【大数据】Flink 测试利器:DataGen

这篇具有很好参考价值的文章主要介绍了【大数据】Flink 测试利器:DataGen。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.什么是 FlinkSQL ?

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持 ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如 过滤聚合连接转换 等。它还提供了 窗口操作时间处理复杂事件处理 等功能,以满足流式数据处理的需求。

Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。它是 Flink 最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

【大数据】Flink 测试利器:DataGen,# Flink,大数据,flink,测试,DataGen,测试数据,Connector,FlinkSQL
使用 Flink SQL 处理数据的基本步骤:

  • 定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。
  • 执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。
  • 定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。
  • 提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink 会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。

总而言之,我们可以通过 Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。

2.什么是 Connector ?

Flink Connector 是指 用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:

例如:

  • JDBC:用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。
  • JDQ:用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。
  • Elasticsearch:用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。
  • File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。
  • ……

还有如 HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive 等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或 将处理结果导出到外部系统

【大数据】Flink 测试利器:DataGen,# Flink,大数据,flink,测试,DataGen,测试数据,Connector,FlinkSQL

3.DataGen Connector

DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。

使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。

3.1 Demo

以下是一个使用 DataGen 函数的简单示例:

-- 创建输入表
CREATE TABLE input_table (
	order_number BIGINT,
	price DECIMAL(32,2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3)
) WITH (
	'connector' = 'datagen',
);

在上面的示例中,我们使用 DataGen 连接器创建了一个名为 input_table 的输入表。该表包含了 order_numberpricebuyerorder_time 四个字段。默认是 Random 随机生成对应类型的数据,生产速率是 10000 10000 10000 条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。

生成的数据样例:

{
    "order_number": -6353089831284155505,
    "price": 253422671148527900374700392448,
    "buyer": {
        "first_name": "6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2",
        "last_name": "d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"
    },
    "order_time": "2023-09-21 06:22:29.618"
}
{
    "order_number": 1102733628546646982,
    "price": 628524591222898424803263250432,
    "buyer": {
        "first_name": "4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c",
        "last_name": "7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"
    },
    "order_time": "2023-09-21 06:23:01.69"
}

3.2 支持的类型

字段类型 数据生成方式
BOOLEAN random
CHAR random / sequence
VARCHAR random / sequence
STRING random / sequence
DECIMAL random / sequence
TINYINT random / sequence
SMALLINT random / sequence
INT random / sequence
BIGINT random / sequence
FLOAT random / sequence
DOUBLE random / sequence
DATE random
TIME random
TIMESTAMP random
TIMESTAMP_LTZ random
INTERVAL YEAR TO MONTH random
INTERVAL DAY TO MONTH random
ROW random
ARRAY random
MAP random
MULTISET random

3.3 连接器属性

属性 是否必填 默认值 类型
描述
connector required (none) String ‘datagen’
rows-per-second optional 10000 10000 10000 Long 数据生产速率
number-of-rows optional (none) Long 指定生产的数据条数,默认是不限制
fields.#.kind optional random String 指定字段的生产数据的方式 random 还是 sequence
fields.#.min optional (Minimum value of type) (Type of field) random 生成器的指定字段 # 最小值,支持数字类型
fields.#.max optional (Maximum value of type) (Type of field) random 生成器的指定字段 # 最大值,支持数字类型
fields.#.length optional 100 100 100 Integer char / varchar / string / array / map / multiset 类型的长度
fields.#.start optional (none) (Type of field) sequence 生成器的开始值
fields.#.end optional (none) (Type of field) sequence 生成器的结束值

4.DataGen 使用案例

4.1 场景一:生成一亿条数据到 Hive 表

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) WITH ( 
	'connector'='datagen', 
	'number-of-rows'='100000000',
	'rows-per-second' = '100000'
);


