linux环境kafka安装及配置

这篇具有很好参考价值的文章主要介绍了linux环境kafka安装及配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

下载资源

linux环境安装kafka,需要预先准备相关资源,我使用的是kafka_2.12-2.5.1版本,下载路径为:http://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz,也可以通过命令wget http://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz进行资源获取;
2、获取并安装zookeeper:(以apache-zookeeper-3.6.1-bin.tar.gz为例),官网:https://zookeeper.apache.org/。
3、将下载好的kafka及zookeeper压缩包上传到虚拟机服务器,放置到/usr/local/目录中:
linux安装kafka,kafka,linux

安装zookeeper

1、解压apache-zookeeper-3.6.1-bin.tar.gz压缩包,并重命名

#进入到压缩包存放路径
cd /usr/local/
#解压
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
#重命名
mv apache-zookeeper-3.6.1-bin zookeeper-3.6.1

2、配置启动

#进入配置目录
cd zookeeper-3.6.1/conf/
#复制配置文件,不直接修改源文件用于备份使用
cp zoo_sample.cfg zoo.cfg
#编辑zoo.cfg文件
vi zoo.cfg

修改zoo.cfg中的内容为,主要修改dataDir路径以及端口号:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper-3.6.1/data/
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

3、启动zookeeper

sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh start 

linux安装kafka,kafka,linux
4、查看zookeeper状态

sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh status

linux安装kafka,kafka,linux
另:停止zookeeper:

sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh stop

linux安装kafka,kafka,linux

以上为单体zookeeper安装及启动过程,以下为zookeeper集群的搭建方式:
与单体的搭建方式类似,只需要重复类似单体的部署模式,集群最少使用3台,以下大概介绍相应的部署方式:
1、首先将解压好的文件夹复制并重命名

#创建集群目录
mkdir zookeeper-cluster
#将解压好的zookeeper目录复制到集群目录中
cp -r zookeeper-3.6.1 zookeeper-cluster/
#进入集群目录中
cd zookeeper-cluster/
#重命名复制过来的目录为节点1目录,同理复制出节点2和节点3的目录
mv zookeeper-3.6.1 zookeeper-1
cp -r zookeeper-1 ./zookeeper-2
cp -r zookeeper-1 ./zookeeper-3

以下为复制完成后的目录结构:
linux安装kafka,kafka,linux

2、分别配置相关的配置文件,注意节点名称及dataDir路径不要冲突

vi zookeeper-1/conf/zoo.cfg
vi zookeeper-2/conf/zoo.cfg
vi zookeeper-3/conf/zoo.cfg

3、分别创建myid文件到data目录中

echo "1" > zookeeper-1/data/myid
echo "2" > zookeeper-2/data/myid
echo "3" > zookeeper-3/data/myid

修改每个节点中的dataDir,clientPort的值,并增加节点之间的关联属性,以下是节点1的示例,其他节点以此类推:
linux安装kafka,kafka,linux
配置完成后分别启动三个节点:

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start

集群启动成功
linux安装kafka,kafka,linux
连接测试,zkCli.sh脚本可以连接集群测试是否启动成功:
linux安装kafka,kafka,linux
如果(kerberos服务已经安装并配置完成)开启Kerberos认证需要进行以下操作(以单体zookeeper为例):
1、生成keytab文件:

#登录kerberos的命令行界面
kadmin.local
#生成随机密码
addprinc -randkey zookeeper/hadoop.test.com@TEST.COM
#生成keytab文件
 ktadd -k /etc/security/keytabs/zookeeper.keytab zookeeper/hadoop.test.com@TEST.COM
 #退出命令行
 exit
 #查看生成的keytab文件的用户
 klist -ket /etc/security/keytabs/zookeeper.keytab

以下是执行过程示例:
linux安装kafka,kafka,linux
2、生成jaas文件:

vi /usr/local/zookeeper-3.6.1/conf/jaas.conf

jaas.conf文件内容,注意keyTab属性位置及principal用户名配置正确

Server{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/zookeeper.keytab"
principal="zookeeper/hadoop.test.com@TEST.COM"
useTicketCache=false;
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/zookeeper.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/hadoop.test.com@TEST.COM";
};

3、修改配置文件zoo.cfg添加配置,在结尾加入以下配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

4、添加java.env文件,并写入相关的内容,注意-Djava.security.auth.login.config为生成的jaas.conf文件的路径:

echo 'export JVMFLAGS=" -Dsun.security.krb5.debug=true -Djava.security.auth.login.config=/usr/local/zookeeper-3.6.1/conf/jaas.conf"' > /usr/local/zookeeper-3.6.1/conf/java.env

