「Kafka」生产者篇

这篇具有很好参考价值的文章主要介绍了「Kafka」生产者篇。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

「Kafka」生产者篇

生产者发送消息流程

在消息发送的过程中,涉及到了 两个线程 ——main 线程Sender 线程

在 main 线程中创建了 一个 双端队列 RecordAccumulator

main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

  • main线程创建 Producer 对象,调用 send 函数发送消息,经过:
    • 拦截器 Interceptors(可选项,扩展一些额外功能)
    • 序列化器 Serializer(为什么不用Java的序列化?因为大数据传输需要更轻量的序列化方式)
    • 分区器 Partitioner,需要判断发送到哪个分区
  • 一个分区就会创建一个双端队列 RecordAccumulator,创建队列都是在内存里完成的,总大小默认为 32m
    • 双端队列 RecordAccumulator 还有一个内存池的概念,每次 send 数据到队列后,在存放数据的时候会从内存池中取出内存,数据发送到kafka后释放内存归还到内存池;一端创建内存,另一端释放内存,这也是它为什么设计为双端队列。
  • Sender线程从队列中拉取数据
    • 每次批处理batch.size的大小默认为 16k,延迟时间 linger.ms 默认为 0ms,没有延迟。
      • 这两个条件是 或 的关系,两个条件达到任意一个就可以发送数据。
    • 以节点的方式, key:value => Broker1:(队列数据...) 的格式发送给对应的 kafka 服务器,如果kafka没有应答,默认每个broker节点队列最多缓存 5 个请求,后续 生产经验—数据乱序 的章节会讲这个作用。
  • Selector负责打通底层的链路,IO输入流 => IO输出流,经过Selector发送到kafka集群,kafka集群进行副本的同步。
  • 如果kafka集群收到数据后,会返回 ack,有3种模式,如上图。
    • 如果ack返回成功,则先清理掉缓存的Request请求,然后清理到对应队列中的数据。
    • 如果ack返回失败,则进行 retries 重试,默认重试次数是int的最大值(死磕),一直发Request请求,直到重试成功。
    • 详细讲解请参考下文的 生产经验—数据可靠性。

生产者重要参数列表

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

异步发送

  • 同步发送:外部数据发送到 RecordAccumulator 队列中,等待这批数据都发送到 kafka 集群,再返回。
  • 异步发送:外部数据发送到 RecordAccumulator 队列中,不管这些数据有没有发送到 kafka 集群,直接返回。
    • 默认为异步发送

普通异步发送

编写不带回调函数的代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducer {
	public static void main(String[] args) throws InterruptedException {
		// 1. 创建 kafka 生产者的配置对象
		Properties properties = new Properties();
		// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
		// key,value 序列化(必须):key.serializer,value.serializer
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		
		// 3. 创建 kafka 生产者对象
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
		
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
            // 这里只指定了topic和value
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
		}
		
		// 5. 关闭资源
		kafkaProducer.close();
	}
}

回调异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)。

如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
    // 添加回调 Callback
    kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
        // 该方法在 Producer 收到 ack 时调用,为异步调用
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                // 没有异常,输出信息到控制台
                System.out.println("主题:" + metadata.topic() + "->" +
                                "分区:" + metadata.partition());
            } else {
                // 出现异常打印
                exception.printStackTrace();
            }
        }
    });
    // 延迟一会会看到数据发往不同分区
    Thread.sleep(2);
}

同步发送

只需在异步发送的基础上,再调用一下 get() 方法即可。

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

生产者分区

分区好处

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

可以通过机器的存储能力自定义分区数据,比如 broker0 存储 20T 数据,broker1和2分别存储 40T 数据。

生产者发送消息的分区策略

可阅读:详解Kafka分区机制原理|Kafka 系列 二

默认的分区器 DefaultPartitioner

