Spring整合kafka

这篇具有很好参考价值的文章主要介绍了Spring整合kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

方式1

只用spring-kafka依赖就行

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>

 注入KafkaTemplate模板

@Configuration
@EnableKafka
public class KafkaConfig {
 
    private final  static  String  CONSUMER_GROUP_ID="yd-group";
 
    public  final static   String TOPIC_NAME="yd-kf-topic";
 
    @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String>  kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
 
    /**
     * 消费工厂
     * @return
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>(8);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    /**
     * 生产工厂
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>(8);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
 
    /**
     * kafka模板
     * @return
     */
    @Bean("kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

消息发送

@Slf4j
@Service
public class KafkaProducer {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    public  String  sendSyncMessage(String key,String  msg){
        String s;
        try {
            ListenableFuture<SendResult<String, String>> tagA = kafkaTemplate.send(KafkaConfig.TOPIC_NAME, key, msg);
            s = tagA.get().toString();
            log.info("生产kafka消息 {}",s);
            return  s;
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
            s=e.getMessage();
            log.error("sendSyncMessage-->发送消息异常{}",e.getMessage());
        }
        return s;
    }
}

 监听消息消费

@Slf4j
@Component
public class CustomKafkaListener  /**implements MessageListener<String,String>*/ {
 
    @KafkaListener(topics = {KafkaConfig.TOPIC_NAME},id = KafkaConfig.TOPIC_NAME)
    public   void   onMessage1(String  msg){
        log.info("onMessage1消费kafka消息 {} ",msg);
    }
    
}

测试发送

@RestController
public class KafkaSendController {
    
    @Autowired
    private KafkaProducer kafkaProducer;
 
    @GetMapping("/kafka/sendMsg")
    public  String  sendMsg(String  key,String msg){
        return kafkaProducer.sendSyncMessage(key,msg);
    }
 
}

方式2:

spring-kafka和kafka-clients结合使用(推荐)

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.6.0</version>
</dependency>

 消费者组件

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ClassUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
 * Consumer config of SSL Kafka
 */
@Configuration
public class TestKafkaConsumerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumerConfig.class);

//读取kafka相关配置
    @Value("#{configproperties_leconf['outer.kafka.bootstrap.servers']}")
    private String bootStrapServers;
    @Value("#{configproperties_leconf['outer.kafka.bootstrap.bootStrapServersNoJks']}")
    private String bootStrapServersNoJks;
    @Value("#{configproperties_leconf['outer.kafka.enable.auto.commit']}")
    private String enableAutoCommit;
    @Value("#{configproperties_leconf['outer.kafka.auto.commit.interval']}")
    private String autoCommitInterval;
    @Value("#{configproperties_leconf['outer.kafka.session.timeout']}")
    private String sessionTimeout;
    @Value("#{configproperties_leconf['outer.kafka.auto.offset.reset']}")
    private String autoOffsetReset;

    @Value("#{configproperties_leconf['outer.kafka.sasl.mechanism']}")
    private String saslMechanism;
    @Value("#{configproperties_leconf['outer.kafka.sasl.jaas.config']}")
    private String saslJaasConfig;
    @Value("#{configproperties_leconf['outer.kafka.ssl.password']}")
    private String sslPassword;
    @Value("#{configproperties_leconf['outer.kafka.sasl.truststore']}")
    private String kafkaJks;

   
    @Value("#{configproperties_leconf['outer.kafka.kafkaUrlType']}")
    private Integer kafkaUrlType;

    @Value("#{configproperties_leconf['outer.kafka.consumer.service.product.consumer.group']}")
    private String groupId;
    @Value("#{configproperties_leconf['outer.kafka.consumer.service.product.topic']}")
    private String serviceProductTopic;

    //消费ssl kafka 消息给price kafka ,消费组sslkafkaToPriceKafkaGroup
    @Value("#{configproperties_leconf['sslkafkaToPriceKafka.group']}")
    private String sslkafkaToPriceKafkaGroup;
	
    //ssl kafka topic和price kafka topic映射关系
    @Value("#{configproperties_leconf['sslkafka2PriceKafkaTopics']}")
    private String sslkafka2PriceKafkaTopics;


//这里构造配置,根据自己的kafka是否使用ssl加密&是否使用jks,没有的去掉if-else即可
    public Map<String, Object> kafkaConfigs() {
	//使用不带有jks的kafka配置
        Map<String, Object> configMap = new HashMap<>();
        if(kafkaUrlType.equals(1)){
            configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServersNoJks);
            configMap.put("security.protocol", "SASL_PLAINTEXT");
        }else {
            //使用jks配置的kafka
            configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
            configMap.put("security.protocol", "SASL_SSL");
            configMap.put("ssl.endpoint.identification.algorithm", "");
			//jks文件是放在项目resource下
            configMap.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + kafkaJks);
            configMap.put("ssl.truststore.password", sslPassword);
        }
        configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        configMap.put("sasl.mechanism", saslMechanism);
        configMap.put("sasl.jaas.config", saslJaasConfig);
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return configMap;
    }


    @Bean
    public KafkaConsumer<String, String> testKafkaConsumer() {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfigs());
        consumer.subscribe(Arrays.asList(serviceProductTopic));
        return consumer;
    }
}

