Java实现Kafka消费者及消息异步回调方式

这篇具有很好参考价值的文章主要介绍了Java实现Kafka消费者及消息异步回调方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka 在创建消费者进行消费数据时,由于可以理解成为是一个kafka 的单独线程,所以在Kafka消费数据时想要在外部对消费到的数据进行业务处理时是获取不到的,所以就需要实现一个消息回调的接口来进行数据的保存及使用。

消息回调接口实现代码如下

/**
 * 消息队列接收消息回调
 */
public interface TestCallBack {
    /**
     * 消息队列接收消息回调
     *
     * @param s 消息列表
     */
    void callBack(String s);
}

Kafka消费者代码实现如下

public class KafkaTest extends Thread{

    private String topic;

    private String ip;

    private TestCallBack testCallBack;

    /**
     * kafka停止消费标识
     */
    public static Map<String, Boolean> kafka = new ConcurrentHashMap<>();

    public KafkaTest(String topic, String ip, TestCallBack testCallBack){
        this.topic = topic;
        this.ip = ip;
        this.testCallBack = testCallBack;
    }
    public void run(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        KafkaConsumer<String, String> indexKafkaConsumer = new KafkaConsumer<>(props);
        indexKafkaConsumer.subscribe(Arrays.asList(topic));
        KafkaTest.kafka.put(ip+topic, true);
        while (kafka.get(ip+topic)){
            ConsumerRecords<String, String> poll = indexKafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : poll) {
                String value = consumerRecord.value();
                testCallBack.callBack(value);
            }
        }
    }
}

其中

 ConsumerConfig.GROUP_ID_CONFIG 为消费者组Id  不同的消费者组每次消费都是新数据否则会出现消费者1消费到的数据以后 消费者2消费不到数据的情况

ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为自动提交offset

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 为设置消费顺序是否重头开始消费

三种情况如下

latest (默认)
earliest
none
三者均有共同定义:
对于同一个消费者组,若已有提交的offset,则从提交的offset开始接着消费

意思就是,只要这个消费者组消费过了,不管auto.offset.reset指定成什么值,效果都一样,每次启动都是已有的最新的offset开始接着往后消费

不同的点为:

latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据
就是说如果这个topic有历史消息,现在新启动了一个消费者组,且auto.offset.reset=latest,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时topic有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交offset
这时候如果该消费者组意外下线了,topic仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的offset处开始接着消费,此时走的就是共同定义

earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费
就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。
一旦该消费者组消费过topic后,此时就有该消费者组的offset了,这种情况下即使指定了auto.offset.reset=earliest,再重新启动该消费者组,效果是与latest一样的,也就是此时走的是共同的定义

none:对于同一个消费者组,若没有提交过offset,会抛异常
一般生产环境基本用不到该参数

触发创建Kafka消费者代码如下

@RestController
@RequestMapping("/kafka")
public class KafkaDemo {

    /**
     * 线程缓存工具
     */
    public static Map<String, Thread> threadHashMap = new ConcurrentHashMap<>();

    /**
     *  Kafka消费者创建消费
     * @param ip kafka连接ip
     * @param topic kafka消费topic
     */
    @GetMapping("/start")
    public void start(@RequestParam("ip") String ip, @RequestParam("topic") String topic) {

        KafkaTest kafkaDemo = new KafkaTest(topic, ip, new TestCallBack() {
            @Override
            public void callBack(String s) {
                System.out.println("Kafka消费到的消息 : " + s);
            }
        });
        threadHashMap.put("10.4.130.71:9092/kafkaDemo", kafkaDemo);
        kafkaDemo.start();
        System.out.println(123);
    }

    /**
     *  Kafka消费者停止
     * @param ip kafka连接ip
     * @param topic kafka消费topic
     */
    @GetMapping("/stop")
    public void stop(@RequestParam("ip") String ip, @RequestParam("topic") String topic) {
        Thread thread = threadHashMap.get(ip + topic);
        KafkaTest.kafka.put(ip + topic, false);
        thread.isInterrupted();
    }

}

 在调用创建Kafka消费者时可以进行回调方式实现,这样就可以在外部进行消费者的消费数据进行后续逻辑实现文章来源地址https://www.toymoban.com/news/detail-462560.html

到了这里,关于Java实现Kafka消费者及消息异步回调方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(34)
  • kafka-python 消费者消费不到消息

    使用 group_id=”consumer_group_id_001“ 和  auto_offset_reset=\\\"earliest\\\"    生产者发完消息后,在close中  先执行 producer.flush() ,再执行 producer.close() 使用offset 观看消息是否写到kafka中。    

    2024年02月10日
    浏览(26)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(31)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(35)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(30)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(32)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(27)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(30)
  • Kafka 入门到起飞 - Kafka怎么做到保障消息不会重复消费的? 消费者组是什么?

    消费者 : 1、订阅Topic(主题) 2、从订阅的Topic消费(pull)消息, 3、将消费消息的offset(偏移量)保存在Kafka内置的一Topic名字是_consumer_offsets的主题中,在Kafka的logs文件下能看到这👟文件,存放的是消息的偏移量数据 消费者组 : 1、订阅同一个Topic的消费者可以加入到一个

    2024年02月15日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包