Springboot 集成kafka

这篇具有很好参考价值的文章主要介绍了Springboot 集成kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、创建项目并导入pom依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

二、修改application.yml配置

1. producer 生产端的配置

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092

2. consumer 消费端的配置,需要给consumer配置一个group-id

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      group-id: auto-dev #消费者组

三、生产者生产消息,消费者消费消息

1. 简单消费

producer生产者中使用自动注入的方式创建KafkaTemplate 对象

@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;

@Test
void sendMessage(){
    // 第一个参数为topic,第二个为消息体
	kafkaTemplate.send("ifun","hello");
}

consumer消费消息,使用@KafkaListener注解监听topic为ifun中的消息,可以监听多个topic

@Component
@Slf4j
public class ConsumerListener {
    // 消费监听
    @KafkaListener(topics = {"ifun"})
    public void onMessage(ConsumerRecord<String, String> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    }
}

2. 带回调的生产者,两种方式

@Test
void sendCallBackMessageOne(){
	kafkaTemplate.send("ifun","hello callback one").addCallback(success -> {
		// 消息发送到的topic
		String topic = success.getRecordMetadata().topic();
		// 消息发送到的分区
		int partition = success.getRecordMetadata().partition();
		// 消息在分区内的offset
		long offset = success.getRecordMetadata().offset();
		log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
	}, failure -> {
		log.info("send fail:message:{} ", failure.getMessage());
	});
}

@Test
void sendCallBackMessageTwo(){
	kafkaTemplate.send("ifun", "hello callback two").addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
		@Override
		public void onFailure(Throwable ex) {
			log.info("send fail:message:{} ", ex.getMessage());
		}
		@Override
		public void onSuccess(SendResult<String, Object> result) {
			String topic = result.getRecordMetadata().topic();
			int partition = result.getRecordMetadata().partition();
			long offset = result.getRecordMetadata().offset();
			log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
		}
	});
}

回调补充,全局回调,需要继承ProducerListener,并重写onSuccess和onError方法

@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener {

    @Override
    public void onSuccess(ProducerRecord producerRecord,
                          RecordMetadata recordMetadata) {
        String topic = recordMetadata.topic();
        int partition = recordMetadata.partition();
        long offset = recordMetadata.offset();
        log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
    }

    @Override
    public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
        log.info("send fail : {}", exception.getMessage());
    }
}

3. 配置自定义分区策略

application.yml中需要指定分区策略的class

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      properties:
        partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner

分区类的实现

@Component
@Slf4j
public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区规则(这里假设全部发到0号分区)
        log.info("自定义分区策略 topic:{} key:{} value:{}",topic,key,value.toString());
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

4. kafka事务提交

如果在发送消息的时候需要创建事务,可以使用KafkaTemplate的executeInTransaction方法来声明事务。

application.yml增加transaction配置

java.lang.IllegalStateException: Producer factory does not support transactions

org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

第一个异常是你没有配置transactions
第二个异常是因为你配置的acks不为all
第三个是正常的send方法,但是抛异常了,需要加@Transactional 注解

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      properties:
        partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner
      acks: all
      transaction-id-prefix: "IFUN_TX"

发送消息代码

@Test
@Transactional
void sendWithException(){
	kafkaTemplate.send("ifun","不带事务提交!");
	kafkaTemplate.executeInTransaction(oper->{
		oper.send("ifun","带事务的提交");
		throw new RuntimeException("fail 1");
	});
	throw new RuntimeException("fail 2");
}

带事务的提交消息发送失败

Springboot 集成kafka

 不带事务的消息被成功消费Springboot 集成kafka

 5. 消费者配置更详细的配置

@KafkaListener注解说明:

  1. id:唯一标识。如果没有配置,取application.yml中的 consumer.groupId
  2. idIsGroup :默认true,true的话代表该consumer分组group!
  3. groupId:消费者分组。如果不填,取id (idIsGroup=true)作为分组。否则取application.yml中的 consumer.groupId
  4. topic 与 topicPartitions 不能共用。
  5. topic:类似于subscripe订阅模式。
  6. topicPartitions类似于assign手动分配模式。
