spring集成kafka并对消息进行监听

这篇具有很好参考价值的文章主要介绍了spring集成kafka并对消息进行监听。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

spring集成kafka


需要依赖zookeeper,需提前启动

在server.properties文件中配置kafka连接zookeeper相关信息

############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

在zookeeper.properties中配置zookeeper所需配置

# 数据文件保存地址
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# 设置此功能端口将不在冲突
admin.enableServer=false
# admin.serverPort=8080

kafka本地安装启动

windows下载kafka二进制包到本机:http://kafka.apache.org/downloads
2、在config下面的server.properties文件,修改:
listeners=PLAINTEXT://localhost:9092
log.dirs=F:\kafka_2.13-2.5.0\logs
3、在bin同级目录下打开shell窗口,启动kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4、创建主题 查看可用主题
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
5、删除指定topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic topic_kedacom_icms_alarm_jy_3725
5.1、如果出现临时存储的topic需要到zookeeper删除指定的topic
#查看存储的topic
ls /brokers/topics
#删除指定的topic
rmr /brokers/topics/topicName
6、另起窗口,开启指定topic
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_kedacom_icms_alarm_jy_3725
7、另起窗口、开启生产端
.\bin\windows\kafka-console-producer.bat --broker-list 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_jy_3725
8、另起窗口,开启消费端
chcp 65001
.\bin\windows\kafka-console-consumer.bat --bootstrap-server 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_sj_3725 --from-beginning
如果遇到文本过长 指令识别错误,是因为存放目录过长不规范引起

pom文件

#在选择版本,高版本会提示缺少anntnationprocess...
   <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.8.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>commons-httpclient</groupId>
      <artifactId>commons-httpclient</artifactId>
      <version>3.1</version>
    </dependency>
  </dependencies>

生产配置

/**
 * @Auther: lyp
 * @Date: 2021/11/22 15:46
 */
 @Configuration
 @EnableKafka
 public class KafkaProducerConfig {

     @Value("${bootstrap.servers}")
     private String bootstrapServers;

     public KafkaProducerConfig(){
         System.out.println("kafka--------------------------------生产配置");
     }

     /**
     * 创建生产值消息工厂
     */
     @Bean
     public ProducerFactory<Integer, String> producerFactory() {
         return new DefaultKafkaProducerFactory(producerProperties());
     }

    /**
     * 生产基本配置
     */
    @Bean
     public Map<String, Object> producerProperties() {
         Map<String, Object> props = new HashMap<String, Object>();
         //设置kafka访问地址
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         //消息转化
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
         //重试次数
         props.put(ProducerConfig.RETRIES_CONFIG,1);
         //分批处理内存设置
         props.put(ProducerConfig.BATCH_SIZE_CONFIG,1048576);
         props.put(ProducerConfig.LINGER_MS_CONFIG,1);
         //使用内存配置
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L);
         //确认标志符使用配置
         props.put(ProducerConfig.ACKS_CONFIG,"all");
         return props;
     }
 
     @Bean
     public KafkaTemplate<Integer, String> kafkaTemplate() {
         KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
         kafkaTemplate.setDefaultTopic(KafkaSendEnum.ALARM_WARN_PUSH.getTopic());
         return kafkaTemplate;
     }
 
 }

消费者配置

package com.huating.jfp.msg.api.kafka.config;

import com.huating.jfp.msg.api.kafka.construct.KafkaConsumerEnum;
import com.huating.jfp.msg.api.kafka.listener.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author lyp
 * @ClassName KafkaConsumerConfig
 * @description: 消费者配置
 * @datetime 2022年 07月 20日 9:15
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    public KafkaConsumerConfig() {
        System.out.println("kafka消费者配置加载...");
    }

    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        //Kafka服务地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //消费组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerEnum.SD_SJ.getGroupId());
        //关闭自动提交位移
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //设置间隔时间,默认5000ms
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        //Key反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 					                                             "org.apache.kafka.common.serialization.StringSerializer");
        //Value反序列化
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,                                                                   "org.apache.kafka.common.serialization.StringSerializer");
        //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
	   //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区		下的数据
	   //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的			offset,则抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>                                                                  kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new                                                                          ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public KafkaConsumerListener kafkaConsumerListener() {
        return new KafkaConsumerListener();
    }
}