/**
 * The default partitioning strategy: 默认分区策略
 * 如果你指定了分区,则直接用这个分区
 * 如果没指定分区,但有key,则按照key的hash值 % 分区数
 * 如果既没指定分区也没指定key,则按照粘性分区处理。
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {
	...
}

ProducerRecord 类的构造方法就表示了这 3 种分区策略:

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

自定义分区器

  • 定义类实现 Partitioner 接口

  • 重写 partition() 方法

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import java.util.Map;
    
    /**
     * 1. 实现接口 Partitioner
     * 2. 实现3个方法: partition、close、configure
     * 3. 编写 partition 方法,返回分区号
     */
    public class MyPartitioner implements Partitioner {
    	/**
    	 * 返回信息对应的分区
    	 * @param topic 主题
    	 * @param key 消息的 key
    	 * @param keyBytes 消息的 key 序列化后的字节数组
    	 * @param value 消息的 value
    	 * @param valueBytes 消息的 value 序列化后的字节数组
    	 * @param cluster 集群元数据可以查看分区信息
    	 * @return
    	 */
    	@Override
    	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    		// 获取消息
    		String msgValue = value.toString();
    		// 创建 partition
    		int partition;
    		// 判断消息是否包含 atguigu
    		if (msgValue.contains("atguigu")) {
    			partition = 0;
    		} else {
    			partition = 1;
    		}
    		// 返回分区号
    		return partition;
    	}
    	
    	// 关闭资源
    	@Override
    	public void close() {
    	}
    	
    	// 配置方法
    	@Override
    	public void configure(Map<String, ?> configs) {
    	}
    }
    
  • 使用分区器的方法,在生产者的配置中添加分区器参数

    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallbackPartitions {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 添加自定义分区器
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if (e == null) {
                            System.out.println("主题:" + metadata.topic() + "->" +
                                    "分区:" + metadata.partition());
                        } else {
                            e.printStackTrace();
                        }
                    }
                });
            }
            kafkaProducer.close();
        }
    }
    

生产者如何提高吞吐量

  1. 合理调整 batch.sizelinger.ms 的参数值
  2. 采用数据压缩
  3. 调整缓冲区大小

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducerParameters {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // linger.ms:等待时间,默认 0ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
        }
        
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

生产经验—数据可靠性

回顾发送流程

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

数据可靠性主要根据 kafka 集群返回给我们的 ack

ack 应答原理

  • ack=0,不需要等待数据落盘应答,一直发送给 kafka,很容易丢数据。
    • 数据发送到 Leader 后,Leader 挂掉了,此时数据还在内存中,未落盘,数据丢失。
  • ack=1,不需要等待 kafka 主从同步完成,Leader 收到数据落盘后应答。
    • Leader 成功落盘,但还未同步给 Follower,Leader 挂了,数据丢失。
  • ack=-1,需要等待 Leader 和 ISR 队列里面的所有节点收齐数据后应答。

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

数据完全可靠条件

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

注意,这里的“副本”并不是指的 Follower;在 Kafka 中,副本分为 Leader 副本和 Follower 副本。Leader 副本负责处理消息,而 Follower 副本则简单地复制 Leader 副本的数据。

也就是一个分区至少要有 1 个 Leader 和 1 个 Follower,ISR 队列最少也要有 1 个 Leader 和 1 个 Follower。

一个分区至少有 1 个 Leader,所以每个 Partition 都会有一个 ISR,而且是由 Leader 动态维护。

可靠性总结

  • acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
  • acks=1,生产者发送过来数据 Leader 应答,可靠性中等,效率中等;
  • acks=-1,生产者发送过来数据 Leader 和 ISR 队列里面所有 Follwer 应答,可靠性高,效率低;
  • 在生产环境中,
    • acks=0,很少使用;
    • acks=1,一般用于传输普通日志,允许丢个别数据;
    • acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

代码实现

// 设置 acks=-1
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

拓展:

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

生产者将数据发送给 Leader,并且完成同步给 Follower,此时回复 ack 时,Leader 挂了,kafka 会挑一个 Follower 成为新的 Leader,因为生产者没有收到 ack,此时就会认为他的数据没有发送到 kafka,就会进行重试,导致新 Leader 重复接收了两份数据。

生产经验—数据去重

数据传递语义

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

幂等性

幂等性原理

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

如何使用幂等性

开启参数 enable.idempotence,默认为 true(默认开启)。

生产者事务

幂等性只能保证单分区单会话的不重复,一旦 kafka 挂掉重启,还是有可能产生重复数据。如果想完全去重,就必须使用事务。

Kafka 事务原理

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

  • 幂等性:如果 kafka 挂掉重启,会重新生成一个 PID,所以可能会有重复。
  • 事务:kafka 根据全局唯一的 transactional.id 会划分到50个分区中的某一个分区,这些分区的信息是存储在一个特殊 Topic 里的,而 Topic 的底层就是硬盘,所以即使客户端挂掉了,重启后也能继续处理未完成的事务,因为有 transactional.id 存在。

Kafka 的事务一共有如下 5 个 API:

// 1. 初始化事务
void initTransactions();

// 2. 开启事务
void beginTransaction() throws ProducerFencedException;

// 3. 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4. 提交事务
void commitTransaction() throws ProducerFencedException;

