Kafka事务

这篇具有很好参考价值的文章主要介绍了Kafka事务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

Kafka事务,微服务,kafka,数据库,分布式

事务操作API

Producer接口中定义了以下5个事务相关方法:

  • initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
  • beginTransaction(开始事务):启动一个Kafka事务
  • sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
  • commitTransaction(提交事务):提交事务
  • abortTransaction(放弃事务):取消事务

事务相关属性配置 

生产者

// 配置事务的id,开启了事务会默认开启幂等性
props.put("transactional.id", "first-transactional");

消费者

// 1. 消费者需要设置隔离级别
props.put("isolation.level","read_committed");
//  2. 关闭自动提交
 props.put("enable.auto.commit", "false");

Kafka事务编程

在Kafka的topic 「ods_user」中有一些用户数据,数据格式如下:

姓名,性别,出生日期
张三,1,1980-10-09
李四,0,1985-11-01

我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic 「dwd_user」中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

启动生产者控制台程序模拟数据

# 创建名为ods_user和dwd_user的主题
bin/kafka-topics.sh --create --bootstrap-server 192.168.2.3:9092 --topic ods_user
bin/kafka-topics.sh --create --bootstrap-server 192.168.2.3:9092 --topic dwd_user
# 生产数据到 ods_user
bin/kafka-console-producer.sh --broker-list 192.168.2.3:9092 --topic ods_user
# 从dwd_user消费数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.3:9092 --topic dwd_user --from-beginning  --isolation-level read_committed

编写创建消费者代码

编写一个方法 createConsumer,该方法中返回一个消费者,订阅「ods_user」主题。注意:需要配置事务隔离级别、关闭自动提交。

实现步骤:

创建Kafka消费者配置

 Properties props = new Properties();
 props.setProperty("bootstrap.servers", "192.168.2.3:9092");
 props.setProperty("group.id", "ods_user");
 props.put("isolation.level","read_committed");
 props.setProperty("enable.auto.commit", "false");
 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

创建消费者,并订阅 ods_user 主题

  // 1. 创建消费者
    public static Consumer<String, String> createConsumer() {
        // 1. 创建Kafka消费者配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.2.3:9092");
        props.setProperty("group.id", "ods_user");
        props.put("isolation.level","read_committed");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2. 创建Kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 3. 订阅要消费的主题
        consumer.subscribe(Arrays.asList("ods_user"));
        
        return consumer;
}

编写创建生产者代码

编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

创建生产者配置

Properties props = new Properties();
props.put("bootstrap.servers", "192.168.2.3:9092");
props.put("transactional.id", "dwd_user");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

创建生产者对象

public static Producer<String, String> createProduceer() {
        // 1. 创建生产者配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.2.3:9092");
        props.put("transactional.id", "dwd_user");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建生产者
        Producer<String, String> producer = new KafkaProducer<>(props);
        return producer;
    }

编写代码消费并生产数据

实现步骤:

  1. 调用之前实现的方法,创建消费者、生产者对象
  2. 生产者调用initTransactions初始化事务
  3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
    1. 生产者开启事务
    2. 消费者拉取消息
    3. 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
    4. 生产消息到dwd_user topic中
    5. 提交偏移量到事务中
    6. 提交事务
    7. 捕获异常,如果出现异常,则取消事务
public static void main(String[] args) {
        Consumer<String, String> consumer = createConsumer();
        Producer<String, String> producer = createProducer();
        // 初始化事务
        producer.initTransactions();

        while(true) {
            try {
                // 1. 开启事务
                producer.beginTransaction();
                // 2. 定义Map结构,用于保存分区对应的offset
                Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
                // 2. 拉取消息
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
                for (ConsumerRecord<String, String> record : records) {
                    // 3. 保存偏移量
                    offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1));
                    // 4. 进行转换处理
                    String[] fields = record.value().split(",");
                    fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
                    String message = fields[0] + "," + fields[1] + "," + fields[2];
                    // 5. 生产消息到dwd_user
                    producer.send(new ProducerRecord<>("dwd_user", message));
                }
                // 6. 提交偏移量到事务
                producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
                // 7. 提交事务
                producer.commitTransaction();
            } catch (Exception e) {
                // 8. 放弃事务
                producer.abortTransaction();
            }
        }
    }

