kafka复习:(23)事务

这篇具有很好参考价值的文章主要介绍了kafka复习:(23)事务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、生产者,开启事务。

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaTest22 {
    public static void main(String[] args) {
        Properties properties= new Properties();

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "2");
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "4");
        properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myTransaction");

        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> producerRecord=new ProducerRecord<>("study2024",0,"fff","hello sister,now is: "+ new Date());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        try{
            Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            long offset = 0;
            try {
                offset = future.get().offset();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(offset);

            Thread.sleep(60000);
            kafkaProducer.commitTransaction();
        } catch (Exception ex){
            kafkaProducer.abortTransaction();
        }


        kafkaProducer.close();
    }
}

二、消费者,设置隔离级别为"read_committed"

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaTest23 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");
        properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        //properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        String topic="study2024";
        myConsumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord record: consumerRecords){
                System.out.println(record.value());
                System.out.println("record offset is: "+record.offset());
            }

        }



    }
}

三、运行结果,按照上述配置,当生产者发送消息并从kafka broker获取到offset后就会sleep,在生产者sleep的时候,消费者是获取不到消息的,只有sleep完成并提交事务之后,消费者才会获取到消息文章来源地址https://www.toymoban.com/news/detail-682757.html

到了这里,关于kafka复习:(23)事务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式软件架构——分布式事务TCC和SAGA

    TCC 是另一种常见的分布式事务机制,它是“ Try-Confirm-Cancel ”三个单词的缩写,是由数据库专家 Pat Helland 在 2007 年撰写的论文《Life beyond Distributed Transactions: An Apostate’s Opinion》中提出。 前面介绍的可靠消息队列虽然能保证最终的结果是相对可靠的,过程也足够简单(相对于

    2024年02月12日
    浏览(44)
  • 【分布式】java实现分布式事务的五种方案

    用户支付完成会将支付状态及订单状态保存在订单数据库中,由订单服务去维护订单数据库。由库存服务去维护库存数据库的信息。下图是系统结构图: 如何实现两个分布式服务(订单服务、库存服务)共同完成一件事即订单支付成功自动减库存,这里的关键是如何保证两个

    2024年04月11日
    浏览(49)
  • 微服务·数据一致-事务与分布式事务

    事务是计算机科学和数据库管理中的一个关键概念,用于确保数据的一致性和可靠想。事务管理是大多数应用程序和数据库系统中不可或缺的一部分。分布式事务扩展了事务的概念,用于多个分布式系统和服务的数据一致性管理。本调查报告将深入探讨事务和分布式事务的概

    2024年02月09日
    浏览(46)
  • 【高级篇】分布式事务

    本地事务,也就是传统的 单机事务 。在传统数据库事务中,必须要满足四个原则: 分布式事务 ,就是指不是在单个服务或单个数据库架构下,产生的事务,例如: 跨数据源的分布式事务 跨服务的分布式事务 综合情况 在数据库水平拆分、服务垂直拆分之后,一个业务操作

    2024年02月08日
    浏览(44)
  • Springboot分布式事务

    1. 概念 本地事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器位于同一节点相同数据库上。 又称为传统事务。它是一个操作序列,这些操作要么都执行,要么都不执行,是一个不可分割的工作单位。例如,银行转账工作:从一个帐号扣款并使另一个帐

    2024年02月09日
    浏览(41)
  • 分布式事务详解

    随着互联网的发展,软件系统由原来的单体应用转变为分布式应用。分布式系统把一个单体应用拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作。这种分布式系统下不同服务之间通过远程协作完成的事务称之为分布式事务,例如用户注册送

    2024年02月19日
    浏览(39)
  • 分布式事务 Seata

    事务(Transaction)是计算机科学中的一个重要概念,主要是指一个 完整的、不可分割的操作序列 。在关系型数据库中,事务通常用于描述对数据库进行的一系列操作的执行单元。 事务的ACID特性 : 原子性(Atomicity):事务是一个原子操作,要么全部执行,要么全部回滚。如

    2024年02月17日
    浏览(51)
  • Seata分布式事务

    本地事务,也就是传统的单机事务。在传统数据库事务中,必须要满足四个原则: 分布式事务,就是指不是在单个服务或单个数据库架构下,产生的事务,例如: 跨数据源的分布式事务 跨服务的分布式事务 综合情况 完成上面的操作需要访问三个不同的微服务和三个不同的

    2024年02月09日
    浏览(44)
  • 分布式事务的华丽进化

    说到分布式事务,大家并不陌生。在实际工作中,用得比较多的还是柔性分布式事务,今天主要把在工作中运用到的几种柔性分布式事务的场景及实现方式做一个简单介绍,也可以看做是柔性分布式事务的一个演进过程。 这种方式适合业务内自己使用,当方法内的任务一个逻

    2024年02月13日
    浏览(33)
  • 聊聊什么是分布式事务

    分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上,以上是百度百科的解释。 简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务

    2024年02月08日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包