说明
本示例只配置了Consumer没有配置Producer,可参考配置文件_1中注释内容部分文章来源地址https://www.toymoban.com/news/detail-558310.html
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.yml配置
spring:
kafka:
one:
#测试环境
bootstrap-servers: 127.0.0.1:9092
topic: default_topic
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-512
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
consumer:
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
group-id: defaultName
#关闭自动提交
enable-auto-commit: false
#重置消费者的offset
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: latest
#key value 的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 5
two:
#测试环境
bootstrap-servers: 127.0.0.1:9092
topic: default_topic_two
consumer:
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
group-id: defaultName_two
#关闭自动提交
enable-auto-commit: false
#重置消费者的offset
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: latest
#key value 的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 5
3.新建配置文件
3.1配置文件_1
@Configuration
@EnableKafka
public class K1kafkaConfiguration {
@Value("${spring.kafka.one.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.one.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.one.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.one.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.one.properties.security.protocol}")
private String securityprotocol;
@Value("${spring.kafka.one.properties.sasl.mechanism}")
private String mechanism;
@Value("${spring.kafka.one.properties.sasl.jaas.config}")
private String jaasconfig;
//@Value("${spring.kafka.one.producer.linger-ms}")
//private Integer lingerMs;
//@Value("${spring.kafka.one.producer.max-request-size}")
//private Integer maxRequestSize;
//@Value("${spring.kafka.one.producer.batch-size}")
//private Integer batchSize;
//@Value("${spring.kafka.one.producer.buffer-memory}")
//private Integer bufferMemory;
//@Bean
//public KafkaTemplate<String, String> kafkaOneTemplate() {
// return new KafkaTemplate<>(producerFactory());
//}
@Bean
@Primary
//理解为默认优先选择当前容器下的消费者工厂
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(1);
//开启批量监听
//factory.setBatchListener(type);
// 被过滤的消息将被丢弃
// factory.setAckDiscarded(true);
factory.getContainerProperties().setPollTimeout(3000);
//设置手动提交ackMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setMissingTopicsFatal(false);
// 设置记录筛选策略
//factory.setRecordFilterStrategy(new RecordFilterStrategy() {
// @Override
// public boolean filter(ConsumerRecord consumerRecord) {
// String msg = consumerRecord.value().toString();
// if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){
// return false;
// }
// // 返回true消息将会被丢弃
// return true;
// }
//});
return factory;
}
//private ProducerFactory<String, String> producerFactory() {
// return new DefaultKafkaProducerFactory<>(producerConfigs());
//}
@Bean//第一个消费者工厂的bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
//private Map<String, Object> producerConfigs() {
// Map<String, Object> props = new HashMap<>();
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
// props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
// props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// return props;
//}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put("security.protocol",securityprotocol);
props.put("sasl.mechanism",mechanism);
props.put("sasl.jaas.config",jaasconfig);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
3.2配置文件_2
@Configuration
@EnableKafka
public class K2kafkaConfiguration {
@Value("${spring.kafka.two.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.two.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.two.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.two.consumer.max-poll-records}")
private String maxPollRecords;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(1);
//开启批量监听
//factory.setBatchListener(type);
// 被过滤的消息将被丢弃
// factory.setAckDiscarded(true);
factory.getContainerProperties().setPollTimeout(3000);
//设置手动提交ackMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
4.设置消费
4.1 设置消费_1
@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer {
@KafkaListener(topics = "#{'${spring.kafka.one.topic}'}", groupId = "defaultName",containerFactory = "kafkaListenerContainerFactory")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("[Consumer] 接收到kafka消息:{}",record.value());
System.out.println(record);
System.out.println(record.value());
//手动提交offset
//ack.acknowledge();
}
4.2 设置消费_2
@Component
@Slf4j(topic = "KAFKALOG")
public class Consumer2 {
@KafkaListener(topics = "#{'${spring.kafka.two.topic}'}", groupId = "defaultName_two",containerFactory = "kafkaTwoContainerFactory")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("[Consumer2 ] 接收到kafka消息:{}",record.value());
System.out.println(record);
System.out.println(record.value());
//手动提交offset
//ack.acknowledge();
}
文章来源:https://www.toymoban.com/news/detail-558310.html
到了这里,关于[springboot配置Kafka] springboot配置多个kafka,包含账号密码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!