Flink DataStream之从Kafka读数据

这篇具有很好参考价值的文章主要介绍了Flink DataStream之从Kafka读数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • 搭建Kafka

参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客

  •  启动zookeeper
[root@localhost kafka_2.12-2.8.1]# pwd
/usr/local/wyh/kafka/kafka_2.12-2.8.1
[root@localhost kafka_2.12-2.8.1]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 启动kafka
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-server-start.sh config/server.properties &
  • 查看进程

Flink DataStream之从Kafka读数据,大数据之Flink,kafka,flink,kafka,大数据

  •  创建topic
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-flink-topic
  • 查看topic列表
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181                                             test-flink-topic
  • 导入pom依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
</dependency>
  • 新建类
package test01;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestReadKafka {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从kafka读
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//这里的泛型需要指定从kafka读数据的数据类型
                .setBootstrapServers("192.168.126.128:9092")//设置kafka server
                .setGroupId("test-consumer-group")//设置consumer groupid
                .setTopics("test-flink-topic")//设置要读取数据的topic
                .setValueOnlyDeserializer(new SimpleStringSchema())//从Kafka读数据时需要进行反序列化,由于kafka的数据一般是存在value中的,不是key中,所以这里我们使用的序列化器是只对Value进行反序列化。这里的参数是用的String类型的反序列化器,因为在前面build时我们设置了要读取的数据类型是String类型。
                .setStartingOffsets(OffsetsInitializer.latest())//设置flink读取kafka数据的读取策略,这里设置的是从最新数据消费
                .build();

        streamExecutionEnvironment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
                        .print();
        streamExecutionEnvironment.execute();
    }
}
  • 启动程序
  • 在终端向kafka生产数据,同时观察程序控制台flink的读取情况
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-console-producer.sh --broker-list 192.168.126.128:9092 --topic test-flink-topic

Flink DataStream之从Kafka读数据,大数据之Flink,kafka,flink,kafka,大数据

 如图说明flink从kafka成功读取数据。文章来源地址https://www.toymoban.com/news/detail-536761.html

到了这里,关于Flink DataStream之从Kafka读数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink生产数据到kafka

    文章目录 前言 一、版本 二、使用步骤 1.maven引入库 2.上代码 近期开始学习Flink程序开发,使用java语言,此文以生产数据至kafka为例记录下遇到的问题以及代码实现,若有错误请提出。 Flink版本:1.15.4 kafka版本:3.0.0 以下代码将Flink环境初始化、配置、生产数据至kafka代码放在

    2023年04月26日
    浏览(36)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • Flink读取kafka数据报错

    报错如下: 解决办法: 修改/usr/local/wyh/kafka/kafka_2.12-2.8.1/config下面的server.properties,默认该配置是被注释掉的额,所以需要放开注释并且配置Host:

    2024年02月13日
    浏览(49)
  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(37)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • 6.2、Flink数据写入到Kafka

    目录 1、添加POM依赖 2、API使用说明 3、序列化器 3.1 使用预定义的序列化器 3.2 使用自定义的序列化器 4、容错保证级别 4.1 至少一次 的配置 4.2 精确一次 的配置 5、这是一个完整的入门案例 Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    2024年02月09日
    浏览(48)
  • 使用Flink处理Kafka中的数据

    目录         使用Flink处理Kafka中的数据 前提:  一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据           具体代码(scala)                 具体执行代码①                 重要逻

    2024年01月23日
    浏览(54)
  • flink 从kafka读取数据报错

    报错: Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandl

    2024年02月02日
    浏览(45)
  • 大数据-玩转数据-FLINK-从kafka消费数据

    大数据-玩转数据-Kafka安装 运行本段代码,等待kafka产生数据进行消费。

    2024年02月14日
    浏览(38)
  • 数据流处理框架Flink与Kafka

    在大数据时代,数据流处理技术已经成为了一种重要的技术手段,用于处理和分析大量实时数据。Apache Flink和Apache Kafka是两个非常重要的开源项目,它们在数据流处理领域具有广泛的应用。本文将深入探讨Flink和Kafka的关系以及它们在数据流处理中的应用,并提供一些最佳实践

    2024年04月23日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包