Springboot 集成kafka

这篇具有很好参考价值的文章主要介绍了Springboot 集成kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、创建项目并导入pom依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

二、修改application.yml配置

1. producer 生产端的配置

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092

2. consumer 消费端的配置,需要给consumer配置一个group-id

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      group-id: auto-dev #消费者组

三、生产者生产消息,消费者消费消息

1. 简单消费

producer生产者中使用自动注入的方式创建KafkaTemplate 对象

@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;

@Test
void sendMessage(){
    // 第一个参数为topic,第二个为消息体
	kafkaTemplate.send("ifun","hello");
}

consumer消费消息,使用@KafkaListener注解监听topic为ifun中的消息,可以监听多个topic

@Component
@Slf4j
public class ConsumerListener {
    // 消费监听
    @KafkaListener(topics = {"ifun"})
    public void onMessage(ConsumerRecord<String, String> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    }
}

2. 带回调的生产者,两种方式

@Test
void sendCallBackMessageOne(){
	kafkaTemplate.send("ifun","hello callback one").addCallback(success -> {
		// 消息发送到的topic
		String topic = success.getRecordMetadata().topic();
		// 消息发送到的分区
		int partition = success.getRecordMetadata().partition();
		// 消息在分区内的offset
		long offset = success.getRecordMetadata().offset();
		log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
	}, failure -> {
		log.info("send fail:message:{} ", failure.getMessage());
	});
}

@Test
void sendCallBackMessageTwo(){
	kafkaTemplate.send("ifun", "hello callback two").addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
		@Override
		public void onFailure(Throwable ex) {
			log.info("send fail:message:{} ", ex.getMessage());
		}
		@Override
		public void onSuccess(SendResult<String, Object> result) {
			String topic = result.getRecordMetadata().topic();
			int partition = result.getRecordMetadata().partition();
			long offset = result.getRecordMetadata().offset();
			log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
		}
	});
}

回调补充,全局回调,需要继承ProducerListener,并重写onSuccess和onError方法

@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener {

    @Override
    public void onSuccess(ProducerRecord producerRecord,
                          RecordMetadata recordMetadata) {
        String topic = recordMetadata.topic();
        int partition = recordMetadata.partition();
        long offset = recordMetadata.offset();
        log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
    }

    @Override
    public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
        log.info("send fail : {}", exception.getMessage());
    }
}

3. 配置自定义分区策略

application.yml中需要指定分区策略的class

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      properties:
        partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner

分区类的实现

@Component
@Slf4j
public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区规则(这里假设全部发到0号分区)
        log.info("自定义分区策略 topic:{} key:{} value:{}",topic,key,value.toString());
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

4. kafka事务提交

如果在发送消息的时候需要创建事务,可以使用KafkaTemplate的executeInTransaction方法来声明事务。

application.yml增加transaction配置

java.lang.IllegalStateException: Producer factory does not support transactions

org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

第一个异常是你没有配置transactions
第二个异常是因为你配置的acks不为all
第三个是正常的send方法,但是抛异常了,需要加@Transactional 注解

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      properties:
        partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner
      acks: all
      transaction-id-prefix: "IFUN_TX"

发送消息代码

@Test
@Transactional
void sendWithException(){
	kafkaTemplate.send("ifun","不带事务提交!");
	kafkaTemplate.executeInTransaction(oper->{
		oper.send("ifun","带事务的提交");
		throw new RuntimeException("fail 1");
	});
	throw new RuntimeException("fail 2");
}

带事务的提交消息发送失败

Springboot 集成kafka

 不带事务的消息被成功消费Springboot 集成kafka

 5. 消费者配置更详细的配置

@KafkaListener注解说明:

  1. id:唯一标识。如果没有配置,取application.yml中的 consumer.groupId
  2. idIsGroup :默认true,true的话代表该consumer分组group!
  3. groupId:消费者分组。如果不填,取id (idIsGroup=true)作为分组。否则取application.yml中的 consumer.groupId
  4. topic 与 topicPartitions 不能共用。
  5. topic:类似于subscripe订阅模式。
  6. topicPartitions类似于assign手动分配模式。
