Flink实现同时消费多个kafka topic,并输出到多个topic

这篇具有很好参考价值的文章主要介绍了Flink实现同时消费多个kafka topic,并输出到多个topic。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.说明

1)代码使用的flink版本为1.16.1,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换;
2)本次编写的两个方案,均只适用于数据源topic来自同一个集群,且kafka消费组相同,暂未研究flink的connect算子join多条流

2.依赖引用

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.16.1</flink.version>
        <hutool.version>5.8.15</hutool.version>
    </properties>
    
 <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool.version}</version>
        </dependency>
        <!--flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>commons-lang3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>commons-lang3</artifactId>
                    <groupId>org.apache.commons</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

3. 方案一:适用于sink topic存在跨集群等kafka生产者配置信息不相同的情况

代码涉及Hadoop相关环境,若无该环境的同学,可以设置为本地路径

3.1配置文件

# 输入topic列表
newInputTopic=hive_data_input_topic
# 输出topic列表
newOutputTopic=topic-test

3.2 java代码

public static void main(String[] args) throws Exception {
		// 设置操作HDFS的用户
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        // 获取命令行参数,args[0] 为配置文件路径 input/customer.properties
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
        String inputTopic = parameterTool.get("newInputTopic");
        String outputTopic = parameterTool.get("newOutputTopic");
        // 构建输入topic
        ArrayList<String> inputTopicList = new ArrayList<>();
        inputTopicList.add("canal_mysql_input_topic");
        if (!StringUtils.isNullOrWhitespaceOnly(inputTopic)) {
            inputTopicList.add(inputTopic);
        }
        // 构建输出topic
        Map<String, String> hashMap = new HashMap<>();
        hashMap.put("ap_article", "canal_input_topic");
        hashMap.put("ap_user", "cast_topic_input");
        if (!StringUtils.isNullOrWhitespaceOnly(outputTopic)) {
            hashMap.put("hive_table_orders", "topic-test");
        }

        // 构建配置
        Configuration configuration = new Configuration();
        // 设定本地flink dashboard的webUi访问端口,即http://localhost:9091
        configuration.setString("rest.port", "9091");
        // 设定从指定的checkpoint恢复,此处为HDFS路径,可更换为本地路径"file:///D:\\test\\flink-tuning\\checkpoint\\jobId\\chk-xx"
        String savePointPath = "hdfs://masterNode:8020/flink-tuning/checkpoint/b66ee8431170f07764db0e777c58848a/chk-36";
        // 设置savepoint路径,以及是否允许本次提交的程序有新增有状态算子,必须给原来的算子配置uid作为唯一标识,否则会出现问题
        SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savePointPath, true);
        SavepointRestoreSettings.toConfiguration(restoreSettings, configuration);
        
		// 获取执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 开启检查点,设置检查点间隔时间
        environment.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        // 设置状态后端类型
        environment.setStateBackend(new HashMapStateBackend());
        CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
        // 设置checkpoint文件存放路径,设置本地路径:file:///D:\\test\\flink-tuning\\checkpoint
        checkpointConfig.setCheckpointStorage("hdfs://masterNode:8020/flink-tuning/checkpoint");
        // 设置并发数,同时最多可以有几个checkpoint执行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        // checkpoint失败次数,超过此次数,job挂掉(checkpoint不会重试,会等待下一个checkpoint)
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        // 超时多久没完成checkpoint,任务失败
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
        // 手动cancel掉job时,保留在外部系统的checkpoint不会被删除
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 从kafka读取数据
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.200.130:9092")
                .setTopics(inputTopicList)
                .setGroupId("group-test-savepoint")
                // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        SingleOutputStreamOperator<String> streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
                .uid("kafka_source")	// 最好设置一下算子的id
                .setParallelism(5); // 设置并行度 = topic分区数

        // 此处使用循环,会开辟map键值对个数的算子链,多个filter --> sink算子链,详情见下图
      	// map中可配置topic所属集群,以及鉴权信息等,此处省略
        for (String key : hashMap.keySet()) {
        	// filter算子根据数据中的表名table与topic之间的映射关系,过滤数据
            SingleOutputStreamOperator<String> outputStreamOperator = streamSource.filter(vo -> {
                JSONObject jsonObject = JSONUtil.parseObj(vo);
                String tableName = (String) jsonObject.get("table");
                return tableName.equals(key);
            }).uid("filter-" + key).setParallelism(5);

			// 构建kafka sink
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
            		// kafka集群,可根据不同topic所在集群不同,动态更换ip
                    .setBootstrapServers("192.168.200.130:9092")
                    // 自定义kafka序列化器
                    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    		// 根据映射获取输出topic
                            .setTopic(hashMap.get(key))
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
                     // 一致性语义:至少一次
                    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .build();
            // sink算子
            outputStreamOperator.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
        }

        // 执行
        environment.execute();

