整体描述
之前写过一篇使用docker搭建kafka服务的文章,使用centos搭建kafka服务器Docker,本文主要简单将一下在springboot框架下,接收kafka服务器发过来的消息。
版本对应
由于使用springboot,管理版本时和springboot绑定的,我目前用的是springboot2.7,kafka的版本是2.1,这个版本也没啥影响,因为kafka服务器是向下兼容的,也就是说你的kafka服务器的版本是3.1,kafka客户端的版本使用3.1以下的,就都可以。
具体接入
1. pom引用
直接使用springboot框架带的kafka客户端,不指定版本号,引入的默认版本就是和springboot版本有关的。这块我看网上有指定版本号会报错的,因为springboot和kafka的版本是有对应关系的,如果引入的kafka版本和当前使用的springboot版本不兼容,就会报错。具体版本对应关系可以自己去网上搜一下。
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. kafka参数配置
配置kafka参数,这块有两种方式,一是直接在配置文件里写,还有就是在代码里写,两种方式都可以,我这里就直接在springboot的config里写了,添加KafkaConsumerConfig.java:
/**
* Kafka消费者配置类
*
* @author thcb
* @date 2023-05-24
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
public final static String BOOTSTRAP_SERVERS = "192.168.1.100:9092";
public final static String GROUP_ID = "test_group";
@Bean
@Conditional(KafkaCondition.class)
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 设置消费者工厂
factory.setConsumerFactory(consumerFactory());
// 消费者组中线程数量
factory.setConcurrency(3);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(3000);
// 当使用批量监听器时需要设置为true
factory.setBatchListener(true);
return factory;
}
@Bean
@Conditional(KafkaCondition.class)
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
@Conditional(KafkaCondition.class)
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// Kafka地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 是否自动提交offset偏移量(默认true)
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交的频率(ms)
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超时设置
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
//请求超时时间
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
// 键的反序列化方式
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 值的反序列化方式
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset偏移量规则设置:
// (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return propsMap;
}
}
注意BOOTSTRAP_SERVERS 需要根据自己的kafka服务器地址配置。这块由于使用的是config配置,所以在kafka服务无法访问时,springboot程序就会启动失败,这块根据实际情况处理吧,由于我现在的项目,kafka是第三方接口,用来接收第三方数据的,所以kafka服务器无法访问,不应该影响程序启动。所以这块我加了一个判断,如果kadka服务无法访问,就不进行kafka相关的初始化操作,也是使用springboot带的注解实现的。
3. 添加Conditional注解
这个注解就是提供一个判断,如果判断通过,就执行注解的内容,如果不通过,就不执行。
这块我们可以在注解里加一个判断kafka服务器的操作:
/**
* kafka动态启动
* kafka代理服务器正常时启动kafka服务
* kafka代理服务器不可用时,不启动kafka服务
*
* @author thcb
* @date 2023-05-24
*/
public class KafkaCondition implements Condition {
public static final Logger log = LoggerFactory.getLogger(KafkaCondition.class);
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
URI uri = URI.create("http://" + KafkaConsumerConfig.BOOTSTRAP_SERVERS);
String host = uri.getHost();
int port1 = uri.getPort();
boolean b = this.isHostConnectable(host, port1);
log.info("matches:{}", b);
return b;
}
/**
* 判断kafka服务器,能否正常连接
*
* @param host
* @param port
* @return
*/
public boolean isHostConnectable(String host, int port) {
log.info("isHostConnectable:host:{},port:{}", host, port);
Socket socket = new Socket();
try {
//判断kafka网络是否能联通,不能连通则返回false
socket.connect(new InetSocketAddress(host, port), 2000);
} catch (IOException e) {
log.error("isHostConnectable:{}", ExceptionUtil.getExceptionMessage(e));
return false;
} finally {
try {
socket.close();
} catch (IOException e) {
log.error("isHostConnectable:{}", ExceptionUtil.getExceptionMessage(e));
}
}
return true;
}
}
然后就在使用kafka的地方都加上这个注解,其实在KafkaConsumerConfig的配置类里,就已经加上了。
4. 添加listener
创建一个监听,直接监听kafka消息就可以了。其中主题根据kafka服务端发的主题确定,GROUP_ID 就是配置文件里设置的。
/**
* Kafka消费者Listener
*
* @author thcb
* @date 2023-05-24
*/
@Component
@Conditional(KafkaCondition.class)
public class EimpKafkaConsumerListener {
public static final Logger log = LoggerFactory.getLogger(EimpKafkaConsumerListener.class);
public final static String TOPIC = "test.topic";
public final static String GROUP_ID = "test_group";
//监听kafka消费
@KafkaListener(topics = TOPIC, groupId = GROUP_ID, containerFactory = "kafkaListenerContainerFactory")
@Conditional(KafkaCondition.class)
public void onMessage(String message) {
log.info("EimpKafkaConsumerListener onMessage:{}", message);
}
}
这样就可以在程序里接收kafka的消息了,在前面我写的那个文章里,有开启服务端Demo的方法,可以在服务端模拟发消息,在程序的log里就能收到了。文章来源:https://www.toymoban.com/news/detail-743755.html
总结
服务器搭建起来之后,接收kafka消息就简单多了。本文主要将接收kafka消息的方式整理了一下,还加了对kafka服务器是否可用的判断。文章来源地址https://www.toymoban.com/news/detail-743755.html
到了这里,关于在Springboot中接收kafka消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!