探究:kafka生产者/消费者与多线程安全

这篇具有很好参考价值的文章主要介绍了探究:kafka生产者/消费者与多线程安全。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1. 多线程安全

1.1. 生产者是多线程安全的么?

1.1. 消费者是多线程安全的么?

2. 消费者规避多线程安全方案

2.1. 每个线程维护一个kafkaConsumer

2.2. [单/多]kafkaConsumer实例 + 多worker线程

2.3.方案优缺点对比


1. 多线程安全

1.1. 生产者是多线程安全的么?

        Kafka生产者是线程安全的,可以在多个线程中共享一个Kafka生产者实例。这是因为Kafka生产者实例内部使用了一些同步机制来保证线程安全,例如使用了线程安全的队列来缓存消息,使用了同步锁来保护共享资源的访问等。

        同时,Kafka生产者的send()方法是非阻塞的,可以在多个线程中并发调用,不会阻塞线程。Kafka生产者还提供了异步发送和同步发送两种发送方式,可以根据实际需求选择不同的发送方式。

然而,如果多个线程共享同一个Kafka生产者实例,需要注意以下几点:

  1. 同一个线程中不要同时调用send()方法和flush()方法,可能会导致消息发送顺序不一致。

  2. 不同线程中调用send()方法时,需要注意消息的顺序,可以使用Kafka的分区机制来保证消息的顺序。

  3. 如果多个线程发送的消息都是针对同一个主题或分区,可能会导致消息的重复发送或丢失。因此,建议在多线程情况下,使用Kafka的分区机制,将消息发送到不同的分区中,以避免消息的重复和丢失。

        综上所述,Kafka生产者是线程安全的,可以在多个线程中共享一个Kafka生产者实例,但需要注意消息的顺序和分区的使用,以保证消息的可靠性和顺序性。

1.1. 消费者是多线程安全的么?

        Kafka消费者是非线程安全的,主要原因是因为:Kafka消费者使用了内部的状态来跟踪消费进度和偏移量。这种状态包括消费者的位置,消费者的偏移量,以及消费者的订阅主题和分区等信息。如果多个线程同时访问同一个Kafka消费者实例,就会导致这些状态信息的不一致,从而导致消费进度出现错误。

2. 消费者规避多线程安全方案

背景:Kafka消费者中具有poll()方法,该方法是一个阻塞方法,如果在同一个线程中多次调用poll()方法,会导致消费者阻塞在poll()方法中,无法及时消费新到达的消息。因此,为了提高Kafka消费者的并发性能,通常使用多个消费者线程来消费+处理消息。为了保证Kafka消费者的线程安全性,通常采用以下2种方案。

方案的本质:一个线程只能绑定一个消费者实例(不能多个线程共用一个消费者实例)

2.1. 每个线程创建一个Consumer

探究:kafka生产者/消费者与多线程安全

 1. 创建多个线程,去消费topic

2. 每个线程绑定固定数量的分区(最好的情况是一个消费者绑定一个分区)

探究:kafka生产者/消费者与多线程安全

        代码比较简单,唯一需要关注的点:创建的消费者线程和分区的数量可能不相等,此时,可以采用「分区副本自动分配策略」,该策略会将消费者线程和分区绑定在一起。

场景讨论:

        按照现在的代码,假设消费者线程获取了分区A 100个msg,然后处理,若还未处理完,此时,offset未提交。因为此时offset还未提交,消费者线程还会去分区A调用poll,获取到的msg依然会是刚才的100个msg。存在消费重复的场景,因此,消费者需要做好幂等处理。

        上面的代码,不会出现这个场景:消费者获取了消息1,2,3,3处理完成了,执行了提交偏移,但是消息1还未提交的场景。

代码示例:

package com.bie.kafka.kafkaThrea;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 *
 *       1、KafkaConsumer是非线程安全的,KafkaProducer是线程安全的。
 *       2、该案例是创建多个线程,每个线程维护一个KafkaConsumer实例
 *           用户创建多个线程消费topic数据,使用分区副本自动分配策略,将消费者线程与分区进行绑定
 *       3、ConsumerRunnable,消费线程类,执行真正的消费任务
 */
public class ConsumerRunnable implements Runnable {

    // 每个线程维护私有的kafkaConsumer实例
    private final KafkaConsumer<String, String> consumer;

