PiflowX如何快速开发flink程序

这篇具有很好参考价值的文章主要介绍了PiflowX如何快速开发flink程序。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

PiflowX如何快速开发flink程序

参考资料

Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com)

Flink SQL 背景

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点:

  • SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;
  • SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
  • SQL 易于理解,不同行业和领域的人都懂,学习成本较低;
  • SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
  • 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。
Flink SQL 常规实战应用

案例来自(Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com))!详细流程有兴趣可以参考原文示例。(如有侵犯,请请联系!)。

在此,简单总结一下flink sql的开发流程:

1.首先需要创建maven工程,确认需要的各种依赖,运气好的话,还需要花费大量的精力和时间去排查依赖冲突的问题(oh God bless me!);

2.开始balabala编写模板代码,如:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

3.数据准备和预处理;

 DataSet<String> input = env.readTextFile("score.csv");
        DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
            @Override
            public PlayerData map(String s) throws Exception {
                String[] split = s.split(",");
                return new PlayerData(String.valueOf(split[0]),
                        String.valueOf(split[1]),
                        String.valueOf(split[2]),
                        Integer.valueOf(split[3]),
                        Double.valueOf(split[4]),
                        Double.valueOf(split[5]),
                        Double.valueOf(split[6]),
                        Double.valueOf(split[7]),
                        Double.valueOf(split[8])
                );
            }
        });
其中的PlayerData类为自定义类:
public static class PlayerData {
        /**
         * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
         */
        public String season;
        public String player;
        public String play_num;
        public Integer first_court;
        public Double time;
        public Double assists;
        public Double steals;
        public Double blocks;
        public Double scores;

        public PlayerData() {
            super();
        }

        public PlayerData(String season,
                          String player,
                          String play_num,
                          Integer first_court,
                          Double time,
                          Double assists,
                          Double steals,
                          Double blocks,
                          Double scores
                          ) {
            this.season = season;
            this.player = player;
            this.play_num = play_num;
            this.first_court = first_court;
            this.time = time;
            this.assists = assists;
            this.steals = steals;
            this.blocks = blocks;
            this.scores = scores;
        }
    }

4.终于到了真正的业务处理了,有了flink sql的强大和方便,倒是省了不少代码;

Table queryResult = tableEnv.sqlQuery("
select player, 
       count(season) as num 
    FROM score 
    GROUP BY player 
    ORDER BY num desc 
    LIMIT 3
");

5.ok,到此,数据处理和计算逻辑完毕,处理结果写入到sink,可以完结散花咯,哈哈;

DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();

6.哦!好像还需要调试运行,好吧,再辛苦一会,便可大功告成!
PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

7.完美,上线。。。。。。
PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

(以上,纯属娱乐,如有不当,敬请谅解!)

可见,在平日开发一个flink任务虽已尽可能简单,但开发周期也得1-2个工作日,甚至更长,有没有简单粗暴的,让我分分钟领盒饭,不,让我分分钟高效完成任务的!

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

当然有啦!!!接下来让我隆重的介绍一下今天的主角—PilfowX—大数据流水线系统。有兴趣可以查看之前的文章(StreamPark + PiflowX 打造新一代大数据计算处理平台-CSDN博客)。

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX是基于Piflow和StreamPark二开实现的,在其基础上,实现了图像化拖拉拽的方式开发spark或flink作业,这里我将介绍flink任务的开发流程,以及如何零代码实现flink sql的开发。

PiflowX的flink组件算子基本都是基于flink table和sql实现的,我们只需在UI界面填写组件相关参数,之后的工作交给底层框架即可。

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

我们回顾一下flink sql语法定义。

Flink SQL 的语法和算子

Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite 解析 SQL ,Calcite 支持标准的 ANSI SQL。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] | AS select_query ]

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]
PiflowX组件flink table实现

在了解了flink sql的定义后,一切便简单多了,那么,我们只需要根据业务需要,设计出一个表单输入,填写我们的业务参数,然后,由框架自动生成sql不就可以了么。

以下介绍如何配置一个mysqlcdc组件:

1.首先从组件列表中拖入一个MysqlCdc组件到画布中,点击节点,右侧会显示出节点参数表单区域和参数说明和示例。参数解释可以查看之前的文章(PiflowX-MysqlCdc组件-CSDN博客)。

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

2.填写相关参数,其实就是在定义flink table中的with属性。

在属性输入框中,点击预览可以实时查看生成的flink sql。

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data
PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

生成的flink sql 语句仅供参考,最终执行的语句会在引擎执行侧生成。
PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

3.接下来我们可以根据需要来定义flink table结构,此步骤和其他步骤没有先后顺序。点击表单属性tableDefinition,在此表单中我们可以输入flink table中的结构属性定义。

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

