如何基于Flink实现定制化功能的开发

这篇具有很好参考价值的文章主要介绍了如何基于Flink实现定制化功能的开发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

       技术为需求服务,通用需求由开源软件提供功能,一些特殊的需求,需要基于场景定制化开发功能。而对于自定义开发功能,Flink则提供了这样的SDK接口能力。

本文将从定制化功能需求分析如何基于Flink构建定制化功能两个方面讲述。

一、定制化功能开发的思考

2.1 为什么要学会定制化功能的开发?

       一些常规需求的应用能力已经被包装得很好,只需要关注包装在功能之上的交互逻辑就能满足业务需求。但有些需求依靠现成的技术无法完成,只能自定义任务逻辑,完成特定场景需求的功能包装;或者部分功能性能和可用性不佳,需要重构功能满足可用性和高性能需求。

2.2 有哪些需求属于定制化开发

大数据场景,对数据集成、加工与分析、写结果这三个过程,都可以做定制化开发:

从集成角度:公司采集各种数据源的数据:使用集成工具,如fileBeat、dataX、canal、sqoop、Flume等从各种源端获取数据,这属于对功能的应用

如果定制化开发任务:需要对SourceFunction函数的包装,实现对各种数据源的采集,这属于定制化需求开发;

如果定制化开发SDK:开发一个工具包,然后通过更改数据源配置驱动任务,这就属于定制化功能的开发,如:flink-sql-connector-mysql-cdc;

从加工和分析角度:

如果对数据的加工:实现类似ETL处理,数据扩维,打标,特征聚类,这属于定制化需求开发;

如果重写处理算子函数:对数据操作重写一个ProcessFunction、MapFunction、AggregateFunction函数,然后重载到DataStream对象里,这就是定制化功能开发;

从写结果过程:

如果自定义RichSinkFunction函数:实现对目标端接口的包装,这属于定制化需求开发;

如果包装一个SDK工具:然后将功能打包成SDK服务,嵌入Flink的sink算子内,这属于定制化功能的开发,如doris包装的:flink-doris-connector-1.14_2.12。

二、如何基于Flink构建定制化的功能

以下是基于Flink构建定制化需求和功能的一些思路:

2.1. 确定业务需求和计算模型

       首先,理清楚你的业务需求和所需的计算模型。明确需要处理的数据类型、处理逻辑、计算规则和数据流动方向等。

2.2 使用 Flink 提供的 API 进行开发

       利用 Flink 的 DataStream API 或 Table API,根据业务需求编写你的计算逻辑。这些 API 提供了丰富的操作符和方法,用于对数据进行转换、聚合、过滤等处理。你可以根据实际需求自定义操作符和函数,实现特定的计算逻辑。

2.3 实现自定义算子和函数

         如果标准操作符无法满足你的需求,可以实现自定义的算子(Operator)或函数(Function)。例如,可以扩展 Flink 的 RichFunction 接口,实现自定义的 MapFunction、FilterFunction、AggregateFunction、KeyedProcessFunction 等。

       这些算子的底层是通过processFucetion实现的,可以自定义processFucetion的value与context构建更细粒度的操作;

        对于一些有自己客户端和读写API的服务,可以将服务读写API,通过服务客户端接口自定义RichSourceFunction和RichSinkFunction,然后包装成SDK提供服务;

以下是自定义Redis的RichSinkFunction函数例子:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.Set;


public class RedisSink extends RichSinkFunction<Tuple2<String, String>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSink.class);

    private JedisCluster cluster;

    private String key;
    private String value;


    public RedisSink(String key, String value) {
        this.key = key;
        this.value = value;

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Set<HostAndPort> hostAndPorts = new HashSet<>();
        hostAndPorts.add(new HostAndPort("ip", 1111));
        hostAndPorts.add(new HostAndPort("ip2", 1111));

        // Jedis连接池配置
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 最大空闲连接数, 默认8个
        jedisPoolConfig.setMaxIdle(100);
        // 最大连接数, 默认8个
        jedisPoolConfig.setMaxTotal(500);
        //最小空闲连接数, 默认0
        jedisPoolConfig.setMinIdle(0);
        // 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1
        jedisPoolConfig.setMaxWaitMillis(2000); // 设置2秒
        cluster = new JedisCluster(hostAndPorts, jedisPoolConfig);
    }

    @Override
    public void invoke(Tuple2<String,  String> tuple2, Context context) throws Exception {
                cluster.set(tuple2.f0, tuple2.f1);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (cluster != null) {
            cluster.close();
        }
    }
}