5、启动验证,启动成功,并连接成功
linux安装kafka,kafka,linux

kafka安装及配置

kafka安装(单体)

1、解压安装包

#进入压缩包存放路径
cd /usr/local/
#解压压缩包
tar -zxvf kafka_2.12-2.5.1.tgz
#进入解压后目录中
cd kafka_2.12-2.5.1

linux安装kafka,kafka,linux
2、修改配置

#修改服务配置文件
vi /usr/local/kafka_2.12-2.5.1/config/server.properties

配置中的内容需要配置:

listeners=PLAINTEXT://192.168.4.130:9092
zookeeper.connect=192.168.4.130:2181

配置完成后(前提:zookeeper已正常启动),即可启动
服务端启动命令(不加-daemon前台启动,关闭即停止服务,加了-daemon后台启动):

#启动命令
/usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server.properties
#停止命令
/usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-stop.sh

查看服务日志命令:

tail -f /usr/local/kafka_2.12-2.5.1/logs/server.log

linux安装kafka,kafka,linux
以上是kafka单体的配置及启动方式。

kafka集群配置方式

本次主要记录同一台服务器上搭建集群,类似于单体的搭建方式,我们只需要配置每个节点的配置文件,然后分别启动即可,如果是分不同的服务器搭建,类似于每台服务器上搭建单体,然后在配置文件中增加相应集群相关的配置项即可使用了,以下默认已经将压缩包解压好后的操作。
1、复制server.properties文件并重命名,复制出3(集群最好大于等于三个节点)份来

#复制配置文件
cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-1.properties
cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-2.properties
cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-3.properties
#编辑配置文件
vi /usr/local/kafka_2.12-2.5.1/config/server-1.properties
vi /usr/local/kafka_2.12-2.5.1/config/server-2.properties
vi /usr/local/kafka_2.12-2.5.1/config/server-3.properties

之后分别配置复制出来的server-1.properties,server-2.properties,server-3.properties三个配置文件,分别配置以下配置

#节点id,不同的节点用不同的数字表示
broker.id=1
#对外的ip及端口,端口号每个文件不要用同一个,我使用的分别是9091,9092,9093
listeners=PLAINTEXT://192.168.4.130:9091
#数据存放位置,每个节点一个如/kafka-logs-1,/kafka-logs-2,/kafka-logs-3等,不同节点使用文件不可重复,如果重复了容易启动失败
log.dirs=/usr/local/kafka_2.12-2.5.1/data/cluster/kafka-logs-1
#填zookeeper的地址,多个用,隔开
zookeeper.connect=192.168.4.130:2181

分别配置好之后可以使用启动命令分别启动相关节点

#启动节点1
/usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-1.properties
#启动节点2
/usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-2.properties
#启动节点3
/usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-3.properties
#查看服务日志
tail -f /usr/local/kafka_2.12-2.5.1/logs/server.log

启动成功日志:
linux安装kafka,kafka,linux

注:有时存在kafka启动失败报zookeeper连接超时拒绝连接时,可能引起的原因是防火墙没有关闭,关闭防火墙的命令,也有可能是/etc/hosts文件配置不对引起的,遇到该问题可以多向考虑。

#方式一
#停止防火墙
service firewalld stop
#禁用防火墙
systemctl disable firewalld
#方式二:
chkconfig iptables off

另:如果之前启动过zookeeper或者kafka,但是数据目录没有清除过的话也会影响我们的启动,一定要仔细核对好。

kafka开启kerberos认证

如果我们搭建的kafka(单体或集群)需要开启kerberos认证,可以在安装的时候这样配置:
1、生成keytab文件:

#登录kerberos的命令行界面
kadmin.local
#生成随机密码
addprinc -randkey kafka/hadoop.test.com@TEST.COM
#生成keytab文件
 ktadd -k /etc/security/keytabs/kafka.keytab kafka/hadoop.test.com@TEST.COM
 #退出命令行
 exit
 #查看生成的keytab文件的用户
 klist -ket /etc/security/keytabs/kafka.keytab

以下是生成过程示例:
linux安装kafka,kafka,linux
2、生成jaas文件:

vi /usr/local/kafka_2.12-2.5.1/config/jaas.conf

jaas.conf文件内容,注意keyTab属性位置及principal用户名配置正确

KafkaServer{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/etc/security/keytabs/kafka.keytab"
principal="kafka/hadoop.test.com@TEST.COM";
};
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/etc/security/keytabs/kafka.keytab"
principal="kafka/hadoop.test.com@TEST.COM"
userTicketCache=true;
};
Client{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/etc/security/keytabs/kafka.keytab"
principal="kafka/hadoop.test.com@TEST.COM"
userTicketCache=true;
};