CREATE CATALOG myhive
WITH (
	'type'='hive',
	'default-database'='default'
);
USE CATALOG myhive;
USE dev;
SET table.sql-dialect=hive;
CREATE TABLE if not exists shipu3_test_0932 (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
	'partition.time-extractor.timestamp-pattern'='$dt',
	'sink.partition-commit.trigger'='partition-time',
	'sink.partition-commit.delay'='1 h',
	'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number, price, buyer, order_time, cast(CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

当每秒生产 10 万条数据的时候,17 分钟左右就可以完成,当然我们可以通过增加 Flink 任务的计算节点、并行度、提高生产速率 rows-per-second 的值等来更快速的完成大数据量的生产。

4.2 场景二:持续每秒生产 10 万条数到消息队列

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price INT,
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_array ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH ( 
	'connector'='datagen', --连接器类型
	'rows-per-second'='100000', --生产速率
	'fields.order_number.kind'='random', --字段order_number的生产方式
	'fields.order_number.min'='1', --字段order_number最小值
	'fields.order_number.max'='1000', --字段order_number最大值
	'fields.price.kind'='sequence', --字段price的生产方式
	'fields.price.start'='1', --字段price开始值
	'fields.price.end'='1000', --字段price最大值
	'fields.col_array.element.length'='5', --每个元素的长度
	'fields.col_map.key.length'='5', --map key的长度
	'fields.col_map.value.length'='5' --map value的长度
);

CREATE TABLE jdqsink1 (
	order_number BIGINT,
	price DECIMAL(32, 2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_ARRAY ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH (
	'connector'='jdq',
	'topic'='jrdw-fk-area_info__1',
	'jdq.client.id'='xxxxx',
	'jdq.password'='xxxxxxx',
	'jdq.domain'='db.test.group.com',
	'format'='json'
);

INSERTINTO jdqsink1
SELECT * FROM dataGenSourceTable;

5.思考

通过以上案例可以看到,通过 Datagen 结合其他连接器可以模拟各种场景的数据。

  • 性能测试:我们可以利用 Flink 的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应。
  • 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性。
  • 数据完整性测试:我们通过 Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink 任务在处理数据时是否能够正确地保持数据的完整性。

总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。文章来源地址https://www.toymoban.com/news/detail-805609.html

到了这里,关于【大数据】Flink 测试利器:DataGen的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步

    Flink作为当前流行的流式计算框架,在对接StarRocks时,若直接使用JDBC的方式“流式”写入数据,对StarRocks是不友好的,StarRocks作为一款MVCC的数据库,其导入的核心思想还是“攒微批+降频率”。为此,StarRocks单独开发了flink-connector-starrocks,其内部实现仍是通过对数据缓存攒批

    2023年04月15日
    浏览(78)
  • Flink 学习十 FlinkSQL

    flink sql 基于flink core ,使用sql 语义方便快捷的进行结构化数据处理的上层库; 类似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整体架构和工作流程 数据流,绑定元数据 schema ,注册成catalog 中的表 table / view 用户使用table Api / table sql 来表达计算逻辑 table-planner利用 apache calci

    2024年02月10日
    浏览(46)
  • Flink 优化(六) --------- FlinkSQL 调优

    FlinkSQL 官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景: ➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不

    2024年02月14日
    浏览(42)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(56)
  • Flink:FlinkSql解析嵌套Json

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下 1,数据是网上摘抄,但包含里常用的大部分格式 {     \\\"afterColumns\\\": {         \\\"created\\\": \\\"1589186680\\\",         \\\"extra\\\": {             \\\"

    2023年04月09日
    浏览(36)
  • flink学习35:flinkSQL查询mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable {   def main(args: Array[String]): Unit = {     //create env     val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    浏览(46)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(56)
  • 【Flink系列七】TableAPI和FlinkSQL初体验

    Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理

    2024年02月03日
    浏览(39)
  • Flink Connector 开发

    Flink 是新一代流 批统一的计算引擎 ,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。 Connector 的作用就相当于一个连接器 ,连接 Flink 计算引擎跟外界存储系统。 Flink 里有以下几种方式,当然也不限于这几种方式可以跟外界进行

    2024年02月03日
    浏览(40)
  • Flink RocketMQ Connector实现

    Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。 参考FlinkKafkaConsumer: 可以看到,自定义的Source,只需要实现SourceFunction。 创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法 需要

    2024年02月11日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包