flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

这篇具有很好参考价值的文章主要介绍了flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink学习笔记

前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。
Tips:我觉得学习 Flink 还是挺有意思的,虽然学习进度有点慢,但是数据源已经理解清楚了,我相信接下来一切会越来越好的!

二、Flink 流批一体 API 开发

1. 输入数据集 Data Source

1.1 预定义 Source
1.1.1 基于本地集合的 Source
  • (1) env.fromElements()
# 两种输入类型,一种是元素,一种是元组
DataStreamSource<Object> ds1 = env.fromElements("hadoop","spark", "spark", "flink");

List<Tuple2<String,Long>> tuple2List  = new ArrayList<>();
tuple2List.add(Tuple2.of("hadoop",1L));
tuple2List.add(Tuple2.of("spark", 2L));
tuple2List.add(Tuple2.of("flink", 3L));
DataStreamSource<List<Tuple2<String, Long>>> ds2 = env.fromElements(tuple2List);

# 输出-1
6> spark
4> hadoop
5> spark
7> flink

# 输出-2
6> [(hadoop,1), (spark,2), (flink,3)]
  • (2) env.fromCollection()
# 传入列表
DataStreamSource<String> ds3 = env.fromCollection(Arrays.asList("spark", "flink", "hadoop"));

# 输出-3
8> hadoop
6> spark
7> flink

# fromParallelCollection 并行度队列(0-10闭区间)
DataStreamSource<Long> parallelCollection  = env.fromParallelCollection(
                new NumberSequenceIterator(0L, 10L),
                TypeInformation.of(Long.TYPE)
        ).setParallelism(3);
# 乱序输出 -parallelCollection

8> 8
2> 10
8> 7
6> 3
6> 5
3> 0
7> 6
1> 9
5> 2
5> 4
4> 1
  • (3) env.generateSequence()
# 传入队列(左开右闭区间)
DataStreamSource<Long> ds4 = env.generateSequence(1, 10);

# 输出 -4
8> 8
3> 3
2> 2
5> 5
1> 1
1> 9
7> 7
6> 6
4> 4
2> 10
  • (4) env.fromSequence()
# 传入队列(左开右闭区间)
DataStreamSource<Long> ds5 = env.fromSequence(1, 10);

# 输出 -5
1> 8
7> 6
6> 10
2> 5
3> 1
3> 2
8> 7
4> 9
5> 3
5> 4
1.1.2 基于文件的 Source
  • (1) 批的方式读取文本文件:env.readTextFile(path)
package cn.itcast.day02.source;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-12 23:47:53
 * @description TODO:批的方式读取文件
 */
public class BatchFromFile {
    public static void main(String[] args) throws Exception {
        // 配置端口号信息
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",8081);
        // 初始化 UI 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        // 读取数据源
        String path = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt";
        DataStreamSource<String> lines = env.readTextFile(path);
        // 数据源并行度
        int parallelism = lines.getParallelism();
        System.out.println("ReadTextFileDemo创建的DataStream的并行度为:" + parallelism);
        lines.print();
        env.execute();
    }
}
  • (2) 流的方式读取文本文件:env.readFile()
    • 细节点:流式处理 PROCESS_CONTINUOUSLY 时,文件状态改变才能触发重新打印一次
package cn.itcast.day02.source;

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;

/**
 * @author lql
 * @time 2024-02-13 15:34:11
 * @description TODO:流的方式读取数据源,无限流
 */
public class StreamFromFile {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        String path = "./data/input/wordcount.txt";

        // new TextInputFormat(null),文本输入编码格式,null表示默认为utf-8编码
        // FileProcessingMode.PROCESS_ONCE 只处理一次
        // 2000毫秒表示间隔处理时间
        DataStreamSource<String> lines1 = env.readFile(new TextInputFormat(null), path,
                FileProcessingMode.PROCESS_ONCE, 2000
        );
        // FileProcessingMode.PROCESS_CONTINUOUSLY 永续处理,不会停止
        DataStreamSource<String> lines2 = env.readFile(new TextInputFormat(null), path,
                FileProcessingMode.PROCESS_CONTINUOUSLY, 2000
        );

