FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

这篇具有很好参考价值的文章主要介绍了FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文介绍了  来源单表->目标源单表同步,多来源单表->目标源单表同步。

注:1.16版本、1.17版本都可以使用火焰图,生产上最好关闭,详情见文章末尾

Flink版本:1.16.2

环境:Linux CentOS 7.0、jdk1.8

基础文件:

flink-1.16.2-bin-scala_2.12.tgz、

flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven仓库目录:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
安装Flink步骤详见文章第二篇

 

Flink版本:1.17.1

环境:Linux CentOS 7.0、jdk1.8

基础文件:

flink-1.17.1-bin-scala_2.12.tgz、

flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven仓库目录:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)

支持的mysql版本: 

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

本次测试使用版本flink1.16.2 

一、 来源单表->目标源单表同步:数据源ip为***.50的源表,同步数据到数据源ip为***.134的目标表中,需要以下几个步骤:

1. 启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

2. 停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

3. 启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

4. 编写FlinkSql,创建临时表和job:

FlinkSql与mysql字段的类型映射

 把写好的Sql粘贴到FlinkSql客户端命令行中,分号'  ;  '是语句结束标识符,按回车创建:

 创建来源表结构:

来源表链接类型为'connector' = 'mysql-cdc'

Flink SQL> CREATE TABLE source_alarminfo51 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '***',
>     'port' = '3306',
>     'username' = '***',
>     'password' = '***',
>     'database-name' = 'alarm',
>     'server-time-zone' = 'Asia/Shanghai',
>     'table-name' = 'alarminfo'
>  );

[INFO] Execute statement succeed.

 创建目标表结构(目标表结构可比来源表字段多,可使用视图指定字段默认值):

目标表链接类型为'connector' = 'jdbc',注意url需要跟后面以下属性值

Flink SQL> CREATE TABLE target_alarminfo134 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   sourceLine int,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
>     'username' = '***',
>     'password' = '****',
>     'table-name' = 'alarminfo',
>     'driver' = 'com.mysql.cj.jdbc.Driver',
>     'scan.fetch-size' = '1000'
>  );

[INFO] Execute statement succeed.

 'scan.fetch-size' = '1000'  的含义:

 在 Flink SQL 中,scan.fetch-size 属性用于配置批处理查询中的每次批量获取记录的大小。具体地说,它指定了每次从数据源读取的记录数。

如何设置:

以下是一些建议:
考虑数据源的吞吐量:如果你的数据源的吞吐量较高,网络延迟较低,可以适当增大 scan.fetch-size 的值,以减少网络往返次数和请求开销。
考虑网络环境和带宽限制:如果数据源位于远程服务器或网络环境较差,可以选择适当较小的 fetch size 值,以减少网络传输的负载,避免出现大量的网络超时和传输失败情况。
考虑内存开销:fetch size 值过大可能会占用较多的内存资源,特别是对于批处理查询。如果你的查询涉及大量的中间状态(intermediate state)或内存密集型操作,可以选择适当较小的 fetch size 值。
一般来说,可以先尝试将 scan.fetch-size 设置为一个较默认的值,例如 1000 或 5000。然后观察任务的性能和执行效果,根据实际情况进行微调。可以根据实际性能测试和系统资源情况,逐步调整 fetch size 值,以找到性能和资源利用的平衡点。
需要注意的是,scan.fetch-size 属性值是一个相对的配置,不同的数据源和查询场景可能有不同的最佳值。因此,针对具体的数据源和查询条件,最好进行一些实际的性能测试和调优,以获得最优的性能和资源使用。

 最后创建同步关系:

INSERT INTO target_alarminfo134 SELECT *,50 AS sourceLine FROM source_alarminfo50

 若目标表比源表结构少字段属性则执行完同步关系后如下:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

创建完表结构可使用下列语句查看和删除:

查看表:show tables;

删除表:drop table if exists  target_alarminfo; 

flink-UI页面效果:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

数据同步效果:

源表:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

目标表数据:首次数据全量,后面数据变更增量 

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

二、 多来源单表->目标源单表同步:数据源ip为***.50、***.51的两个源表,同步数据到数据源ip为***.134的目标表中,使用sourceLine 用于区分数据来源,需要以下几个步骤:

 1. 创建自定义初始化脚本文件 init.sql、flinkSqlInit.sql,flinkSqlInit.sql文件中包含了在FlinkSql中需要执行的语句,用于自动化创建临时表和视图,这两个放在flink的bin目录下:

init.sql内容如下:

SET execution.runtime-mode=streaming;
SET pipeline.name=my_flink_job;
SET parallism.default=4;

 SET execution.runtime-mode=streaming 设置了作业的运行模式为流处理模式。这表示作业将以流处理的方式运行,即实时处理每个输入事件,并根据输入数据的到达顺序进行处理。

SET pipeline.name=my_flink_job 设置了作业的流水线名称为 "my_flink_job"。流水线名称主要用于标识作业,以便在运行时进行管理和监控。

SET parallelism.default=4 设置了作业的默认并行度为 4。并行度表示同时执行作业任务的任务数量。通过设置并行度,可以控制作业在集群上使用的资源量和执行的并行度。默认并行度将应用于作业的所有算子,除非为某个算子单独指定了并行度。

这些设置属性可以在 Flink 的初始化脚本中使用,并在作业启动时生效。可以根据作业的需求和资源情况调整这些属性,以获得最佳的性能和资源利用率。

注:mysql-cdc和jdbc的区别:mysql-cdc 标注 数据来源的表,jdbc标注 同步到的目标表

flinkSqlInit.sql内容如下:

SET execution.checkpointing.interval = 60s;
drop table if exists  source_alarminfo50;
CREATE TABLE source_alarminfo50 (
  id STRING NOT NULL,
  AlarmTypeID STRING,
  `Time` timestamp,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '**',
    'port' = '3306',
    'username' = '**',
    'password' = '**',
    'database-name' = 'alarm',
    'server-time-zone' = 'Asia/Shanghai',
    'table-name' = 'alarminfo'
 );
drop table if exists  source_alarminfo51;
CREATE TABLE source_alarminfo51 (
  id STRING NOT NULL,
  AlarmTypeID STRING,
  `Time` timestamp,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '**',
    'port' = '3306',
    'username' = '**',
    'password' = '**',
    'database-name' = 'alarm',
    'server-time-zone' = 'Asia/Shanghai',
    'table-name' = 'alarminfo'
 );
drop table if exists  target_alarminfo134;
CREATE TABLE target_alarminfo134 (
  id STRING NOT NULL,
  AlarmTypeID STRING,
  `Time` timestamp,
  sourceLine int,
  PRIMARY KEY (`id`) NOT ENFORCED
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
    'username' = '**',
    'password' = '**',
    'table-name' = 'alarminfo',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'scan.fetch-size' = '200'
 );

BEGIN STATEMENT SET;

INSERT INTO target_alarminfo134
SELECT *,50 AS sourceLine FROM source_alarminfo50
UNION ALL
SELECT *,51 AS sourceLine FROM source_alarminfo51;

END;

其中涉及flinksql的语法:

BEGIN STATEMENT SET 是 Flink SQL 中的一个特殊语法,用于将一组 SQL 语句作为一个事务进行处理。它用于将多个 SQL 语句作为一个原子操作执行,要么全部成功提交,要么全部回滚。
在 Flink SQL 中,可以使用 BEGIN STATEMENT SET 将多个 SQL 语句组合成一个事务,以确保这些语句的原子性。
以下是 BEGIN STATEMENT SET 的使用示例:
BEGIN STATEMENT SET;
-- SQL 语句 1
-- SQL 语句 2
-- ...
COMMIT;
在上述示例中,BEGIN STATEMENT SET 表示事务的开始,COMMIT 表示事务的提交。你可以在 BEGIN STATEMENT SET 和 COMMIT 之间编写需要执行的多个 SQL 语句。
如果在 BEGIN STATEMENT SET 和 COMMIT 之间的任何一条语句执行失败,整个事务将回滚,即已经执行的语句会被撤销。
需要注意的是,BEGIN STATEMENT SET 和 COMMIT 语句是 Flink SQL 的扩展语法,它们可能在某些特定的 Flink 版本或环境中才可用。在使用时,请确保你的 Flink 版本和环境支持该语法。 