创建topic工具类

/**
 * @author lyp
 */
public class KafkaTopicUtil {

    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicUtil.class);

    /**
     * 功能描述:创建topic,并返回创建结果
     * @param: topicName
     * @return: boolean
     * @auther: lyp
     * @date: 2021/11/12 16:06
     */
    public static boolean createTopics(String bootstrapServers,String topicName,int partitions,short replication) {
        boolean res = false;
        try {
            Properties properties = new Properties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
            AdminClient adminClient = KafkaAdminClient.create(properties);
            NewTopic newTopic = new NewTopic(topicName, partitions, replication);
            adminClient.createTopics(Arrays.asList(newTopic));
            logger.info("创建Topic:"+topicName+"成功!");
            res = true;
        } catch (Exception e) {
            e.printStackTrace();
            logger.info("创建异常!");
        }
        return res;
    }

    /**
     * 功能描述:获取当前kafka所存在的topic列表
     * @return: set
     * @auther: lyp
     * @date: 2021/11/12 16:07
     */
    public static Set<String> getTopics(String bootstrapServers){
        Set<String> nameSet = new HashSet<>();
        try {
            Properties properties = new Properties();
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            AdminClient adminClient = KafkaAdminClient.create(properties);
            ListTopicsResult listTopicsResult = adminClient.listTopics();
            KafkaFuture<Set<String>> names = listTopicsResult.names();
            nameSet = names.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return nameSet;
    }
}

生产业务

public interface KafkaProduceService {

    /**设备报警消息发送*/
    boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo);
}
/**
 * @author lyp
 */
@Service("kafkaProducerService")
public class KafkaProducerServiceImpl implements KafkaProduceService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaProduceService.class);

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    @Value("${topic.name}")
    private String topicName;

    @Value("${srcUnit.code}")
    private String srcUnitCode;

    @Value("${srcUnit.name}")
    private String srcUnitName;

    @Override
    public boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo) {
        boolean res = false;
        Map<String, Object> reportData = new HashMap<>();
        reportData.put("command","reportAlarm");
        deviceWarnInfo.setSrcUnitCode(srcUnitCode);
        deviceWarnInfo.setSrcUnitName(srcUnitName);
        reportData.put("data",deviceWarnInfo);
        //判断是否存在当前主题
        Set<String> topics = KafkaTopicUtil.getTopics(bootstrapServers);
        if (!topics.contains(KafkaSendEnum.ALARM_WARN_PUSH.getTopic())){
            if (!KafkaTopicUtil.createTopics(bootstrapServers,topicName,1,(short)1)){
                logger.info("topic创建失败,消息发送不成功!");
                return res;
            }
        }

        KafkaTemplate kafkaTemplate = SpringContextUtil.getBean("kafkaTemplate");
        ListenableFuture send = kafkaTemplate.sendDefault(topicName, JSONArray.toJSONString(reportData));
        send.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());
                System.out.println("发送失败!");
            }

            @Override
            public void onSuccess(Object result) {
                logger.info("消息发送成功"+result.toString());
                System.out.println("发送成功!");
            }
        });
        return res;
    }

}

消费业务

消息接收类
package com.huating.jfp.msg.api.kafka.entity;

/**
 * @author lyp
 * @ClassName MesBody
 * @description: 消息实体
 * @datetime 2022年 07月 21日 14:48
 */
@Data
public class MesBody {
	//类型标记字段
    private String command;
	//消息实体字段
    private String data;
}


监听类
package com.huating.jfp.msg.api.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import com.alibaba.fastjson.JSONObject;
import com.huating.jfp.msg.api.kafka.construct.KafkaMesType;
import com.huating.jfp.msg.api.kafka.construct.KafkaTopics;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.MesBody;
import com.huating.jfp.msg.api.kafka.entity.Notice;

/**
 * @author lyp
 * @ClassName KafkaConsumerListener
 * @description: 主题监听
 * @datetime 2022年 07月 20日 9:27
 */

public class KafkaConsumerListener {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
    @Autowired
    private KafkaConsumerService consumerService;

