Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析

这篇具有很好参考价值的文章主要介绍了Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions

学习笔记如下:


接口实现样例

用户可以通过实现接口来完成自定义 Functions。

实现接口并使用的样例:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}
data.map(new MyMapFunction());

使用匿名类实现的样例:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

使用 Lambda 表达式实现(Java 8)样例:

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

Rick Functions

所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获取运行状态的上下文,而且拥有一些生命周期方法,可以实现更复杂的功能。

所有需要用户自定义的 function 的转化操作都可以转化为 rich function,只需要继承 Rich 版本的 Function 即可。

例如:可以将以下 MapFunction 代码

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}

替换为

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}

在使用自定义的 rich function 时,与非 Rich 版本一样,传给 map 算子即可:

data.map(new MyMapFunction());

Rich Function 也可以使用匿名类的方式定义。例如:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

累加器使用样例

累加器可以使用 Accumulator.add(V value) 方法将其递增;在作业结束时,Flink 会汇总(merge)所有部分的结果并将其发送给客户端。

使用计数器的方法如下:

Step 1|在需要使用累加器的用户自定义 function 中创建一个累加器对象(此处以计数器为例)

private IntCounter numLines = new IntCounter();

Step 2|在 rich function 的 open() 方法中注册累加器对象,也可以在次数定义名称

getRuntimeContext().addAccumulator("num-lines", this.numLines);

Step 3|在 function 的任何地方(包括 open()close() 方法中)使用累加器

this.numLines.add(1);

Step 4|最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器共享一个命名空间,因此可以在不同操作 function 里使用同一个累加器,Flink 会在内部将所有具有相同名称的累加器合并起来。

需要注意的是,当前累加器的结果只有在整个作业结束后才可用。

计数器和累加器源码

计数器 IntCounter 类的源码如下:

// flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java

@PublicEvolving
public class IntCounter implements SimpleAccumulator<Integer> {

    private static final long serialVersionUID = 1L;

    private int localValue = 0;

    public IntCounter() {}

    public IntCounter(int value) {
        this.localValue = value;
    }

    // ------------------------------------------------------------------------
    //  Accumulator
    // ------------------------------------------------------------------------

    /** Consider using {@link #add(int)} instead for primitive int values */
    @Override
    public void add(Integer value) {
        localValue += value;
    }

    @Override
    public Integer getLocalValue() {
        return localValue;
    }

    @Override
    public void merge(Accumulator<Integer, Integer> other) {
        this.localValue += other.getLocalValue();
    }

    @Override
    public void resetLocal() {
        this.localValue = 0;
    }

    @Override
    public IntCounter clone() {
        IntCounter result = new IntCounter();
        result.localValue = localValue;
        return result;
    }

    // ------------------------------------------------------------------------
    //  Primitive Specializations
    // ------------------------------------------------------------------------

    public void add(int value) {
        localValue += value;
    }

    public int getLocalValuePrimitive() {
        return this.localValue;
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "IntCounter " + this.localValue;
    }
}
  • 使用 localValue 属性来存储当前 function 中的累加器的值
  • add 方法:实现累加操作
  • merge 方法:实现多个累加器的合并
  • clone() 方法:实现累加器的复制

计数器 IntCounter 类实现了 SimpleAccumulator 接口,SimpleAccumulator 接口的源码如下:

// flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java

@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T, T> {}

SimpleAccumulator 接口继承了 Accumulator 接口,并定义添加的值的类型与最终结果的类型相同;Accumulator 接口的源码如下:

@Public
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
    /** @param value The value to add to the accumulator object */
    void add(V value);

    /** @return local The local value from the current UDF context */
    R getLocalValue();

    /** Reset the local value. This only affects the current UDF context. */
    void resetLocal();

    /**
     * Used by system internally to merge the collected parts of an accumulator at the end of the
     * job.
     *
     * @param other Reference to accumulator to merge in.
     */
    void merge(Accumulator<V, R> other);

    /**
     * Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
     * throw a {@link java.lang.CloneNotSupportedException}
     *
     * @return The duplicated accumulator.
     */
    Accumulator<V, R> clone();
}
  • add 方法:实现累加器的累加操作
  • getLocalValue() 方法:读取当前 UDF 中累加器的值
  • resetLocal() 方法:重置当前 UDF 中累加器的值
  • merge 方法:在 Flink 合并不同 UDF 的结果时使用
  • clone() 方法:复制累加器对象,实现了 Clonable 接口

