怎么做到Kafka顺序读写

这篇具有很好参考价值的文章主要介绍了怎么做到Kafka顺序读写。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、业务场景

一个大的binlog数据库,还原出来了很多SQL语句
binlog生成SQL语句方式

SQL语句需要顺序执行,因为不顺序执行,比如先新增了一条数据,才有可能修改这条数据,假如先执行修改操作,后执行新增操作,那这个数据就错了

2、技术方案选型

  1. 如果表的binlog文件很小,直接执行就可以了;
  2. 如果表的binlog太大,那直接执行效率非常低,而且如果某个是否读写出了问题都不知道要从哪里重新执行(如执行时机器出现问题,执行SQL不仅需要读入binlog文件,还要执行SQL语句,频繁的执行SQL可能导致数据库CPU等飙升,可能存在一些执行错误的问题);
  3. 借助消息队列,可以重复消费来实现,并且通过代码抓取异常来判断是否消费成功,是否重复消费,并且也可以做削峰填谷,批量消费,控制流量,防止MySQL宕机或拒绝连接等;

3、消息队列选型

选择kafka,因为kafka写入同一个partition(目录)的数据是顺序的,写入的offset是递增的,我们只需要保证同一个partition消费是顺序的就可以了
Kafka原理

4、设计思路

前提条件:存在物理外键的表,先不用建物理外键,等SQL执行完成,最后补充物理外键,否则可能执行SQL因为数据外键问题失败

发送端

  1. 把同一张表,通过表名求hash,发送到指定的分区,这样,同一个表名的执行SQL在队列的partition中肯定是有序的;

消费端文章来源地址https://www.toymoban.com/news/detail-568947.html

  1. 使用单线程,完全没有问题,但性能太低;
  2. 直接使用多线程数组,根据表名取Hashing.consistentHash(表名.hashCode()),这样就能保证同一张表的SQL顺序执行,不能直接使用线程池,线程池可能会多个线程同时拉取一个Partition的同一张表数据,虽然取出SQL是顺序的,但SQL的执行效率,分配到CPU的执行时间不是顺序的,这样就会造成执行SQL乱序;
  3. 批量拉取,使用手动提交,使用CountDownLatch保证所有的线程执行完成才能提交偏移量;
  4. 线程池执行SQL,如果某个SQL消费异常,抓取异常(如果是重试能解决的异常就通过重试解决,比如数据库连接过多等),记录失败SQL,停止消费,提交偏移量,存在执行失败的SQL,抛出异常,停止继续消费,分析SQL执行失败原因,订正消费代码,手动执行完成失败SQL,之后重新执行,继续开始消费;
@Bean
public ExecutorService[] executorServices() {
    ExecutorService[] pools = new ExecutorService[threadCorePoolSize];
    for (int i = 0; i < pools.length; i++) {
        pools[i] = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<>(1),
                new ThreadFactoryBuilder().setNameFormat("pool-sql-" + i + "-consume-thread-%d").build(),
                (r, executor) -> {
                     try {
                         executor.getQueue().put(r);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
               });
    }
    return pools;
}

@Resource(name = "executorServices")
private ExecutorService[] executorServices;

@KafkaListener(id = "executeBinLogSql", topics = "binlog.execute.sql", containerFactory = "batchKafkaListenerContainerFactory", groupId = CONSUMER_GROUP_ID)
public void subscribe(List<ConsumerRecord<String, byte[]>> consumerRecords, Acknowledgment ack) throws Exception {
     long beginTime = Clock.systemDefaultZone().millis();
     CountDownLatch countDownLatch = new CountDownLatch(consumerRecords.size());
     List<String> failedSqlList = Lists.newArrayListWithExpectedSize(consumerRecords.size());
     consumerRecords.forEach(consumerRecord -> {
        if (consumerRecord == null) {
             return;
        }
        String tableName = StringUtils.defaultIfEmpty(consumerRecord.key(), "");
        if (StringUtils.isEmpty(tableName)) {
            return;
        }
        int hash = Hashing.consistentHash(tableName.hashCode(), executorServices.length);
        executorServices[hash].execute(() -> {
            String sql = null;
            try {
                 sql = JsonSerializer.deserialize(consumerRecord.value(), KAFKA_RECORD_TYPE_REFERENCE);
            } catch (Exception e) {
                LOGGER.warn("序列化异常,value:{}", consumerRecord.value());
            }
            if (StringUtils.isEmpty(sql)) {
                 return;
            }
            try {
                 sqlProcessor.process(sql);
            } catch (Exception e) {
                 LOGGER.error("执行SQL失败 sql: {}", sql, e);
                 failedSqlList.add(sql);
            } finally {
                 countDownLatch.countDown();
            }
        });
    });
    try {
        countDownLatch.await(30, TimeUnit.MINUTES);
        ack.acknowledge();
        if (!CollectionUtils.isEmpty(failedSqlList)) {
            throw new Exception("存在执行失败的SQL");
        }
    } catch (Exception e) {
        LOGGER.error("终止线程 failedSqlList: {}", failedSqlList, e);
        System.exit(-1);
    }
    long endTime = Clock.systemDefaultZone().millis();
    LOGGER.debug("执行【" + consumerRecords.size() + "】条数据所用时间为:" + (endTime - beginTime));
}

