一、简介
- Kafka 是一个分布式流处理平台由 LinkedIn 公司开发的,遵循 Apache 开源协议。
- Kafka 主要是用来处理实时数据流,可以发布、订阅、存储和处理数据。
应用场景:
- 日志收集:用于分布式日志系统,例如 ELK。
- 消息系统:可以将 Kafka 作为消息队列使用。
- 流处理:将 Kafka 与 Flink 或 Spark 等流处理引擎配合使用。
二、架构介绍
1. 组件
- Producer:发送数据到 Kafka 集群。
- Consumer:从 Kafka 集群消费数据。
- Broker:Kafka 集群中的每个服务器就叫做 Broker。
- Topic:物理上不同的消息类别;逻辑上一个 Topic 包含多个 Partition。
- Partition:物理上的概念,每个 Partition 对应一个文件夹,该文件夹下存储着该 Partition 的所有消息。
- Offset:Kafka 采用了分布式的提交日志机制,消费者消费数据时会记录已经消费的位置,即 Offset。
- ZooKeeper:Kafka 使用 ZooKeeper 来存储集群的配置信息,及 Broker、Producer、Consumer 等各种节点的状态信息。
2. 集群
- Kafka 集群由多个 Broker 组成,每个 Broker 在集群中都有一个唯一的编号。
- 一个 Broker 可以容纳多个 Partition,同一个 Topic 的不同 Partition 分散到不同的 Broker 上组成分布式集群。
- Kafka 集群会自动调整 Partition 的数量,并将 Partition 均匀分配给各个 Broker。
3. 数据存储结构
- Kafka 消息被保存在 Partition 中,每个 Partition 对应一个目录,里面是多个 Segment,Segment 文件的大小和发送速率有关系。一个 Partition 有多个 Segments 是因为 Kafka 采取了文件系统批量读写机制
代码示例:
public class KafkaDemo {
public static void main(String[] args) {
//1.创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(getProperties());
//2.创建消息
ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", "key", "value");
try {
//3.发送消息
producer.send(record).get();
System.out.println("Sent message successfully");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
//4.关闭连接
producer.close();
}
}
/**
* 获取Kafka配置信息
*
* @return 配置信息
*/
private static Properties getProperties() {
Properties props = new Properties();
//设置Kafka地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//设置消息Key和Value的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return props;
}
}
三、Kafka消息传递原理
1. 消息生产者
Kafka生产者将数据以消息的形式发送到Kafka集群。生产者可以将消息发送到一个指定的主题(topic),也可以选择在发送时指定分区(partition)。当生产者需要发送消息时,它先与Kafka集群上的一个Broker建立TCP连接,然后将消息发送到该Broker。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) throws InterruptedException {
// 配置Kafka生产者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到指定主题
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "Hello World-" + i));
Thread.sleep(1000); // 每秒发送一条消息
}
producer.close(); // 关闭Kafka生产者实例
}
}
2. 消息消费者
Kafka消费者从Kafka集群中的一个或多个分区中消费消息。消费者可以随时订阅一个或多个主题,并在每个主题中定位到特定分区。
import org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group"); // 与生产者所在的组相同
props.put("enable.auto.commit", "true"); // 自动提交偏移量
props.put("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Kafka消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic")); // 订阅指定主题
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
}
}
// consumer.close();
}
}
3. 主题与分区
Kafka的主题(topic)是Kafka用于区分消息类型和类别的单位。每个主题都由一个或多个分区(partition)组成,分区是存储在Kafka集群中不同节点上的数据容器。每个主题的消息可以分布在不同的分区中。
4. 副本机制
Kafka的副本机制是为了保证消息的高可用性和数据的持久性。当一个分区的消息被发送到Kafka集群后,它会被复制到多个副本(replica)中。每个分区都有一个或多个副本,其中有且仅有一个被标记为“首领副本”(leader replica),负责读写该分区的数据。其他副本被称为“追随者副本”(follower replica),它们只能复制首领副本的数据,并借此提高系统的可靠性和容错性。
四、消息传递过程
1. 消息发送流程
消息发送者将消息发送到Kafka主题(topic),然后由Kafka Producer将消息分区并写入到Broker中的指定分区中。在发送消息之前,Producer需要从Zookeeper中获取集群元数据信息,包括Broker列表和主题分区的信息。具体流程如下:
-
消息发送者通过Producer API将消息发送到指定topic中。
String topic = "test_topic"; String message = "Hello, Kafka!"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); producer.send(record);
-
Producer根据消息的key值使用Partitioner算法将同一个key的消息发送到同一个分区里,保证消息的有序性。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int numPartitions = cluster.partitionCountForTopic(topic); if (keyBytes == null) { Random random = new Random(); return random.nextInt(numPartitions); } int hash = Utils.murmur2(keyBytes); return hash % numPartitions; }
-
Producer将记录存储在缓冲区中,如果缓冲区已满,则会调用send方法将缓冲区中的内容批量发送到Broker中。
2. 消息存储流程
消息被存储在Broker的一个或多个分区中,分区中的每条消息都有一个唯一的偏移量(offset),并按照其他参数(如消息的时间戳)进行排序存储。当Consumer消费分区中的消息时,可以根据偏移量来读取消息,保证消息的顺序性。
在Broker上保存的消息是以一种高效而紧凑的格式进行编码的,称为RecordBatch,它可以将多个Producer的相关记录分组在一起,以便有效地压缩提交到Broker的数据传输量。
3. 消息消费流程
Consumer订阅并读取特定主题(topic)的消息。消费者(Consumer)从Broker中拉取特定分区的消息,并对其进行处理。具体流程如下:
-
消费者向Kafka集群发送Fetch请求,获取数据。
-
Broker收到Fetch请求后,从指定的分区和偏移量(offset)开始读取消息,然后将数据返回给Consumer。
-
Consumer获取到数据后,进行处理和消费,同时记录每个Partition的下一个可拉取的偏移量,并定期将其提交到Zookeeper中,用于在Consumer发生故障或重启后重新读取未被处理的消息。
String topicName = "test_topic";
String groupId = "test_group";
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", groupId);
props.setProperty("auto.commit.enable", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
五、性能优化
1.硬件方面的优化
磁盘
- 在使用Kafka时,建议使用SSD磁盘,因为SSD的I/O性能比HDD磁盘更好。
- 另外,可以使用多块磁盘,将Kafka数据分散到不同的磁盘上,以减少单个磁盘的负担。
内存
- 将足够的内存分配给Kafka Broker进程,以便它可以缓存更多的消息。
- 在Kafka Producer写入消息时,可以开启压缩功能,减少传输的数据量,节省内存空间。
CPU
- Kafka Broker对CPU的要求通常不高,但在高负载下还是需要注意CPU的使用率。
- 在多核CPU的机器上,可以通过增加Broker实例数或者增加分区数来充分利用CPU资源。
2.Kafka配置优化
Producer端
- acks参数:设置消息确认的级别。acks=0表示不等待服务器确认;acks=1表示只需得到Kafka集群中一台服务器的确认;acks=all表示需要得到Kafka集群中全部服务器的确认。确认级别越高,消息的耗时就会增加,但是可以提供更好的数据安全性。
- batch.size参数:设置批处理大小。较小的批处理大小可以降低延迟,但也会增加CPU开销。建议根据实际情况调整该参数。
- compression.type参数:设置压缩方式。可选的压缩方式包括none(默认)、gzip、snappy和lz4。生产者使用压缩功能可以减少传输的数据量,提高传输效率。
Broker端
- message.max.bytes参数:设置单个消息的最大大小。如果消息大小超过了该限制,Kafka将拒绝该消息。
- num.io.threads参数:设置Broker处理I/O请求的线程数。增加该参数的值可以提高Broker的并发能力,但是也会增加CPU的使用率。
3. 消费者优化
- Group ID:消费者组是Kafka中消费者的逻辑分组,对于同一组内的消费者来说,每个分区只会被其中一台消费者消费。因此,合理设置Group ID可以提高消费者的有效性。
- Fetch Size:每次从Kafka Broker读取的消息数量。过大的Fetch Size会导致消费端的延迟增加,过小的Fetch Size则会增加网络开销。可以通过调节该参数来达到最佳的消费效率。
- 处理策略:消费者有两种处理消息的方式,即poll()和push()。其中,poll()需要由应用主动调用,而push()则是在后台由Kafka的Consumer线程自动触发。通常情况下,使用poll()比push()更为灵活,适用于大部分场景。
4. 代码示例
以下是Kafka Producer端实现批量写入消息的Java代码示例:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerSample {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 16384);
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "myTopic";
for (int i = 0; i < 10000; i++) {
String msg = "My message No." + i;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
producer.send(record);
}
producer.close();
}
}
六、Kafka的优缺点
1. Kafka的优点
- 高吞吐量、低延迟: Kafka通过partition和consumer group的概念来实现负载均衡,支持分布式部署,能够实现高吞吐量和低延迟。
- 高扩展性: Kafka集群内所有节点都是对等的,新的节点可以很容易地加入到集群中,扩展集群的容量,并且不会中断已经运行的服务。
- 持久化存储: Kafka数据以文件的形式保留在磁盘中,可靠性较高,即使一些节点失效,数据依然不会丢失,非常适合大规模数据的持续存储和离线分析处理。
- 可靠性高: Kafka支持数据备份和副本机制,通过数据的复制和备份来提高其稳定性,保障数据不会丢失。
- 消息传输具有多样性: Kafka支持多种协议的传输,可以与不同类型的应用程序集成,例如支持HTTP RESTful API,各种编程语言的客户端和其他一些补充工具等。
2. Kafka的缺点
- 部署和配置较为复杂: Kafka的集群需要进行配置和部署,需要一定的技术力量,对于较小的企业来说,可能需要投入大量的精力和时间才能完成部署和配置。
- 需要对数据进行处理: Kafka只是一个消息传递平台,不直接对数据进行处理,需要用户自己编写代码进行数据处理,因此需求较高的技术人员才能使用。
- 没有自动管理: Kafka集群需要手动做一些管理,例如,当某个节点失效后,需要重新平衡partition的负载等。
七、Kafka的应用案例
Kafka是一个开源的分布式消息系统,在大数据领域有着广泛的应用。下面介绍Kafka的三个应用案例。
1. 网络爬虫
网络爬虫的核心功能是从互联网上抓取数据并进行分析或保存。Kafka可以作为网络爬虫的消息队列,负责将被爬取数据传输给爬虫程序。当网络爬虫处理完数据后,将数据发送到Kafka中,供后续处理程序使用。
具体实现时,需要先创建一个名为spider
的Kafka主题,然后在爬虫程序中编写生产者代码,将爬取到的数据发送至该主题。以下是Java示例代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SpiderProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer =
new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>(
"spider", Integer.toString(i), "data-" + Integer.toString(i)));
producer.close();
}
}
2. 数据统计
Kafka除了可以作为消息队列,还可以作为数据缓存,可以处理大量的数据流。在数据统计过程中,Kafka既可以作为生产者将收集到的数据发送到主题中,也可以作为消费者从主题中获取数据并进行分析、统计等操作。具体实现时,需要先创建一个名为data
的Kafka主题,然后在收集数据的程序中编写生产者代码,将数据发送至该主题。再在处理数据的程序中编写消费者代码,从该主题中获取数据以完成数据统计。
以下是Java示例代码:文章来源地址https://www.toymoban.com/news/detail-563522.html
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class DataConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("data"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class DataProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer =
new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>(
"data", Integer.toString(i), "data-" + Integer.toString(i)));
producer.close();
}
}
3. 实时监控
Kafka可以在实时监控中作为传输媒介,将源数据流发送到消费者,以满足分布式的需求。具体实现时,需要先创建一个名为metrics
的Kafka主题,然后在生产者程序中将监控数据发送至该主题。再在监控中心中编写消费者代码,从该主题中获取数据并进行分析、监控等操作。文章来源:https://www.toymoban.com/news/detail-563522.html
以下是Java示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class MetricsConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("metrics"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MetricsProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer =
new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>(
"metrics", Integer.toString(i), "metrics-" + Integer.toString(i)));
producer.close();
}
}
到了这里,关于Kafka底层原理探秘的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!