检查点间隔设置:
SET execution.checkpointing.interval = 60s;
通过设置适当的检查点间隔,可以在容忍一定故障的同时,控制检查点的频率和资源使用。较短的检查点间隔可以提供更高的容错性,但也会增加系统开销。
检查点是 Flink 中用于实现容错性的机制,它会定期将作业的状态保存到持久化存储中,以便在发生故障时进行恢复。检查点间隔定义了两个连续检查点之间的时间间隔。 

 2. 重启Flink服务:

停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

 3.1 在flink的bin目录下执行初始化文件flinkSqlInit.sql:

有两种方式:

方式一:可设置job名称及资源参数配置

[root@localhost bin]#  ./sql-client.sh -i init.sql -f flinkSqlInit.sql  

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

 使用这个语句的好处是可以根据作业的需求和资源情况调整这些属性,以获得最佳的性能和资源利用率。

flink-UI页面效果:

 FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

方式二:不可设置job名称及资源参数配置

[root@localhost bin]#  ./sql-client.sh -f  flinkSqlInit.sql  

 FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

 4. 数据同步效果:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

三、打开火焰图:

编辑flink-conf.yaml:最后面添加 

rest.flamegraph.enabled: true

配置后重启flink服务,重新创建任务。

火焰图效果:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink

注:

在分析火焰图时,可以关注以下几点:
函数的执行时间:纵向的轴显示了函数的嵌套层级,越往下表示越深层的函数调用。横向轴表示时间,通过不同颜色的方块来表示函数的执行时间。
热点函数:寻找占据执行时间大部分的函数,这些函数可能是需要优化的关键点。
函数之间的关系:观察函数之间的调用关系,查看是否有不必要的函数调用或循环。
I/O 操作:关注是否有大量的数据读取、写入或网络通信,这可能是性能瓶颈的来源。
根据火焰图的分析结果,您可以进一步定位和排查潜在的性能问题,并在代码、配置或资源分配方面进行优化。
请注意,为了准确分析火焰图,建议在负载较高的情况下生成火焰图,并保持足够的监视时间。此外,Flink 的火焰图功能在生产环境中可能会造成一定的开销,因此建议在测试或开发环境中使用。

四、源表、目标表结构with下的属性介绍:

源表with下的属性:

chunk-key.even-distribution.factor.lower-bound:块键(Chunk Key)的均匀分布因子下限。

chunk-key.even-distribution.factor.upper-bound:块键的均匀分布因子上限。

chunk-meta.group.size:块元数据的分组大小。

connect.max-retries:连接重试的最大次数。

connect.timeout:连接的超时时间。

connection.pool.size:连接池的大小。

connector:使用的连接器的名称。

database-name:数据库的名称。

heartbeat.interval:心跳间隔时间。

hostname:主机名或 IP 地址。

password:连接到数据库或其他系统所需的密码。

port:连接的端口号。

property-version:属性版本。

scan.incremental.snapshot.chunk.key-column:增量快照的块键列。

scan.incremental.snapshot.chunk.size:增量快照的块大小。

scan.incremental.snapshot.enabled:是否启用增量快照。

scan.newly-added-table.enabled:是否启用新加入表的扫描。

scan.snapshot.fetch.size:从状态快照中获取的每次批量记录数。

scan.startup.mode:扫描启动模式。

scan.startup.specific-offset.file:指定启动位置的文件名。

scan.startup.specific-offset.gtid-set:指定启动位置的 GTID 集合。

scan.startup.specific-offset.pos:指定启动位置的二进制日志位置。

scan.startup.specific-offset.skip-events:跳过的事件数量。

scan.startup.specific-offset.skip-rows:跳过的行数。

scan.startup.timestamp-millis:指定启动时间戳(毫秒)。

server-id:服务器 ID。

server-time-zone:服务器时区。

split-key.even-distribution.factor.lower-bound:切分键(Split Key)的均匀分布因子下限。

split-key.even-distribution.factor.upper-bound:切分键的均匀分布因子上限。

table-name:表名。

username:连接到数据库或其他系统所需的用户名。

Sink目标表with下的属性:

connection.max-retry-timeout:连接重试的最大超时时间。

connector:使用的连接器的名称。

driver:JDBC 连接器中使用的数据库驱动程序的类名。

lookup.cache:查找表的缓存配置。

lookup.cache.caching-missing-key:是否缓存查找表中的缺失键。