    /**
     * 默认每个消费者的配置参数初始化
     *
     * @param brokerList
     * @param groupId
     * @param topic
     */
    public ConsumerRunnable(String brokerList, String groupId, String topic) {
        // 带参数的构造方法
        Properties props = new Properties();
        // kafka的列表
        props.put("bootstrap.servers", brokerList);
        // 消费者组编号
        props.put("group.id", groupId);
        // 自动提交
        props.put("enable.auto.commit", true);
        // 提交提交每个一秒钟
        props.put("auto.commit.interval.ms", "1000");
        // 反序列化key
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化value
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 将配置信息进行初始化操作
        this.consumer = new KafkaConsumer<>(props);
        // 定义响应的主题信息topic:使用分区副本自动分配策略
        consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        // 消费者保持一直消费的状态
        while (true) {
            // 将获取到消费的信息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
            // 遍历出每个消费的消息
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息:deal msg
            }
            // 提交offset
        }
    }
}
package com.bie.kafka.kafkaThrea;

import java.util.ArrayList;
import java.util.List;

/**
 *       1、消费线程管理类,创建多个线程类执行消费任务
 */
public class ConsumerGroup {

    // 消费者群组,多消费者。
    private List<ConsumerRunnable> consumers;

    public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
        // 初始化消费者组
        consumers = new ArrayList<>(consumerNum);
        // 初始化消费者,创建多少个消费者
        for (int i = 0; i < consumerNum; i++) {
            // 根据消费者构造方法,创建消费者实例
            ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
            // 将创建的消费者实例添加到消费者组中
            consumers.add(consumerRunnable);
        }
    }

    public void execute() {
        // 将消费者组里面的消费者遍历出来
        for (ConsumerRunnable task : consumers) {
            // 创建一个消费者线程,并且启动该线程
            new Thread(task).start();
        }
    }
}
package com.bie.kafka.kafkaThrea;


public class ConsumerMain {

    public static void main(String[] args) {
        // kafka即broker列表
        String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
        // group组名称
        String groupId = "group1";
        // topic主题名称
        String topic = "topic1";
        // 消费者的数量
        int consumerNum = 3;
        // 通过构造器创建出一个对象
        ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
        // 执行execute的方法,创建出ConsumerRunnable消费者实例。多线程多消费者实例
        consumerGroup.execute();
    }
}

上面代码,在实际工程应用中存在问题,描述见下:

  1. 每个消费者线程,先执行poll拉取一批batchsize命令后
  2. 批量处理这批消息
  3. 提交offset

        步骤2,处理batchsize的消息,可能中间的某一个失败了,但是步骤3提交了整体的offset,会导致失败的消息丢失了。

解决方案:处理每个消息的流程中,增加重试机制(本地消息表),保证该消息能执行成功

2.2. 消费者组 + worker线程池

探究:kafka生产者/消费者与多线程安全

 与2.1方案一的区别在于,将「消息的获取」与「消息的处理」解耦开

1. 消息的获取:维护一个or多个kafkaConsumer实例,获取消息,获取到消息后,将消息丢到消息处理线程池中

2. 消息的处理:创建一个线程池,里面存放了worker线程,每个worker线程处理获取到的消息

探究:kafka生产者/消费者与多线程安全

ConsumerWorker类

        worker线程:执行msg的处理,更新offsets信息

package huxi.test.consumer.multithreaded;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
 
import java.util.List;
import java.util.Map;
 
public class ConsumerWorker<K, V> implements Runnable {
 
    private final ConsumerRecords<K, V> records;
    private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
    public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.records = record;
        this.offsets = offsets;
    }
 
    @Override
    public void run() {
        for (TopicPartition partition : records.partitions()) {
            // 获取到分区的消息记录
            List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
            // 遍历获取到的消息记录
            for (ConsumerRecord<K, V> record : partitionRecords) {
                // 消息处理逻辑
            }
 
            /* 下面操作,是更新各个分区的offset信息到offsets变量,并没有真正的提交位移 */
            /* 真正的更新位移操作,不是在worker线程,而是在消费者线程 */

            // 待更新的位移
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            // 同步锁,锁住offsets位移
            synchronized (offsets) {
                // 如果offsets位移不包含partition这个key信息,就将位移信息设置到map集合里面
                if (!offsets.containsKey(partition)) {
                    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                } else {
                    // 否则,offsets位移包含partition这个key信息,获取到offsets的位置信息
                    long curr = offsets.get(partition).offset();
                    if (curr <= lastOffset + 1) { // 如果获取到的位置信息小于等于上一次位移信息大小,将这个partition的位置信息设置到map集合中。并保存到broker中。
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    }
                }
            }
        }
    }
}

 ConsumerThreadHandler类

        一个主线程,定时去poll消息,然后将消息投递到worker线程中,最后,当offsets信息发生变更后,提交offset

