监听器文章来源:https://www.toymoban.com/news/detail-743663.html
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
/**
* 监听消息
*/
@KafkaListener(topics = {订阅主题,可订阅多个}, groupId = "分组,不能为空",containerFactory="kafkaListenerContainerFactory")
public void kafkaListener(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
log.info("这是消费者在消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
if (Objects.equals(EVENT, record.topic())) {
Object value = record.value();
log.info("iot消息---");
ack.acknowledge();
}
} catch (Exception e){
log.error(String.format("kafka监听消息失败:%s-%s", record.topic() ,record.value()));
}
kafka配置文章来源地址https://www.toymoban.com/news/detail-743663.html
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
public class KafkaMQConfig {
private String servers ="监听服务器";
// 消费组标识
private String groupId ="";
// 偏移量的起始点
private String autoOffsetReset ="latest";
// 偏移量的提交方式
private String enableAutoCommit = "false";
// 一次从kafka服务拉取的数据量
private String maxPollRecords ="65535";
// 监测消费端心跳时间
private String heartbeatInterval ="3000";
// 两次拉取数据的最大时间间隔
private String maxPollIntervalMs ="50000";
@Bean
public KafkaProducer kafkaProducer() {
Properties props = new Properties();
// 设置接入点,请通过控制台获取对应 Topic 的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
// Kafka 消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// 构造 Producer 对象,注意,该对象是线程安全的
// 一般来说,一个进程内一个 Producer 对象即可
// 如果想提高性能,可构造多个对象,但最好不要超过 5 个
return new KafkaProducer<String, String>(props);
}
// 消费端相关配置
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
到了这里,关于springboot+kafka消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!