1.kafka介绍
1.1.生产者介绍
1.1.1.生产者分区策略
- 轮询策略:Round-robin 策略,即顺序分配,轮询策略有非常优秀的负载均衡表 现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用)
- 随机策略:Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上。
- 消息键保序策略:key-ordering策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的。()
例如企业上传实时数据,将企业id作为key,可以确保每个企业的数据都在同一个分区,确保能按顺序消费。
如果不设置key,可能会出现17:30分的数据在1分区,17:31分的数据在2分区,但是1分区数据较多,2分区数据比较少,结果就是先消费了17:31的数据,后消费17:30的数据。
2.springboot集成kafka代码
项目目录结构如下
2.1.引入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.2</version>
</dependency>
2.2.添加KafkaConsumerConfig.java消费者配置类
package com.example.springbootkafka.config;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrapServers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.topics}")
private List<String> topics;
@Value("${spring.kafka.consumer.groupId}")
private String groupId;
@Value("${spring.kafka.consumer.sessionTimeOut}")
private String sessionTimeOut;
@Value("${spring.kafka.consumer.enableAutoCommit}")
private String enableAutoCommit;
@Value("${spring.kafka.consumer.autoCommitInterval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.maxPollRecords}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.maxPollInterval}")
private String maxPollInterval;
@Value("${spring.kafka.consumer.heartbeatInterval}")
private String heartbeatInterval;
@Value("${spring.kafka.consumer.keyDeserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.valueDeserializer}")
private String valueDeserializer;
@Value("${spring.kafka.consumer.autoOffsetReset}")
private String autoOffsetReset;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 并发数 多个微服务实例会均分
factory.setConcurrency(3);
factory.setBatchListener(true);
ContainerProperties containerProperties = factory.getContainerProperties();
// 是否设置手动提交
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> consumerConfigs = consumerConfigs();
log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs));
return new DefaultKafkaConsumerFactory<>(consumerConfigs);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// 服务器地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 是否自动提交
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
// 自动提交间隔
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
//会话时间
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOut);
//key序列化
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
//value序列化
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
// 心跳时间
propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
// 分组id
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//消费策略
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// poll记录数
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//poll时间
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
return propsMap;
}
}
2.3.添加Consumer.java消费者实现代码
- 如果在 @KafkaListener 注解中不设置 concurrency 属性,默认并发度是 1。这意味着每个消费者实例将在单个线程中运行,一次只能处理一个消息。设置 concurrency 参数为 “1” 可以确保消息的顺序性,但会限制消费者的吞吐量。
- 如果将concurrency 设置为大于 “1” 的值,那么每个消费者实例将使用多个线程来并发处理消息。这样可以提高消息处理的吞吐量,但可能会导致消息的处理顺序性无法保证。
package com.example.springbootkafka.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class Consumer {
// @KafkaListener(topics = {"${spring.kafka.consumer.topics}"},
// groupId = "${spring.kafka.consumer.groupId}",
// containerFactory = "kafkaListenerContainerFactory",
// properties = {"${spring.kafka.consumer.autoOffsetReset}"})
@KafkaListener(topics = {"#{T(java.util.Arrays).asList('${spring.kafka.consumer.topics}'.split(','))}"},
groupId = "${spring.kafka.consumer.groupId}",
containerFactory = "kafkaListenerContainerFactory",
concurrency = "1",
properties = {"${spring.kafka.consumer.autoOffsetReset}"})
public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
for (ConsumerRecord<String, String> record : records) {
log.info("topic_test 消费了: Topic:" + record.topic() + ",Message:" + record.value());
//手动提交偏移量
ack.acknowledge();
}
}
}
2.4.添加接口ProducerService
package com.example.springbootkafka.service;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.messaging.Message;
import java.util.concurrent.ExecutionException;
public interface ProducerService {
/**
* 发送同步消息
*
* @param topic
* @param data
* @throws ExecutionException
* @throws InterruptedException
*/
void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException;
/**
* 发送普通消息
*
* @param topic
* @param data
*/
void sendMessage(String topic, String data);
/**
* 发送带附加信息的消息
*
* @param record
*/
void sendMessage(ProducerRecord<String, String> record);
/**
* 发送Message消息
*
* @param message
*/
void sendMessage(Message<String> message);
/**
* 发送带key的消息
*
* @param topic
* @param key
* @param data
*/
void sendMessage(String topic, String key, String data);
void sendMessage(String topic, Integer partition, String key, String data);
void sendMessage(String topic, Integer partition, Long timestamp, String key, String data);
}
2.5.添加ProducerServiceImpl.java生产者实现类
package com.example.springbootkafka.service.impl;
import com.example.springbootkafka.service.ProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.ExecutionException;
@Component
@Slf4j
@EnableAsync
public class ProducerServiceImpl implements ProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送同步消息
*
* @param topic
* @param data
* @throws ExecutionException
* @throws InterruptedException
*/
@Override
public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
log.debug("sendSyncMessage 发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
}
/**
* 发送普通消息
*
* @param topic
* @param data
*/
@Override
public void sendMessage(String topic, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
future.addCallback(
success -> log.info("sendMessage topic={}发送消息成功!",topic),
failure -> log.error("sendMessage 发送消息失败!失败原因是:{}", failure.getMessage())
);
}
/**
* 发送带附加信息的消息
*
* @param record
*/
@Override
public void sendMessage(ProducerRecord<String, String> record) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
/**
* 发送Message消息
*
* @param message
*/
@Override
public void sendMessage(Message<String> message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
/**
* 发送带key的消息
*
* @param topic
* @param key
* @param data
*/
@Override
public void sendMessage(String topic, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
log.info("发送到:{} ,消息体为:{}",topic,data);
future.addCallback(
success -> log.debug("发送消息成功!"),
failure -> log.error("发送消息失败!失败原因是:{}", failure.getMessage())
);
}
@Override
public void sendMessage(String topic, Integer partition, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
@Override
public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> sendResult) {
RecordMetadata metadata = sendResult.getRecordMetadata();
log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
}
});
}
}
2.6.添加application.yml配置
server:
port: 8080
spring:
kafka:
bootstrap-servers: 192.168.80.251:9092
producer:
batch-size: 16384 #批次大小,默认16k
acks: all #ACK应答级别,指定分区中必须要有多少个副本收到消息之后才会认为消息成功写入,默认为1只要分区的leader副本成功写入消息;0表示不需要等待任何服务端响应;-1或all需要等待ISR中所有副本都成功写入消息
retries: 3 #重试次数
value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
buffer-memory: 33554432 #缓冲区大小,默认32M
client-id: abcdefg #客户端ID
compression-type: none #消息压缩方式,默认为none,另外有gzip、snappy、lz4
properties:
retry.backoff.ms: 100 #重试时间间隔,默认100
linger.ms: 0 #默认为0,表示批量发送消息之前等待更多消息加入batch的时间
max.request.size: 1048576 #默认1MB,表示发送消息最大值
connections.max.idle.ms: 540000 #默认9分钟,表示多久后关闭限制的连接
receive.buffer.bytes: 32768 #默认32KB,表示socket接收消息缓冲区的大小,为-1时使用操作系统默认值
send.buffer.bytes: 131072 #默认128KB,表示socket发送消息缓冲区大小,为-1时使用操作系统默认值
request.timeout.ms: 30000 #默认30000ms,表示等待请求响应的最长时间
consumer:
bootstrapServers: 192.168.80.251:9092
topics: testTopic1,testTopic2
groupId: test
#后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
sessionTimeOut: 30000
#取消自动提交,即便如此 spring会帮助我们自动提交
enableAutoCommit: false
#自动提交间隔
autoCommitInterval: 1000
maxPollRecords: 50
#300秒的提交间隔,如果程序大于300秒提交,会报错
maxPollInterval: 300000
#心跳间隔
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
autoOffsetReset: latest
heartbeatInterval: 10000
2.7.添加Controller层代码
package com.example.springbootkafka.controller;
import com.example.springbootkafka.service.ProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.Response;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/test")
public class Controller {
@Autowired
ProducerService producerService;
@GetMapping("/sendMsg")
public Integer sendMsg(@RequestParam("msg") String msg){
producerService.sendMessage("testTopic1","key",msg);
return Response.SC_OK;
}
}
2.8.启动项目,测试功能
直接调用接口地址:127.0.0.1:8080/test/sendMsg?msg=11231231231
控制台打印消息如下,则表示发送、消费消息成功
3.使用docker-compose启动zookeeper和kafka
docker-compose.yaml文件如下,创建文件后,将 192.168.80.251
改为自己的虚拟机的ip,然后使用 docker-compose up -d
命令启动,一般是先启动zookeeper,再启动kafka,如果kafka启动失败了,使用 docker restart kafka
命令重启试试。文章来源:https://www.toymoban.com/news/detail-803253.html
version: "3.3"
services:
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
restart: always
networks:
- zkp-kafka
ports:
- "2181:2181"
deploy:
replicas: 1
update_config:
parallelism: 2
delay: 10s
restart_policy:
condition: on-failure
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.80.251:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
ports:
- 9092:9092
networks:
- zkp-kafka
depends_on:
- zookeeper
deploy:
replicas: 1
update_config:
parallelism: 2
delay: 10s
restart_policy:
condition: on-failure
networks:
zkp-kafka:
4.gitee仓库地址
地址:https://gitee.com/wangyunchao6/spring-boot-kafka.git
文章来源地址https://www.toymoban.com/news/detail-803253.html
到了这里,关于springboot集成kafka消费数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!