2.4 优化和调优

       在构建定制化计算引擎的过程中,持续进行优化和调优是很重要的。考虑到数据处理性能、吞吐量和延迟等方面的问题,对代码进行优化和调整。

2.5 测试和部署

        完成代码编写后,进行测试和验证。确保计算引擎在不同场景和数据量下能够正常运行。之后,根据需求选择合适的部署方式,可以在集群环境中部署你的定制化计算引擎。

2.6 监控和维护

        一旦部署完成,建立相应的监控机制,监控计算引擎的运行状态和性能指标。持续地进行维护和优化,确保计算引擎的稳定性和性能。

三 、总结

       总的来说,构建定制化的计算引擎需要深入了解业务需求,善用 Flink 提供的 API,并根据实际情况进行优化和调整。 Flink 提供了强大的功能和灵活性,可以帮助你构建符合特定业务场景需求的定制化计算引擎,完成定制化功能的开发。文章来源地址https://www.toymoban.com/news/detail-806648.html

到了这里,关于如何基于Flink实现定制化功能的开发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能

    * 设备ID */ private Integer deviceId; * 监控的变量名称 */ private String varName; * 最小值 */ private Double min; * 最大值 */ private Double max; } /** * 报警消息 */ @Data public class AlarmMessage { * 设备 */ private Integer deviceId; * 报警时间 */ private Long timestamp; /** * 触发报警的采集变量名称 */ private String ala

    2024年04月11日
    浏览(41)
  • 基于 Flink & Paimon 实现 Streaming Warehouse 数据一致性管理

    摘要:本文整理自字节跳动基础架构工程师李明,在 Apache Paimon Meetup 的分享。本篇内容主要分为四个部分: 背景 方案设计 当前进展 未来规划 点击查看原文视频 演讲PPT ​ 早期的数仓生产体系主要以离线数仓为主,业务按照自己的业务需求将数仓分为不同的层次,例如 DW

    2024年02月14日
    浏览(40)
  • 基于大数据平台(XSailboat)的计算管道实现MySQL数据源的CDC同步--flink CDC

    笔者在先前的一篇文档《数据标签设计 – 大数据平台(XSailboat)的数据标签模块》 提到了关于数据标签的模块,现已实现并应用于项目中。在项目中遇到这样一种情形: 如果打标信息和业务数据是在一个数据库实例中,那么只需要连接两张表进行查询即可。但是数据标签作为

    2024年01月17日
    浏览(63)
  • Flink Dashboard的数据监控功能

    1.1 数据反压是啥 数据反压是在实时数据处理中,数据处理流的某个节点上游产生数据的速度大于该节点处理数据速度,导致数据堆积,从该节点向上游传递,一直到数据源,并降低数据源的摄入速度。导致数据反压出现的常见场景,比如, GC导致短时间数据积压,数据的波动

    2024年02月13日
    浏览(37)
  • 基于Canal与Flink实现数据实时增量同步(一),计算机毕设源码要提交吗

    配置修改 修改conf/example/instance.properties,修改内容如下: canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = kms-1.apache.com:3306 #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.mq.topic

    2024年04月12日
    浏览(53)
  • 如何在Flink SQL中轻松实现高效数据处理:最佳实践揭秘Protobuf自定义格式

    目录 Flink SQL Protobuf Format设计要点 1. 引言 2. 为什么需要自定义Protobuf格式  3. 自定义Protobuf格式的

    2024年02月19日
    浏览(41)
  • PiflowX如何快速开发flink程序

    参考资料 Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com) Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计

    2024年01月16日
    浏览(28)
  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(37)
  • flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

    前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的! Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王

    2024年02月21日
    浏览(42)
  • 数睿通2.0功能更新:支持多版本 Flink 切换,新增数据标签模块

    小伙伴们,大家好,数睿通 2.0 数据中台迎来了 12 月份的更新,由于年底工作繁忙,所以本次更新内容稍微少了点,还望理解,本次更新内容主要包括: 数据开发多版本 Flink 支持,执行任务的时候可以动态切换 Flink 版本,目前支持的版本有 1.14 和 1.16 新增数据标签模块,包

    2024年02月01日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包