[springboot配置Kafka] springboot配置多个kafka,包含账号密码

这篇具有很好参考价值的文章主要介绍了[springboot配置Kafka] springboot配置多个kafka,包含账号密码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

说明

本示例只配置了Consumer没有配置Producer,可参考配置文件_1中注释内容部分文章来源地址https://www.toymoban.com/news/detail-558310.html

1.引入依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

2.yml配置

spring:
  kafka:
    one:
    #测试环境
      bootstrap-servers: 127.0.0.1:9092
      topic: default_topic
      properties:
        security:
          protocol: SASL_PLAINTEXT
        sasl:
          mechanism: SCRAM-SHA-512
          jaas:
            config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName
        #关闭自动提交
        enable-auto-commit: false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5
    two:
      #测试环境
      bootstrap-servers: 127.0.0.1:9092
      topic: default_topic_two
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName_two
        #关闭自动提交
        enable-auto-commit: false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 5

3.新建配置文件

3.1配置文件_1

@Configuration
@EnableKafka
public class K1kafkaConfiguration {

    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.one.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.one.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.one.properties.security.protocol}")
    private String securityprotocol;
    @Value("${spring.kafka.one.properties.sasl.mechanism}")
    private String mechanism;
    @Value("${spring.kafka.one.properties.sasl.jaas.config}")
    private String jaasconfig;


    //@Value("${spring.kafka.one.producer.linger-ms}")
    //private Integer lingerMs;
    //@Value("${spring.kafka.one.producer.max-request-size}")
    //private Integer maxRequestSize;
    //@Value("${spring.kafka.one.producer.batch-size}")
    //private Integer batchSize;
    //@Value("${spring.kafka.one.producer.buffer-memory}")
    //private Integer bufferMemory;


    //@Bean
    //public KafkaTemplate<String, String> kafkaOneTemplate() {
    //    return new KafkaTemplate<>(producerFactory());
    //}
    @Bean
    @Primary
//理解为默认优先选择当前容器下的消费者工厂
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        //并发数量
        factory.setConcurrency(1);
        //开启批量监听
         //factory.setBatchListener(type);
        // 被过滤的消息将被丢弃
        // factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        // 设置记录筛选策略
        //factory.setRecordFilterStrategy(new RecordFilterStrategy() {
        //    @Override
        //    public boolean filter(ConsumerRecord consumerRecord) {
        //        String msg = consumerRecord.value().toString();
        //        if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){
        //            return false;
        //        }
        //        // 返回true消息将会被丢弃
        //        return true;
        //    }
        //});
        return factory;
    }

    //private ProducerFactory<String, String> producerFactory() {
    //    return new DefaultKafkaProducerFactory<>(producerConfigs());
    //}
    @Bean//第一个消费者工厂的bean
    public ConsumerFactory<Integer, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    //private Map<String, Object> producerConfigs() {
    //    Map<String, Object> props = new HashMap<>();
    //    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    //    props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
    //    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
    //    props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
    //    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
    //    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //    return props;
    //}


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put("security.protocol",securityprotocol);
        props.put("sasl.mechanism",mechanism);
        props.put("sasl.jaas.config",jaasconfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

3.2配置文件_2

@Configuration
@EnableKafka
public class K2kafkaConfiguration {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.two.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.two.consumer.max-poll-records}")
    private String maxPollRecords;


  

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        //并发数量
        factory.setConcurrency(1);
        //开启批量监听
        //factory.setBatchListener(type);
        // 被过滤的消息将被丢弃
        // factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        return factory;
    }


    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

 

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

4.设置消费

4.1 设置消费_1

@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer {

    @KafkaListener(topics = "#{'${spring.kafka.one.topic}'}", groupId = "defaultName",containerFactory = "kafkaListenerContainerFactory")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("[Consumer] 接收到kafka消息:{}",record.value());
        System.out.println(record);
        System.out.println(record.value());
        //手动提交offset
        //ack.acknowledge();
    }

4.2 设置消费_2

@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer2 {

    @KafkaListener(topics = "#{'${spring.kafka.two.topic}'}", groupId = "defaultName_two",containerFactory = "kafkaTwoContainerFactory")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("[Consumer2 ] 接收到kafka消息:{}",record.value());
        System.out.println(record);
        System.out.println(record.value());
        //手动提交offset
        //ack.acknowledge();
    }

到了这里,关于[springboot配置Kafka] springboot配置多个kafka,包含账号密码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka安装配置(包含内外网设置)

    这里使用的kafka安装包:kafka_2.13-3.4.0.tgz kafka版本2.2+的版本,已经不需要依赖zookeeper来查看、创建topic,新版本使用--bootstrap-server替换老版本的--zookeeper-server。  配置完内网可以正常得去消费或者添加kfka数据 如果配置了advertised.listeners就无法正常访问消费 因为我的外网通讯虽然

    2024年02月06日
    浏览(38)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(86)
  • kafka 设置用户密码和通过SpringBoot测试

    当前Kafka认证方式采用 动态增加用户 协议。 自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此文主要介绍SASL方式。 1)SASL验证: 验证方式 Kafka版本 特点 SASL/PLAIN 0.10.0.0 不能动态增加用户

    2024年02月17日
    浏览(39)
  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(54)
  • springBoot配置文件账号密码加密存储(springCloud nacos)

            最近公司有要求,项目中的配置文件不允许明文存储,全部要改为密文,收集了一些资料,在这里做下记录总结。 在Application启动主类中加入启动注解 @EnableEncryptableProperties 注:加密因子password配置在配置文件中也会涉及到安全问题,更安全的做法是:将其作为系

    2024年02月16日
    浏览(35)
  • 手动配置 kafka 用户密码,认证方式等的方式

    部分场景会指定使用某一kafka 来提高安全性,这里就不得不使用用户密码认证方式等来控制

    2024年02月02日
    浏览(32)
  • 为Kafka添加用户名密码支持并配置ssl协议

    在使用kafka过程中,出于安全考虑需要给kafka配置支持用户名密码验证(zookeeper使用kafka自带)和SSL验证 推荐下载kafka的kafka_2.13-3.5.0版本,本配置方法基于kafka_2.13-3.5.0版本,其他版本可能略有调整。kafka安装目录为/opt/kafka,jdk安装目录为/opt/jdk。 1.编写生成证书的脚本,详细步骤

    2024年02月08日
    浏览(66)
  • Springboot 配置使用 Kafka

    不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,yml 配置,Config 工厂代码配置都有,batch-size、acks、offset、auto-commit、trusted-packages、poll-timeout、linger 应有尽有,批量消费、开启事务、定义批量消费数量、延时发送、失败重试、异常处理你还想要什么 As we all kno

    2024年02月02日
    浏览(27)
  • Springboot Kafka 集成配置

    Springboot 配置使用 Kafka 前言 一、Linux 安装 Kafka 二、构建项目 三、引入依赖 四、配置文件 生产者 yml 方式 Config 方式 消费者 yml 方式 Config 方式 五、开始写代码 生产者 发送 成功回调和异常处理 消费者 接收 异常处理 七、开始测试 测试普通单条消息 测试消费者异常处理 测试

    2024年02月08日
    浏览(47)
  • SpringBoot 集成 Kafka 配置

    自定义分区器 生产者 消费者 配置文件 自定义分区器 生产者 消费者配置 消费数据过滤 消费异常处理类 消费者配置  @KafkaListener

    2024年02月15日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包