        // 查看并行度
        System.out.println("lines1的并行度:"+lines1.getParallelism());
        System.out.println("lines2的并行度:"+lines2.getParallelism());

        //lines1.print();
        lines2.print();
        env.execute();
    }
}
1.1.3 基于 Socket 的 Source
  • 现象:socket 的并行度是 1(单并行度数据源)
  • 细节:在虚拟机上用 nc -lk 8888 启动 socket 服务端
package cn.itcast.day02.source;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author lql
 * @time 2024-02-13 16:00:47
 * @description TODO:基于socket的数据源
 */
public class StreamSocketSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism0 = env.getParallelism();
        System.out.println("执行环境默认的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("192.168.88.161", 8888);
        int parallelism1 = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism1);
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        int parallelism2 = words.getParallelism();
        System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2);
        words.print();
        env.execute();
    }
}
1.2 自定义 Source
1.2.1 基于随机生成DataSource
  • (1) 自定义实现 SourceFunction 接口
    • 例子:自定义数据源, 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
    • 要求: 随机生成订单ID(UUID),用户ID(0-2),订单金额(0-100),时间戳为当前系统时间
package cn.itcast.day02.source.custom;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * @author lql
 * @time 2024-02-13 16:21:31
 * @description TODO
 */
public class CustomerSourceWithoutParallelDemo {

