袋鼠云的FlinkSQL插件开发

这篇具有很好参考价值的文章主要介绍了袋鼠云的FlinkSQL插件开发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

袋鼠云是什么

袋鼠云是一家大数据产品供应商。他开发了一个产品叫做 flinkStreamSQL。这东西是以 Flink 为基础开发的使用 SQL 来写流式计算逻辑的产品。

FlinkStreamSQL 的开源地址

什么是插件

这里所说的插件是可以理解为自定义的语法。例如下面的 SQL:

select fact.shop_id
      ,shop.shop_name
from fact_stream as fact
left join dim_shop as shop
on fact.shop_id = shop.shopid

dim_shop 可能是一个 redis 为实体的 Table ,这袋鼠已经为我们实现了,现在我们可能从 HTTP 的接口拿到数据,此时的话,我们可以自定义一个 HTTP Table ,然后上面的代码不用修改。

整体的流程

编写、执行 FlinkStreamSQL 的流程如下所示:


CREATE TABLE source(
    colName colType,
    ...
    function(colNameX) AS aliasName,
    WATERMARK FOR colName AS withOffset( colName , delayTime )
 )WITH(
    type ='kafka09',
    kafka.bootstrap.servers ='ip:port,ip:port...',
    kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',
    kafka.auto.offset.reset ='latest',
    kafka.topic ='topicName',
    parallelism ='parllNum',
    --timezone='America/Los_Angeles',
    timezone='Asia/Shanghai',
    sourcedatatype ='json' #可不设置
 );

CREATE TABLE sink(
    colName colType,
    ...
    function(colNameX) AS aliasName,
    WATERMARK FOR colName AS withOffset( colName , delayTime )
 )WITH(
    type ='kafka09',
    kafka.bootstrap.servers ='ip:port,ip:port...',
    kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',
    kafka.auto.offset.reset ='latest',
    kafka.topic ='topicName',
    parallelism ='parllNum',
    --timezone='America/Los_Angeles',
    timezone='Asia/Shanghai',
    sourcedatatype ='json' #可不设置
 );

 CREATE TABLE dim (
     columnFamily:columnName type as alias,
     ...
     PRIMARY KEY(keyInfo),
     PERIOD FOR SYSTEM_TIME
  )WITH(
     type ='hbase',
     zookeeperQuorum ='ip:port',
     zookeeperParent ='/hbase',
     tableName ='tableNamae',
     cache ='LRU',
     cacheSize ='10000',
     cacheTTLMs ='60000',
     parallelism ='1',
     partitionedJoin='false'
  );

insert into sink(
...
)
select source.*
      ,dim.*
from source 
left join dim 
where 

袋鼠云的FlinkSQL插件开发,Flink,flink,大数据
如上面画的,会将 DDL 变成 source、sink、side-put 的算子。简单的讲,执行的逻辑是
; 为分割符号,分割开 sql 语句,然后使用正则表达式识别 DDL 语句、DML 语句。
其中 DDL 语句中符合(?i)^PERIOD\s+FOR\s+SYSTEM_TIME$ 则认为是 side-input ,side-input 会别解析为异步I/O
算子。如果 DDL 语句中没有则解析为 source 算子,如果 DDL 表在 DML 中在 insert into 后面,则为 sink 表。

知道了这些之后,我们可以自己定义一种 DDL 语句,如下:

 CREATE TABLE dim (
     columnFamily:columnName type as alias,
     ...
     PRIMARY KEY(keyInfo),
     PERIOD FOR SYSTEM_TIME
  )WITH(
     type ='http',
     url ='http://....',
     ...
  );
  
insert into sink(
...
)
select source.*
      ,dim.*
from source 
left join dim 
where 

其他的都不变,我现在的认为是实现一些接口,让 FlinkStreamSql 能通过 SQL 找到对应算子的实现。

关键的接口

DDL 语句解析的相关接口

AbstractTableInfo
    |--AbstractSideTableInfo
    |--AbstractTargetTableInfo
    |--AbstractSourceTableInfo

AbstractTableParser
    |--AbstractSideTableParser
    |--ClickhouseSideParser
    |--AbstractSourceParser

袋鼠平台是如何找到对应的 AbstractSideTableInfo 的呢?其实靠 class 的命名规则,例如,hbase side table
AbstractSideTableInfo 的实现类是 HbaseSideParser。with 中的属性 type = hbase ,然后 table 的 DDL 中有 side table 的关键配置,然后所以贫出来的 class 文件文字是 HbaseSideParser ,namespace 是 com.dtstack.flink.sql.{类型}.{type}.table, 所以全程也出来了。

转化为算子的接口

BaseAsyncReqRow 此接口是继承了 RichAsyncFunction ,重要的方法有 handleAsyncInvoke 里面实现了异步调用外表接口的东西。

我实现的是的 Http side table ,使用的是 http 异步的请求接口:

<dependency>
	<groupId>org.asynchttpclient</groupId>
	<artifactId>async-http-client</artifactId>
	<version>2.12.3</version>
</dependency>

使用的接口是:文章来源地址https://www.toymoban.com/news/detail-742530.html

AsyncHttpClient client = Dsl.asyncHttpClient();
BoundRequestBuilder builder = client.preparePost(url)
.setHeader("Content-type","application/json")
.setBody(json.getBytes());

Request r = builder.build();
ListenableFuture whenResponse = client.executeRequest(build);
whenResponse.addListener(
  new Runnable(){
    public void run(){
      // 编写异步的回调函数
    }
  }
)

到了这里,关于袋鼠云的FlinkSQL插件开发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink:FlinkSql解析嵌套Json

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下 1,数据是网上摘抄,但包含里常用的大部分格式 {     \\\"afterColumns\\\": {         \\\"created\\\": \\\"1589186680\\\",         \\\"extra\\\": {             \\\"

    2023年04月09日
    浏览(24)
  • flink学习35:flinkSQL查询mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable {   def main(args: Array[String]): Unit = {     //create env     val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    浏览(38)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(42)
  • 【Flink系列七】TableAPI和FlinkSQL初体验

    Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理

    2024年02月03日
    浏览(28)
  • 【Flink】FlinkSQL中执行计划以及如何用代码看执行计划

    FilnkSQL怎么查询优化 Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如: • 基于 Apache Calcite 的子查询解相关 • 投影剪裁 • 分区剪裁 • 过滤器下推 • 子计划消除重复数据以避免重复计算 • 特殊子查询重写,包括两部

    2023年04月11日
    浏览(43)
  • 【Flink】FlinkSQL读取Mysql表中时间字段相差13个小时

    问题:Flink版本1.13,在我们使用FlinkSQL读取Mysql中数据的时候,发现读取出来的时间字段中的数据和Mysql表中的数据相差13个小时,Mysql建表语句及插入的数据如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT \\\'自增ID\\\', `name` varchar(64) DEFAULT NULL COMMENT \\\'姓名\\\'

    2024年01月19日
    浏览(25)
  • FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能

    2023年04月27日
    浏览(31)
  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(30)
  • flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

    前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的! Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王

    2024年02月21日
    浏览(37)
  • flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

    前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。 Tips:我觉得学习 Flink 还是挺有意思的

    2024年02月19日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包