@KafkaListener(id = "ifun-001",groupId = "ifun-01", topicPartitions={
		@TopicPartition(topic = "ifun1",partitions = {"0"}),
		@TopicPartition(topic = "ifun2",
				partitions = {"0"},
				partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
		}
)
public void onTopicsMessage(ConsumerRecord<String, String> record){
	log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

解释:这里定义了消费者id为ifun-001,消费者组id为ifun-01,同时监听两个topic,ifun1和ifun2,其中监听ifun1的0号分区,ifun2的0号和1号分区,其中1号分区开始的offset为8,也就是说如果next-offset大于8就会消费,小于8不会消费。

6. 消费者批量消费

需要在application.yml中开启批量消费,sping.kafka.listener.type: batch 监听类型为batch,spring.kafka.consumer.max-poll-records 批量消费每次最多消费多少条消息,接收消息的时候需要使用List来接收。

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    listener:
      #batch single
      type: batch
    consumer:
      group-id: auto-dev #消费者组
      max-poll-records: 3
@KafkaListener(topics = {"ifun"})
public void onBatchMessage(List<ConsumerRecord<String, String>> records){
	log.info("批量消费");
	for (ConsumerRecord<String, String> record : records) {
		log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
	}
}

Springboot 集成kafka

 7. 消费端手动ack

设置spring.kafka.consumer.enable-auto-commit 为false 的时候 spring.kafka.listener.ack-mode 才会生效,设置为手动的manual表示手动

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    listener:
      #batch single
      type: batch
      # 手动确认模式 RECORD,  BATCH,  TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE;
      ack-mode: manual
    consumer:
      #消费者组 id
      group-id: auto-dev 
      max-poll-records: 3
      #是否自动提交偏移量offset
      enable-auto-commit: false 

消费代码

@KafkaListener(topics = {"ifun"})
public void onBatchMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
	try{
		log.info("批量消费");
		for (ConsumerRecord<String, String> record : records) {
			log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
		}
	}finally {
		ack.acknowledge();
	}
}

如果没有ack,那么会出现如下情况:

  1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
  2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。
  3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。
     

9. 消费异常捕获

配置ConsumerAwareListenerErrorHandler 处理类,在listener上设置errorHandler属性为ConsumerAwareListenerErrorHandler的BeanName

@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
	return (message, exception, consumer) -> {
		System.out.println("消费异常:"+message.getPayload());
		return null;
	};
}
@KafkaListener(topics = {"ifun"}, errorHandler = "consumerAwareErrorHandler")
public void onBatchMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
	try{
		log.info("批量消费");
		for (ConsumerRecord<String, String> record : records) {
			log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
			throw new RuntimeException("消费异常");
		}
	}finally {
		ack.acknowledge();
	}
}

显示结果如下:

Springboot 集成kafka

10. 配置消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

需要为监听器工厂配置一个RecordFilterStrategy,返回true的时候消息将被抛弃,返回false会正常抵达监听器。

然后在监听器上设置containerFactory属性为配置的过滤器工厂类