3、修改kafka的配置文件,如果是单体仅需要修改一个,如果是集群,则需要修改每个节点对应的配置文件:

vi /usr/local/kafka_2.12-2.5.1/config/server.properties

配置文件中添加(配置)以下的属性(非kerberos配置时需要的配置默认需要配置好)

listeners=SASL_PLAINTEXT://172.168.4.130:9093
advertised.listeners=SASL_PLAINTEXT://172.168.4.130:9093
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
kafka.security.protocol=SASL_PLAINTEXT
super.users=User:kafka

4、修改kafka服务启动脚本,配置相关的jaas文件路径

vi /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh

添加以下的内容:

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.5.1/config/jaas.conf"

添加示例:
linux安装kafka,kafka,linux

注意:如果kafka连接时,生产者或消费者连接开启kerberos认证的kafka服务器时,需要在相应的脚本中也同样添加该配置

#生产者连接脚本配置
vi /usr/local/kafka_2.12-2.5.1/bin/kafka-console-producer.sh
#消费者连接脚本配置
vi /usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh
#topic连接脚本配置
vi /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh

同时消费者或生产者的配置文件中需要增加以下配置:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

添加示例(以生产者为例):
linux安装kafka,kafka,linux
启动kafka服务,连接并使用,参考普通kafka启动及连接相关命令。

kafka自带zookeeper使用

注:kafka安装包中也自带了zookeeper,如果不想安装zookeeper,可以使用kafka安装包中自带的zookeeper。
如果使用kafka安装包中带的zookeeper,需要配置解压后目录中的zookeeper.properties

vi /usr/local/kafka_2.12-2.5.1/config/zookeeper.properties

如果不需要修改端口可以默认不修改,使用命令启动自带zookeeper:

#启动zookeeper
/usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/zookeeper.properties
#停止zookeeper
/usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-stop.sh

如果zookeeper需要开启kerberos认证需要给zookeeper.properties添加配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

在启动脚本zookeeper-server-start.sh中添加如下配置,注意jaas.conf文件的路径,jaas文件生成方式同上。

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/zookeeper-3.6.1/conf/jaas.conf"

启动命令如上。

kafka常用命令

#创建主题
/usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --create --zookeeper 192.168.4.130:2181 --replication-factor 1 --partitions 3 --topic test01
#查看主题列表
/usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --list --zookeeper 192.168.4.130:2181/kafka
#生产者连接并生产数据
/usr/local/kafka_2.12-2.5.1/bin/kafka-console-producer.sh --broker-list 192.168.4.130:9092  --topic test01 --producer.config /usr/local/kafka_2.12-2.5.1/config/producer.properties
#消费者连接并消费数据
/usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.130:9092 --topic test01 --from-beginning --consumer.config  /usr/local/kafka_2.12-2.5.1/config/consumer.properties
#kafka添加消息写入partition时间戳的方法
#Kafka消息的时间戳,在消息中增加了一个时间戳字段和时间戳类型。目前支持的时间戳类型有两种: CreateTime 和 LogAppendTime 前者表示producer创建这条消息的时间;后者表示broker接收到这条消息的时间(严格来说,是leader broker将这条消息写入到log的时间)
/usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --alter --topic test01 --zookeeper 192.168.4.130:2181 --config  message.timestamp.type=LogAppendTime
/usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.130:9092 --topic test01 --from-beginning --consumer.config  /usr/local/kafka_2.12-2.5.1/config/consumer.properties --property print.timestamp=true
#删除主题内的消息:kafka-delete-records --bootstrap-server <broker-host:port> --offset-json-file offsets.json
#–bootstrap-server:需要连接的 brokers 地址;
#–offset-json-file:包含删除配置的 Json 文件。
/usr/local/kafka_2.12-2.5.1/bin/kafka-delete-records.sh --bootstrap-server 192.168.4.130:9092 --offset-json-file /usr/local/kafka_2.12-2.5.1/remove.json
#删除附加:移除kerberos开启的server中的数据,同样需要在相关配置文件中配置kerberos相关的配置,以及脚本中增加相关的配置
/usr/local/kafka_2.12-2.5.1/bin/kafka-delete-records.sh --bootstrap-server 192.168.4.130:9092 --command-config /usr/local/kafka_2.12-2.5.1/config/delete-kerb.properties  --offset-json-file /usr/local/kafka_2.12-2.5.1/remove.json

关于移除数据的remove.json配置文件内容:

{
   "partitions": [
                  {"topic": "test01", "partition": 0, "offset": -1}
                 ],
                 "version":1
 }

