目录
1、Kafka的四个角色解释
2、Kafka与zookeeper的关系与环境搭建
3、Kafka入门小案例
4、Kafka分区机制
4.1、Topic在分区下如何存储消息
4.2、消息的分区策略
5、Kafka高可用设计方案
5.1、集群
5.2、备份机制(Replication)
5.2.1、两种追随者
6、生产者详解
6.1、参数配置
7、消费者详解
7.1、消费者组
7.2、消息有序性
7.3、提交偏移量带来的问题及解决方案
7.3.1、自动提交
重复消费
消息丢失
7.3.2、手动提交
同步提交
异步提交
同步加异步
8、封装消息的方式
1、Kafka的四个角色解释
Kafka官网
kafka官网:http://kafka.apache.org/
producer:发布消息的对象称之为主题生产者(Kafka topic producer)
topic:对于生产者生产的消息分门别类,每一类消息称之为一个主题(Topic)
consumer:订阅主题并负责消费主题里生产者生产的消息称之为主题消费者(consumer)
broker:生产者发布的消息保存在一组服务器中,称之为Kafka集群Kafka Cluster,集群中的每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题topic,并从Broker拉数据,从而消费这些已发布的消息
2、Kafka与zookeeper的关系与环境搭建
Kafka对于zookeeper是强依赖,zookeeper相当于Kafka的注册中心保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器:
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器:
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
注意:换成自己镜像的IP地址
3、Kafka入门小案例
POM.XML
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
生产者文章来源:https://www.toymoban.com/news/detail-808682.html
application.yml
spring:
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
/**
* @author JiaWei
* 生产者
*/
public class ProducerQuikStart {
public static void main(String[] args) {
//Kafka配置信息
Properties prop = new Properties();
//连接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
//key和value进行序列化网络传输
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//创建Kafka生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
//发送消息
/**
* 1.主题
* 2.key
* 3.消息
*/
ProducerRecord<String,String> kvproducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
producer.send(kvproducerRecord);
//释放
producer.close();
}
}
消费者
spring:
kafka:
bootstrap-servers: 192.168.200.130:9092
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
同一个消费者组里的消费者同时订阅一个主题,再有最新的消息时,这条消息只能被一个消费者消费
如果想让一条消息被所有消费者接收,那么就需要给多个消费者在不同的消费者组里面
/**
* @author JiaWei
* 消费者
*/
public class ConsumerQuickStart {
public static void main(String[] args) {
//kafka消费者配置信息
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
//key 和 value反序列化
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组 当前消费者在那个消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
//消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
//订阅主题
consumer.subscribe(Collections.singletonList("topic-first"));
//拉取消息
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
4、Kafka分区机制
概述:分区就相当于一个文件夹其中的文件就是Topic文件里的内容就是消息 ,且分区建立在Borker服务器中,多个Borker建立成Kafka集群,分区解决的问题就是假如所有的Topic都存储在一台服务器上如果数据量特别大可能会存储不了,所以Kafka为了存储更多的数据不受一台服务器的限制,允许你的Topic存储在不同机器Broker上,所以在创建Topic时可以存储在不同的分区下面。
4.1、Topic在分区下如何存储消息
发送消息时可以把topic存储在3个分区下面,每个分区下面消息都是有顺序的,并且有对应的编号进行记录且是连续自增的,它称之为偏移量(offset),就可以知道你的消息在分区的什么位置
4.2、消息的分区策略
5、Kafka高可用设计方案
5.1、集群
一个Kafka集群由多个Broker组成,这样集群一台机器宕机,其他机器上的Broker也依然能够对外提供服务
5.2、备份机制(Replication)
备份机制解决消息丢失,消息持久化
生产者在连接上Kafka集群后发送消息会先在领导者副本(Leader Replica)进行存储,而后由领导者副本将消息同步到追随者副本(Follower Replica)中进行备份防止消息丢失,且追随者副本有多个。
5.2.1、两种追随者
追随者有两种分别是ISR和普通分别为同步和异步,同步数据为强一致性,异步保证可用性
如果leader失效,优先选取ISR列表中的follower因为数据较新,如果ISR列表的follower都不行了,次选其他follower
极端情况:所有副本都失效了
6、生产者详解
生产者发送消息有两种方式
同步发送:如果消息特别多消费者处理太慢就会造成阻塞
异步发送:消费者消费的慢也不会造成阻塞,不用等到消费者消费完消息,生产者才会发送下一条消息
6.1、参数配置
消息确认机制ack,开发中一般不会设置默认值为acks=1也就是leader在接收到消息保存后返回给生产者就完事了。
生产者消息重发机制retries
leader如果长时间没有给生产者回执,那么生产者就会重复发送消息,我们则可以设置次数,默认情况下,生产者会在每次重试之间等待100ms
参数详解-消息压缩
降低消息传输网络带宽的开销,也会降低存储消息的空间
7、消费者详解
7.1、消费者组
指的是由一个或多个消费者组成的群体,一条消息在topic上分发给消费者组中的消费者,如果所有消费者都在一个消费者组中则消息只能被一个消费者消费且只有一个消费者能接收到消息,反之如果想让所有消费者都接收到消息那么就给每个消费者分配一个消费者组。
7.2、消息有序性
由于生产消息和消费消息是同时进行的他保证不了消息的顺序性,如果想要保证则需要单独指定分区以及对应唯一的消费者因为消息是有offerset偏移量的消费者按照偏移量进行消费就能保证消费的顺序性
7.3、提交偏移量带来的问题及解决方案
消费者消费完消息会提交偏移量标注消费到分区的什么位置分为两种方式
7.3.1、自动提交
Kafka会默认每五秒主动提交一次偏移量,这种方式面临两个问题 重复消费及消息丢失
重复消费
同一个消费者组中的消费者负责消费对应的分区消息,如果消费者组中的其中一个消费者挂掉了,那么它负责的分区中的消息就由同一个消费者组中的消费者负责进行消费,如果由于网络波动,突然宕机、等原因,造成挂掉消费者提交的偏移量是在10的位置但是你实际提交的是5的位置还没来得及提交你就挂掉了,那么接手你负责分区的消费者就会从5的位置进行消费,这样就造成重复消费了
消息丢失
同一个消费者组中的消费者负责消费对应的分区消息,如果消费者组中的其中一个消费者挂掉了,那么它负责的分区中的消息就由同一个消费者组中的消费者负责进行消费,如果由于网络波动、突然宕机、等原因,造成消费者其实才消费到分区偏移量5的位置但是它实际提交的偏移量是10的位置,那么接手挂掉消费者对应分区的消费者就会从10的位置往后消费,那么10之前的消息还没来得及消费就被丢失了
7.3.2、手动提交
同步提交
同步提交偏移量如果出现异常可能造成阻塞,那么他就会一直等响应然后进行重试提交直到成功,在消费完消息的代码下将提交偏移量代码写上
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
try {
consumer.commitSync();//同步提交当前最新的偏移量
}catch (CommitFailedException e){
System.out.println("记录提交失败的异常:"+e);
}
}
}
异步提交
异步提交不会造成阻塞,但是异步提交不会进行重试,异步没有重试是因为如果同时存在多个异步提交可能会导致偏移量的覆盖,就好比现在都能去提交并没有造成阻塞,多个提交同时进行就可能会重复覆盖,所以没有重试
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e!=null){
System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
}
}
});
}
同步加异步
如果异步提交失败之后他先去记录日志,然后因为异步已经提交失败了那么同步再去提交,这样就能保证偏移量成功的提交,所以说这两种方式结合起来会比较好,保证能够重试也能保证不阻塞
try {
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync();
}
}catch (Exception e){+
e.printStackTrace();
System.out.println("记录错误信息:"+e);
}finally {
try {
consumer.commitSync();
}finally {
consumer.close();
}
}
8、封装消息的方式
利用阿里fastJSON的方式来进行消息的发送与消费
生产者
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
Map<String, Object> map = new HashMap<>();
map.put("articleId", wmNews.getArticleId());
map.put("enable", dto.getEnable());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
消费者文章来源地址https://www.toymoban.com/news/detail-808682.html
#controller
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message){
if (message != null){
Map map = JSONObject.parseObject(message, Map.class);
apArticleConfigService.updateByMap(map);
}
}
#Service
public interface ApArticleConfigService extends IService<ApArticleConfig> {
void updateByMap(Map map);
}
#ServiceImpl
@Override
public void updateByMap(Map map) {
Object articleId = map.get("articleId");
Object enable = map.get("enable");
Boolean isDown = true;
if (enable.equals(1)){
isDown = false;
}
Wrappers.<ApArticleConfig>lambdaUpdate().
set(ApArticleConfig::getIsDown, isDown).
eq(ApArticleConfig::getArticleId, articleId);
}
到了这里,关于一篇搞定Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!