    /**
     * 功能描述: 监听指定topic,多个使用,
     * groupId:分组id
     * topics:监听当前topic数组
     * topic:监听单个topic
     */
    @KafkaListener(groupId = "${group.id}",topics = "#{'${consumer.topics}'.split(',')}",containerFactory = "")
    public void listener(ConsumerRecord<String, String> consumerRecord) {
        logger.info("开始消费" + KafkaTopics.SD_JY_DUTY_TOPIC.getTopicName() + "的消息{}", consumerRecord.value());
        MesBody mesBody = JSONObject.parseObject(consumerRecord.value(), MesBody.class);
        logger.error("kafka监听-当前消息类型:"+mesBody.getCommand());
        //督查督办
        if (mesBody.getCommand().equals(KafkaMesType.SD_INSPECTOR_ISSUE.getMesCode())
                || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_DISPOSE.getMesCode())
                || mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_RES.getMesCode())) {
            logger.error("督查督办监听消息处理开始----->----->");
            InspectorIssue inspectorIssue = JSONObject.parseObject(mesBody.getData(), InspectorIssue.class);
            consumerService.inspectorListener(inspectorIssue);
        }

        //通知通报
        if (mesBody.getCommand().equals(KafkaMesType.SD_NOTICE_ISSUE.getMesCode())) {
            logger.error("通知通报开始监听");
            Notice notice = JSONObject.parseObject(mesBody.getData(), Notice.class);
            consumerService.noticeListener(notice);
        }
    }
}

业务处理
package com.huating.jfp.msg.api.kafka.consumer.service;


import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.Notice;

/**
 * @author lyp
 */
public interface KafkaConsumerService {

    /**
     * 功能描述: 督查下发 督查办结监听处理
     *
     * @param inspectorIssue
     */
    void inspectorListener(InspectorIssue inspectorIssue);

    /**
     * 功能描述: 通知通报下发监听
     *
     * @param notice
     */
    void noticeListener(Notice notice);
}


