笔记:配置多个kafka生产者和消费者

这篇具有很好参考价值的文章主要介绍了笔记:配置多个kafka生产者和消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx;

本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。

适用场景:需要消费来源于不同kafka的消息、需要在不同的kafka生产消息。

1、配置自定义Kafka Properties信息

custom.kafka.test.bootstrap-servers = my-server1,my-server2
custom.kafka.test.consumer.group-id = my-consumer
custom.kafka.test.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer
custom.kafka.test.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
custom.kafka.test.consumer.enable-auto-commit = false
custom.kafka.test.consumer.auto-offset-reset = latest
custom.kafka.test.producer.key-serializer = org.apache.kafka.common.serialization.LongSerializer
custom.kafka.test.producer.value-serializer = org.springframework.kafka.support.serializer.JsonSerializer
custom.kafka.test.listener.ack-mode = manual

2、代码定义生产者和消费者

当然也可以只定义生产者或者只定义消费者,按需进行,以下示例是同时定义生产者和消费者。

import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.util.ObjectUtils;

import java.util.Map;

/**
 * 自定义kafka配置
 */
@Configuration
public class CustomKafkaConfig {

    /** 生产者 */
    @Bean("kafkaCustomTemplate")
    public KafkaTemplate<String, Object> kafkaCustomTemplate(
            @Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
        return new KafkaTemplate<>(producerFactory(customKafkaProperties));
    }

    private ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
        kafkaProperties.getProducer().setAcks(StringUtils.isBlank(kafkaProperties.getProducer().getAcks()) ? "all" : kafkaProperties.getProducer().getAcks());
        Map<String, Object> properties = kafkaProperties.buildProducerProperties();
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    /** 消费者 */
    @Bean("kafkaCustomContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, String>> kafkaCustomContainerFactory(
            @Autowired @Qualifier("customKafkaProperties") KafkaProperties customKafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<Long, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(customKafkaProperties));
        factory.setConcurrency(ObjectUtils.isEmpty(customKafkaProperties.getListener().getConcurrency()) ?
                Runtime.getRuntime().availableProcessors() : customKafkaProperties.getListener().getConcurrency());
        factory.getContainerProperties().setAckMode(ObjectUtils.isEmpty(customKafkaProperties.getListener().getAckMode()) ?
                ContainerProperties.AckMode.MANUAL : customKafkaProperties.getListener().getAckMode());
        return factory;
    }

    private ConsumerFactory<Long, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    /** 配置文件*/
    @ConfigurationProperties(prefix = "custom.kafka.test")
    @Bean("customKafkaProperties")
    public KafkaProperties customKafkaProperties() {
        return new KafkaProperties();
    }

    // @Primary 要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错
    @Primary
    @ConfigurationProperties(prefix = "spring.kafka")
    @Bean
    public KafkaProperties kafkaProperties() {
        return new KafkaProperties();
    }

    @Primary
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(@Autowired ProducerFactory<?, ?> kafkaProducerFactory, @Autowired KafkaProperties kafkaProperties,
                                             ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

}

Tips:

1)消费者

     需要实现自定义的KafkaListenerContainerFactory Bean

2)生产者

     需要实现自定义的KafkaTemplate Bean

3)@Primary

     @Autowired注解默认是根据类型Type来自动注入的,当有多个相同类型的bean时,使用@Primary来赋予bean更高的优先级。

3、应用

1)消费者

@Component
@Slf4j
public class TestKafkaListener {

    @KafkaListener(
            topics = {"myTestTopic"},
            containerFactory = "kafkaCustomContainerFactory")
    public void testReceive(ConsumerRecord<Long, String> record, Acknowledgment ack) {
        // 业务代码 start
        // ...
        // 业务代码 end
        ack.acknowledge();
    }

}

2)生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;


@RefreshScope
@Service
@Transactional
@Slf4j
public class TestKafkaService implements TestKafkaServiceI {


    @Qualifier("kafkaCustomTemplate")
    @Autowired
    private KafkaTemplate<String, Object> kafkaCustomTemplate;

    @Override
    public void testSend(String jsonParam) {
        // 发送kafka消息
        TestKafkaEvent<String> event = new TestKafkaEvent<>(jsonParam);
        try {
            kafkaCustomTemplate.send(event.getProducerRecord());
        }
        catch (Exception e) {
            throw new RuntimeException("发送消息失败");
        }
    }
}

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.Serializable;

@Slf4j
public class TestKafkaEvent<T extends Serializable> {

    private T source;

    @Override
    public ProducerRecord<String, Object> getProducerRecord() {
        log.info("发送消息: {}", getSource());
        return new ProducerRecord<>("my-tes-topic", getSource());
    }
    private TestKafkaEvent(){}

    public TestKafkaEvent(T source) {
        this.source = source;
    }

    public T getSource() {
        return this.source;
    }

    public void setSource(T source) {
        this.source = source;
    }
}

参考:聊聊在springboot项目中如何配置多个kafka消费者文章来源地址https://www.toymoban.com/news/detail-613238.html

到了这里,关于笔记:配置多个kafka生产者和消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka生产者消费者练习

    需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中 生产的数据格式: 造数据 {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    浏览(43)
  • Kafka系列之:Kafka生产者和消费者

    batch.size:只有数据积累到batch.size之后,sender才会发送数据,默认16K。 linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。 0:生产者发送过来的数据,不需要等数据罗盘应答。 1:生产者发送过来的

    2023年04月09日
    浏览(42)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(42)
  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(53)
  • Kafka生产者与消费者api示例

      一个正常的生产逻辑需要具备以下几个步骤 配置生产者参数及创建相应的生产者实例 构建待发送的消息 发送消息 关闭生产者实例 采用默认分区方式将消息散列的发送到各个分区当中    对于properties配置的第二种写法,相对来说不会出错,简单举例:   1.kafka的生产者可

    2024年02月07日
    浏览(70)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(41)
  • 【Rust 基础篇】Rust 通道实现单个消费者多个生产者模式

    在 Rust 中,我们可以使用通道(Channel)来实现单个消费者多个生产者模式,简称为 MPMC。MPMC 是一种常见的并发模式,适用于多个线程同时向一个通道发送数据,而另一个线程从通道中消费数据的场景。本篇博客将详细介绍 Rust 中单个消费者多个生产者模式的实现方法,包含

    2024年02月16日
    浏览(39)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(86)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(45)
  • 如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

    场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议 rabbitmq实现思路: 选型:发布订阅模式(Publish/Subscribe) 一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,

    2023年04月25日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包