springboot集成kafka消费数据

这篇具有很好参考价值的文章主要介绍了springboot集成kafka消费数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.kafka介绍

1.1.生产者介绍

1.1.1.生产者分区策略

  • 轮询策略:Round-robin 策略,即顺序分配,轮询策略有非常优秀的负载均衡表 现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用)
  • 随机策略:Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上。
  • 消息键保序策略:key-ordering策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的。()
    例如企业上传实时数据,将企业id作为key,可以确保每个企业的数据都在同一个分区,确保能按顺序消费。
    如果不设置key,可能会出现17:30分的数据在1分区,17:31分的数据在2分区,但是1分区数据较多,2分区数据比较少,结果就是先消费了17:31的数据,后消费17:30的数据。
    springboot集成kafka消费数据,java,spring boot,kafka,spring boot,kafka,java

2.springboot集成kafka代码

项目目录结构如下

springboot集成kafka消费数据,java,spring boot,kafka,spring boot,kafka,java

2.1.引入pom依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.2</version>
        </dependency>

2.2.添加KafkaConsumerConfig.java消费者配置类

package com.example.springbootkafka.config;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrapServers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.topics}")
    private List<String> topics;
    @Value("${spring.kafka.consumer.groupId}")
    private String groupId;
    @Value("${spring.kafka.consumer.sessionTimeOut}")
    private String sessionTimeOut;
    @Value("${spring.kafka.consumer.enableAutoCommit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.consumer.autoCommitInterval}")
    private String autoCommitInterval;
    @Value("${spring.kafka.consumer.maxPollRecords}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.maxPollInterval}")
    private String maxPollInterval;
    @Value("${spring.kafka.consumer.heartbeatInterval}")
    private String heartbeatInterval;
    @Value("${spring.kafka.consumer.keyDeserializer}")
    private String keyDeserializer;
    @Value("${spring.kafka.consumer.valueDeserializer}")
    private String valueDeserializer;
    @Value("${spring.kafka.consumer.autoOffsetReset}")
    private String autoOffsetReset;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 并发数 多个微服务实例会均分
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        ContainerProperties containerProperties = factory.getContainerProperties();
        // 是否设置手动提交
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> consumerConfigs = consumerConfigs();
        log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs));
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // 服务器地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 是否自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // 自动提交间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        //会话时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOut);
        //key序列化
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        //value序列化
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        // 心跳时间
        propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);

        // 分组id
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //消费策略
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        // poll记录数
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //poll时间
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        return propsMap;
    }

}

2.3.添加Consumer.java消费者实现代码

  • 如果在 @KafkaListener 注解中不设置 concurrency 属性,默认并发度是 1。这意味着每个消费者实例将在单个线程中运行,一次只能处理一个消息。设置 concurrency 参数为 “1” 可以确保消息的顺序性,但会限制消费者的吞吐量。
  • 如果将concurrency 设置为大于 “1” 的值,那么每个消费者实例将使用多个线程来并发处理消息。这样可以提高消息处理的吞吐量,但可能会导致消息的处理顺序性无法保证。
package com.example.springbootkafka.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class Consumer {

//    @KafkaListener(topics = {"${spring.kafka.consumer.topics}"},
//                    groupId = "${spring.kafka.consumer.groupId}",
//                    containerFactory = "kafkaListenerContainerFactory",
//                    properties = {"${spring.kafka.consumer.autoOffsetReset}"})
    @KafkaListener(topics = {"#{T(java.util.Arrays).asList('${spring.kafka.consumer.topics}'.split(','))}"},
                    groupId = "${spring.kafka.consumer.groupId}",
                    containerFactory = "kafkaListenerContainerFactory",
                    concurrency = "1",
                    properties = {"${spring.kafka.consumer.autoOffsetReset}"})
    public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        for (ConsumerRecord<String, String> record : records) {
            log.info("topic_test 消费了: Topic:" + record.topic() + ",Message:" + record.value());
            //手动提交偏移量
            ack.acknowledge();

        }
    }

}

2.4.添加接口ProducerService

package com.example.springbootkafka.service;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.messaging.Message;

import java.util.concurrent.ExecutionException;

public interface ProducerService {



    /**
     * 发送同步消息
     *
     * @param topic
     * @param data
     * @throws ExecutionException
     * @throws InterruptedException
     */
    void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException;