lookup.cache.max-rows:查找表缓存中允许的最大行数。

lookup.cache.ttl:查找表缓存中行的生存时间。

lookup.max-retries:查找操作的最大重试次数。

lookup.partial-cache.cache-missing-key:是否缓存查找表部分缺失的键。

lookup.partial-cache.expire-after-access:查找表部分缓存中行的访问到期时间。

lookup.partial-cache.expire-after-write:查找表部分缓存中行的写入到期时间。

lookup.partial-cache.max-rows:查找表部分缓存中允许的最大行数。

password:连接到数据库或其他系统所需的密码。

property-version:属性版本。

scan.auto-commit:是否自动提交扫描操作。

scan.fetch-size:每次批量获取记录的大小。

scan.partition.column:用于分区的列名。

scan.partition.lower-bound:分区的下限值。

scan.partition.num:要扫描的分区数量。

scan.partition.upper-bound:分区的上限值。

sink.buffer-flush.interval:将缓冲区的数据刷新到目标系统的时间间隔。

sink.buffer-flush.max-rows:缓冲区中的最大行数,达到此值时将刷新数据。

sink.max-retries:写入操作的最大重试次数。

sink.parallelism:写入任务的并行度。

table-name:表名。

url:连接到数据库或其他系统的 URL。

username:连接到数据库或其他系统所需的用户名。

 

最后FlinkCDC目前不支持整库同步:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),运行环境,mysql,数据库,flink文章来源地址https://www.toymoban.com/news/detail-667179.html

到了这里,关于FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink】FlinkCDC获取mysql数据时间类型差8小时时区解决方案

    1、背景: 在我们使用FlinkCDC采集mysql数据的时候,日期类型是我们很常见的类型,但是FlinkCDC读取出来会和数据库的日期时间不一致,情况如下 FlinkCDC获取的数据中create_time字段1694597238000转换为时间戳2023-09-13 17:27:18  而数据库中原始数据如下,并没有到下午5点,这就导致了

    2024年02月07日
    浏览(50)
  • MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)

    MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改) 把MySQL多库多表的数据通过FlinkCDC DataStream的方式实时同步到同一个Kafka的Topic中,然后下游再写Flink SQL拆分把数据写入到ClickHouse,FlinkCDC DataStream通过自定义Debezium格式的序列化器,除了增加,还能进行

    2024年02月15日
    浏览(41)
  • Flink CDC MySQL同步MySQL错误记录

    0、相关Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者从mvnrepository.com下载 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    浏览(51)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 连接mysql(1)

    下载 连接器 SQL jar (或 自行构建 )。 将下载的jar包放在FLINK_HOME/lib/. 重启Flink集群。 注意 :目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的 6.使用 Flink CDC 对 MySQL 进行流式 ETL 本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。 假设我们将产品数

    2024年04月26日
    浏览(46)
  • flinkcdc同步完全量数据就不同步增量数据了

    使用flinkcdc同步mysql数据,使用的是全量采集模型 startupOptions(StartupOptions.earliest()) 全量阶段同步完成之后,发现并不开始同步增量数据,原因有以下两个: 1.mysql中对应的数据库没有开启binlog 在/etc/my.cnf配置文件中,在[ mysqld ]添加以下内容 然后重启数据库 ,执行命令 和chec

    2024年02月11日
    浏览(33)
  • FlinkCDC 入门之数据同步和故障恢复

    FlinkCDC 是一款基于 Change Data Capture(CDC)技术的数据同步工具,可以用于将关系型数据库中的数据实时同步到 Flink 流处理中进行实时计算和分析,下图来自官网的介绍。 下图 1 是 FlinkCDC 与其它常见 开源 CDC 方案的对比: 可以看见的是相比于其它开源产品,FlinkCDC 不仅支持增

    2024年02月07日
    浏览(35)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(102)
  • 最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。 连接器 数据库 驱动 mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    浏览(64)
  • Flink实时同步MySQL与Doris数据

    技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入-阿里云开发者社区 1. Flink环境: https://flink.apache.org/zh/ 下载flink-1.15.1 解压,修改配置 修改配置 修改rest.bind-address为 0.0.0.0 下载依赖jar包 至 flink安装目录lib下 启动flink 访问WebUI http://192.168.0.158:8081 2、

    2024年02月13日
    浏览(44)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包