Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据,Batch模式和Streaming模式查询数据

这篇具有很好参考价值的文章主要介绍了Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据,Batch模式和Streaming模式查询数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、INSERT INTO

CREATE TABLE `stu` (id int,name string, age int)
PARTITIONED BY (age)

insert into stu values(3,'杀sheng',16),(4,'鸣人',19)

二、INSERT OVERWRITE

仅支持Flink的Batch模式

SET execution.runtime-mode = batch;

INSERT OVERWRITE sample VALUES (1,'a');

INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;

三、UPSERT

当将数据写入v2表格时,Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。

建表时指定

CREATE TABLE `hive_catalog`.`test`.`sample5`(
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled'='true'
);

UPSERT模式下,如果对表进行分区,则分区字段必须是主键。

insert into sample5 values(1,'a');
insert into sample5 values(2,'b');
SET sql-client.execution.result-mode=tableau;
select * from sample5;
insert into sample5 values(2,'c');

四、查询Batch模式

Batch模式:

SET execution.runtime-mode = batch;
select * from sample;

五、查询Streaming模式

Streaming模式:

SET execution.runtime-mode = streaming;
SET table.dynamic-table-options.enabled=true;
SET sql-client.execution.result-mode=tableau;

从当前快照读取所有记录,然后从该快照读取增量数据

SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;

读取指定快照id(不包含)后的增量数据

SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s','start-snapshot-id'='384023852058202')*/;

六、读取Kafka流插入到iceberg表中

下载flink-connector-kafka:

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

创建iceberg表:

CREATE TABLE `hive_catalog`.`test`.`sample5`(
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
)

创建kafka topic对应的表:

create table default_catalog.default_database.kafka(
id int,
data string
) with(
'connector' = 'kafka',
'topic' = 'testKafkaTopic',
'properties.zookeeper.connect'='hadoop1:2101',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'json',
'properties.group.id'='iceberg',
'scan.startup.mode'='earliest-offset'
);

流式读取:

SET sql-client.execution.result-mode=tableau;
SET execution.runtime-mode = streaming;

插入数据

insert into hive_catalog.test1.sample5 select * from default_catalog.default_database.kafka;

查询数据

SELECT * FROM sample5 /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;

topic有最新数据时候,就能源源不断查询到最新数据。文章来源地址https://www.toymoban.com/news/detail-519844.html

到了这里,关于Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据,Batch模式和Streaming模式查询数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Iceberg从入门到精通系列之七:Flink SQL创建Catalog

    type:必须是iceberg catalog-type:内置了hive和hadoop两种catalog,也可以使用catalog-impl来自定义catalog。 catalog-impl:自定义catalog实现的全限定类名。如果未设置catalog-type,则必须设置。 property-version:描述属性版本的版本号。此属性可用于向后兼容,以防属性格式更改。当前属性版本

    2024年02月11日
    浏览(58)
  • Iceberg从入门到精通系列之三:创建Iceberg表、修改表结构、插入数据、删除表

    Hive语法创建分区表,不会在元数据创建分区,而是将分区数据转换为Iceberg标识分区。 这种情况下不能使用Iceberg的分区转换,例如:days(timestamp),如果想要使用Iceberg格式表的分区转换标识分区,需要使用Spark或者Flink引擎创建表。 只支持HiveCatalog表修改表属性,Iceberg表属性和

    2024年02月11日
    浏览(89)
  • Iceberg从入门到精通系列之五:Zeppelin集成iceberg,创建iceberg普通表和分区表,并插入数据

    Zeppelin支持Flink SQL Flink SQL支持iceberg Zeppelin集成Flink SQL后,就可以在Zeppelin上创建iceberg表了 下面演示下Zeppelin集成iceberg后,创建表,插入数据的方便性。

    2024年02月11日
    浏览(48)
  • Iceberg从入门到精通系列之六:Flink集成Iceberg

    下载Flink: https://www.apache.org/dyn/closer.lua/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz 下载Iceberg flink jar包:iceberg-flink-runtime-1.17-1.3.0.jar https://iceberg.apache.org/releases/ 修改配置文件flink-conf.yaml local模式 修改workers 至此FLink成功集成Iceberg

    2024年02月16日
    浏览(43)
  • Oracle数据库从入门到精通系列之十八:Oracle进程

    Oracle中的每个进程都要执行一个特定的任务(或一组任务),每个进程都会为自己分配内存(PGA)来完成它的任务。 一个Oracle实例主要有以下3类进程: 服务器进程(server process)。 后台进程(background process)。 从属进程(slave process)。 这些进程根据客户端的请求来完成工作。 专用服务

    2024年02月09日
    浏览(59)
  • Iceberg从入门到精通系列之二:Iceberg集成Hive

    理解Iceberg核心概念可以阅读博主下面这篇技术博客: Iceberg从入门到精通系列之一:Iceberg核心概念理解 拷贝Iceberg的jar包到Hive的auxlib目录中 启动hdfs 启动yarn 启动historyserver Hive的元数据服务是一种存储和管理Hive表格和数据定义的中央服务,它允许用户定义表格、分区和桶等元

    2024年02月12日
    浏览(35)
  • Iceberg从入门到精通系列之二十二:Spark DDL

    要在 Spark 中使用 Iceberg,请首先配置 Spark 目录。 Iceberg 使用 Apache Spark 的 DataSourceV2 API 来实现数据源和目录。 Spark 3 可以使用 USINGiceberg 子句在任何 Iceberg 目录中创建表: Iceberg会将Spark中的列类型转换为对应的Iceberg类型。详细信息请查看创建表的类型兼容性部分。 PARTITIONE

    2024年02月19日
    浏览(44)
  • Kubernetes(K8s)从入门到精通系列之十四:安装工具

    Kubernetes 命令行工具 kubectl, 让你可以对 Kubernetes 集群运行命令。 你可以使用 kubectl 来部署应用、监测和管理集群资源以及查看日志。 kind 让你能够在本地计算机上运行 Kubernetes。 使用这个工具需要你安装 Docker 或者 Podman。 与 kind 类似,minikube 是一个工具, 能让你在本地运

    2024年02月14日
    浏览(43)
  • Kubernetes(K8s)从入门到精通系列之十一:安装kubeadm

    一台兼容的 Linux 主机。Kubernetes 项目为基于 Debian 和 Red Hat 的 Linux 发行版以及一些不提供包管理器的发行版提供通用的指令。 每台机器 2 GB 或更多的 RAM(如果少于这个数字将会影响你应用的运行内存)。 CPU 2 核心及以上。 集群中的所有机器的网络彼此均能相互连接(公网和

    2024年02月14日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包