spring boot配置双Kafka方法

这篇具有很好参考价值的文章主要介绍了spring boot配置双Kafka方法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

第一步:application.yml的配置

server:
  port: 8080
spring:
  application:
    name: demo 
  kafka:
    one:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        group-id: xxxx
        enable-auto-commit: true
    two:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        group-id: xxxx
        enable-auto-commit: true

第二步:配置config

@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate<String, String> xxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(20);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}
@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate<String, String> xxxxxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(6);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        //        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}

注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行

第三步:

@Resource
private KafkaTemplate<String, String> xxxOneTemplate;
@Resource
private KafkaTemplate<String, String> xxxxTwoTemplate;

直接在你要用到的类中直接引用就行。

跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有研究成功就不献丑了。文章来源地址https://www.toymoban.com/news/detail-504180.html

到了这里,关于spring boot配置双Kafka方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【配置nacos】使用application.yml配置文件来配置spring-cloud-starter-alibaba-nacos-config

    1.首先修改pom.xml文件,引入spring-cloud-starter-alibaba-nacos-config依赖 2.在应用的 /src/main/resources/application.yml 配置文件中配置 Nacos Config 元数据 这里注意一下, 官方文档上面给的使用bootstrap.properties 来进行配置的, 我是使用的application.yml 来进行配置的。 【出现的问题以及解决】

    2024年01月24日
    浏览(39)
  • Spring Boot读取yml或者properties配置信息

    编写配置类 开始使用

    2024年02月14日
    浏览(44)
  • 【Spring Boot】application 配置文件(2)

    server.servlet.session. timeout =1800  定义了 HTTP 会话的超时时间,单位是秒,在此设置中,会话的超时时间被设置为 1800秒 即30分钟。这意味着 ,如果30分钟内用户没有与服务器进行任何活动(如请求页面),那么会话将自动过期。  spring.jackson.time-zone=GMT+8 这个设置指定了jackson 库

    2024年01月23日
    浏览(30)
  • Spring Boot配置文件:properties 与 yml 的竞争

    本文,我们就要来介绍一下Spring Boot中的配置文件 ,在学习了本文之后,可以很清楚地知道如何在Spring Boot中去配置项目的一些系统设置 首先我们先来聊聊什么是配置文件? 配置文件是一种 文本文件 ,通常用于存储程序的参数和设置。它包含了程序运行所需的各种选项和配

    2024年02月04日
    浏览(51)
  • 在Spring Boot微服务使用jasypt-spring-boot加密和解密yml配置文件

    记录 :424 场景 :在Spring Boot微服务,使用jasypt-spring-boot加密和解密yml配置文件中的配置信息。 版本 :JDK 1.8,Spring Boot 2.6.3,jasypt-1.9.3,jasypt-spring-boot-2.1.2, jasypt-spring-boot-3.0.5。 开源地址 :https://github.com/ulisesbocchio/jasypt-spring-boot 1.在Spring Boot微服务使用jasypt-spring-boot-3.0.5版本

    2024年02月09日
    浏览(59)
  • Spring Boot Application.properties和yaml配置文件

    全局配置文件能够对一些默认配置值进行修改。Spring Boot使用一个application.properties或者application.yaml的文件作为全局配置文件,该文件存放在src/main/resource目录或者类路径的/config,一般会选择resource目录。 使用Spring Initializr方式创建项目——PropertiesDemo 单击【Next】按钮 添加W

    2024年01月24日
    浏览(60)
  • Spring Boot微服务从yml文件中加载配置(使用@Value和@ConfigurationProperties)

    记录 :398 场景 :在Spring Boot的微服务中从application.yml等yml文件中加载自定义配置内容。使用@Value直接加载单个配置。使用@ConfigurationProperties注解把一个或者多个配置加载为Java对象。 版本 :JDK 1.8,SpringBoot 2.6.3 1.使用@Value注解加载配置 使用注解@RestController、@Service、@Component等

    2024年02月12日
    浏览(30)
  • 【Spring Boot】项目端口号冲突解决方法,一步到位

    启动项目遇到以下问题: Description: Web server failed to start. Port 8080 was already in use. Action: Identify and stop the process that’s listening on port 8080 or configure this application to listen on another port. Process finished with exit code 1 找到要启动的项目的配置文件 application.properties ,没有的话可以在 resource

    2024年02月02日
    浏览(40)
  • application.yml基础配置以及读取

    1.SpringBoot提供了3种配置文件的格式 properties(传统格式/默认格式) yml(主流格式) yaml 1. SpringBoot中导入对应starter后,提供对应配置属性 2.书写SpringBoot配置采用+提示形式书写 properties优先级yml优先级yaml优先级 不同配置文件中相同配置按照加载优先级相互覆盖,不同配置文

    2023年04月08日
    浏览(28)
  • application.yml mybatis最简单的配置

    2024年02月09日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包