生产者组件

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.ClassUtils;

import java.util.HashMap;
import java.util.Map;

/**
 * outer kafka config
 */
@Configuration
public class TestKafkaProducerConfig {

    @Value("#{configproperties_leconf['outer.kafka.bootstrap.servers']}")
    private String bootStrapServers;
    @Value("#{configproperties_leconf['outer.kafka.bootstrap.bootStrapServersNoJks']}")
    private String bootStrapServersNoJks;
    @Value("#{configproperties_leconf['outer.kafka.request.timeout']}")
    private String requestTimeout;
    @Value("${searchengine.MQ.kafka.key.serializer:org.apache.kafka.common.serialization.StringSerializer}")
    private String keySerializer;
    @Value("${searchengine.MQ.kafka.value.serializer:org.apache.kafka.common.serialization.StringSerializer}")
    private String valueSerializer;


    @Value("#{configproperties_leconf['outer.kafka.kafkaUrlType']}")
    private Integer kafkaUrlType;
    @Value("#{configproperties_leconf['outer.kafka.sasl.mechanism']}")
    private String saslMechanism;
    @Value("#{configproperties_leconf['outer.kafka.sasl.jaas.config']}")
    private String saslJaasConfig;
    @Value("#{configproperties_leconf['outer.kafka.ssl.password']}")
    private String sslPassword;
    @Value("#{configproperties_leconf['outer.kafka.sasl.truststore']}")
    private String kafkaJks;


    @Bean
    public ProducerFactory<String, String> sslProducerFactory() {
        Map<String, Object> properties = new HashMap<>();
        if(kafkaUrlType.equals(1)){
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServersNoJks);
            //使用不带有jks的kafka配置
            properties.put("security.protocol", "SASL_PLAINTEXT");
        }else {
            //使用jks配置的kafka
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
            properties.put("security.protocol", "SASL_SSL");
            properties.put("ssl.endpoint.identification.algorithm", "");
            properties.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + kafkaJks);
            properties.put("ssl.truststore.password", sslPassword);
        }

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000);
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        //生产者重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //指定ProducerBatch(消息累加器中BufferPool中的)可复用大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //消息缓存
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //数据压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        //设置每条消息的最大值10M
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        properties.put("sasl.mechanism", saslMechanism);
        properties.put("sasl.jaas.config", saslJaasConfig);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> testKafkaTemplate(ProducerFactory<String, String> sslProducerFactory) {
        return new KafkaTemplate<>(sslProducerFactory);
    }
}

生产消息和消费消息

