使用场景 处理大规模的消息,大数据,事件采集,日志收集等,不过使用延迟消息比较麻烦对比其他的消息队列的话。高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition。每个消费组 对分区进行消费
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
基本概念
1、消费者:(Consumer):主动从Broker拉数据,从而消费这些已发布的消息
2、生产者:(Producer) :向broker发布消息的应用程序
3、AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于kafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
4、话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名;
5、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
特征
- kafka支持消息持久化
- 消费端是主动拉取数据,消费状态和订阅关系由客户端负责维护
- 消息消费完后,不会立即删除,会保留历史消息
- topic中所有的分区的消费顺序是随机的但是,在单个分区内的消费顺序是固定的。
使用kafka中如何保证消息的有序性
目前的做法都是保证发送的消息发送到同一个partition,这里顺序消费又涉及到consumer单线程和多线程消费的情况。
在单线程的情况下
同一个topic,同一个分区:
Kafka的消息在分区内是严格有序的。也就是说我们可以把同一笔订单的所有消息,按照生成的顺序一个个发送到同一个topic的同一个分区。那么consumer就能顺序的消费到同一笔订单的消息。但是这种情况下提升不了系统的吞吐量,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。
在多线程的情况下
写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
要保证顺序其实就是在消费者获取到消息的时候在进行一次类似kafka的分区操作,发送到不同的队列,然后我们开启多个线程去消费对应队列里面的数据。
使用kafka中如何保证不丢数据
Producer
使用异步发送或者同步发送 获取消息发送结果获取,加入消息发送失败重试机制
1)同步模式:有3种状态保证消息被安全生产,但是在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失。
确认机制设置为-1,也就是让消息写入leader和所有的副本。
2)异步模式:当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),数据就会被立即丢弃掉。如果消息发出去了,但还没有收到确认的时候,缓冲池满了,在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,这样也能保证数据不会丢失。
broker端
acks设置为0:broker接收消息立即返回,当消息还没写入磁盘宕机时,容易丢失数据
acks设置为1:等待broker的ack,如果leader落盘了就返回ack,如果follower同步完成前leader挂了就会丢失未同步的数据
acks设置为-1:等待所有leader和follower都落盘后返回ack,如果follower已同步,但是broker返回ack前leader挂了,则会重复发送消息。
Consumer端
可以优化成手动提交offset。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。可以通过enable.auto.commit设置为false,关闭闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。
使用kafka中如何保证不重复消费
生产者端的话
幂等可以保证单个生产者向同一个集群中同一个topic投递的数据发送的消息,不会丢失,而且不会重复。
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number
Kafka 从 0.11.0 开始可以配置
enable.idempotence=true
主要是消费端的幂等
消费者端的话
首先Kafka Broker上存储的消息,都有一个Offset标记。
然后kafka的消费者是通过offSet标记来维护当前已经消费的数据,
每消费一批数据,Kafka Broker就会更新OffSet的值,避免重复消费。
默认情况下,消息消费完以后,会自动提交Offset的值,避免重复消费。
Kafka消费端的自动提交逻辑有一个默认的5秒间隔,也就是说在5秒之后的下一次向Broker拉取消息的时候提交。
所以在Consumer消费的过程中,应用程序被强制kill掉或者宕机,可能会导致Offset没提交,从而产生重复提交的问题。
除此之外,还有另外一种情况也会出现重复消费。
在Kafka里面有一个Partition Balance机制,就是把多个Partition均衡的分配给多个消费者。
Consumer端会从分配的Partition里面去消费消息,如果Consumer在默认的5分钟内没办法处理完这一批消息。
就会触发Kafka的Rebalance机制,从而导致Offset自动提交失败。
而在重新Rebalance之后,Consumer还是会从之前没提交的Offset位置开始消费,也会导致消息重复消费的问题。
重复消费实现可以自己做幂等操作,建立去重表等。
使用docker安装一下kafka 因为kafka需要依赖zookeeper
使用docker-compose运行一个只有一个ZooKeeper node和一个Kafka broker的开发环境
使用一下网上的yml文件
version: '2'
services:
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "2181:2181"
container_name: zookeeper
# kafka version: 1.1.0
# scala version: 2.12
kafka1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 10.225.137.105 //ip地址
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
#KAFKA_CREATE_TOPICS: "test1_topic,test2_topic"
depends_on:
- zoo1
container_name: kafka
启动容器
在docker-compose.yml所在的目录执行以下命令:
docker-compose up -d
docker ps -a 可以查看到容器已经启动
再安装一下kafka的map管理web页面
docker run -d -p 8089:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --name kafka-map dushixiang/kafka-map:latest
kafka-map: 一个美观简洁且强大的kafka web管理工具。
进入kafka容器
docker exec -it kafka /bin/bash
创建topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic ly --zookeeper zoo1:2181 --replication-factor 1 --partitions 1
在map页面查看一下
进入kafka容器进行手动脚本发送消息测试
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=ly --broker-list kafka1:9092
1
1111
2222
3333
查看一下map页面topic的消息是否发送成功
手动脚本消费一下从offset等于0开始消费
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic ly
按offset消费
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --partition 0 --offset 3 --topic ly
kafka消费过的数据不会立马删除
kafka删除数据有两种方式
- 按照时间,超过一段时间后删除过期消息
- 按照消息大小,消息数量超过一定大小后删除最旧的数据 kafka删除数据的最小单位:segment
下面使用代码进行简单的kafka使用
php版 使用compose 安装
# 安装 kafka-php
composer require nmred/kafka-php
pruducer.php
<?php
/*
* @Author: yue
* @Date: 2022/7/1 9:52
* @LastEditTime: 2022/7/1 9:52
* @LastEditors: yue
* @Description: 生产者
*/
require_once '../vendor/autoload.php';
/* 创建一个配置实例 */
$config = \Kafka\ProducerConfig::getInstance();
/* Topic的元信息刷新的间隔 */
$config->setMetadataRefreshIntervalMs(10000);
/* 设置broker的地址 */
$config->setMetadataBrokerList('10.225.137.105:9092');
/* 设置broker的代理版本 */
$config->setBrokerVersion('1.0.0');
/* 只需要leader确认消息 */
$config->setRequiredAck(1);
/* 选择异步 */
$config->setIsAsyn(false);
/* 每500毫秒发送消息 */
$config->setProduceInterval(500);
/* 创建一个生产者实例 */
$producer = new \Kafka\Producer();
for ($i = 0; $i < 100; $i++) {
$producer->send([
[
'topic' => 'code_demo',
'value' => 'test' . $i,
],
]);
}
进入kafka-map查看一下是否发送成功
consumer.php
<?php
/*
* @Author: yue
* @Date: 2022/7/1 10:06
* @LastEditTime: 2022/7/1 10:06
* @LastEditors: yue
* @Description: 消费者
*/
require_once '../vendor/autoload.php';
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.225.137.105:9092');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['code_demo']);
$consumer = new \Kafka\Consumer();
$consumer->start(function ($topic, $part, $message) {
var_dump($topic,$part,$message);
});
可以看到已经消费成功了
go版
用kafka-map让分区扩个容 增加到2个分区
producer.go 使用 sarama第三方库
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"10.225.137.105:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
for i := 100; i < 110; i++ {
//创建消息
var s string
msg := &sarama.ProducerMessage{}
msg.Topic = "code_demo"
s = "test" +fmt.Sprintf("%d",i) + "go"
msg.Value = sarama.StringEncoder(s)
//发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
}
执行一下
可以看到有些进入分区0 分区1
进入kafka-map查看一下是否发送成功
分区0
分区1
发送消息成功
consumer.go 需要遍历所有分区 每个分区创建一个消费者实现消费
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
var wg sync.WaitGroup
func main() {
consumer, err := sarama.NewConsumer([]string{"10.225.137.105:9092"}, nil)
if err != nil {
fmt.Println("consumer connect err:", err)
return
}
defer consumer.Close()
//获取 kafka 主题
partitions, err := consumer.Partitions("code_demo")
if err != nil {
fmt.Println("get partitions failed, err:", err)
return
}
for _, p := range partitions {
//sarama.OffsetNewest:从当前的偏移量开始消费,sarama.OffsetOldest:从最老的偏移量开始消费
partitionConsumer, err := consumer.ConsumePartition("code_demo", p, sarama.OffsetNewest)
if err != nil {
fmt.Println("partitionConsumer err:", err)
continue
}
wg.Add(1)
go func() {
for m := range partitionConsumer.Messages() {
fmt.Printf("key: %s, message: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)
}
wg.Done()
}()
}
wg.Wait()
}
文章来源:https://www.toymoban.com/news/detail-428852.html
消费成功文章来源地址https://www.toymoban.com/news/detail-428852.html
到了这里,关于kafka简单搭建和基本使用介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!