可以看到,我们可以在此定义flink table中的表基本信息,物理列,元数据列,计算列,水印等,具体说明在此就不赘述了,以后会有具体文章来说明。看看最终的效果:

PiflowX如何快速开发flink程序,PiflowX,flink,大数据,spark,hadoop,big data

至此,我们通过简单的表单填写,便可开发一个flink任务,最后,点击运行,系统便可自动提交到flink环境,并可实时查看运行日志,是不是很方便快捷!

当然,目前系统处于初期研发阶段,还有很多不完善的地方,敬请谅解。最后,我们来看一个简单的实例,如果通过PiflowX开发一个mysql cdc实时同步和flink读取doris的任务。

PiflowX-Droris读写组件

PiflowX-MysqlCdc组件文章来源地址https://www.toymoban.com/news/detail-795462.html

到了这里,关于PiflowX如何快速开发flink程序的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 处理大数据的基础架构,OLTP和OLAP的区别,数据库与Hadoop、Spark、Hive和Flink大数据技术

    2022找工作是学历、能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得去找开发,测开 测开的话,你就得学数据库,sql,oracle,尤其sql要学,当然,像很多金融企业、安全机构啥的,他们必须要用oracle数据库 这oracle比sql安全,强大多了,所以你需要学

    2024年02月08日
    浏览(57)
  • Hadoop、Spark、Storm、Flink区别及选择

    hadoop和spark是更偏向于对大量离线数据进行批量计算,提高计算速度 storm和flink适用于实时在线数据,即针对源源不断产生的数据进行实时处理。至于storm和flink之间的区别在于flink的实时性和吞吐量等要比storm高。 上述四个组件的实时性高低顺序如下: hadoop spark storm flink hdf

    2024年02月08日
    浏览(42)
  • Hadoop、Spark与Flink的基础架构及其关系和优异

    Hadoop、Spark和Flink是目前重要的三大分布式计算系统。它们都可以用于大数据处理,但在处理方式和应用场景上有所不同。 Hadoop专为批处理而生,一次将大量数据集输入到输入中,进行处理并产生结果。它用于离线复杂的大数据处理。 Spark定义是一个批处理系统,但也支持流

    2024年02月11日
    浏览(47)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建 下载 https://archive.apache.org/dist/  Mysql下载地址 Index of /MySQL/Downloads/ 我最终选择 Zookeeper3.7.1 +Hadoop3.3.5 + Spark-3.2.4 + Flink-1.16.1 + Kafka2.12-3.4.0 + HBase2.4.17 + Hive3.1.3  +JDK1.8.0_391  IP规划 IP hostname 192.168.1.5 node1 192.168.1.6 node

    2024年01月23日
    浏览(49)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(60)
  • 利用Docker快速部署hadoop、hive和spark

    2024年02月13日
    浏览(49)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆级超详细含图文)

    说明: 本篇将详细介绍用二进制安装包部署hadoop等组件,注意事项,各组件的使用,常用的一些命令,以及在部署中遇到的问题解决思路等等,都将详细介绍。 ip hostname 192.168.1.11 node1 192.168.1.12 node2 192.168.1.13 node3 1.2.1系统版本 1.2.2内存建议最少4g、2cpu、50G以上的磁盘容量 本次

    2024年02月12日
    浏览(50)
  • 林子雨 VirtualBox + Ubuntu[linux] 配置 java、hadoop、Spark[python]、pyspark快速配置流程

    按照步骤快速执行shell,最快速配置。 读者可以根据该篇随记快速回顾流程,以及用到的shell指令和相关配置文件。 是林老师教程的精简版,初次配置者只能作为流程参考,主要和林子雨Spark[python]版课程配套。  林老师厦大实验指南链接如下: Spark编程基础(Python版)教材官

    2024年04月12日
    浏览(43)
  • Linux多虚拟机集群化配置详解(Zookeeper集群、Kafka集群、Hadoop集群、HBase集群、Spark集群、Flink集群、Zabbix、Grafana部署)

    前面安装的软件,都是以单机模式运行的,学习大数据相关的软件部署,后续安装软件服务,大多数都是以集群化(多台服务器共同工作)模式运行的。所以,需要完成集群化环境的前置准备,包括创建多台虚拟机,配置主机名映射,SSH免密登录等等。 我们可以使用VMware提供

    2024年02月04日
    浏览(51)
  • Hadoop+Hive+Spark+Hbase开发环境练习

    1.练习一 1. 数据准备 在hdfs上创建文件夹,上传csv文件 [root@kb129 ~]# hdfs dfs -mkdir -p /app/data/exam 查看csv文件行数 [root@kb129 ~]# hdfs dfs -cat /app/data/exam/meituan_waimai_meishi.csv | wc -l 2. 分别使用 RDD和 Spark SQL 完成以下分析(不用考虑数据去重) 开启spark shell [root@kb129 ~]# spark-shell (1)加载

    2024年02月03日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包