@Service
public class KafkaServiceImpl implements KafkaService, ApplicationListener<ContextRefreshedEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);

    @Autowired
    @Qualifier("sslKafkaTemplate")
    private KafkaTemplate<String, String> sslKafkaTemplate;
    @Autowired
    @Qualifier("testKafkaConsumer")
    private KafkaConsumer<String, String> testKafkaConsumer;

    /**
     * 执行 原生 kafka consumer api 监听topic,并存储数据到数据库和redis。
     * Handle an application event.
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(()->{
            //拉去消息
            while (true) {
                ConsumerRecords<String, String> records = null;
                try {
                    records = testKafkaConsumer.poll(5000);
                    testKafkaConsumer.commitSync();
                } catch (ConcurrentModificationException e) {
                    //e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                    LOGGER.error("消费ssl kafka 消息失败。");
                    LOGGER.error(e.getMessage());
                }
                if (Objects.isNull(records)) {
                    continue;
                }

                final ConsumerRecords<String, String> consumerRecords = records;
                //数据太多 这里用线程池处理
                threadPoolTaskExecutor.submit(() -> {
                    for (ConsumerRecord<String, String> record : consumerRecords) {
                        if (Objects.isNull(record)) {
                            continue;
                        }
                        try {
                            LOGGER.info("records.count={},record.length={} partition= {}, offset = {}", consumerRecords.count(), record.value().length(), record.partition(), record.offset());

                            //拿到消息 这里处理业务
                            System.out.println(record.value());
                            
                            //这里模拟业务场景1,直接将消息消息发到另一个topic
                            doSend(sslKafkaTemplate,"test_topic", null,record.value());

                            
                            //这里模拟业务场景2,用于设置自定义的消息头消息
                            List<Header> headers = new ArrayList<Header>();
                            if(record.headers() != null){
                                record.headers().forEach(header -> {
                                    if(header.key().equals("region")){
                                        headers.add(new RecordHeader("region",header.value()));
                                    }
                                });
                            }
                            doSend(sslKafkaTemplate, "test_topic2", record.value(),headers);
                            
                        } catch (Exception e) {
                            LOGGER.error("Kafka  error: ", e);
                        }
                    }
                });
            }
        },"ssl Kafka ").start();
    }

    private ListenableFuture<SendResult<String, String>> doSend(KafkaTemplate<String, String> kafkaTemplate, String topic, String key, String message) {
        try {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
            future.get(1000 * 15, TimeUnit.MILLISECONDS);
            //TODO 异常处理
            future.addCallback(result -> LOGGER.info("send success,topic:" + topic), throwable -> LOGGER.info("send error:\n" + throwable));
        } catch (Exception e) {
            LOGGER.error(ExceptionUtil.getExceptionStackTrace(e));
            LOGGER.error("+++++++++++++++++++++++: ", e);
            LOGGER.error("not_send_topic:={}", topic);
            LOGGER.error("not_send_message:={}", message);
        }
        return null;
    }
    private ListenableFuture<SendResult<String, String>> doSend(KafkaTemplate<String, String> kafkaTemplate, String topic, String message,List<Header> headers){
        try {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,null, null,message,headers);
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
            //future.addCallback(result -> LOGGER.info("send success,ssl topic:" + topic), throwable -> LOGGER.info("send ssl error:\n" + throwable));
        }catch (Exception e){
            LOGGER.error(ExceptionUtil.getExceptionStackTrace(e));
            LOGGER.error("ssl exception+++++++++++++++++++++++: ", e);
            LOGGER.error("not_send_ssl_topic:={}", topic);
            LOGGER.error("not_send_ssl_message:={}", message);
        }
        return null;
    }
}

 注:

这里记录一下生产发生的问题

关于max.poll.interval.ms配置的问题,根据自己的业务配置poll拉去间隔等待时间

kafka一次性拉多条数据,然后循环用线程池处理的,线程池数量有限,我们配置的线程池拒绝策略是CallerRunsPolicy,当线程池满的时候就会用当前线程处理请求,导致下次poll方法无法立即执行,当超过最大时间max.poll.interval.ms时候,服务端会认为当前消费者已经无效,就会踢掉消费者,导致后续不再消费了

关于kafka配置内容参考文章

https://blog.csdn.net/qq_34491508/article/details/126029810

springboot整合参考地址

https://blog.csdn.net/qq_34491508/article/details/120831627文章来源地址https://www.toymoban.com/news/detail-783577.html

到了这里,关于Spring整合kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot整合Kafka+SSE实现实时数据展示

    2024年3月10日 不使用Rabbitmq或者Rocketmq是因为Kafka是Hadoop集群下的组成部分,对于大数据的相关开发适应性好,且当前业务场景下不需要使用死信队列,不过要注意Kafka对于更新时间慢的数据拉取也较慢,因此对与实时性要求高可以选择其他MQ。 使用消息队列是因为该中间件具有

    2024年04月24日
    浏览(28)
  • 使用 Spring Boot 整合 Kafka:实现高效的消息传递

    Kafka 是一种流处理平台,用于在分布式系统中处理高吞吐量的数据流。它是一种基于发布订阅模式的消息系统,能够处理来自多个应用程序的数据流。Kafka 具有高度的可扩展性、可靠性和性能,使得它成为处理大数据的流行选择。 Spring Boot 是一种开源框架,用于简化 Java 应用

    2024年02月14日
    浏览(36)
  • 【消息中间件MQ系列】Spring整合kafka并设置多套kafka配置

            圣诞节的到来,程序员不会收到圣诞老人的🎁,但可以自己满足一下自己,所以,趁着有时间,就记录一下这会儿撸了些什么代码吧!!!         因为业务原因,需要在系统内新增其他的kakfa配置使用,所以今天研究的是怎么在系统内整合多套kafka配置使用。

    2024年02月01日
    浏览(82)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月08日
    浏览(32)
  • 第三章 Spring Boot 整合 Kafka消息队列 消息者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年02月22日
    浏览(34)
  • spring cloud steam 整合kafka 进行消息发送与接收

    spring cloud steam : Binder和Binding Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于

    2024年02月10日
    浏览(29)
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(36)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(33)
  • 从零到Kafka:万字带你体验Spring Boot整合消息驱动的奇妙之旅

    主页传送门:📀 传送 Spring boot : | 基于Spring的开源框架,用于简化新Spring应用的初始搭建以及开发过程 特性: | 快速开发、轻量级、无代码生成和独立运行等特性 优势: | 简化配置,提供自动配置,减少开发时间 应用场景: | 适用于微服务架构、云原生应用等场景 环境

    2024年02月05日
    浏览(29)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包