@KafkaListener(id = "ifun-001",groupId = "ifun-01", topicPartitions={
		@TopicPartition(topic = "ifun1",partitions = {"0"}),
		@TopicPartition(topic = "ifun2",
				partitions = {"0"},
				partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
		}
)
public void onTopicsMessage(ConsumerRecord<String, String> record){
	log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

解释:这里定义了消费者id为ifun-001,消费者组id为ifun-01,同时监听两个topic,ifun1和ifun2,其中监听ifun1的0号分区,ifun2的0号和1号分区,其中1号分区开始的offset为8,也就是说如果next-offset大于8就会消费,小于8不会消费。

6. 消费者批量消费

需要在application.yml中开启批量消费,sping.kafka.listener.type: batch 监听类型为batch,spring.kafka.consumer.max-poll-records 批量消费每次最多消费多少条消息,接收消息的时候需要使用List来接收。

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    listener:
      #batch single
      type: batch
    consumer:
      group-id: auto-dev #消费者组
      max-poll-records: 3
@KafkaListener(topics = {"ifun"})
public void onBatchMessage(List<ConsumerRecord<String, String>> records){
	log.info("批量消费");
	for (ConsumerRecord<String, String> record : records) {
		log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
	}
}

Springboot 集成kafka

 7. 消费端手动ack

设置spring.kafka.consumer.enable-auto-commit 为false 的时候 spring.kafka.listener.ack-mode 才会生效,设置为手动的manual表示手动

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    listener:
      #batch single
      type: batch
      # 手动确认模式 RECORD,  BATCH,  TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE;
      ack-mode: manual
    consumer:
      #消费者组 id
      group-id: auto-dev 
      max-poll-records: 3
      #是否自动提交偏移量offset
      enable-auto-commit: false 

消费代码

@KafkaListener(topics = {"ifun"})
public void onBatchMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
	try{
		log.info("批量消费");
		for (ConsumerRecord<String, String> record : records) {
			log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
		}
	}finally {
		ack.acknowledge();
	}
}

如果没有ack,那么会出现如下情况:

  1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
  2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。
  3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。
     

9. 消费异常捕获

配置ConsumerAwareListenerErrorHandler 处理类,在listener上设置errorHandler属性为ConsumerAwareListenerErrorHandler的BeanName

@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
	return (message, exception, consumer) -> {
		System.out.println("消费异常:"+message.getPayload());
		return null;
	};
}
@KafkaListener(topics = {"ifun"}, errorHandler = "consumerAwareErrorHandler")
public void onBatchMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
	try{
		log.info("批量消费");
		for (ConsumerRecord<String, String> record : records) {
			log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
			throw new RuntimeException("消费异常");
		}
	}finally {
		ack.acknowledge();
	}
}

显示结果如下:

Springboot 集成kafka

10. 配置消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

需要为监听器工厂配置一个RecordFilterStrategy,返回true的时候消息将被抛弃,返回false会正常抵达监听器。

然后在监听器上设置containerFactory属性为配置的过滤器工厂类