topic:待删除数据主题
partition:待删除的分区
offset:删除起始偏移量,设置为 -1,表示将删除主题中所有数据。
linux安装kafka,kafka,linux文章来源地址https://www.toymoban.com/news/detail-779881.html

到了这里,关于linux环境kafka安装及配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Windows下安装单机Kafka环境及配置SASL身份认证

    zookeeper和kafka都是java开发的,所以安装前先安装1.8版本以上的jdk,并设置环境变量 JAVA_HOME=d:envJavajdk1.8.0_14 1.1 Apache ZooKeeper点击下载地址 Apache ZooKeeper,下载最新版本zookeeper压缩包,解压到本地 1.2 来到 conf文件夹下,复制一份 zoo_sample.cfg ,改名为 zoo.cfg 1.3 在安装目录下新

    2024年02月01日
    浏览(40)
  • kafka各种环境安装(window,linux,docker,k8s),包含KRaft模式

    1、 本次实验,采用kafka版本为 3.4.0 2、我们首先需要了解一下,一个 Kafka 集群是由下列几种类型的节点构成的,它们充当着不同的作用: Broker 节点 :即 代理节点 ,是 Kafka 中的工作节点,充当消息队列的角色, 负责储存和处理消息 ,每个 Broker 都是一个独立的 Kafka 服务器

    2024年02月10日
    浏览(48)
  • 【Linux】Linux环境配置安装

    目录 一、双系统(特别不推荐) 安装双系统的缺点: 安装双系统优点(仅限老手): 二、虚拟机+centos7镜像(较为推荐推荐) 虚拟机的优点: 虚拟机的缺点: ​ 下载centos 7的镜像文件  下载Ubuntu镜像文件Ubuntu 镜像文件下载地址  三、云服务器 Xshell云服务器共享 Xshell删除

    2024年02月07日
    浏览(35)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(45)
  • JDK下载、安装和环境配置教程(Linux环境)

    一、下载JDK 下载JDK官网地址:进入官网—点击Java archiveJava archiveJava archive Java Downloads | Oracle https://www.oracle.com/java/technologies/downloads/  往下翻可以看到所有的版本—版本根据自己需求选择  选择自己要下载的版本——下翻——选择tar包或rpm包(这里选择tar包)——点击下载(官

    2024年02月06日
    浏览(46)
  • Linux 安装 Anaconda 并配置环境

    本文主要用于记录自己在 飞腾匠牛开发板(ARM架构) 中利用 Anaconda/Miniconda 创建 python 虚拟环境中遇到的一些问题。 1. 官网下载 Free Download | Anaconda 2. 清华镜像下载 Index of /anaconda/archive/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 下载 Linux 64-Bit (x86) installer 注: 如果是在

    2024年02月07日
    浏览(66)
  • Linux安装Flink及其环境配置

    Linux服务器环境部署专栏目录(点击进入…) 使用StandAlone模式,需要启动Flink的主节点JobManager以及从节点TaskManager 服务 node1 node2 node3 JobManager 是 否 否 TaskManager 是 是 是 Flink 的部署模式分为3种: (1)Application模式 (2)Per-Job模式 (3)Session模式 每个 JobManager 的可用内存值(

    2024年02月10日
    浏览(28)
  • Linux环境下配置安装RocketMQ

    官网下载:下载链接 根据需要下载自己需要的版本、本文使用下载的是:4.7.0版本 创建目录,使用ftp工具上传下载的包到上面创建的目录下。 注意 :rocketmq 需要 Linux 上安装JDK,版本 1.8 以上,如果你 Linux 上已经配置了 Java 环境可跳过,未安装请自行安装。 上传之后进行解压

    2024年02月12日
    浏览(56)
  • 【配置环境】Linux下安装MySQL

    目录 一,环境 二,安装步骤 1.使用包管理器安装MySQL 2.配置MySQL的安全选项 3.设置root用户使用密码进行身份验证(可选) 三,拓展知识 1.如何修改MySQL的密码策略? 2.实现连接MySQL数据库的测试代码 VMware® Workstation 16 Pro (版本:16.1.2 build-17966106) ubuntu-22.04.2-desktop-amd64 对于

    2024年02月13日
    浏览(42)
  • Linux环境中Grafana安装与配置

    https://grafana.com/grafana/download?edition=oss 下载安装包上传到需要安装服务器,在安装包所在的文件夹执行解压命令。 解压后会有一个grafana的文件夹。 解压后需要进入Grafana文件夹启动。 浏览器输入 http:ip:3000 ,默认端口3000 出现登录界面代表启动成功。帐号密码默认 admin 1.登录后

    2024年02月08日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包