package com.huating.jfp.msg.api.kafka.consumer.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.huating.jfp.common.dao.MsgConfigureDao;
import com.huating.jfp.common.dao.MsgDao;
import com.huating.jfp.common.entity.Msg;
import com.huating.jfp.common.entity.MsgConfigure;
import com.huating.jfp.common.entity.MsgReceive;
import com.huating.jfp.common.service.MsgReceiveService;
import com.huating.jfp.core.base.ViewPublicRewrite;
import com.huating.jfp.msg.api.http.servcie.HttpRequestService;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.dao.InspectorEventMapper;
import com.huating.jfp.msg.api.kafka.dao.NoticeMapper;
import com.huating.jfp.msg.api.kafka.entity.*;
import com.huating.jfp.msg.api.kafka.producer.service.KafkaProducerService;
import com.huating.jfp.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * @author lyp
 * @ClassName KafkaConsumerServiceImpl
 * @description: 消费实现
 * @datetime 2022年 07月 20日 9:13
 */
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
    private static final String SRC_UNIT_CODE = "gaol_code";
    private static final String SRC_UNIT_NAME = "gaol_name";
    @Autowired
    private InspectorEventMapper inspectorEventMapper;
    @Autowired
    private HttpRequestService httpRequestService;
    @Autowired
    private NoticeMapper noticeMapper;

    @Autowired
    private MsgDao msgMapper;

    @Autowired
    private MsgConfigureDao msgConfigureMapper;
    @Autowired
    private MsgReceiveService msgReceiveService;
    @Autowired
    private ViewPublicRewrite vp;

    @Override
    public void inspectorListener(InspectorIssue inspectorIssue) {
        if (!StrUtil.isEmpty(inspectorIssue.getUuid())) {
            if (!StrUtil.isEmpty(inspectorIssue.getDubanTime())) {
                logger.error("督办下发处理");
                InspectorEventDispose inspectorEventDispose = new InspectorEventDispose();
                //督查督办
                String uuid = StringUtil.getUUID();
                inspectorEventDispose.setIedUuid(uuid);
                inspectorEventDispose.setIedIeUuid(inspectorIssue.getUuid());
                inspectorEventDispose.setIedExpireTime(inspectorIssue.getDubanTime());
                inspectorEventDispose.setIedContent(inspectorIssue.getContent());

                //督办下发持久化
                inspectorEventMapper.insertDispose(inspectorEventDispose);
                logger.error("督办下发数据新增完成");
                //督办文件持久化
                List<FileEntity> files = inspectorIssue.getFiles();
                List<InspectorFile> fileList = new ArrayList<>();
                downloadFile(files, fileList, uuid);
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督办下发完成");
            } else if (!StrUtil.isEmpty(inspectorIssue.getSrcUnitCode())) {
                logger.error("督查下发处理");
                InspectorEvent inspectorEvent = new InspectorEvent();
                //督查下发
                inspectorEvent.setIeUuid(inspectorIssue.getUuid());
                inspectorEvent.setIeAreaCode(vp.getBusinessValue("gaol_code"));
                inspectorEvent.setIeEventType(inspectorIssue.getType());
                inspectorEvent.setIeDescribe(inspectorIssue.getContent());
                inspectorEvent.setIeGrabTime(inspectorIssue.getPublishTime());
                inspectorEvent.setIeExpireTime(inspectorIssue.getQxTime());
                inspectorEvent.setIeCusNunmber(vp.getBusinessValue("base_cus"));
                inspectorEvent.setIeNature(inspectorIssue.getNature());
                inspectorEvent.setIeIsSj(0);
                //督查下发持久化
                inspectorEventMapper.insertSynData(inspectorEvent);
                logger.error("督查下发数据新增成功");
                //督查文件持久化
                List<FileEntity> files = inspectorIssue.getFiles();
                List<InspectorFile> fileList = new ArrayList<>();
                downloadFile(files, fileList, inspectorIssue.getUuid());
                inspectorEventMapper.insertFiles(fileList);
                logger.error("督查文件数据新增成功");
                logger.error("督查下发完成");
            } else {
                //督查办结
                if (inspectorEventMapper.searchIsSj(inspectorIssue.getUuid()) > 0) {
                    //修改督查状态为办结
                    inspectorEventMapper.updateState("3", inspectorIssue.getUuid());
                    logger.error("督查办结完成");
                }
            }
        }
    }

    @Override
    public void noticeListener(Notice notice) {
        logger.error("通知通报下发开始处理");
        //通知通报持久化
        noticeMapper.insertData(notice);

        Msg msg = new Msg();
        String uuid = StringUtil.getUUID();
        msg.setMUuid(uuid);

        MsgConfigure msgConfigure = new MsgConfigure();
        msgConfigure.setMcCode("NOTIC_ISSUE");
        MsgConfigure config = msgConfigureMapper.selectByData(msgConfigure).get(0);

        msg.setMcUuid(config.getMcUuid());
        msg.setMcMsglevel(config.getMcMsglevel());
        msg.setMStatus(Byte.parseByte(notice.getFeedback() == 0 ? "1" : "0"));
        msg.setMParam(notice.getUuid());
        msg.setMContent(notice.getTitle());
        msg.setCreateTime(new Date());
        if (notice.getFeedback() == 0) {
            msg.setMHandleTime(new Date());
            msg.setMHandleUser("当前通知通报无需处置");
        }
        msgMapper.insertMsg(msg);

        MsgReceive msgReceive = new MsgReceive();
        msgReceive.setMrUuid(StringUtil.getUUID());
        msgReceive.setmUuid(uuid);
        msgReceiveService.insertMsgReceive(msgReceive);

        //文件持久化
        List<FileEntity> files = notice.getFiles();
        noticeDownloadFile(files, notice.getUuid());
        noticeMapper.insertFiles(files);

    }

    private void downloadFile(List<FileEntity> files, List<InspectorFile> fileList, String uuid) {
        logger.error("文件下载开始");
        if (!files.isEmpty()) {
            for (FileEntity file : files) {
                InspectorFile inspectorFile = new InspectorFile();
                String fileName = file.getFileName();
                logger.error(fileName);
                inspectorFile.setIfFileName(fileName);
                String last = fileName.substring(fileName.lastIndexOf("."));
                if (last.equals(".jpg") || last.equals(".JPG") || last.equals(".png") || last.equals(".gif") || last.equals(".bmp")) {
                    inspectorFile.setIfFileType(1);
                } else {
                    inspectorFile.setIfFileType(2);
                }
                inspectorFile.setIfSourceType(1);
                inspectorFile.setIfIeUuid(uuid);
                //需要确定省局的其他类型文件详情
                inspectorFile.setIfPath(file.getFileName());
                String fileId = file.getFileId();
                //文件下载
                String token = httpRequestService.sendPostMessage();
                boolean res = httpRequestService.downloadFile(fileId, token, vp.getCusBusinessValue("duty_file_disk_mapping_path", "1000") + "/dutyUpLoad/");
                if (res) {
                    fileList.add(inspectorFile);
                }
            }
        }
    }

    private void noticeDownloadFile(List<FileEntity> files, String uuid) {
        files.stream().forEach((file) -> {
            file.setParentId(uuid);
            String token = httpRequestService.sendPostMessage();
            httpRequestService.downloadFile(file.getFileId(), token, vp.getBusinessValue("notice_file_disk_mapping_path") + "/noticeUpLoad/");
        });
    }

    public boolean checkSj() {
        return vp.getBusinessValue("is_sj") != null &&
                Boolean.parseBoolean(vp.getBusinessValue("is_sj"));
    }
}