    /**
     * 自定义 java Bean 类
     *     @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
     *     @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
     *     @NoArgsConstructor:自动生成一个无参构造函数。
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order{
        // 订单
        private String id;
        // 用户 ID
        private String userId;
        // 订单金额
        private int money;
        // 时间戳
        private Long timestamp;
    }

    /**
     * 主函数
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //todo 1)获取flink流处理的运行环境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        System.out.println("初始化环境的并行度:"+ env.getParallelism());

        // todo 2) 接入自定义数据源
        DataStreamSource<Order> streamSource = env.addSource(new MySource());
        System.out.println("streamSource并行度: " + streamSource.getParallelism());
        
        // todo 3) 打印输出
        streamSource.printToErr();
        env.execute();
    }
    
    /**
     * 自定义数据源,每秒钟生成一个订单信息
     */
    private static class MySource implements SourceFunction<Order> {
        // 定义循环生成数据的标记
        private boolean isRunning = true;
        /**
         * 核心方法:生成数据
         */
        @Override
        public void run(SourceContext<Order> sourceContext) throws Exception {
            Random random = new Random();
            while (isRunning){
                // 订单ID
                String orderID = UUID.randomUUID().toString();
                // 用户 Id
                String userID = String.valueOf(random.nextInt(3));
                // 订单金额
                int money = random.nextInt(1000);
                // 时间
                long time = System.currentTimeMillis();
                // 返回数据
                sourceContext.collect(new Order(orderID, userID, money, time));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:默认运行环境的并行度:8, 自定义streamSource的并行度为:1

总结:文章来源地址https://www.toymoban.com/news/detail-826946.html

  • 1- env.addSource(new MySource()),自定义数据源 [私有静态方法]

    • new 一个 实现(implements) SourceFunction 接口,并重写核心方法
  • 2- 认识了 java bean 类,@Data,@NoArgsConstructor,@AllArgsConstructor 的作用

  • 3- UUID 这个工具类可以随机生成 id,随机数使用需要先 new 一个,random.nextInt() 是左闭右开

  • 4- String.valuesOf()是可以生成字符串类型,while 循环需要有 boolean 标记

  • 5- collect()可以返回对象数据

  • (2) 实现ParallelSourceFunction创建可并行Source

DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);

# 上述非rich的自定义mySource数据源不支持多个并行度
  • (3) 实现RichParallelSourceFunction:创建并行并带有Rich功能的Source
package cn.itcast.day02.source.custom;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * @author lql
 * @time 2024-02-13 16:58:49
 * @description TODO:多并行度的自定义数据源
 */
public class RichParallelismDemo {
    /**
     * 自定义 java Bean 类
     *
     * @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
     * @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
     * @NoArgsConstructor:自动生成一个无参构造函数。
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        // 订单
        private String id;
        // 用户 ID
        private String userId;
        // 订单金额
        private int money;
        // 时间戳
        private Long timestamp;
    }

    /**
     * 主函数
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //todo 1)获取flink流处理的运行环境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        System.out.println("初始化环境的并行度:" + env.getParallelism());

        // todo 2) 接入自定义数据源
        DataStreamSource<Order> streamSource = env.addSource(new MySource());
        streamSource = streamSource;
        System.out.println("streamSource并行度: " + streamSource.getParallelism());

        // todo 3) 打印输出
        streamSource.printToErr();
        env.execute();

    }

    /**
     * 自定义数据源,每秒钟生成一个订单信息
     */
    private static class MySource extends RichParallelSourceFunction<Order> {
        // 定义循环生成数据的标记
        private boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public void cancel() {

        }

        @Override
        public void run(SourceContext<Order> sourceContext) throws Exception {
            Random random = new Random();
            while (isRunning) {
                // 订单ID
                String orderID = UUID.randomUUID().toString();
                // 用户 Id
                String userID = String.valueOf(random.nextInt(3));
                // 订单金额
                int money = random.nextInt(1000);
                // 时间
                long time = System.currentTimeMillis();
                // 返回数据
                sourceContext.collect(new Order(orderID, userID, money, time));

            }
        }
    }
}

结果:自定义RichParallelSourceFunction支持多个并行度

总结:继承 RichParallelSourceFunction 方法,需要重写方法 open 和 close !

1.2.2 基于 MySQL 的 Source 操作
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');

SET FOREIGN_KEY_CHECKS = 1;
package cn.itcast.day02.source.custom;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;

/**
 * @author lql
 * @time 2024-02-13 17:14:06
 * @description TODO:自定义 mysql 数据源
 */
public class MysqlSource {
    public static void main(String[] args) throws Exception {
        // TODO 1: 获取 flink 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2: 接入自定义数据源
        DataStreamSource<UserInfo> streamSource = env.addSource(new MysqlSourceFunction());
        System.out.println("MysqlSourceFunction的并行度为:"+streamSource.getParallelism());

        // todo 3) 打印输出
        streamSource.print();

        // todo 4) 启动运行作业
        env.execute();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserInfo{
        private int id;
        private String username;
        private String password;
        private String name;
    }

    /**
     * 自定义数据源:获取 mysql 数据
     */
    private static class MysqlSourceFunction extends RichParallelSourceFunction<UserInfo> {
        // 定义 mysql 的连接对象
        private Connection connection = null;

        // 定义 mysql statement 对象
        private PreparedStatement statement = null;

        /**
         * 实例化的时候会被执行一次,多个并行度会执行多次,因为有多个实例
         * 一般由于资源的初始化操作
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 注册驱动
            Class.forName("com.mysql.jdbc.Driver");
            // 实例化 mysql 的连接对象
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root");
            // 实例化 statement 对象
            statement = connection.prepareStatement("select * from test.user");
        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public void run(SourceContext<UserInfo> sourceContext) throws Exception {
            while(true){
                ResultSet resultSet = statement.executeQuery();
                while(resultSet.next()) {
                    int id = resultSet.getInt("id");
                    String username = resultSet.getString("username");
                    String password = resultSet.getString("password");
                    String name = resultSet.getString("name");

                    sourceContext.collect(new UserInfo(id,username,password,name));
                }
                resultSet.close();
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {

        }
    }
}

结果:mysql 的自定义 source,可以多并行度

总结:

  • 1- java Bean 类,给 mysql 字段名定义用
  • 2- 初始化 mysql 连接对象 connection 和 statement 记为null
  • 3- 重写 open 驱动方法:
    • 注册 mysql 驱动:Class.forName(“com.mysql.jdbc.Driver”)
    • 实例化连接对象 connection:DriverManager.getConnection()
    • 实例化 statement:connection.prepareStatement(),这里放置 sql 查询语句
  • 4- 重写 run 核心方法:
    • 双重循环,第一层:结果集关闭和停顿间隔,第二层:statement.executeQuery()获取结果集,字段类型和内容获取
    • 获取完字段后,需要collect(new 实体类(字段集))
  • 5- 睡眠时间:TimeUnit.SECONDS.sleep()

到了这里,关于flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月13日
    浏览(60)
  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(43)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(45)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(41)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(44)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(60)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(43)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(37)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(42)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包