springboot整合kafka多数据源

这篇具有很好参考价值的文章主要介绍了springboot整合kafka多数据源。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

项目背景

在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。

依赖

    implementation 'org.springframework.kafka:spring-kafka:2.8.2'

配置

单机的情况
如果是单机的kafka我们直接通过springboot自动配置的就可以使用,例如在yml里面直接引用

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    bootstrap-servers: server001.bbd:9092

在使用的时候直接注入,然后就可以使用里面的方法了

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

springboot整合kafka多数据源,spring boot,kafka

多数据源情况下

本篇文章主要讲的是在多数据源下的使用,和单机的有所不同,我也看了网上的一些博客,但是当我去按照网上的配置的时候,总是会报错 kafakTemplate这个bean找不到,所以没办法只有按照springboot自动配置里面的来改
springboot整合kafka多数据源,spring boot,kafka文章来源地址https://www.toymoban.com/news/detail-649560.html

package com.ddb.zggz.config;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.io.IOException;

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {

    private final KafkaProperties properties;

    private final KafkaSecondProperties kafkaSecondProperties;



    public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {
        this.properties = properties;
        this.kafkaSecondProperties = kafkaSecondProperties;
    }

    @Bean("kafkaTemplate")
    @Primary
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaSecondTemplate")
    public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                                   @Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,
                                                   ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaProducerListener")
    @Primary
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<>();
    }


    @Bean("kafkaSecondProducerListener")
    public ProducerListener<Object, Object> kafkaSecondProducerListener() {
        return new LoggingProducerListener<>();
    }

    @Bean("kafkaConsumerFactory")
    @Primary
    public ConsumerFactory<Object, Object> kafkaConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.properties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondConsumerFactory")
    public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.kafkaSecondProperties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }


    @Bean("zwKafkaContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaSecondConsumerFactory);
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }


    @Bean("kafkaProducerFactory")
    @Primary
    public ProducerFactory<Object, Object> kafkaProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondProducerFactory")
    public ProducerFactory<Object, Object> kafkaSecondProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.kafkaSecondProperties.buildProducerProperties());
        String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
        KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
        KafkaProperties.Jaas jaasProperties = this.properties.getJaas();
        if (jaasProperties.getControlFlag() != null) {
            jaas.setControlFlag(jaasProperties.getControlFlag());
        }
        if (jaasProperties.getLoginModule() != null) {
            jaas.setLoginModule(jaasProperties.getLoginModule());
        }
        jaas.setOptions(jaasProperties.getOptions());
        return jaas;
    }

    @Bean("kafkaAdmin")
    @Primary
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
        return kafkaAdmin;
    }

}


生产者


package com.ddb.zggz.event;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

@Component
@Slf4j
public class KafkaPushEvent {


    @Resource
    private KafkaTemplate<String, String> kafkaSecondTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ApplicationConfiguration configuration;


    public void pushEvent(PushParam param) {
        ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;
        if ("zw".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if ("net".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if (sendResultListenableFuture == null){
            throw new IllegalArgumentException("kakfa发送消息失败");
        }
        sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka发送的message报错,发送数据:{}", param);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("kafka发送的message成功,发送数据:{}", param);
            }
        });


    }


}

消费者

package com.ddb.zggz.event;

import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;


@Component
@Slf4j
public class SendMessageListener {

    @Autowired
    private GzApprovalService gzApprovalService;

    @Autowired
    private GzServiceService gzServiceService;

    @KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")
    @RetryableTopic(include = {Exception.class},
            backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000)
    )
    public void listen(ConsumerRecord<?, ?> consumerRecord) {
        String value = (String) consumerRecord.value();
        PushParam pushParam = JSONObject.parseObject(value, PushParam.class);

        //版本提审
        if ("version-approval".equals(pushParam.getEvent())) {
            ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);
            gzApprovalService.approval(approvalDTO);
        }

        //服务下架
        if (pushParam.getEvent().equals("server-OffShelf-gzt")) {
            OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);
            gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());

        }

    }
    @DltHandler
    public void processMessage(String message) {

    }
}

