kafka复习:(24)consume-transform-produce模式

这篇具有很好参考价值的文章主要介绍了kafka复习:(24)consume-transform-produce模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.*;

public class KafkaTest24 {
    public static final String brokerList = "k8s-master:9092";

    public static Properties getConsumerProperties() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //必须配置手动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        return props;
    }

    public static Properties getProducerProperties() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
        return props;
    }

    //先从source-topic消费,再往sink-topic生产
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties());
        consumer.subscribe(Collections.singletonList("source-topic"));
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());


        //初始化事务
        producer.initTransactions();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            if (!records.isEmpty()) {
                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                //开启事务
                producer.beginTransaction();
                try {
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            ProducerRecord<String, String> producerRecord =
                                    new ProducerRecord<>("sink-topic", record.key(), record.value());
                            producer.send(producerRecord);
                            System.out.println("sent :" + record.value());
                        }
                        long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));
                    }
                    // 提交消费位移
                    // consume-transform-produce模式,此处的group id 必须要配置成consumer 中配置的group id
                    producer.sendOffsetsToTransaction(offsets, "groupId");
                    producer.commitTransaction();
                } catch (ProducerFencedException e) {
                    producer.abortTransaction();
                }
            }
        }
    }
}

文章来源地址https://www.toymoban.com/news/detail-682623.html

到了这里,关于kafka复习:(24)consume-transform-produce模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 图解系列 图解Kafka之Producer

    开局一张图,其他全靠吹 发送消息流程如下 : 指定bootstrap.servers,地址的格式为 host:port 。它会连接bootstrap.servers参数指定的所有Broker,Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与

    2024年02月09日
    浏览(35)
  • Kafka_02_Producer详解

    Producer (生产者): 生产并发送消息到Broker(推送) Producer是 多线程安全 的(建议通过池化以提高性能) Producer实例后可发送多条消息(可对应多个ProducerRecord) // 0.9之后的版本是基于Java实现(之前是Scala实现) Producer客户端发送消息大致逻辑: 配置Producer客户端参数并创建该Producer实例 构

    2024年02月01日
    浏览(37)
  • Kafka中的producer拦截器

    在Kafka中,拦截器一共有两种。分别是生产者端的和消费者端的。本文介绍生产者端的拦截器 Kafka Producer拦截器(Interceptor)主要用于实现clients端的定制化控制逻辑。对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如

    2024年02月16日
    浏览(33)
  • Kafka之Producer网络传输

    在Kafka的组成部分(Broker、Consumer、Producer)中,设计理念迥异,每个部分都有自己独特的思考。而把这些部分有机地组织起来,使其成为一个整体的便是「网络传输」。区别于其他消息队列的方式( RocketMQ处理网络部分直接使用成熟的组件Netty ),Kafka则是直接对java的NIO进行

    2024年03月23日
    浏览(39)
  • 谈谈 Kafka 的幂等性 Producer

    使用消息队列,我们肯定希望不丢消息,也就是消息队列组件,需要保证消息的可靠交付。消息交付的可靠性保障,有以下三种承诺: 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。 至少一次(at least once):消息不会丢失,但有可能被重复发送。 精确一次

    2024年02月14日
    浏览(38)
  • 【Kafka】Kafka consumer lag 为负数

    最近对Kafka 集群部署了 Kafka_exporter 监控,并集成了 granfana 图标展示。 发现 Consumer Group Lag 有时候为负数。 于是进行一番查询,并总结整理下。 从下图可以看出, consumer group 值有时候出现负数的情况。 消息过期了(超过默认7天),已经被清理掉了,这时候 topic 最新的 end

    2024年02月13日
    浏览(49)
  • 图解Kafka Producer常用性能优化配置参数

    bootstrap.servers:Kafka broker服务器地址列表, , 分开,可不必写全,Kafka内部有自动感知Kafka broker的机制 client.dns.lookup:客户端寻找bootstrap地址的方式,支持两种方式: resolve_canonical_bootstrap_servers_only:依据bootstrap.servers提供的主机名(hostname),根据主机上的名称服务返回其IP地址

    2024年02月03日
    浏览(36)
  • Failed to construct kafka producer

    问题重述: org.apache.kafka.common.KafkaException: Failed to construct kafka producer Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers Kafka连接不上 解决办法: 将kafka信息输入到hosts文件中 文件地址 使用Notepad++打开,输入ip地址和kafka名称

    2024年01月18日
    浏览(32)
  • Kafka中Consumer源码解读

    本课程的核心技术点如下: 1、consumer初始化 2、如何选举Consumer Leader 3、Consumer Leader是如何制定分区方案 4、Consumer如何拉取数据 5、Consumer的自动偏移量提交 从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法 这个方法的前面代码部分都是一些配置,我们分析源码要抓核心

    2024年02月09日
    浏览(34)
  • 简单聊聊Kafka的Consumer

    我们知道消息队列一般有两种实现方式,(1)Push(推模式) (2)Pull(拉模式),那么 Kafka Consumer 究竟采用哪种方式进行消费的呢? 其实 Kafka Consumer 采用的是主动拉取 Broker 数据进行消费的即 Pull 模式 。这两种方式各有优劣,我们来分析一下: 1)、为什么不采用Push模式?**如果是选择

    2024年01月15日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包