Flink编程——最小程序MiniProgram

这篇具有很好参考价值的文章主要介绍了Flink编程——最小程序MiniProgram。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

最小程序MiniProgram

前面我们已经搭建起了Flink 的基础环境,这一节我们就在上一节的基础上,进行编写我们的第一个Flink 程序,开始之前我们先看一下一个完整的Flink 程序是什么样的

Flink 程序结构

为了演示Flink 程序结构,我们下面写了一个程序,这个程序我称之为 MiniProgram,也就是流程序的最小结构

package com.kingcall.examples.stream;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;


public class MiniProgram {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<String> arras = new ArrayList<>();
        arras.add("China");
        arras.add("Japan");
        arras.add("Russia");

        DataStream<String> country= env.fromCollection(arras);

        DataStream<String> filters = country.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String str) throws Exception {
                return !"Japan".equals(str);
            }
        });

        filters.print();

        env.execute();
    }
}

可以参考下面的注释,一个流程序就这样被创建出来了,一个标准的程序就是这样的结构

  1. 每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment
  2. DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
  3. 上述示例用 filters.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

Flink编程——最小程序MiniProgram,# Flink 编程,flink,大数据

需要特别注意的是,Flink 的流程序需要我们在最后调用env.execute() 才可以执行,在此之前都是在定义程序的执行逻辑,只有最后的这一步骤才会提交任务并执行

如果没有调用 execute(),应用就不会运行

下面就是程序的输出

Flink编程——最小程序MiniProgram,# Flink 编程,flink,大数据

1> 和 5> 指出输出来自哪个 sub-task(即 thread),我们可以对上面的程序进行改造一下,也就是设置并行度env.setParallelism(1); 这个时候输出就没有> 这样的提示了

Flink编程——最小程序MiniProgram,# Flink 编程,flink,大数据

基本的 stream source

上述示例用 env.fromElements(...) 方法构造 DataStream<Person> 。这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。因此,你可以这样做:

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);

另一个获取数据到流中的便捷方法是用 socket

DataStream<String> lines = env.socketTextStream("localhost", 9999)

或读取文件

DataStream<String> lines = env.readTextFile("file:///path");

在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。

基本的 stream sink

上述示例用 filters.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

输出看起来类似于

1> Fred: age 35
2> Wilma: age 35
  1. 1> 和 2> 指出输出来自哪个 sub-task(即 thread)在生产中
  2. 常用的 sink 包括各种数据库和几个 pub-sub 系统

什么是流

前面我们介绍了Flink程序的结构,我们是通过一个叫做最小程序的MiniProgram 进行说明的,我们之前一直说到Flink 程序的数据流,流说明数据是实时的流动的,那这里的数据有什么格式的要求吗,或者什么样的数据才能流动。

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。

你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。

Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。

Java tuples 和 POJOs

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs

Tuples

对于 Java,Flink 自带有 Tuple0Tuple25 类型。

Tuple2<String, Integer> person = Tuple2.of("Fred", 35);

// zero based index!  
String name = person.f0;
Integer age = person.f1;
POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

示例:

public class Person {
    public String name;  
    public Integer age;  
    public Person() {}
    public Person(String name, Integer age) {  
        . . .
    }
}  

Person person = new Person("Fred Flintstone", 35);
完整程序

下面的程序将关于人的记录流作为输入,并且过滤后只包含成年人。

package com.kingcall.examples.stream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Adults {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;

        public Person() {
        }

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}

下面就是程序的输出

Flink编程——最小程序MiniProgram,# Flink 编程,flink,大数据

Stream 执行环境

每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink编程——最小程序MiniProgram,# Flink 编程,flink,大数据

调试

在生产中,应用程序将在远程集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager 和 TaskManager 日志对于调试此类故障非常有用,但是更简单的是 Flink 支持在 IDE 内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink 的工作原理和内部细节,查看 Flink 源码也是非常好的方法。

总结

Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:文章来源地址https://www.toymoban.com/news/detail-822057.html

  1. 获取一个执行环境(execution environment)
  2. 加载/创建初始数据;
  3. 指定数据相关的转换;
  4. 指定计算结果的存储位置;
  5. 触发程序执行。