消息体

package com.ddb.zggz.event;

import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * @author bbd
 */
@Data
public class PushParam implements Serializable {

    /**
     * 发送的消息数据
     */
    private Object data;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JSONField(format = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime createTime = LocalDateTime.now();

    /**
     * 事件名称,用于消费者处理相关业务
     */
    private String event;


    /**
     * 保存版本参数
     */
    public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzH5VersionManage);
        pushParam.setEvent("save-version");
        return pushParam;
    }

    /**
     * 保存服务参数
     */
    public static PushParam toKafkaServer(GzService gzService) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzService);
        pushParam.setEvent("save-server");
        return pushParam;
    }

到了这里,关于springboot整合kafka多数据源的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot入门(07):整合 MySQL 和 Druid数据源 | 全网最详细保姆级教学(两万字)

            作为现代Web应用开发的重要技术栈之一,Spring Boot在快速构建可靠、高效、易维护的应用方面具有独特的优势。而在实际开发中,数据库作为系统的重要组成部分,对于数据源的选择和配置也是至关重要的。本篇文章将全面介绍如何使用Spring Boot整合MySQL和Druid数据

    2024年02月12日
    浏览(50)
  • springboot整合多数据源的配置以及动态切换数据源,注解切换数据源

    在许多应用程序中,可能需要使用多个数据库或数据源来处理不同的业务需求。Spring Boot提供了简便的方式来配置和使用多数据源,使开发人员能够轻松处理多个数据库连接。如果你的项目中可能需要随时切换数据源的话,那我这篇文章可能能帮助到你 ℹ️:这里对于pom文件

    2024年02月10日
    浏览(49)
  • 【Spring Boot 3】【数据源】自定义多数据源

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年02月01日
    浏览(60)
  • 【Spring Boot 3】【数据源】自定义JPA数据源

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月21日
    浏览(70)
  • SpringBoot整合MybatisPlus多数据源

    相信在很多使用MybatisPlus框架的小伙伴都会遇到多数据源的配置问题,并且官网也给出了推荐使用多数据源 (dynamic-datasource-spring-boot-starter) 组件来实现。由于最近项目也在使用这个组件来实现多数据源切换,因此想了解一下该组件是如何运行的,经过自己的调试,简单记录一

    2024年02月13日
    浏览(35)
  • 【Spring Boot 3】【数据源】自定义JDBC多数据源

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月23日
    浏览(49)
  • 【Spring Boot 3】【数据源】自定义JPA多数据源

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月22日
    浏览(80)
  • MyBatis整合Springboot多数据源实现

    数据源,实际就是数据库连接池,负责管理数据库连接,在 Springboot 中,数据源通常以一个 bean 的形式存在于 IOC 容器中,也就是我们可以通过依赖注入的方式拿到数据源,然后再从数据源中获取数据库连接。 那么什么是多数据源呢,其实就是 IOC 容器中有多个数据源的 bea

    2023年04月22日
    浏览(61)
  • SpringBoot整合Druid配置多数据源

    目录 1.初始化项目 1.1.初始化工程 1.2.添加依赖 1.3.配置yml文件 1.4.Spring Boot 启动类中添加 @MapperScan 注解,扫描 Mapper 文件夹 1.5.配置使用数据源 1.5.1.注解方式 1.5.2.基于AOP手动实现多数据源原生的方式 2.结果展示 Mybatis-Plus:简介 | MyBatis-Plus (baomidou.com) 在正式开始之前,先初始

    2024年02月01日
    浏览(51)
  • Spring Boot 配置双数据源

    Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies. 需求: 1.基本步骤 添加依赖 添加 Spring Boot 和数据库驱动的依赖 配置数据源 在 application.properties 或 application.yml 中分别配

    2024年01月22日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包