Flink使用 KafkaSource消费 Kafka中的数据

这篇具有很好参考价值的文章主要介绍了Flink使用 KafkaSource消费 Kafka中的数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.前言

目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
//指定kafka的Broker地址
properties.setProperty("bootstrap.servers", "192.168.xx.xx:9092");
//指定组ID
properties.setProperty("group.id", "flink-demo");
//如果没有记录偏移量,第一次从最开始消费
properties.setProperty("auto.offset.reset", "earliest");

FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic005", new SimpleStringSchema(), properties);

新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下:

'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer' is deprecated 

2.如何使用 KafkaSource

新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kfkSource = KafkaSource.<String>builder()
    .setBootstrapServers("192.168.xx.xx:9092")
    .setGroupId("flink-demo")
    .setTopics("topic005")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
DataStreamSource<String> lines = env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), "kafka source");

3.总结

开发者在工作中应该尽量避免使用被标记为 deprecated的方法或者类,要及时进行更换,保持代码的活力。文章来源地址https://www.toymoban.com/news/detail-606351.html

到了这里,关于Flink使用 KafkaSource消费 Kafka中的数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交

    1、背景 flink消费kafka数据,多并发,实现双流join 2、现象 (1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint (2)其中一个流的subtask显示finished (3)无背压 3、问题原因 (1)其中一个topic分区为1 (2)配置的并行

    2024年02月13日
    浏览(10)
  • 使用Flink处理Kafka中的数据

    目录         使用Flink处理Kafka中的数据 前提:  一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据           具体代码(scala)                 具体执行代码①                 重要逻

    2024年01月23日
    浏览(8)
  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(11)
  • 大数据-玩转数据-FLINK-从kafka消费数据

    大数据-玩转数据-Kafka安装 运行本段代码,等待kafka产生数据进行消费。

    2024年02月14日
    浏览(11)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(14)
  • Idea本地跑flink任务时,总是重复消费kafka的数据(kafka->mysql)

    Idea本地跑flink任务时,总是重复消费kafka的数据(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中执行任务时,没法看到JobManager的错误,以至于我以为是什么特殊的原因导致任务总是反复消费。在close方法中,增加日志,发现jdbc连接被关闭了。 重新消费,jdbc连接又启动了。 注意,在Flink的函数中,open和close方法

    2024年02月07日
    浏览(10)
  • flink1.16使用消费/生产kafka之DataStream

    flink高级版本后,消费kafka数据一种是Datastream 一种之tableApi。 上官网 Kafka | Apache Flink 引入依赖 flink和kafka的连接器,里面内置了kafka-client 使用方法 很简单一目了然。 topic和partition  反序列化 其实就是实现接口 DeserializationSchema 的deserialize()方法 把byte转为你想要的类型。 起

    2024年02月16日
    浏览(8)
  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(12)
  • 大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中

    前言 题目: 一、读题分析 二、处理过程   1.数据处理部分: 2.HBaseSink(未经测试,不能证明其正确性,仅供参考!) 三、重难点分析 总结  什么是HBase? 本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理 注:由于设备问题,代码执行结果

    2024年02月03日
    浏览(11)
  • 大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层

    大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层

    前言 题目: 一、读题分析 二、处理过程 三、重难点分析 总结  本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理 注:由于设备问题,代码执行结果以及数据的展示无法给出,可参照我以往的博客其中有相同数据源展示     提示:以下是本

    2024年02月04日
    浏览(20)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包