FlinkSql写入/读取Kafka

这篇具有很好参考价值的文章主要介绍了FlinkSql写入/读取Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

FlinkSql读取iceberg数据写入kafka

  1. 创建写入kafka的sink表
create table dws_mtrl_tree( 
        ODS_QYBM     INT,                       
        ODS_QYMC  STRING,                           
       MTRL_CODE  STRING,                           
       UPPER_MTRL_CODE  STRING)
        with ( 
        'connector'='kafka', 
        'topic'='dws_mtrl_tree',  
       'properties.bootstrap.servers'='xx.xxx.xxx.xxx:9092', 
        'format' = 'json',
        'sink.partitioner'='round-robin' 
        )
  1. 创建catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://xx.xxx.xxx.xxx:9083',
  'clients'='5',
  'hive-conf-dir'='/opt/softwares/hadoop-3.1.1/etc/hadoop/',
  'warehouse'='hdfs://xx.xxx.xxx.xxx:8020/user/hive/warehouse/hive_catalog',
  'property-version'='1'
);

  1. 插入数据
insert into  dws_mtrl_tree select * from hive_catalog.dws_hgdd. dws_mtrl_tree;

发现kafka中已有数据
flinksql读取kafka数据,flink,大数据,kafka

FlinkSql读取kafka数据写入iceberg

  1. 创建连接Kafka的Source表
CREATE TABLE kafka_dws_mtrl_tree (  
       ODS_QYBM  INT,                       
       ODS_QYMC  STRING,                           
       MTRL_CODE  STRING,                           
       UPPER_MTRL_CODE  STRING
    )with(
    'connector' = 'kafka',
    'topic' = 'dws_mtrl_tree',
    'properties.bootstrap.servers' = 'xx.xxx.xxx.xxx:9092',
	'properties.group.id' = 'consumergroup',
     'format' = 'json',
	 'scan.startup.mode' = 'earliest-offset'
);

  1. 创建iceberg表
CREATE TABLE DWS_MTRL_TREE (  
    ODS_QYBM int, 
	ODS_QYMC string, 
	MTRL_CODE string, 
	UPPER_MTRL_CODE string
 )with(
  'connector'='iceberg',
 'catalog-name'='hive_catalog',  
 'catalog-type'='hive', 
 'catalog-database'='DWS_HGDD',
 'warehouse'='hdfs://xx.xxx.xxx.xxx:8020/user/hive/warehouse/hive_catalog', 
 'format-version'='2' 
 );

3.插入数据

insert into  DWS_MTRL_TREE select * from kafka_dws_mtrl_tree;
  • 问题:
    报错如下:org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId=‘null’, inTransaction=false, closed=false}
    Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic dws_mtrl_tree not present in metadata after 60000 ms.

flinksql读取kafka数据,flink,大数据,kafka
flinksql读取kafka数据,flink,大数据,kafka
排查原因:
通过查看taskmanager的logs发现 :
Error connecting to node node02:9092 (id: 43 rack: null)
flinksql读取kafka数据,flink,大数据,kafka
解决办法:
在/etc/hosts文件中添加映射,问题解决文章来源地址https://www.toymoban.com/news/detail-606198.html

xx.xxx.xxx.xxx node01
xx.xxx.xxx.xxx node02
xx.xxx.xxx.xxx node03

到了这里,关于FlinkSql写入/读取Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2.2 如何使用FlinkSQL读取&写入到文件系统(HDFS\Local\Hive)

    目录 1、文件系统 SQL 连接器 2、如何指定文件系统类型 3、如何指定文件格式 4、读取文件系统 4.1 开启 目录监控  4.2 可用的 Metadata 5、写出文件系统 5.1 创建分区表 5.2 滚动策略、文件合并、分区提交 5.3 指定 Sink Parallelism 6、示例_通过FlinkSQL读取kafka在写入hive表 6.1、创建

    2024年02月07日
    浏览(29)
  • 【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

    当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1.with参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置 true 开启流读模式

    2024年02月14日
    浏览(38)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

    需求描述: 1、数据从 Kafka 写入 ClickHouse。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。 5、Kafka 数据为 Json 格式,通过 FlatMap 扁平

    2024年02月03日
    浏览(34)
  • 【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

    需求描述: 1、数据从 Kafka 写入 Hive。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Flink 集成 Kafka 写入 Hive 需要进行 checkpoint 才能落盘至 HDFS。 5、先在 Hive 中创建表然后动态获取 Hive 的表

    2024年02月03日
    浏览(43)
  • flink:通过table api把文件中读取的数据写入MySQL

    当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 文件info.txt

    2024年03月15日
    浏览(33)
  • 【Flink】FlinkSQL读取Mysql表中时间字段相差13个小时

    问题:Flink版本1.13,在我们使用FlinkSQL读取Mysql中数据的时候,发现读取出来的时间字段中的数据和Mysql表中的数据相差13个小时,Mysql建表语句及插入的数据如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT \\\'自增ID\\\', `name` varchar(64) DEFAULT NULL COMMENT \\\'姓名\\\'

    2024年01月19日
    浏览(25)
  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(30)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(45)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(40)
  • Flink读取kafka数据报错

    报错如下: 解决办法: 修改/usr/local/wyh/kafka/kafka_2.12-2.8.1/config下面的server.properties,默认该配置是被注释掉的额,所以需要放开注释并且配置Host:

    2024年02月13日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包