// 5. 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

单个 Producer,使用事务保证消息的仅一次发送:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class CustomProducerTransactions {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置事务 id(必须),事务 id 任意起名,要求全局唯一
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 初始化事务
        kafkaProducer.initTransactions();

        // 开启事务
        kafkaProducer.beginTransaction();

        try {
            // 4. 调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                // 发送消息
                kafkaProducer.send(new  ProducerRecord<>("first", "atguigu " + i));
            }
            // int i = 1 / 0;
            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
}

生产经验—数据有序

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

仅能保证单分区内有序,如果想保证全局有序,只能把所有分区的消息都拉到消费者端,进行一个全排序,再进行消费。

但需要等所有数据到齐了再进行排序,效率可能还不如单分区。

生产经验—数据乱序

一个 broker 可以有一个 broker 缓存队列,队列中存放的是还未收到 ack 的请求,最多能存放 5 个。

比如发送 Request1 后,对方没有应答,此时还可以发送 Request2、Request3、Request4、Request5,最多能发送 5 次请求。

假设在一个分区中,生产者发送了 Request1、Request2 请求都成功了,但 Request3 请求发送失败了,进行重试,但此时 Request4 请求发送成功了,然后 Request3 请求才发送成功,此时到达 kafka 的顺序就为 1 2 4 3,是乱序的。

  • kafka在1.x版本之前保证数据单分区有序,条件如下:
    • max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
      • 也就是 broker 的缓存队列只允许有 1 个请求,这个请求收到 ack 后才能发送下一个。
  • kafka在1.x及以后版本保证数据单分区有序,条件如下:
    • 开启幂等性
      • max.in.flight.requests.per.connection 需要设置小于等于 5
    • 未开启幂等性
      • max.in.flight.requests.per.connection 需要设置为 1(和kafka在1.x版本之前一样)。

原因说明:因为在 kafka1.x 以后,启用幂等后,kafka 服务端最多会缓存 producer 发来的最近 5request 的元数据。

故无论如何,都可以保证最近 5request 的数据都是有序的。

「Kafka」生产者篇,Kafka,kafka,java,后端,分布式

  1. 比如先来的 Request1、Request2,服务端根据 SeqNumber 判断数据是否是单调递增的,如果符合则直接进行落盘;
  2. 但下一个请求是 Request4,正常应该是 Request3,所以 Request4 这个请求只能在内存中放着,不能进行落盘;
  3. 再下一个是 Request5,同样不能进行落盘;
  4. 直到 Request3 来了,然后对他们进行排序,然后再依次落盘 Request3、Request4、Request5。

笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)文章来源地址https://www.toymoban.com/news/detail-804783.html

到了这里,关于「Kafka」生产者篇的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(54)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(47)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(48)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(62)
  • Apache Kafka - 重识Kafka生产者

    Kafka 生产者是 Apache Kafka 中的一个重要组件,它负责将数据发送到 Kafka 集群中。在实时数据处理和流式处理应用程序中,Kafka 生产者扮演着非常重要的角色。 这里我们将介绍 Kafka 生产者的概念、工作原理以及如何使用 Kafka 生产者。 Kafka 生产者是一种用于将数据发送到 Kafk

    2024年02月05日
    浏览(42)
  • [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题

    [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题 Kafka是一个高度可扩展的分布式流平台,用于构建实时数据管道和流处理应用程序。作为一个广泛使用的消息代理系统,Kafka在数据传输方面表现出色,但是在极端情况下,它可能会出现生产者阻塞的问题。这可能会导致

    2024年02月11日
    浏览(49)
  • kafka入门(五):kafka生产者发送消息

    构建消息,即创建 ProduceRecord 对象。 (1) kafka发送消息,最常见的构造方法是: topic 表示主题, value 表示值。 (2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。

    2024年01月17日
    浏览(41)
  • 三、Kafka生产者

    1 发送原理 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 【RecordAccumulator缓冲的结构: 每一个分区对应一

    2024年02月12日
    浏览(39)
  • 「Kafka」生产者篇

    在消息发送的过程中,涉及到了 两个线程 —— main 线程 和 Sender 线程 。 在 main 线程中创建了 一个 双端队列 RecordAccumulator 。 main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象,调用 send 函数发送消息,

    2024年01月19日
    浏览(41)
  • (三)Kafka 生产者

    创建一个 ProducerRecord 对象,需要包含目标主题和要发送的内容,还可以指定键、分区、时间戳或标头。 在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基

    2024年02月09日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包