3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)

Flink实现同时消费多个kafka topic,并输出到多个topic

4.方案二:适用于输入及输出topic都用属于一个集群的场景

4.1 配置文件同上

4.2 Java代码

public static void main(String[] args) throws Exception {
		// 环境配置同上,故此处省略。。。

        // 从kafka读取数据
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.200.130:9092")
                .setTopics(inputTopicList)
                .setGroupId("group-test-savepoint")
                // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        SingleOutputStreamOperator<String> streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
                .uid("kafka_source")	// 最好设置一下算子的id
                .setParallelism(5); // 设置并行度 = topic分区数
                
        // 输出到kafka,此处没有循环,只会产生一条算子链
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.200.130:9092")	// 输出topic的kafka集群固定
                .setRecordSerializer((KafkaRecordSerializationSchema<String>) (data, context, timestamp) -> {
                    JSONObject jsonObject = JSONUtil.parseObj(data);
                    // 获取表名
                    String table = (String) jsonObject.get("table");
                    // 获取topic
                    String topic = hashMap.get(table);
                    return new ProducerRecord<>(topic, data.getBytes(StandardCharsets.UTF_8));
                })
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
        // sink算子
        streamSource.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
        }
        // 执行
        environment.execute();

5. 业务使用场景:

Flink实现同时消费多个kafka topic,并输出到多个topic文章来源地址https://www.toymoban.com/news/detail-419653.html

到了这里,关于Flink实现同时消费多个kafka topic,并输出到多个topic的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中

    https://blog.csdn.net/qinqinde123/article/details/128130131 flume监听到有新文件出现的时候,会将文件内容推送到kakfa的topic中,但是如果文件夹中有不同类型的文件,直接推送到kafka的同一个topic中,如果根据内容无法区分不同类型的文件,那就需要根据文件名称来区分。flume本身根据配置

    2023年04月08日
    浏览(66)
  • 大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中

    前言 题目: 一、读题分析 二、处理过程   1.数据处理部分: 2.HBaseSink(未经测试,不能证明其正确性,仅供参考!) 三、重难点分析 总结  什么是HBase? 本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理 注:由于设备问题,代码执行结果

    2024年02月03日
    浏览(43)
  • Flink1.17.1消费kafka3.5中的数据出现问题Failed to get metadata for topics [flink].

    问题呈现 Failed to get metadata for topics [flink]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connecto

    2024年02月11日
    浏览(58)
  • 大数据之使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层

    前言 题目: 一、读题分析 二、处理过程 三、重难点分析 总结  本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理 注:由于设备问题,代码执行结果以及数据的展示无法给出,可参照我以往的博客其中有相同数据源展示     提示:以下是本

    2024年02月04日
    浏览(48)
  • Kafka/Spark-01消费topic到写出到topic

    消费者代码 注意点 consumerConfigs是定义的可变的map的类型的,具体如下 consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下 Subscribe传参需要指定泛型,这边指定string,

    2024年02月09日
    浏览(35)
  • Kafka - Topic 消费状态常用命令

    replication-factor:指定副本数量 partitions:指定分区 查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数 这里同样需要根据新、旧版本的consumer,分别指

    2024年01月25日
    浏览(54)
  • Kafka某个Topic无法消费问题

    12月28日,公司测试环境Kafka的task.build.metadata.flow这个topic突然无法消费。 其他topic都正常使用,这个topic只有一个分区,并且只有一个消费者 首先登录服务器,运行kafka的cli命令,查看消费者组的详情。 由上图可以发现,task.build.metadata.flow这个topic,最新offset是2,但是当前o

    2024年02月03日
    浏览(51)
  • springboot整合rocketmq:一个消费者组怎么订阅多个topic

            一个消费者组中的所有消费者订阅关系,可以多个topic,多个tag,但是必须一致,否则就倒沫子了,如下图:  下面贴了如下结构的代码  一个消费组(消费者)订阅多个topic的代码(只写了一个消费组的,其他类似): 结果:

    2024年02月15日
    浏览(54)
  • kafka如何动态消费新增topic主题

    一、解决痛点 使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务 。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件 二、组件

    2023年04月21日
    浏览(41)
  • 如何查看Kafka的Topic消费情况

    进入kafka安装目录,然后执行以下命令  2.10为Scala版本,0.10.0.2.5.3.0为kafka版本  

    2024年02月10日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包