怎么做到Kafka顺序读写

这篇具有很好参考价值的文章主要介绍了怎么做到Kafka顺序读写。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、业务场景

一个大的binlog数据库,还原出来了很多SQL语句
binlog生成SQL语句方式

SQL语句需要顺序执行,因为不顺序执行,比如先新增了一条数据,才有可能修改这条数据,假如先执行修改操作,后执行新增操作,那这个数据就错了

2、技术方案选型

  1. 如果表的binlog文件很小,直接执行就可以了;
  2. 如果表的binlog太大,那直接执行效率非常低,而且如果某个是否读写出了问题都不知道要从哪里重新执行(如执行时机器出现问题,执行SQL不仅需要读入binlog文件,还要执行SQL语句,频繁的执行SQL可能导致数据库CPU等飙升,可能存在一些执行错误的问题);
  3. 借助消息队列,可以重复消费来实现,并且通过代码抓取异常来判断是否消费成功,是否重复消费,并且也可以做削峰填谷,批量消费,控制流量,防止MySQL宕机或拒绝连接等;

3、消息队列选型

选择kafka,因为kafka写入同一个partition(目录)的数据是顺序的,写入的offset是递增的,我们只需要保证同一个partition消费是顺序的就可以了
Kafka原理

4、设计思路

前提条件:存在物理外键的表,先不用建物理外键,等SQL执行完成,最后补充物理外键,否则可能执行SQL因为数据外键问题失败

发送端

  1. 把同一张表,通过表名求hash,发送到指定的分区,这样,同一个表名的执行SQL在队列的partition中肯定是有序的;

消费端文章来源地址https://www.toymoban.com/news/detail-568947.html

  1. 使用单线程,完全没有问题,但性能太低;
  2. 直接使用多线程数组,根据表名取Hashing.consistentHash(表名.hashCode()),这样就能保证同一张表的SQL顺序执行,不能直接使用线程池,线程池可能会多个线程同时拉取一个Partition的同一张表数据,虽然取出SQL是顺序的,但SQL的执行效率,分配到CPU的执行时间不是顺序的,这样就会造成执行SQL乱序;
  3. 批量拉取,使用手动提交,使用CountDownLatch保证所有的线程执行完成才能提交偏移量;
  4. 线程池执行SQL,如果某个SQL消费异常,抓取异常(如果是重试能解决的异常就通过重试解决,比如数据库连接过多等),记录失败SQL,停止消费,提交偏移量,存在执行失败的SQL,抛出异常,停止继续消费,分析SQL执行失败原因,订正消费代码,手动执行完成失败SQL,之后重新执行,继续开始消费;
@Bean
public ExecutorService[] executorServices() {
    ExecutorService[] pools = new ExecutorService[threadCorePoolSize];
    for (int i = 0; i < pools.length; i++) {
        pools[i] = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<>(1),
                new ThreadFactoryBuilder().setNameFormat("pool-sql-" + i + "-consume-thread-%d").build(),
                (r, executor) -> {
                     try {
                         executor.getQueue().put(r);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
               });
    }
    return pools;
}

@Resource(name = "executorServices")
private ExecutorService[] executorServices;