测试

往之前启动的console-producer中写入消息进行测试,同时检查console-consumer是否能够接收到消息:

Kafka事务,微服务,kafka,数据库,分布式

逐个测试一下消息:

张三,1,1980-10-09
李四,0,1985-11-01

模拟异常测试事务

// 3. 保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1));
// 4. 进行转换处理
String[] fields = record.value().split(",");
fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
String message = fields[0] + "," + fields[1] + "," + fields[2];

// 模拟异常
int i = 1/0;

// 5. 生产消息到dwd_user
producer.send(new ProducerRecord<>("dwd_user", message));

启动程序一次,抛出异常。

再启动程序一次,还是抛出异常。

直到我们处理该异常为止。

我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。文章来源地址https://www.toymoban.com/news/detail-819809.html

到了这里,关于Kafka事务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式系统的多数据库,实现分布式事务回滚(1.7.0 seata整合2.0.4nacos)

    1、解决的应用场景是分布式事务,每个服务有独立的数据库。 2、例如:A服务的数据库是A1,B服务的数据库是B2,A服务通过feign接口调用B服务,B涉及提交数据到B2,业务是在B提交数据之后,在A服务内报错。 所以,希望B能回滚事务。这就是跨库的数据回滚 seata下载地址 注意

    2024年02月11日
    浏览(43)
  • 【flume实时采集mysql数据库的数据到kafka】

    最近做了flume实时采集mysql数据到kafka的实验,做个笔记,防止忘记 !!!建议从头看到尾,因为一些简单的东西我在前面提了,后面没提。 Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013 flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502 编写配置

    2024年02月03日
    浏览(60)
  • 关于Maxwell与Kafka和数据库的监控

    1.Maxwell的配置 其实就是配置两端的配置信息,都要能连接上,然后才能去传输数据 config.properties 2.群起脚本 Kafka消费者开始等待消费 4.全量同步 每条数据都有时间戳,这个时间戳是同步的时间 5.为什么Kafka后面需要拦截器? 零点漂移,当为一天末点时,时间戳还是末点,但是到第二个

    2024年02月11日
    浏览(33)
  • 基于Canal+kafka监听数据库变化的最佳实践

    1、前言         工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务。我们通常的操作通过业务埋点、接口的调用或者中间件完成。         但是状态变化的入口比较多的时候,就很容易漏掉

    2023年04月08日
    浏览(85)
  • 实时Flink的数据库与Kafka集成优化案例

    在现代数据处理系统中,实时数据处理和分析是至关重要的。Apache Flink是一个流处理框架,可以用于实时数据处理和分析。在许多场景下,Flink需要与数据库和Kafka等消息系统进行集成,以实现更高效的数据处理。本文将讨论Flink与数据库和Kafka集成的优化案例,并提供实际示

    2024年02月20日
    浏览(44)
  • TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

    快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群 创建 changefeed,将 TiDB 增量数据输出至 Kafka 使用 go-tpc 写入数据到上游 TiDB 使用 Kafka console consumer 观察数据被写入到指定的 Topic (可选)配置 Flink 集群消费 Kafka 内数据 部署包含 TiCDC 的 TiDB 集群 在实验或测试环境中,可以使用 TiU

    2024年02月12日
    浏览(58)
  • 分布式消息服务kafka

    什么是消息中间件? 消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。 例如:在淘宝购物时,订单系统处理完订单后,把订单消息发送到消息中间件中,由消息中间件将订单消息分发到下

    2024年02月01日
    浏览(48)
  • 火山引擎云搜索服务升级云原生新架构;提供数十亿级分布式向量数据库能力

    从互联网发展伊始,搜索技术就绽放出了惊人的社会和经济价值。随着信息社会快速发展,数据呈爆炸式增长,搜索技术通过数据收集与处理,满足信息共享与快速检索的需求。 云搜索服务 ESCloud 是火山引擎提供的 完全托管在线分布式搜索服务 ,兼容 Elasticsearch、Kibana 等软

    2024年02月16日
    浏览(43)
  • 分布式运用之Filebeat+Kafka+ELK 的服务部署

    Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。 topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,

    2024年02月06日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包