spring集成kafka
需要依赖zookeeper,需提前启动
在server.properties文件中配置kafka连接zookeeper相关信息
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
在zookeeper.properties中配置zookeeper所需配置
# 数据文件保存地址
dataDir=/tmp/zookeeper
# 客户端端口
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# 设置此功能端口将不在冲突
admin.enableServer=false
# admin.serverPort=8080
kafka本地安装启动
windows下载kafka二进制包到本机:http://kafka.apache.org/downloads
2、在config下面的server.properties文件,修改:
listeners=PLAINTEXT://localhost:9092
log.dirs=F:\kafka_2.13-2.5.0\logs
3、在bin同级目录下打开shell窗口,启动kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4、创建主题 查看可用主题
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
5、删除指定topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic topic_kedacom_icms_alarm_jy_3725
5.1、如果出现临时存储的topic需要到zookeeper删除指定的topic
#查看存储的topic
ls /brokers/topics
#删除指定的topic
rmr /brokers/topics/topicName
6、另起窗口,开启指定topic
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_kedacom_icms_alarm_jy_3725
7、另起窗口、开启生产端
.\bin\windows\kafka-console-producer.bat --broker-list 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_jy_3725
8、另起窗口,开启消费端
chcp 65001
.\bin\windows\kafka-console-consumer.bat --bootstrap-server 189.1.0.55:9092 --topic topic_kedacom_icms_spdc_sj_3725 --from-beginning
如果遇到文本过长 指令识别错误,是因为存放目录过长不规范引起
pom文件
#在选择版本,高版本会提示缺少anntnationprocess...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
生产配置
/**
* @Auther: lyp
* @Date: 2021/11/22 15:46
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${bootstrap.servers}")
private String bootstrapServers;
public KafkaProducerConfig(){
System.out.println("kafka--------------------------------生产配置");
}
/**
* 创建生产值消息工厂
*/
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory(producerProperties());
}
/**
* 生产基本配置
*/
@Bean
public Map<String, Object> producerProperties() {
Map<String, Object> props = new HashMap<String, Object>();
//设置kafka访问地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//消息转化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//重试次数
props.put(ProducerConfig.RETRIES_CONFIG,1);
//分批处理内存设置
props.put(ProducerConfig.BATCH_SIZE_CONFIG,1048576);
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
//使用内存配置
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L);
//确认标志符使用配置
props.put(ProducerConfig.ACKS_CONFIG,"all");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
kafkaTemplate.setDefaultTopic(KafkaSendEnum.ALARM_WARN_PUSH.getTopic());
return kafkaTemplate;
}
}
消费者配置
package com.huating.jfp.msg.api.kafka.config;
import com.huating.jfp.msg.api.kafka.construct.KafkaConsumerEnum;
import com.huating.jfp.msg.api.kafka.listener.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author lyp
* @ClassName KafkaConsumerConfig
* @description: 消费者配置
* @datetime 2022年 07月 20日 9:15
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${bootstrap.servers}")
private String bootstrapServers;
public KafkaConsumerConfig() {
System.out.println("kafka消费者配置加载...");
}
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<String, Object>();
//Kafka服务地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//消费组
props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerEnum.SD_SJ.getGroupId());
//关闭自动提交位移
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//设置间隔时间,默认5000ms
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
//Key反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//Value反序列化
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区 下的数据
//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public KafkaConsumerListener kafkaConsumerListener() {
return new KafkaConsumerListener();
}
}
创建topic工具类
/**
* @author lyp
*/
public class KafkaTopicUtil {
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicUtil.class);
/**
* 功能描述:创建topic,并返回创建结果
* @param: topicName
* @return: boolean
* @auther: lyp
* @date: 2021/11/12 16:06
*/
public static boolean createTopics(String bootstrapServers,String topicName,int partitions,short replication) {
boolean res = false;
try {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
AdminClient adminClient = KafkaAdminClient.create(properties);
NewTopic newTopic = new NewTopic(topicName, partitions, replication);
adminClient.createTopics(Arrays.asList(newTopic));
logger.info("创建Topic:"+topicName+"成功!");
res = true;
} catch (Exception e) {
e.printStackTrace();
logger.info("创建异常!");
}
return res;
}
/**
* 功能描述:获取当前kafka所存在的topic列表
* @return: set
* @auther: lyp
* @date: 2021/11/12 16:07
*/
public static Set<String> getTopics(String bootstrapServers){
Set<String> nameSet = new HashSet<>();
try {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = KafkaAdminClient.create(properties);
ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> names = listTopicsResult.names();
nameSet = names.get();
} catch (Exception e) {
e.printStackTrace();
}
return nameSet;
}
}
生产业务
public interface KafkaProduceService {
/**设备报警消息发送*/
boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo);
}
/**
* @author lyp
*/
@Service("kafkaProducerService")
public class KafkaProducerServiceImpl implements KafkaProduceService {
private static final Logger logger = LoggerFactory.getLogger(KafkaProduceService.class);
@Value("${bootstrap.servers}")
private String bootstrapServers;
@Value("${topic.name}")
private String topicName;
@Value("${srcUnit.code}")
private String srcUnitCode;
@Value("${srcUnit.name}")
private String srcUnitName;
@Override
public boolean sendWarnMessage(DeviceWarnInfo deviceWarnInfo) {
boolean res = false;
Map<String, Object> reportData = new HashMap<>();
reportData.put("command","reportAlarm");
deviceWarnInfo.setSrcUnitCode(srcUnitCode);
deviceWarnInfo.setSrcUnitName(srcUnitName);
reportData.put("data",deviceWarnInfo);
//判断是否存在当前主题
Set<String> topics = KafkaTopicUtil.getTopics(bootstrapServers);
if (!topics.contains(KafkaSendEnum.ALARM_WARN_PUSH.getTopic())){
if (!KafkaTopicUtil.createTopics(bootstrapServers,topicName,1,(short)1)){
logger.info("topic创建失败,消息发送不成功!");
return res;
}
}
KafkaTemplate kafkaTemplate = SpringContextUtil.getBean("kafkaTemplate");
ListenableFuture send = kafkaTemplate.sendDefault(topicName, JSONArray.toJSONString(reportData));
send.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());
System.out.println("发送失败!");
}
@Override
public void onSuccess(Object result) {
logger.info("消息发送成功"+result.toString());
System.out.println("发送成功!");
}
});
return res;
}
}
消费业务
消息接收类
package com.huating.jfp.msg.api.kafka.entity;
/**
* @author lyp
* @ClassName MesBody
* @description: 消息实体
* @datetime 2022年 07月 21日 14:48
*/
@Data
public class MesBody {
//类型标记字段
private String command;
//消息实体字段
private String data;
}
监听类
package com.huating.jfp.msg.api.kafka.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import com.alibaba.fastjson.JSONObject;
import com.huating.jfp.msg.api.kafka.construct.KafkaMesType;
import com.huating.jfp.msg.api.kafka.construct.KafkaTopics;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.MesBody;
import com.huating.jfp.msg.api.kafka.entity.Notice;
/**
* @author lyp
* @ClassName KafkaConsumerListener
* @description: 主题监听
* @datetime 2022年 07月 20日 9:27
*/
public class KafkaConsumerListener {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
@Autowired
private KafkaConsumerService consumerService;
/**
* 功能描述: 监听指定topic,多个使用,
* groupId:分组id
* topics:监听当前topic数组
* topic:监听单个topic
*/
@KafkaListener(groupId = "${group.id}",topics = "#{'${consumer.topics}'.split(',')}",containerFactory = "")
public void listener(ConsumerRecord<String, String> consumerRecord) {
logger.info("开始消费" + KafkaTopics.SD_JY_DUTY_TOPIC.getTopicName() + "的消息{}", consumerRecord.value());
MesBody mesBody = JSONObject.parseObject(consumerRecord.value(), MesBody.class);
logger.error("kafka监听-当前消息类型:"+mesBody.getCommand());
//督查督办
if (mesBody.getCommand().equals(KafkaMesType.SD_INSPECTOR_ISSUE.getMesCode())
|| mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_DISPOSE.getMesCode())
|| mesBody.getCommand().equals(KafkaMesType.SD_INSPECT_RES.getMesCode())) {
logger.error("督查督办监听消息处理开始----->----->");
InspectorIssue inspectorIssue = JSONObject.parseObject(mesBody.getData(), InspectorIssue.class);
consumerService.inspectorListener(inspectorIssue);
}
//通知通报
if (mesBody.getCommand().equals(KafkaMesType.SD_NOTICE_ISSUE.getMesCode())) {
logger.error("通知通报开始监听");
Notice notice = JSONObject.parseObject(mesBody.getData(), Notice.class);
consumerService.noticeListener(notice);
}
}
}
业务处理
package com.huating.jfp.msg.api.kafka.consumer.service;
import com.huating.jfp.msg.api.kafka.entity.InspectorIssue;
import com.huating.jfp.msg.api.kafka.entity.Notice;
/**
* @author lyp
*/
public interface KafkaConsumerService {
/**
* 功能描述: 督查下发 督查办结监听处理
*
* @param inspectorIssue
*/
void inspectorListener(InspectorIssue inspectorIssue);
/**
* 功能描述: 通知通报下发监听
*
* @param notice
*/
void noticeListener(Notice notice);
}
package com.huating.jfp.msg.api.kafka.consumer.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.huating.jfp.common.dao.MsgConfigureDao;
import com.huating.jfp.common.dao.MsgDao;
import com.huating.jfp.common.entity.Msg;
import com.huating.jfp.common.entity.MsgConfigure;
import com.huating.jfp.common.entity.MsgReceive;
import com.huating.jfp.common.service.MsgReceiveService;
import com.huating.jfp.core.base.ViewPublicRewrite;
import com.huating.jfp.msg.api.http.servcie.HttpRequestService;
import com.huating.jfp.msg.api.kafka.consumer.service.KafkaConsumerService;
import com.huating.jfp.msg.api.kafka.dao.InspectorEventMapper;
import com.huating.jfp.msg.api.kafka.dao.NoticeMapper;
import com.huating.jfp.msg.api.kafka.entity.*;
import com.huating.jfp.msg.api.kafka.producer.service.KafkaProducerService;
import com.huating.jfp.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author lyp
* @ClassName KafkaConsumerServiceImpl
* @description: 消费实现
* @datetime 2022年 07月 20日 9:13
*/
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
private static final String SRC_UNIT_CODE = "gaol_code";
private static final String SRC_UNIT_NAME = "gaol_name";
@Autowired
private InspectorEventMapper inspectorEventMapper;
@Autowired
private HttpRequestService httpRequestService;
@Autowired
private NoticeMapper noticeMapper;
@Autowired
private MsgDao msgMapper;
@Autowired
private MsgConfigureDao msgConfigureMapper;
@Autowired
private MsgReceiveService msgReceiveService;
@Autowired
private ViewPublicRewrite vp;
@Override
public void inspectorListener(InspectorIssue inspectorIssue) {
if (!StrUtil.isEmpty(inspectorIssue.getUuid())) {
if (!StrUtil.isEmpty(inspectorIssue.getDubanTime())) {
logger.error("督办下发处理");
InspectorEventDispose inspectorEventDispose = new InspectorEventDispose();
//督查督办
String uuid = StringUtil.getUUID();
inspectorEventDispose.setIedUuid(uuid);
inspectorEventDispose.setIedIeUuid(inspectorIssue.getUuid());
inspectorEventDispose.setIedExpireTime(inspectorIssue.getDubanTime());
inspectorEventDispose.setIedContent(inspectorIssue.getContent());
//督办下发持久化
inspectorEventMapper.insertDispose(inspectorEventDispose);
logger.error("督办下发数据新增完成");
//督办文件持久化
List<FileEntity> files = inspectorIssue.getFiles();
List<InspectorFile> fileList = new ArrayList<>();
downloadFile(files, fileList, uuid);
inspectorEventMapper.insertFiles(fileList);
logger.error("督办下发完成");
} else if (!StrUtil.isEmpty(inspectorIssue.getSrcUnitCode())) {
logger.error("督查下发处理");
InspectorEvent inspectorEvent = new InspectorEvent();
//督查下发
inspectorEvent.setIeUuid(inspectorIssue.getUuid());
inspectorEvent.setIeAreaCode(vp.getBusinessValue("gaol_code"));
inspectorEvent.setIeEventType(inspectorIssue.getType());
inspectorEvent.setIeDescribe(inspectorIssue.getContent());
inspectorEvent.setIeGrabTime(inspectorIssue.getPublishTime());
inspectorEvent.setIeExpireTime(inspectorIssue.getQxTime());
inspectorEvent.setIeCusNunmber(vp.getBusinessValue("base_cus"));
inspectorEvent.setIeNature(inspectorIssue.getNature());
inspectorEvent.setIeIsSj(0);
//督查下发持久化
inspectorEventMapper.insertSynData(inspectorEvent);
logger.error("督查下发数据新增成功");
//督查文件持久化
List<FileEntity> files = inspectorIssue.getFiles();
List<InspectorFile> fileList = new ArrayList<>();
downloadFile(files, fileList, inspectorIssue.getUuid());
inspectorEventMapper.insertFiles(fileList);
logger.error("督查文件数据新增成功");
logger.error("督查下发完成");
} else {
//督查办结
if (inspectorEventMapper.searchIsSj(inspectorIssue.getUuid()) > 0) {
//修改督查状态为办结
inspectorEventMapper.updateState("3", inspectorIssue.getUuid());
logger.error("督查办结完成");
}
}
}
}
@Override
public void noticeListener(Notice notice) {
logger.error("通知通报下发开始处理");
//通知通报持久化
noticeMapper.insertData(notice);
Msg msg = new Msg();
String uuid = StringUtil.getUUID();
msg.setMUuid(uuid);
MsgConfigure msgConfigure = new MsgConfigure();
msgConfigure.setMcCode("NOTIC_ISSUE");
MsgConfigure config = msgConfigureMapper.selectByData(msgConfigure).get(0);
msg.setMcUuid(config.getMcUuid());
msg.setMcMsglevel(config.getMcMsglevel());
msg.setMStatus(Byte.parseByte(notice.getFeedback() == 0 ? "1" : "0"));
msg.setMParam(notice.getUuid());
msg.setMContent(notice.getTitle());
msg.setCreateTime(new Date());
if (notice.getFeedback() == 0) {
msg.setMHandleTime(new Date());
msg.setMHandleUser("当前通知通报无需处置");
}
msgMapper.insertMsg(msg);
MsgReceive msgReceive = new MsgReceive();
msgReceive.setMrUuid(StringUtil.getUUID());
msgReceive.setmUuid(uuid);
msgReceiveService.insertMsgReceive(msgReceive);
//文件持久化
List<FileEntity> files = notice.getFiles();
noticeDownloadFile(files, notice.getUuid());
noticeMapper.insertFiles(files);
}
private void downloadFile(List<FileEntity> files, List<InspectorFile> fileList, String uuid) {
logger.error("文件下载开始");
if (!files.isEmpty()) {
for (FileEntity file : files) {
InspectorFile inspectorFile = new InspectorFile();
String fileName = file.getFileName();
logger.error(fileName);
inspectorFile.setIfFileName(fileName);
String last = fileName.substring(fileName.lastIndexOf("."));
if (last.equals(".jpg") || last.equals(".JPG") || last.equals(".png") || last.equals(".gif") || last.equals(".bmp")) {
inspectorFile.setIfFileType(1);
} else {
inspectorFile.setIfFileType(2);
}
inspectorFile.setIfSourceType(1);
inspectorFile.setIfIeUuid(uuid);
//需要确定省局的其他类型文件详情
inspectorFile.setIfPath(file.getFileName());
String fileId = file.getFileId();
//文件下载
String token = httpRequestService.sendPostMessage();
boolean res = httpRequestService.downloadFile(fileId, token, vp.getCusBusinessValue("duty_file_disk_mapping_path", "1000") + "/dutyUpLoad/");
if (res) {
fileList.add(inspectorFile);
}
}
}
}
private void noticeDownloadFile(List<FileEntity> files, String uuid) {
files.stream().forEach((file) -> {
file.setParentId(uuid);
String token = httpRequestService.sendPostMessage();
httpRequestService.downloadFile(file.getFileId(), token, vp.getBusinessValue("notice_file_disk_mapping_path") + "/noticeUpLoad/");
});
}
public boolean checkSj() {
return vp.getBusinessValue("is_sj") != null &&
Boolean.parseBoolean(vp.getBusinessValue("is_sj"));
}
}
异步 同步 ONEWAY
kafka消息发送方式有同步、异步和ONEWAY三种方式,producer.type参数指定同步或者异步,request.require.acks指定ONEWAY。
producer.type=sync默认同步
设置异步需配套配置
Property | Default | Description |
---|---|---|
queue.buffering.max.ms | 5000 | 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。 |
queue.buffering.max.messages | 10000 | 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms | -1 | 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。 |
batch.num.messages | 200 | 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量) |
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。
在代码中如果需要同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。
kafkaTemplate.send().get("key",value);
异步发送只需要在发送成功获取消息是否成功即可:
ListenableFuture future = kafkaTemplate.send();
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error(ex.getMessage()+"发送失败!原因:"+ex.getCause());
}
@Override
public void onSuccess(Object result) {
logger.info("消息发送成功"+result.toString());
}
});
消息可靠性文章来源:https://www.toymoban.com/news/detail-775283.html
producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:文章来源地址https://www.toymoban.com/news/detail-775283.html
- 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
- 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
- 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
到了这里,关于spring集成kafka并对消息进行监听的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!