@KafkaListener(id = "executeBinLogSql", topics = "binlog.execute.sql", containerFactory = "batchKafkaListenerContainerFactory", groupId = CONSUMER_GROUP_ID)
public void subscribe(List<ConsumerRecord<String, byte[]>> consumerRecords, Acknowledgment ack) throws Exception {
     long beginTime = Clock.systemDefaultZone().millis();
     CountDownLatch countDownLatch = new CountDownLatch(consumerRecords.size());
     List<String> failedSqlList = Lists.newArrayListWithExpectedSize(consumerRecords.size());
     consumerRecords.forEach(consumerRecord -> {
        if (consumerRecord == null) {
             return;
        }
        String tableName = StringUtils.defaultIfEmpty(consumerRecord.key(), "");
        if (StringUtils.isEmpty(tableName)) {
            return;
        }
        int hash = Hashing.consistentHash(tableName.hashCode(), executorServices.length);
        executorServices[hash].execute(() -> {
            String sql = null;
            try {
                 sql = JsonSerializer.deserialize(consumerRecord.value(), KAFKA_RECORD_TYPE_REFERENCE);
            } catch (Exception e) {
                LOGGER.warn("序列化异常,value:{}", consumerRecord.value());
            }
            if (StringUtils.isEmpty(sql)) {
                 return;
            }
            try {
                 sqlProcessor.process(sql);
            } catch (Exception e) {
                 LOGGER.error("执行SQL失败 sql: {}", sql, e);
                 failedSqlList.add(sql);
            } finally {
                 countDownLatch.countDown();
            }
        });
    });
    try {
        countDownLatch.await(30, TimeUnit.MINUTES);
        ack.acknowledge();
        if (!CollectionUtils.isEmpty(failedSqlList)) {
            throw new Exception("存在执行失败的SQL");
        }
    } catch (Exception e) {
        LOGGER.error("终止线程 failedSqlList: {}", failedSqlList, e);
        System.exit(-1);
    }
    long endTime = Clock.systemDefaultZone().millis();
    LOGGER.debug("执行【" + consumerRecords.size() + "】条数据所用时间为:" + (endTime - beginTime));
}

到了这里,关于怎么做到Kafka顺序读写的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(36)
  • 2023-07-10:Kafka如何做到消息不丢失?

    2023-07-10:Kafka如何做到消息不丢失? 答案2023-07-10: Kafka采用多种机制来确保消息的不丢失,其中包括副本机制、ISR(In-Sync Replicas)机制以及ACK机制等。 1.副本机制 Kafka通过副本机制来确保消息不会丢失。在Kafka中,每个分区都可以配置多个副本,每个副本保存分区的完整拷

    2024年02月15日
    浏览(29)
  • Kafka面试】Kafka如何保证消费的顺序性?

    消费者组的某个消费者可能负责消费 一个topic的多个分区 。每个分区都维护了偏移量(都是从0开始的),在消息存储时按照一定的策略来找到不同的分区进行存储,消费同样如此,并不能保证消息的顺序性。 要想保证顺序性,可以只提供一个分区,或者相同的业务只在一个

    2024年02月15日
    浏览(30)
  • 【kafka面试题2】如何保证kafka消息的顺序性

    如何保证kafka消息的顺序性呢,其实整体的策略就是:我们 让需要有序的消息发送到同一个分区Partition。 为什么说让有序的消息发送到同一个分区Partition就行呢,,下面我们来详细分析一下子。 首先 ,我们知道kafka消息的收发是基于Topic(主题),消息通过Topic进行分类。单

    2024年02月13日
    浏览(32)
  • python 读写kafka

    测试 测试

    2024年01月20日
    浏览(30)
  • java操作kafka读写操作

    1,前提,kafka 的 server.properties里面开通了 2,防火墙 3,java代码 生产者

    2024年02月13日
    浏览(25)
  • 如何保证Kafka顺序消费

    在Kafka中Partition(分区)是真正保存消息的地方,发送的消息都存放在这里。Partition(分区)又存在于Topic(主题)中,并且一个Topic(主题)可以指定多个Partition(分区)。 在Kafka中,只保证Partition(分区)内有序,不保证Topic所有分区都是有序的。 所以 Kafka 要保证消息的消费顺序,可

    2024年02月15日
    浏览(28)
  • Kafka 如何实现顺序消息

    本文所有的讨论均在如下版本进行,其他版本可能会有所不同。 Kafka: 3.6.0 Pulsar: 2.9.0 RabbitMQ 3.7.8 RocketMQ 5.0 Go1.21 github.com/segmentio/kafka-go v0.4.45 Kafka 只能保证单一分区内的顺序消息,无法保证多分区间的顺序消息。具体来说,要在 Kafka 完全实现顺序消息,至少需要保证以下几个

    2024年02月03日
    浏览(28)
  • kafka-顺序消息实现

    场景 在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证 解决方案 生产者将相同的

    2024年01月25日
    浏览(26)
  • Kafka 如何保证消息的消费顺序

    我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是: 更改用户会员等级。 根据会员等级计算订单价格。 假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

    2024年02月13日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包