前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。
一、消息监听器
1.1、消息监听器接口
消息监听器顾名思义用来接收消息,它是使用消息监听容器的必须条件。目前有8个消息监听器:
- 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); }
- 使用手动提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); }
- 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。 提供对 Consumer 对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); }
- 使用手动提交方法之一,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。 提供对 Consumer 对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
- 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
(使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。)(注意:这个接收所有)public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); }
- 使用手动提交方法之一,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。(注意:这个接收所有)
public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); }
- 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
(使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。) 提供对 Consumer 对象的访问。(注意:这个接收所有)public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); }
- 使用手动提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
提供对 Consumer 对象的访问。(注意:这个接收所有)public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
1、Consumer对象不是线程安全的;2、不要在这个过程中操作消费者位置和/或监听器中已提交偏移量。
1.2、消息监听器容器
消息监听容器有两种,一种是单线程消费,一种是多线程消费。
1.2.1、单线程
单线程实现为 KafkaMessageListenerContainer,KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。
- 使用 KafkaMessageListenerContainer
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
通过 ContainerProperties 可以对主题和分区以及其他信息进行配置
//主题设置
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
//监听器设置
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
//消费者工厂配置
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
权限配置:authorizationExceptionRetryInterval。这是一个容器属性,它会从KafkaConsumer中获取获取信息,当配置的用户被拒绝读取特定主题时,
就会发生触发 AuthorizationException。
1.2.2、多线程
多线程实现为 ConcurrentMessageListenerContainer ,ConcurrentMessageListenerContainer实际上是在给一个或多个KafkaMessageListenerContainer实例提供多线程消费,
本质上最后进行工作的还是KafkaMessageListenerContainer,故此它的实现和 KafkaMessageListenerContainer类似,
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它有一个concurrency属性,通过设置这个属性可以指定创建几个 KafkaMessageListenerContainer实例。
假设我们有3个主题,每个主题有5个分区。同时设置 container.setConcurrency(15),我们希望的是有15个线程活动着,实际上只有5个活着,
这是因为 Kafka 中默认的 PartitionAssignor是RangeAssignor。 在SpringBoot中可以设置: spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor 来更改策略。
1.3、偏移量
-
自动提交
设置 enable.auto.commit 消费者属性为 true 即可。这个也是默认状态。 -
手动提交
设置 enable.auto.commit 消费者属性为 false 即可;同时注意设置 AckMode。以下是spring支持的集中类型说明(无事务)- RECORD:当侦听器处理记录后返回时提交偏移量。
- BATCH:当返回的所有记录都poll()处理完毕后提交偏移量。
- TIMEpoll():只要超出了ackTime自上次提交以来的记录,则在处理完返回的所有记录后提交偏移量。
- COUNTpoll():只要ackCount自上次提交以来已收到记录,则在处理完返回的所有记录后提交偏移量。
- COUNT_TIMETIME:与和类似COUNT,但如果任一条件为 ,则执行提交true。
- MANUAL:消息监听者对acknowledge()负责Acknowledgment。之后,BATCH应用与 相同的语义。
- MANUAL_IMMEDIATEAcknowledgment.acknowledge():当监听器调用该方法时立即提交偏移量。
-
如何提交文章来源:https://www.toymoban.com/news/detail-726156.html
public interface Acknowledgment {
void acknowledge();
}
如果要提交部分批次,使用nack(),使用事务时,设置AckMode为MANUAL; 调用nack()会将成功处理的记录的偏移量发送到事务。
nack()只能在调用侦听器的消费者线程上调用。文章来源地址https://www.toymoban.com/news/detail-726156.html
到了这里,关于消息监听器和消息监听容器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!