Flink Connector 开发

这篇具有很好参考价值的文章主要介绍了Flink Connector 开发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink Streaming Connector

Flink是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。Connector的作用就相当于一个连接器,连接Flink计算引擎跟外界存储系统。Flink里有以下几种方式,当然也不限于这几种方式可以跟外界进行数据交换:
【1】Flink里面预定义了一些sourcesink
【2】Flink内部也提供了一些Boundled connectors
【3】可以使用第三方Apache Bahir项目中提供的连接器;
【4】是通过异步IO方式;

预定义的 source 和 sink

Flink里预定义了一部分sourcesink。在这里分了几类。
Flink Connector 开发,Flink,flink,大数据,java,面试,elasticsearch,后端,性能优化

基于文件的 source 和 sink

如果要从文本文件中读取数据,可以直接使用:

env.readTextFile(path)

就可以以文本的形式读取该文件中的内容。当然也可以使用:根据指定的fileInputFormat格式读取文件中的内容。

env.readFile(fileInputFormat, path)

如果数据在Flink内进行了一系列的计算,想把结果写出到文件里,也可以直接使用内部预定义的一些sink,比如将结果已文本或csv格式写出到文件中,可以使用DataStreamwriteAsText(path)DataSetwriteAsCsv(path)

基于 Socket 的 Source 和 Sink

提供 Sockethost nameport,可以直接用StreamExecutionEnvironment预定的接口socketTextStream创建基于Socketsource,从该 socket中以文本的形式读取数据。当然如果想把结果写出到另外一个Socket,也可以直接调用DataStream writeToSocket

//从 socket 中读取数据流
env.socketTextStream("localhost",777);
//输出至 socket 
resultDataStream.writeToSocket("hadoop1",6666,new SimpleStringSchema())

基于内存 Collections、Iterators 的 Source

可以直接基于内存中的集合或者迭代器,调用StreamExecutionEnvironment fromCollectionfromElements构建相应的source。结果数据也可以直接printprintToError的方式写出到标准输出或标准错误。详细也可以参考Flink源码中提供的一些相对应的Examples来查看异常预定义 sourcesink的使用方法,例如WordCountSocketWindowWordCount

//从Java.util.Collection集合中读取数据作为数据源
ArrayList<String> list = new ArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();

//从Java.util.Collection集合中读取数据作为数据源
 env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();

Bundled Connectors

Flink里已经提供了一些绑定的Connector,例如kafka sourcesinkEs sink等。读写kafkaesrabbitMQ时可以直接使用相应 connectorapi即可。

虽然该部分是Flink项目源代码里的一部分,但是真正意义上不算作Flink引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job时候需要注意,job代码jar包中一定要将相应的connetor相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
Flink Connector 开发,Flink,flink,大数据,java,面试,elasticsearch,后端,性能优化

Apache Bahir 中的连接器

Apache Bahir最初是从Apache Spark中独立出来项目提供,以提供不限于Spark相关的扩展 / 插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器streaming connectorsSQL数据源扩展分析平台的覆盖面。如有需要写到flumeredis的需求的话,可以使用该项目提供的connector
Flink Connector 开发,Flink,flink,大数据,java,面试,elasticsearch,后端,性能优化

Async I/O

流计算中经常需要与外部存储系统交互,比如需要关联MySQL中的某个表。一般来说,如果用同步I/O的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步I/O可以并发处理多个请求,提高吞吐,减少延迟。Async的原理可参考官方文档
Flink Connector 开发,Flink,flink,大数据,java,面试,elasticsearch,后端,性能优化文章来源地址https://www.toymoban.com/news/detail-777462.html

到了这里,关于Flink Connector 开发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink Kafka[输入/输出] Connector

    本章重点介绍生产环境中最常用到的 Flink kafka connector 。使用 Flink 的同学,一定会很熟悉 kafka ,它是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟 kafka 进行一些数据的交换,比如利用 kafka consumer 读取数据,然后进行一系

    2024年02月04日
    浏览(40)
  • 生态扩展:Flink Doris Connector

    官网地址: https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector flink的安装: flink环境配置:vim /etc/profile 复制到flink的lib目录 doris官网:https://doris.apache.org/docs/ecosystem/flink-doris-connector

    2024年02月06日
    浏览(40)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(54)
  • Flink SQL Hive Connector使用场景

    目录 1.介绍 2.使用 2.1注册HiveCatalog 2.2Hive Read 2.2.1流读关键配置 2.2.2示例

    2024年02月06日
    浏览(42)
  • Flink Oracle CDC Connector源码解读

    flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。 flink oracle

    2024年02月02日
    浏览(42)
  • Flink Upsert Kafka SQL Connector 介绍

    在某些场景中,比方GROUP BY聚合之后的后果,须要去更新之前的结果值。这个时候,须要将 Kafka 记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在 Flink1.11 中,能够通过 flink-cdc-connectors 项目提供的 changelog-json format 来实现该性能。 在

    2024年02月20日
    浏览(44)
  • Flink CDC系列之:Oracle CDC Connector

    2023年08月23日
    浏览(49)
  • 使用 SPL 高效实现 Flink SLS Connector 下推

    作者:潘伟龙(豁朗) 日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数

    2024年03月19日
    浏览(35)
  • 关于flink-sql-connector-phoenix的重写逻辑

    目录 重写意义 代码结构  调用链路 POM文件配置 代码解析 一、PhoenixJdbcD

    2024年02月12日
    浏览(36)
  • Apache Doris (六十四): Flink Doris Connector - (1)-源码编译

     🏡 个人主页:IT贫道-CSDN博客   🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. Flink与Doris版本兼容

    2024年01月18日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包