SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

这篇具有很好参考价值的文章主要介绍了SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

 更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。

一、背景

        因业务发展需要,需要对接kafka,快速批量接收消息日志,避免消息日志累积过多,必须做到数据处理后,动态插入到库表(相同表结构,不同表名)下,并且还要支持批量事务提交,实现消息快速消费。(注意:源码文章最后有获取方式)

spring连接kafka代码,kafka,java,java,开发语言,kafka,spring boot,redis

二、核心代码

2.1、开启批量、并发消费

kafka:
    bootstrap-servers: 10.1.*.*:9092     #服务器的ip及端口,可以写多个,服务器之间用“:”间隔
    producer: #生产者配置 
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: #消费者配置
      #指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
      group-id: myGroup                 #设置消费者的组id default:Group
      enable-auto-commit: true  #设置自动提交offset
      auto-commit-interval: 2000  #默认值为5000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #值的反序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      auto-offset-reset: latest
      max-poll-records: 2000  #批量一次最大拉取数据量 默认500
    listener:
      # poll-timeout: 1000
      type: batch  # 开启批量消费
      concurrency: 3  #指定listener 容器中的线程数,用于提高并发量
    properties:
      session:
        timeout:
          ms: 120000  #默认10000
        max:
          poll:
            interval:
              ms: 600000  #默认300000(5分钟)

       说明:type: batch  # 开启批量消费, max-poll-records: 2000,批量消费每次最多消费记录数。这里设置 max-poll-records是2000,并不是说如果没有达到2000条消息,我们就一直等待。而是说一次poll最多返回的记录数为2000。concurrency: 3  #指定listener 容器中的线程数,用于提高并发量。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。例如:设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition。

2.2、多线程异步配置

    具体配置参加前面文章:SpringBoot使用@Async实现多线程异步

    注意:在启动类上需要加上注解@EnableAsync,开启异步。

spring连接kafka代码,kafka,java,java,开发语言,kafka,spring boot,redis

2.3、redis相关配置

1、yml相关配置:

spring:
  redis:
    # 地址
    host: 127.0.0.1
    # 端口,默认为6379
    port: 6379
    # 密码
    # 连接超时时间
    timeout: 10s
    lettuce:
      pool:
        # 连接池中的最小空闲连接
        min-idle: 0
        # 连接池中的最大空闲连接
        max-idle: 8
        # 连接池的最大数据库连接数
        max-active: 8
        # #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1ms

2、RedisConfig配置

package com.wonders.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 〈自定义redis序列化方式〉
 * @author yangyalin
 * @create 2018/11/1
 * @since 1.0.0
 */
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    /**
     * @Author yangyalin
     * @Description redisTemplate序列化使用的jdkSerializeable, 存储二进制字节码(默认), 所以自定义序列化类
     * 用于存储可视化内容
     * @Date 15:07 2018/11/1
     * @Param [redisConnectionFactory]
     * @return org.springframework.data.redis.core.RedisTemplate<java.lang.Object,java.lang.Object>
     **/
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
        RedisTemplate<Object,Object> redisTemplate=new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        //使用jackson2JsonRedisSerializer替换默认序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer=new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper=new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        //设置key和value的序列化规则
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

2.4、动态表名

    <!--插入到kafka日志临时表中-->
    <insert id="insertMsgInfoTemp" parameterType="com.wonders.entity.KafkaMsgConfig">
      INSERT INTO ${logTableName}("EVN_LOG_ID", "TABLE_NAME", "OPERATION", "PK_VALUE1", "PK_VALUE2",
           "PK_VALUE3", "PK_VALUE4", "PK_VALUE5", "TRANS_FLAG", "PKS", "BASE_CODE", "PLA_BRANCH_CODE",
           "CREATE_TIME","MSG_PRODUCE_TIME")
      VALUES (#{id,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR},
            #{pk1,jdbcType=VARCHAR}, #{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},
            #{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR}, 'Y',
            #{pks,jdbcType=VARCHAR}, #{baseCode,jdbcType=VARCHAR},
            #{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})
    </insert>

    说明:1、#{} :会根据参数的类型进行处理,当传入String类型,则会为参数加上双引号(占位符);2、${} :将参数取出不做任何处理,直接放入语句中,就是简单的字符串替换(替换符)。

2.5、sql批量提交

public void batchInsert(List<KafkaMsgInfo> kafkaMsgInfoList) throws Exception{
        //如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交
        // 创建session实列
        SqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory");
        // 开启批量处理模式 BATCH 、关闭自动提交事务 false
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);
        KafkaMsgConfigMapper KafkaMsgMapper = sqlSession.getMapper(KafkaMsgConfigMapper.class);
        int BATCH = 1000;
        for (int i = 0,size=kafkaMsgInfoList.size(); i < size; i++) {
            //循环插入 + 开启批处理模式
            KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));
            if (i != 0 && i % BATCH == 0) {
                sqlSession .commit();
            }
        }
        // 一次性提交事务
        sqlSession.commit();
        // 关闭资源
        sqlSession.close();
    }