package huxi.test.consumer.multithreaded;
 
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
 
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class ConsumerThreadHandler<K, V> {
 
    // KafkaConsumer实例
    private final KafkaConsumer<K, V> consumer;
    // ExecutorService实例
    private ExecutorService executors;
    // 位移信息offsets
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
    public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
        // 构造kafkaConsumer配置
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false"); // 非自动提交位移信息
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        // 创建kafkaConsumer,赋值给成员变量consumer
        consumer = new KafkaConsumer<>(props);

        // 消费者订阅消息,并实现重平衡rebalance
        // rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
        // 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            // 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(offsets); // 提交位移
            }

            // rebalance完成后会调用onPartitionsAssigned方法。
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                offsets.clear(); // 清除位移信息
            }
        });
    }
 
    /**
     * 消费主方法
     * @param threadNumber  线程池中线程数
     */
    public void consume(int threadNumber) {
        // 创建一个worker线程池,线程数量为threadNumber个
        executors = new ThreadPoolExecutor(
                threadNumber,
                threadNumber,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 只有一个消费者线程
        try {
            // 消费者一直处于等待状态,等待消息消费
            while (true) {
                // 从主题中获取消息
                ConsumerRecords<K, V> records = consumer.poll(1000L);
                // 如果获取到的消息不为空
                if (!records.isEmpty()) {
                    // submit: 将一批msg交给worker线程处理,消息处理完后,更新offsets信息
                    executors.submit(new ConsumerWorker<>(records, offsets));
                }
                // 调用提交位移信息
                commitOffsets();
            }
        } catch (WakeupException e) {
            // swallow this exception
        } finally {
            commitOffsets();  // 调用提交位移信息
            consumer.close(); // 关闭consumer
        }
    }
 
    private void commitOffsets() {
        Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
        // 保证线程安全、同步锁,锁住offsets
        synchronized (offsets) {
            // 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
            if (offsets.isEmpty()) {
                return;
            }
            // 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
            unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
            // 清除位移信息offsets
            offsets.clear();
        }
        // 将封装好的位移信息unmodfiedMap集合进行同步提交
        // 手动提交位移信息
        consumer.commitSync(unmodfiedMap);
    }
 
    public void close() {
        consumer.wakeup();
        // 关闭ExecutorService实例
        executors.shutdown();
    }
}

Main类

        包装了所有的工具类,启动整个程序

package huxi.test.consumer.multithreaded;
 
public class Main {
 
    public static void main(String[] args) {
        // broker列表、topic、group id
        String brokerList = "localhost:9092";
        String topic = "test-topic";
        String groupID = "test-group";
        
        // 1. 根据ConsumerThreadHandler构造方法,创建出消费者handler
        final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);
        final int cpuCount = Runtime.getRuntime().availableProcessors();
 
        // 创建线程的匿名内部类
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 执行consume,在此线程中执行消费者消费消息。
                handler.consume(cpuCount); 
            }
        };
        new Thread(runnable).start();  // 3. 该函数会调用上面的run()方法
 
        try {
            // 20秒后自动停止该测试程序
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // swallow this exception
        }
        System.out.println("Starting to close the consumer...");
        handler.close();
    }
}  

2.3.方案优缺点对比

优点 缺点
方案1

实现简单

速度较快,因为无线程交互开销

方便位移管理

易于维护分区之间的消息消费顺序

socket连接开销大

consumer受限于topic的分区数,扩展性差

broker端处理负载高(因为发往broker的请求较多)

reblance可能性增大

方案2

消息获取和处理解耦

可独立扩展consumer和worker数,伸缩性好

实现负载

难于维护分区内的消息顺序

处理链路变长,导致位移管理困难

worker线程异常可能导致消费数据丢失文章来源地址https://www.toymoban.com/news/detail-426212.html

到了这里,关于探究:kafka生产者/消费者与多线程安全的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(53)
  • Kafka生产者与消费者api示例

      一个正常的生产逻辑需要具备以下几个步骤 配置生产者参数及创建相应的生产者实例 构建待发送的消息 发送消息 关闭生产者实例 采用默认分区方式将消息散列的发送到各个分区当中    对于properties配置的第二种写法,相对来说不会出错,简单举例:   1.kafka的生产者可

    2024年02月07日
    浏览(78)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(44)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(87)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(48)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(58)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(49)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(47)
  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(47)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包