    /**
     * 发送普通消息
     *
     * @param topic
     * @param data
     */
    void sendMessage(String topic, String data);


    /**
     * 发送带附加信息的消息
     *
     * @param record
     */
    void sendMessage(ProducerRecord<String, String> record);


    /**
     * 发送Message消息
     *
     * @param message
     */
    void sendMessage(Message<String> message);


    /**
     * 发送带key的消息
     *
     * @param topic
     * @param key
     * @param data
     */
    void sendMessage(String topic, String key, String data);

    void sendMessage(String topic, Integer partition, String key, String data);


    void sendMessage(String topic, Integer partition, Long timestamp, String key, String data);
}

2.5.添加ProducerServiceImpl.java生产者实现类

package com.example.springbootkafka.service.impl;

import com.example.springbootkafka.service.ProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.ExecutionException;

@Component
@Slf4j
@EnableAsync
public class ProducerServiceImpl implements ProducerService {


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    /**
     * 发送同步消息
     *
     * @param topic
     * @param data
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Override
    public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
        SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        log.debug("sendSyncMessage 发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
    }


    /**
     * 发送普通消息
     *
     * @param topic
     * @param data
     */
    @Override
    public void sendMessage(String topic, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
        future.addCallback(
                success -> log.info("sendMessage topic={}发送消息成功!",topic),
                failure -> log.error("sendMessage 发送消息失败!失败原因是:{}", failure.getMessage())
        );

    }