@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
	ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
	factory.setConsumerFactory(consumerFactory);
	// 被过滤的消息将被丢弃
	factory.setAckDiscarded(true);
	// 消息过滤策略
	factory.setRecordFilterStrategy(consumerRecord -> {
		if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
			return false;
		}
		//返回true消息则被过滤
		return true;
	});
	return factory;
}
@KafkaListener(topics = {"ifun"},containerFactory = "filterContainerFactory")
public void onMessage(ConsumerRecord<String, String> record){
	log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

过滤结果

Springboot 集成kafka

11. 消息转发

Topic A 收到消息后将消息转发给Topic B,使用@SendTo注解即可

@KafkaListener(topics = {"ifun"})
@SendTo("ifun1")
public String onMessage(ConsumerRecord<String, String> record){
	log.info("topic {} 收到需要转发的消息:{}",record.topic(), record.value());
	return record.value()+" 【forward message】";
}

@KafkaListener(topics = {"ifun1"})
public void onIFun1Message(ConsumerRecord<String, String> record){
	log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

 结果如下,可以看到消息先记过第一个topic,然后转发给了第二个topic

Springboot 集成kafkaSpringboot 集成kafka

 12. 设置json序列化方式生产和消费消息

消费端配置如下

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      #消费者组 id
      group-id: auto-dev
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              # 配置json反序列化信任的包
              packages: '*'

生产端配置如下

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      #key的编解码方法
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value的编解码方法
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

编写消息实体类

@Data
public class UserInfo implements Serializable {
    private Long id;
    private String name;
    private Integer age;
}

发送消息

@Test
void sendMessage(){
	UserInfo userInfo = new UserInfo();
	userInfo.setAge(21);
	userInfo.setId(1L);
	userInfo.setName("Jack");
	kafkaTemplate.send("ifun",userInfo);
}

消费消息

@KafkaListener(topics = {"ifun"})
public void onMessage(UserInfo userInfo){
	log.info("消息:{}",userInfo);
}

消费结果如下:

Springboot 集成kafka

 注意:发送的类要和消费的类的全类名一致才行,不能是类名一样,字段一样,但是包名不一样,这样会抛异常。

四、kafka其他配置

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: xx.xx.xx.xx:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    #生产者配置
    producer:
      bootstrap-servers: xx.xx.xx.xx:9092
      #生产者发送消息失败重试次数
      retries: 1
      # 同一批次内存大小(默认16K)
      batch-size: 16384 
      #生产者内存缓存区大小(300M = 300*1024*1024)
      buffer-memory: 314572800
      #acks=0:无论成功还是失败,只发送一次。无需确认
      #acks=1:即只需要确认leader收到消息
      #acks=all或-1:ISR + Leader都确定收到
      acks: 1
      #key的编解码方法
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      #value的编解码方法
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
      #开启事务,但是要求ack为all,否则无法保证幂等性
      #transaction-id-prefix: "IFUN_TX"
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
      properties:
        #自定义拦截器,注意,这里是classes(先于分区器)
        interceptor.classes: cn.com.controller.TimeInterceptor
        #自定义分区器
        #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
        #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
        linger.ms: 1000
        #最大请求大小,200M = 200*1024*1024
        max.request.size: 209715200
        #Producer.send()方法的最大阻塞时间(115秒)
        max.block.ms: 115000
        #该配置控制客户端等待请求响应的最长时间。
        #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。 
        #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
        request.timeout.ms: 115000
        #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
        #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
        delivery.timeout.ms: 120000


    #https://kafka.apache.org/documentation/#consumerconfigs
    #消费者配置
    consumer:
      bootstrap-servers: xx.xx.xx.xx:9092
      #消费者组id
      group-id: default-group
      #消费方式: earliest:从头开始消费 latest:从最新的开始消费,默认latest
      auto-offset-reset: earliest 
      #是否自动提交偏移量offset
      enable-auto-commit: false
      #前提是 enable-auto-commit=true。自动提交的频率
      auto-commit-interval: 1s 
      #key 解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value 解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #最大消费记录数
      max-poll-records: 2
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        session.timeout.ms: 120000
        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        #配置控制客户端等待请求响应的最长时间。 
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        #服务器返回的最大数据量,不能超过admin的message.max.bytes单条数据最大大小
        max.partition.fetch.bytes: 1048576
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true
        # 如果设置的json解码器,需要配置所信任的包名
        spring:
          json:
            trusted:
              packages: '*'

    #监听器配置
    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      #如果至少有一个topic不存在,true启动失败。false忽略
      missing-topics-fatal: true 
      #单条消费 single 批量消费batch 
      #批量消费需要配合 consumer.max-poll-records
      type: batch
      #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
      concurrency: 2 

    template:
      default-topic: "default-topic"

五、总结

kafka的简单使用就到此结束了,和rabbitmq还是有挺大的区别的。大家快去试试吧。文章来源地址https://www.toymoban.com/news/detail-490012.html

到了这里,关于Springboot 集成kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • <Maven>项目依赖导入Maven本地仓库命令

    项目工程pom.xml文件打开:查看报错的依赖, 将jar包放在D盘(或者其它路径都可)根目录下,在windows黑窗口执行以下命令; 举例:jar包名称: 1.api-1.0-SNAPSHOT102.jar 2.coms-cache-1.0-SNAPSHOT.jar 命令:

    2024年02月14日
    浏览(39)
  • AutoMQ生态集成 - 将数据从 AutoMQ for Kafka 导入 Databend

    Databend是使用 Rust 研发、开源的、完全面向云架构、基于对象存储构建的新一代云原生数据仓库,为企业提供湖仓一体化、计算和存储分离的大数据分析平台。 本文将介绍如何通过 bend-ingest-kafka 将数据从 AutoMQ for Kafka 导入 Databend。 info 本文中提及的 AutoMQ Kafka 术语,均特指安

    2024年01月16日
    浏览(46)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(62)
  • idea中导入maven项目,但是却没有导入依赖的问题

    今天我遇到很奇葩的问题,用maven几年了,只是导入maven项目配置好maven,所有的依赖从仓库下载下来后就自动导入了,前几天电脑硬盘坏了,所以重新装的idea, 可以看到pom的素有依赖都没引入,我重新导入pom还是一样。 这是需要勾选上: 然后再重启idea,再reimport一下就好了

    2024年02月16日
    浏览(53)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(43)
  • 解决IDEA Maven依赖下载失败、Maven依赖包导入失败报红的问题(项目实操)

    在向IDEA导入一个maven项目的时候,有一些依赖就是无法下载下来,我想尽了办法,查尽了资料,还是无法解决,最后通过我的一个突发奇想解决了问题。 一、我的pom文件报红的依赖,在maven仓库官网上都可以搜索到,但是无论想什么办法,IDEA就是下载不下来依赖 二、首先列

    2024年02月16日
    浏览(56)
  • 八、SpringBoot集成Kafka

    1、修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息 2、创建 controller 从浏览器接收数据, 并写入指定的 topic 1、修改 SpringBoot 核心配置文件 application.propeti 2、创建类消费 Kafka 中指定 topic 的数据 一些配置总结

    2024年02月10日
    浏览(38)
  • SpringBoot——集成Kafka详解

    1.1、引入依赖 1.2、application.yml配置 1.3、简单生产 1.4、简单消费 2.1、带回调的生产者 kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法, 2.2、监听器 Kafka提供了ProducerListener 监听器来异步监听生产者消息

    2024年02月08日
    浏览(45)
  • Springboot Kafka 集成配置

    Springboot 配置使用 Kafka 前言 一、Linux 安装 Kafka 二、构建项目 三、引入依赖 四、配置文件 生产者 yml 方式 Config 方式 消费者 yml 方式 Config 方式 五、开始写代码 生产者 发送 成功回调和异常处理 消费者 接收 异常处理 七、开始测试 测试普通单条消息 测试消费者异常处理 测试

    2024年02月08日
    浏览(47)
  • Springboot 集成kafka

    一、创建项目并导入pom依赖 二、修改application.yml配置 1. producer 生产端的配置 2. consumer 消费端的配置,需要给consumer配置一个group-id 三、生产者生产消息,消费者消费消息 1. 简单消费 producer生产者中使用自动注入的方式创建KafkaTemplate 对象 consumer消费消息,使用@KafkaListener注解

    2024年02月09日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包