@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
	ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
	factory.setConsumerFactory(consumerFactory);
	// 被过滤的消息将被丢弃
	factory.setAckDiscarded(true);
	// 消息过滤策略
	factory.setRecordFilterStrategy(consumerRecord -> {
		if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
			return false;
		}
		//返回true消息则被过滤
		return true;
	});
	return factory;
}
@KafkaListener(topics = {"ifun"},containerFactory = "filterContainerFactory")
public void onMessage(ConsumerRecord<String, String> record){
	log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

过滤结果

Springboot 集成kafka

11. 消息转发

Topic A 收到消息后将消息转发给Topic B,使用@SendTo注解即可

@KafkaListener(topics = {"ifun"})
@SendTo("ifun1")
public String onMessage(ConsumerRecord<String, String> record){
	log.info("topic {} 收到需要转发的消息:{}",record.topic(), record.value());
	return record.value()+" 【forward message】";
}

@KafkaListener(topics = {"ifun1"})
public void onIFun1Message(ConsumerRecord<String, String> record){
	log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

 结果如下,可以看到消息先记过第一个topic,然后转发给了第二个topic

Springboot 集成kafkaSpringboot 集成kafka

 12. 设置json序列化方式生产和消费消息

消费端配置如下

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      #消费者组 id
      group-id: auto-dev
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              # 配置json反序列化信任的包
              packages: '*'

生产端配置如下

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      #key的编解码方法
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value的编解码方法
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

编写消息实体类

@Data
public class UserInfo implements Serializable {
    private Long id;
    private String name;
    private Integer age;
}

发送消息

@Test
void sendMessage(){
	UserInfo userInfo = new UserInfo();
	userInfo.setAge(21);
	userInfo.setId(1L);
	userInfo.setName("Jack");
	kafkaTemplate.send("ifun",userInfo);
}

消费消息

@KafkaListener(topics = {"ifun"})
public void onMessage(UserInfo userInfo){
	log.info("消息:{}",userInfo);
}

消费结果如下:

Springboot 集成kafka

 注意:发送的类要和消费的类的全类名一致才行,不能是类名一样,字段一样,但是包名不一样,这样会抛异常。

四、kafka其他配置

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: xx.xx.xx.xx:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    #生产者配置
    producer:
      bootstrap-servers: xx.xx.xx.xx:9092
      #生产者发送消息失败重试次数
      retries: 1
      # 同一批次内存大小(默认16K)
      batch-size: 16384 
      #生产者内存缓存区大小(300M = 300*1024*1024)
      buffer-memory: 314572800
      #acks=0:无论成功还是失败,只发送一次。无需确认
      #acks=1:即只需要确认leader收到消息
      #acks=all或-1:ISR + Leader都确定收到
      acks: 1
      #key的编解码方法
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      #value的编解码方法
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
      #开启事务,但是要求ack为all,否则无法保证幂等性
      #transaction-id-prefix: "IFUN_TX"
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
      properties:
        #自定义拦截器,注意,这里是classes(先于分区器)
        interceptor.classes: cn.com.controller.TimeInterceptor
        #自定义分区器
        #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
        #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
        linger.ms: 1000
        #最大请求大小,200M = 200*1024*1024
        max.request.size: 209715200
        #Producer.send()方法的最大阻塞时间(115秒)
        max.block.ms: 115000
        #该配置控制客户端等待请求响应的最长时间。
        #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。 
        #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
        request.timeout.ms: 115000
        #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
        #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
        delivery.timeout.ms: 120000


    #https://kafka.apache.org/documentation/#consumerconfigs
    #消费者配置
    consumer:
      bootstrap-servers: xx.xx.xx.xx:9092
      #消费者组id
      group-id: default-group
      #消费方式: earliest:从头开始消费 latest:从最新的开始消费,默认latest
      auto-offset-reset: earliest 
      #是否自动提交偏移量offset
      enable-auto-commit: false
      #前提是 enable-auto-commit=true。自动提交的频率
      auto-commit-interval: 1s 
      #key 解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value 解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #最大消费记录数
      max-poll-records: 2
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        session.timeout.ms: 120000
        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        #配置控制客户端等待请求响应的最长时间。 
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        #服务器返回的最大数据量,不能超过admin的message.max.bytes单条数据最大大小
        max.partition.fetch.bytes: 1048576
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true
        # 如果设置的json解码器,需要配置所信任的包名
        spring:
          json:
            trusted:
              packages: '*'

    #监听器配置
    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      #如果至少有一个topic不存在,true启动失败。false忽略
      missing-topics-fatal: true 
      #单条消费 single 批量消费batch 
      #批量消费需要配合 consumer.max-poll-records
      type: batch
      #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
      concurrency: 2 

    template:
      default-topic: "default-topic"

五、总结

kafka的简单使用就到此结束了,和rabbitmq还是有挺大的区别的。大家快去试试吧。文章来源地址https://www.toymoban.com/news/detail-490012.html

到了这里,关于Springboot 集成kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • <Maven>项目依赖导入Maven本地仓库命令

    项目工程pom.xml文件打开:查看报错的依赖, 将jar包放在D盘(或者其它路径都可)根目录下,在windows黑窗口执行以下命令; 举例:jar包名称: 1.api-1.0-SNAPSHOT102.jar 2.coms-cache-1.0-SNAPSHOT.jar 命令:

    2024年02月14日
    浏览(30)
  • AutoMQ生态集成 - 将数据从 AutoMQ for Kafka 导入 Databend

    Databend是使用 Rust 研发、开源的、完全面向云架构、基于对象存储构建的新一代云原生数据仓库,为企业提供湖仓一体化、计算和存储分离的大数据分析平台。 本文将介绍如何通过 bend-ingest-kafka 将数据从 AutoMQ for Kafka 导入 Databend。 info 本文中提及的 AutoMQ Kafka 术语,均特指安

    2024年01月16日
    浏览(34)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(39)
  • idea中导入maven项目,但是却没有导入依赖的问题

    今天我遇到很奇葩的问题,用maven几年了,只是导入maven项目配置好maven,所有的依赖从仓库下载下来后就自动导入了,前几天电脑硬盘坏了,所以重新装的idea, 可以看到pom的素有依赖都没引入,我重新导入pom还是一样。 这是需要勾选上: 然后再重启idea,再reimport一下就好了

    2024年02月16日
    浏览(34)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(33)
  • 解决IDEA Maven依赖下载失败、Maven依赖包导入失败报红的问题(项目实操)

    在向IDEA导入一个maven项目的时候,有一些依赖就是无法下载下来,我想尽了办法,查尽了资料,还是无法解决,最后通过我的一个突发奇想解决了问题。 一、我的pom文件报红的依赖,在maven仓库官网上都可以搜索到,但是无论想什么办法,IDEA就是下载不下来依赖 二、首先列

    2024年02月16日
    浏览(32)
  • SpringBoot——集成Kafka详解

    1.1、引入依赖 1.2、application.yml配置 1.3、简单生产 1.4、简单消费 2.1、带回调的生产者 kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法, 2.2、监听器 Kafka提供了ProducerListener 监听器来异步监听生产者消息

    2024年02月08日
    浏览(33)
  • SpringBoot 集成 Kafka 配置

    自定义分区器 生产者 消费者 配置文件 自定义分区器 生产者 消费者配置 消费数据过滤 消费异常处理类 消费者配置  @KafkaListener

    2024年02月15日
    浏览(25)
  • Springboot 集成kafka

    一、创建项目并导入pom依赖 二、修改application.yml配置 1. producer 生产端的配置 2. consumer 消费端的配置,需要给consumer配置一个group-id 三、生产者生产消息,消费者消费消息 1. 简单消费 producer生产者中使用自动注入的方式创建KafkaTemplate 对象 consumer消费消息,使用@KafkaListener注解

    2024年02月09日
    浏览(22)
  • 八、SpringBoot集成Kafka

    1、修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息 2、创建 controller 从浏览器接收数据, 并写入指定的 topic 1、修改 SpringBoot 核心配置文件 application.propeti 2、创建类消费 Kafka 中指定 topic 的数据 一些配置总结

    2024年02月10日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包