异步 同步 ONEWAY

kafka消息发送方式有同步、异步和ONEWAY三种方式,producer.type参数指定同步或者异步,request.require.acks指定ONEWAY。

producer.type=sync默认同步

设置异步需配套配置

Property Default Description
queue.buffering.max.ms 5000 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
queue.buffering.max.messages 10000 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms -1 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。
batch.num.messages 200 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

在代码中如果需要同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send().get("key",value);

异步发送只需要在发送成功获取消息是否成功即可:

ListenableFuture future = kafkaTemplate.send();
future.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());
            }
            @Override
            public void onSuccess(Object result) {
                logger.info("消息发送成功"+result.toString());
            }
        });

消息可靠性

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:文章来源地址https://www.toymoban.com/news/detail-775283.html

  • 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
  • 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
  • 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

到了这里,关于spring集成kafka并对消息进行监听的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring Boot】集成Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月09日
    浏览(48)
  • 深入理解Spring Kafka中@KafkaListener注解的参数与使用方式

    Apache Kafka作为一个强大的消息代理系统,与Spring框架的集成使得在分布式应用中处理消息变得更加简单和灵活。Spring Kafka提供了 @KafkaListener 注解,为开发者提供了一种声明式的方式来定义消息监听器。在本文中,我们将深入探讨 @KafkaListener 注解的各种参数以及它们的使用方

    2024年01月16日
    浏览(51)
  • 实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq

    前面的博文我们介绍并实战演示了Spring Cloud Stream整合rabbitmq,其中主要介绍了如何使用和配置完成消息中间件的集成。但是,在实际的生产环境中可能会用到多个消息中间件,又或者是由于业务改变需要更换消息中间件,在这些情况下我们的Spring Cloud Stream框架可以完全兼容多

    2024年02月08日
    浏览(54)
  • spring cloud steam 整合kafka 进行消息发送与接收

    spring cloud steam : Binder和Binding Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于

    2024年02月10日
    浏览(43)
  • 使用 Spring Kafka 进行非阻塞重试的集成测试

    ​Kafka的非阻塞重试是通过为主题配置重试主题来实现的。如果需要,还可以配置额外的死信主题。如果所有重试都耗尽,事件将被转发到DLT。在公共领域中有很多资源可用于了解技术细节。对于代码中的重试机制编写集成测试确实是一项具有挑战性的工作。以下是一些测试

    2024年02月10日
    浏览(38)
  • kafka消息监听

    1, spring配置kafka网址 2,listener groupId表示分组,不同组的消费者不是竞争关系 3, 这段代码使用了Spring Kafka提供的注解 @KafkaListener 来定义一个 Kafka消费者 。具体的配置如下: groupId = \\\"order-service-2\\\" :指定该消费者所属的消费者组ID,即\\\"order-service-2\\\"。 topicPartitions :表示要订

    2024年02月15日
    浏览(27)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(46)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(42)
  • Java集成消息队列Kafka

    在使用Maven构建Java项目时,你可以通过添加Kafka的Maven依赖来引入Kafka相关的库。下面是Kafka的Maven坐标: 将上述依赖坐标添加到你的项目的pom.xml文件中,即可下载并引入Kafka客户端库。请注意,版本号可能会有所不同,你可以根据自己的需求选择最合适的版本。 另外,如果你

    2024年01月18日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包