到了这里,关于Flink编程——最小程序MiniProgram的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据Flink(六十一):Flink流处理程序流程和项目准备

    文章目录 Flink流处理程序流程和项目准备 一、Flink流处理程序的一般流程

    2024年02月11日
    浏览(39)
  • Flink 学习二 Flink 编程基础API

    如果要使用Scala API ,需要替换 flink-java 为flink-scala_2.12 flink-streaming-java_2.12 为 flink-streaming-scala_2.12 DataStream 代表数据流,可以有界也可以无界 DataStream 类似于 java的集合 ,但是是不可变的immutable ,数据本身不可变 无法对一个 DataStream 进行添加或者删除数据 只可以通过算子对

    2024年02月10日
    浏览(49)
  • Flink编程——风险欺诈检测

    Apache Flink 提供了 DataStream API 来实现稳定可靠的、有状态的流处理应用程序。 Flink 支持对状态和时间的细粒度控制,以此来实现复杂的事件驱动数据处理系统。 这个入门指导手册讲述了如何通过 Flink DataStream API 来实现一个有状态流处理程序。 在当今数字时代,信用卡欺诈行

    2024年01月18日
    浏览(30)
  • Flink编程——基础环境搭建

    准备环境搭建 我们先把电脑的准备环境给安装好,这样后面才能顺利的学习和时间 因为后面可能用到的有:Kafka、MySQL、ElasticSearch 等,另外像 Flink 编写程序还需要依赖Java,还有就是我们项目是用 Maven来管理依赖的,所以需要把这些环境搭建起来,如果可以的话也可以把Ha

    2024年01月23日
    浏览(84)
  • Flink第七章:状态编程

    Flink第一章:环境搭建 Flink第二章:基本操作. Flink第三章:基本操作(二) Flink第四章:水位线和窗口 Flink第五章:处理函数 Flink第六章:多流操作 Flink第七章:状态编程 这次我们来学习Flink中的状态学习部分,创建以下scala文件 这个文件里有几个常用的状态创建 按键分区中值状态编程案

    2024年02月06日
    浏览(68)
  • 实验8 Flink初级编程实践

    由于CSDN上传md文件总是会使图片失效 完整的实验文档地址如下: https://download.csdn.net/download/qq_36428822/85814518 实验环境:本机:Windows 10 专业版 Intel® Core™ i7-4790 CPU @ 3.60GHz 8.00 GB RAM 64 位操作系统, 基于 x64 的处理器 Oracle VM VirtualBox 虚拟机:Linux Ubuntu 64-bit RAM 2048MB 处理器数量

    2024年02月09日
    浏览(32)
  • Flink状态编程之按键分区状态

    在实际应用中,我们一般都需要将数据按照某个 key 进行分区,然后再进行计算处理;所 以最为常见的状态类型就是 Keyed State。之前介绍到 keyBy 之后的聚合、窗口计算,算子所 持有的状态,都是 Keyed State。 另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、

    2024年01月25日
    浏览(42)
  • Flink中的元编程与元学习

    作者:禅与计算机程序设计艺术 Flink 是 Apache 基金会开源的一款基于 Java 的分布式计算框架,它最初由 IBM 开发并于 2014 年宣布开源,目前已经成为 Apache Top-Level 项目,具有高吞吐量、低延迟等优点,被多家公司采用。 在实际应用中,许多数据处理任务都需要对数据进行增、

    2024年02月12日
    浏览(37)
  • Flink Table API 与 SQL 编程整理

    Flink API 总共分为 4 层这里主要整理 Table API 的使用 Table API 是流处理和批处理通用的关系型 API , Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的, Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言

    2024年02月04日
    浏览(55)
  • Python 编写 Flink 应用程序经验记录(Flink1.17.1)

    目录 官方API文档 提交作业到集群运行 官方示例 环境 编写一个 Flink Python Table API 程序 执行一个 Flink Python Table API 程序 实例处理Kafka后入库到Mysql 下载依赖 flink-kafka jar 读取kafka数据 写入mysql数据 flink-mysql jar https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

    2024年02月08日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包