一、完整代码
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;
public class ReadDemo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.获取tableLoader
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
//2.通过FlinkSource构建数据流
DataStream<RowData> build = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(false) // false batch方式 true streaming方式
// .streaming(true)
// .asOfTimestamp()
// .startSnapshotId()
.build();
//转换为二元组
build.map(r -> Tuple2.of(r.getInt(0),r.getLong(1)))
.returns(Types.TUPLE(Types.INT,Types.LONG))
.print();
env.execute();
}
}
- streaming(false) :false batch方式
- streaming(true):true streaming方式
二、效果如下所示
11> (2,3)
11> (4,5)
文章来源地址https://www.toymoban.com/news/detail-519846.html
文章来源:https://www.toymoban.com/news/detail-519846.html
到了这里,关于Iceberg从入门到精通系列之十一:Flink DataStream读取Iceberg表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!