    /**
     * 发送带附加信息的消息
     *
     * @param record
     */
    @Override
    public void sendMessage(ProducerRecord<String, String> record) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }


    /**
     * 发送Message消息
     *
     * @param message
     */
    @Override
    public void sendMessage(Message<String> message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }


    /**
     * 发送带key的消息
     *
     * @param topic
     * @param key
     * @param data
     */
    @Override
    public void sendMessage(String topic, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
        log.info("发送到:{} ,消息体为:{}",topic,data);
        future.addCallback(
                success -> log.debug("发送消息成功!"),
                failure -> log.error("发送消息失败!失败原因是:{}", failure.getMessage())
        );
    }

    @Override
    public void sendMessage(String topic, Integer partition, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    @Override
    public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                log.debug("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }
}

2.6.添加application.yml配置

server:
  port: 8080


spring:
  kafka:
    bootstrap-servers: 192.168.80.251:9092
    producer:
      batch-size: 16384 #批次大小,默认16k
      acks: all #ACK应答级别,指定分区中必须要有多少个副本收到消息之后才会认为消息成功写入,默认为1只要分区的leader副本成功写入消息;0表示不需要等待任何服务端响应;-1或all需要等待ISR中所有副本都成功写入消息
      retries: 3 #重试次数
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      buffer-memory: 33554432 #缓冲区大小,默认32M
      client-id: abcdefg #客户端ID
      compression-type: none #消息压缩方式,默认为none,另外有gzip、snappy、lz4
      properties:
        retry.backoff.ms: 100 #重试时间间隔,默认100
        linger.ms: 0 #默认为0,表示批量发送消息之前等待更多消息加入batch的时间
        max.request.size: 1048576 #默认1MB,表示发送消息最大值
        connections.max.idle.ms: 540000 #默认9分钟,表示多久后关闭限制的连接
        receive.buffer.bytes: 32768 #默认32KB,表示socket接收消息缓冲区的大小,为-1时使用操作系统默认值
        send.buffer.bytes: 131072 #默认128KB,表示socket发送消息缓冲区大小,为-1时使用操作系统默认值
        request.timeout.ms: 30000 #默认30000ms,表示等待请求响应的最长时间
    consumer:
      bootstrapServers:  192.168.80.251:9092
      topics: testTopic1,testTopic2
      groupId: test
      #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
      sessionTimeOut: 30000
      #取消自动提交,即便如此 spring会帮助我们自动提交
      enableAutoCommit: false
      #自动提交间隔
      autoCommitInterval: 1000
      maxPollRecords: 50
      #300秒的提交间隔,如果程序大于300秒提交,会报错
      maxPollInterval: 300000
      #心跳间隔
      keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      autoOffsetReset: latest
      heartbeatInterval: 10000

2.7.添加Controller层代码

package com.example.springbootkafka.controller;

import com.example.springbootkafka.service.ProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.Response;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/test")
public class Controller {


    @Autowired
    ProducerService producerService;


    @GetMapping("/sendMsg")
    public Integer sendMsg(@RequestParam("msg") String msg){
        producerService.sendMessage("testTopic1","key",msg);
        return Response.SC_OK;
    }



}

2.8.启动项目,测试功能

直接调用接口地址:127.0.0.1:8080/test/sendMsg?msg=11231231231
控制台打印消息如下,则表示发送、消费消息成功
springboot集成kafka消费数据,java,spring boot,kafka,spring boot,kafka,java

3.使用docker-compose启动zookeeper和kafka

docker-compose.yaml文件如下,创建文件后,将 192.168.80.251 改为自己的虚拟机的ip,然后使用 docker-compose up -d 命令启动,一般是先启动zookeeper,再启动kafka,如果kafka启动失败了,使用 docker restart kafka 命令重启试试。

version: "3.3"
services:
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    restart: always
    networks:
      - zkp-kafka
    ports:
      - "2181:2181"
    deploy:
      replicas: 1
      update_config:
        parallelism: 2
        delay: 10s
      restart_policy:
        condition: on-failure

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: always
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.80.251:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    ports:
      - 9092:9092
    networks:
      - zkp-kafka
    depends_on:
      - zookeeper
    deploy:
      replicas: 1
      update_config:
        parallelism: 2
        delay: 10s
      restart_policy:
        condition: on-failure


networks:
  zkp-kafka:

4.gitee仓库地址

地址:https://gitee.com/wangyunchao6/spring-boot-kafka.git文章来源地址https://www.toymoban.com/news/detail-803253.html

到了这里,关于springboot集成kafka消费数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka原理之springboot 集成批量消费

    由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。 手动提交非批量消费   String 类型接入 使用注解方式获取消息头、消息体   手动提交批量消费 想要批量消费,首先要开启批量

    2024年02月11日
    浏览(27)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月08日
    浏览(31)
  • 【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

    本文主要有以下内容: 简单消息的发送 顺序消息的发送 RocketMQTemplate的API介绍 环境搭建: RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示: 在 Spring boot 项目中引入 RocketMQ 依赖: 在application.yml增加相关配置: 在 Spring Boot 中使用RocketM

    2024年02月14日
    浏览(33)
  • kafka原理五之springboot 集成批量消费

    目录 前言 一、新建一个maven工程,添加kafka依赖 二、yaml配置文件 三、消息消费 手动提交非批量消费   String 类型接入 使用注解方式获取消息头、消息体 手动提交批量消费 ConsumerRecord类接收 String类接收 使用注解方式获取消息头、消息体,则也是使用 List 来接收: 并发消费

    2024年02月14日
    浏览(29)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(33)
  • spring boot学习第八篇:kafka监听消费

    为了实现监听器功能 pom.xml文件内容如下:  application.yml文件内容如下: logback.xml文件内容如下: BackendApplication.java文件内容如下: 然后添加了kafkaConsumerListenerExample.java文件 发到服务器上,启动hmblogs报错,截图如下: Caused by: java.lang.TypeNotPresentException: Type org.springframework.k

    2024年01月19日
    浏览(43)
  • SpringBoot3集成Kafka优雅实现信息消费发送

           首先,你的JDK是否已经是8+了呢?        其次,你是否已经用上SpringBoot3了呢?        最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。        这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据

    2024年02月02日
    浏览(34)
  • java 集成kafka(支持单条消费和批量消费)

    1、下载安装zk,kafka...(大把教程,不在这里过多阐述) 2、引入pom 3、Kafka配置 4、生产者配置 5、生产者发消息的工具类 6、消费着配置 7、消费者配置类(配置批量消费) 8、测试类分别测试单条消费以及批量消费 9、消费者消费 完结。。。

    2024年02月12日
    浏览(46)
  • Spring Boot中使用Kafka时遇到“构建Kafka消费者失败“的问题

    在使用Spring Boot开发应用程序时,集成Apache Kafka作为消息队列是一种常见的做法。然而,有时候在配置和使用Kafka时可能会遇到一些问题。本文将探讨在Spring Boot应用程序中使用Kafka时可能遇到的\\\"构建Kafka消费者失败\\\"错误,并提供解决方案。 错误描述: 当尝试构建Kafka消费者时

    2024年01月17日
    浏览(38)
  • Spring Boot集成Kafka详解

    Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。 1. 添加依赖 首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。 2. 配置Kafka 接下来,

    2024年02月09日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包