2.6、业务代码
 @KafkaListener(topics = {"${mykafka.topics:mytopic}"})
    public void myMQConsumer(List<String> msgList){
        log.info("接收到的消息条数size:"+msgList.size());
        //计算程序耗时时间
        StopWatch stopWatch = new StopWatch();
        // 开始计时
        stopWatch.start();
        this.getKafkaMsgAndDel(msgList);  //2、接收kafka日志并解析
        stopWatch.stop();
        log.info("本次任务耗时(秒):" + stopWatch.getLastTaskTimeMillis()/1000 + "s");
    }

三、测试结果

spring连接kafka代码,kafka,java,java,开发语言,kafka,spring boot,redis

序号 kafka数量(万条) 消耗(秒)
1 1 3
2 10 13
3 100 120

 

更多详细资料,请关注个人微信公众号或搜索“程序猿小杨”添加。

回复:源码,可以获取该项目对应的源码及表结构,开箱即可使用。

spring连接kafka代码,kafka,java,java,开发语言,kafka,spring boot,redis

spring连接kafka代码,kafka,java,java,开发语言,kafka,spring boot,redis

推荐文章:

    1、SpringBoot使用@Async实现多线程异步;

    2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;

    3、SpringBoot用线程池ThreadPoolExecutor处理百万级数据。文章来源地址https://www.toymoban.com/news/detail-682758.html

到了这里,关于SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(73)
  • Kafka是什么,以及如何使用SpringBoot对接Kafka

    上手第一关,手把手教你安装kafka与可视化工具kafka-eagle 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 防止消息丢失与消息重复——Kafka可靠性分析及优化实践 继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是

    2024年02月08日
    浏览(32)
  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(55)
  • ChatGPT工作提效之使用python开发对接百度地图开放平台API的实战方案(批量路线规划、批量获取POI、突破数量有限制、批量地理编码)

    ChatGPT工作提效之初探路径独孤九剑遇强则强 ChatGPT工作提效之在程序开发中的巧劲和指令(创建MySQL语句、PHP语句、Javascript用法、python的交互) ChatGPT工作提效之生成开发需求和报价单并转为Excel格式 ChatGPT工作提效之小鹅通二次开发批量API对接解决方案(学习记录同步、用户注

    2024年02月06日
    浏览(49)
  • 【并发编程】线程池多线程异步去分页调用其他服务接口获取海量数据

    前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。 刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些

    2024年02月13日
    浏览(36)
  • Spring-Kafka如何实现批量消费消息并且不丢失数据

    先给答案: 某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同

    2024年02月13日
    浏览(46)
  • Selenium处理异步加载请求获取XHR消息体的2种方法

    目录 通过Log读取XHR 简单使用示例 异步加载情况下,不涉及浏览器全局的加载,因此selenium会直接往下执行,这就导致异步结果还没返回,脚本就继续执行了。 构造chrome driver: 通过log来获取xhr: 其中,上述中“message”的消息如下: 通过requestId可以获得详细的消息体: Git

    2023年04月08日
    浏览(34)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(48)
  • SpringBoot异步任务获取HttpServletRequest

    在使用框架日常开发中需要在controller中进行一些异步操作减少请求时间,但是发现在使用@Anysc注解后会出现Request对象无法获取的情况,本文就此情况给出完整的解决方案 @Anysc注解会开启一个新的线程,主线程的Request和子线程是不共享的,所以获取为null 在使用springboot的自定

    2024年02月21日
    浏览(52)
  • springboot 集成 kafka批量消费数据

    yaml配置文件

    2024年02月13日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包