到了这里,关于怎么做到Kafka顺序读写的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(54)
  • 2023-07-10:Kafka如何做到消息不丢失?

    2023-07-10:Kafka如何做到消息不丢失? 答案2023-07-10: Kafka采用多种机制来确保消息的不丢失,其中包括副本机制、ISR(In-Sync Replicas)机制以及ACK机制等。 1.副本机制 Kafka通过副本机制来确保消息不会丢失。在Kafka中,每个分区都可以配置多个副本,每个副本保存分区的完整拷

    2024年02月15日
    浏览(42)
  • Kafka面试】Kafka如何保证消费的顺序性?

    消费者组的某个消费者可能负责消费 一个topic的多个分区 。每个分区都维护了偏移量(都是从0开始的),在消息存储时按照一定的策略来找到不同的分区进行存储,消费同样如此,并不能保证消息的顺序性。 要想保证顺序性,可以只提供一个分区,或者相同的业务只在一个

    2024年02月15日
    浏览(41)
  • 【kafka面试题2】如何保证kafka消息的顺序性

    如何保证kafka消息的顺序性呢,其实整体的策略就是:我们 让需要有序的消息发送到同一个分区Partition。 为什么说让有序的消息发送到同一个分区Partition就行呢,,下面我们来详细分析一下子。 首先 ,我们知道kafka消息的收发是基于Topic(主题),消息通过Topic进行分类。单

    2024年02月13日
    浏览(43)
  • python 读写kafka

    测试 测试

    2024年01月20日
    浏览(43)
  • java操作kafka读写操作

    1,前提,kafka 的 server.properties里面开通了 2,防火墙 3,java代码 生产者

    2024年02月13日
    浏览(38)
  • 如何保证Kafka顺序消费

    在Kafka中Partition(分区)是真正保存消息的地方,发送的消息都存放在这里。Partition(分区)又存在于Topic(主题)中,并且一个Topic(主题)可以指定多个Partition(分区)。 在Kafka中,只保证Partition(分区)内有序,不保证Topic所有分区都是有序的。 所以 Kafka 要保证消息的消费顺序,可

    2024年02月15日
    浏览(40)
  • kafka-顺序消息实现

    场景 在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证 解决方案 生产者将相同的

    2024年01月25日
    浏览(37)
  • Kafka 如何实现顺序消息

    本文所有的讨论均在如下版本进行,其他版本可能会有所不同。 Kafka: 3.6.0 Pulsar: 2.9.0 RabbitMQ 3.7.8 RocketMQ 5.0 Go1.21 github.com/segmentio/kafka-go v0.4.45 Kafka 只能保证单一分区内的顺序消息,无法保证多分区间的顺序消息。具体来说,要在 Kafka 完全实现顺序消息,至少需要保证以下几个

    2024年02月03日
    浏览(39)
  • kafka 如何保证消息的顺序消费

    在Kafka分布式集群中,要保证消息的顺序消费,您可以采取以下措施: 分区策略 :Kafka的主题可以分为多个分区,每个分区内的消息是有序的。因此,首先要确保生产者将相关的消息发送到同一个分区。这可以通过生产者的分区策略来实现。默认情况下,Kafka会使用基于消息

    2024年02月06日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包