Accumulator 接口中,定义了添加的值的类型 V 和最终结果的类型 R。适用于实现更灵活的操作,例如直方图,添加的值为数字,最终结果的直方图。

SimpleAccumulator 接口中,要求添加的值的类型和最终结果的类型相同。适用于相对简单的操作,例如计数器。

自定义累加器

如果需要自定义累加器,只需要实现 Accumulator 接口或 SimpleAccumulator 接口即可。文章来源地址https://www.toymoban.com/news/detail-802801.html

到了这里,关于Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink学习——DataStream API

            一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分: 获取执行环境(Execution Environment) 读取数据源(Source) 定义基于数据的转换操作(Transformations) 定义计算结果的输出位置(Sink) 触发程序执行(Execute)         flink 程序可以在各种上

    2024年02月05日
    浏览(28)
  • 《Flink学习笔记》——第五章 DataStream API

    一个Flink程序,其实就是对DataStream的各种转换,代码基本可以由以下几部分构成: 获取执行环境 读取数据源 定义对DataStream的转换操作 输出 触发程序执行 获取执行环境和触发程序执行都属于对执行环境的操作,那么其构成可以用下图表示: 其核心部分就是Transform,对数据

    2024年02月10日
    浏览(33)
  • Flink|《Flink 官方文档 - 部署 - 概览》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 概览》 学习笔记如下: 上图展示了 Flink 集群的各个构建(building blocks)。通常来说: 客户端获取 Flink 应用程序代码,将其转换为 JobGraph,并提交给 JobManager JobManager 将工作分配给 TaskManager,并在那里执行实际的算子操作 在部署 Flink 时,

    2024年01月19日
    浏览(40)
  • Flink|《Flink 官方文档 - 内幕 - 文件系统》学习笔记

    学习文档:内幕 - 文件系统 学习笔记如下: Flink 通过 org.apache.flink.core.fs.FileSystem 实现了文件系统的抽象。这种抽象提供了一组通用的操作,以支持使用各类文件系统。 为了支持众多的文件系统, FileSystem 的可用操作集非常有限。例如,不支持对现有文件进行追加或修改。

    2024年02月03日
    浏览(27)
  • Flink|《Flink 官方文档 - 概念透析 - Flink 架构》学习笔记

    学习文档:概念透析 - Flink 架构 学习笔记如下: 客户端(Client):准备数据流程序并发送给 JobManager(不是 Flink 执行程序的进程) JobManager:协调 Flink 应用程序的分布式执行 ResourceManager:负责 Flink 集群中的资源提供、回收、分配 Dispatcher:提供了用来提交 Flink 应用程序执行

    2024年01月19日
    浏览(38)
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    学习文档:概念透析 - 及时流处理 学习笔记如下: 及时流处理时有状态流处理的扩展,其中时间在计算中起着一定的作用。 及时流的应用场景: 时间序列分析 基于特定时间段进行聚合 对发生时间很重要的事件进行处理 处理时间(processing time) 处理时间的即数据到达各个

    2024年02月03日
    浏览(37)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》 学习笔记如下: Flink 中每条消息都会被放到网络缓冲(network buffer) 中,并以此为最小单位发送到下一个 subtask。 Flink 在传输过程的输入端和输出端使用了网络缓冲队列,即每个 subtask 都有一个输入队列来接收

    2024年01月21日
    浏览(44)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记

    学习文档:Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记 学习笔记如下: Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。其中,Flink 总内存(Total Flink Memory)包括 JV

    2024年03月15日
    浏览(34)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 Flink 进程的内存》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 内存配置 - 配置 Flink 进程的内存》 学习笔记如下: Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)

    2024年01月21日
    浏览(37)
  • Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》 学习笔记如下: 当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过 flink run 提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Pyt

    2024年02月22日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包