1,
spring配置kafka网址
2,listener
@Component
public class OrderMsgListener {
@KafkaListener(topics = "order",groupId = "order-service")
public void listen(ConsumerRecord record){
System.out.println("收到消息:"+record); //可以监听到发给kafka的新消息,以前的拿不到
}
@KafkaListener(groupId = "order-service-2",topicPartitions = {
@TopicPartition(topic = "order",partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "0")
})
})//这一块注解表示可以监听kafka之前的消息
public void listenAll(ConsumerRecord record){
System.out.println("收到partion-0消息:"+record);
}
}
groupId表示分组,不同组的消费者不是竞争关系
3,
@KafkaListener(groupId = "order-service-2",topicPartitions = {
@TopicPartition(topic = "order",partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "0")
})
})
这段代码使用了Spring Kafka提供的注解@KafkaListener
来定义一个Kafka消费者。具体的配置如下:
-
groupId = "order-service-2"
:指定该消费者所属的消费者组ID,即"order-service-2"。 -
topicPartitions
:表示要订阅的主题和分区信息,是一个数组。 -
@TopicPartition(topic = "order", partitionOffsets = { ... })
:表示订阅名为"order"主题的特定分区。 -
@PartitionOffset(partition = "0", initialOffset = "0")
:表示订阅的分区为0,并设置初始偏移量为0。
这段代码将创建一个Kafka消费者,用于订阅名为"order"的Kafka主题下的0号分区,并从初始偏移量0开始消费消息。
在实际应用中,您可能会根据需要添加其他的@TopicPartition
和@PartitionOffset
注解可以订阅多个主题和分区,并指定每个分区的初始偏移量。
值得注意的是,上述代码是使用Spring Kafka提供的注解方式来创建Kafka消费者。通过该注解,您可以方便地定义多个消费者,并且框架会自动处理与Kafka的连接、消费消息等底层细节。
4,文章来源:https://www.toymoban.com/news/detail-607776.html
主题:topics;消息是发送给某个主题文章来源地址https://www.toymoban.com/news/detail-607776.html
到了这里,关于kafka消息监听的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!