Flink实时计算引擎入门教程

这篇具有很好参考价值的文章主要介绍了Flink实时计算引擎入门教程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink实时计算引擎入门教程

1.简介

Fink是一个开源的分布式,高性能,高可用,准确的实时数据计算框架,它主要优点如下:

流式计算: Fink可以连接处理流式(实时)数据。
容错: Fink提供了有状态的计算,会记录任务的中间状态,当执行失败时可以实现故障恢复。
可伸缩: Fink集群可以支持上千个节点。
高性能: Fink能提供高吞吐,低延迟的性能。
三大实时计算框架对比:

Spark Streaming: 可以处理秒级别延迟的实时数据计算,但是无法处理真正的实时数据计算,适合小型且独立的实时项目。
Storm: 可以处理真正的实时计算需求,但是它过于独立没有自己的生态圈,适合能够接受秒级别延迟不需要Hadoop生态圈的实时项目。
Fink: 新一代实时计算引擎,它包含了Strorm和Spark Streaming的优点,它即可以实现真正意义的实时计算需求,也融入了Hadoop生态圈,适合对性能要求高吞吐低延迟的实时项目。

2.执行流程

flink实时计算,大数据,flink,大数据

3.核心三大组件

flink实时计算,大数据,flink,大数据

DataSource: 数据源,主要用来接受数据。例如: readTextFile(),socketTextStream(),fromCollection(),以及一些第三方数据源组件。
Transformation: 计算逻辑,主要用于对数据进行计算。例如:map(),flatmap(),filter(),reduce()等类型的算子。
DataSink: 目的地,主要用来把计算的结果数据输出到其他存储介质。例如Kafka,Redis,Elasticsearch等。

4.应用场景

实时ETL: 集成实时数据计算系统现有的诸多数据通道和SQL灵活的加工能力,对实时数据进行清洗、归并和结构化处理。同时,为离线数仓进行有效的补充和优化,为数据实时传输计算通道。
实时报表: 实时采集、加工和存储,实时监控和展现业务指标数据,让数据化运营实时化。
监控预警: 对系统和用户行为进行实时检测和分析,实时检测和发现危险行为。
在线系统: 实时计算各类数据指标,并利用实时结果及时调整在线系统的相关策略。

5.架构原理

Fink常用的两种架构是: Standalone(独立集群)和ON YARN。

Standalone: 独立部署,不依赖Hadoop环境,但是需要使用Zookeeper实现服务的高可用。
ON YARN: 依赖Hadoop环境的YARN实现Flink任务的调度,需要Hadoop版本2.2以上。
Flink ON YARN架构图如下:

flink实时计算,大数据,flink,大数据

1.客户端将作业提交给 YARN 的资源管理器,这一步中会同时将 Flink 的 Jar 包和配置
上传到 HDFS,以便后续启动 Flink 相关组件的容器。
2.YARN 的资源管理器分配 Container 资源,启动 Flink JobManager,并将作业提交给
JobMaster。这里省略了 Dispatcher 组件。
3.JobMaster 向资源管理器请求资源(slots)。
4.资源管理器向 YARN 的资源管理器请求 container 资源。
5.YARN 启动新的 TaskManager 容器。
6.TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽。
7.资源管理器通知 TaskManager 为新的作业提供 slots。
8.TaskManager 连接到对应的 JobMaster,提供 slots。
9.JobMaster 将需要执行的任务分发给 TaskManager,执行任务。

Flink ON YARN 在运行的时候可以细分为两种模式。

Session模式: 可以称为会话模式或多任务模式。这种模式会在YARN中初始化一个Flink集群,以后提交的任务都会提交到这个集群中,这个Flink集群会在YARN集群中,除非手动停止。
Per-Job模式: 可以称为单任务模式,这种模式每次提交Flink任务时任务都会创建一个集群,Flink任务之间都是互相独立,互不影响,执行任务资源会释放掉。

6.常用的API

Flink中提供了4种不同层次的API,每种都有对应的使用场景。

Sateful Stream Processing: 低级API,提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用一些复杂事件处理逻辑上。
DataStream/DataSet API: 核心API,提供了针对实时数据和离线数据的处理,是对低级API进行的封装,提供了filter(),sum(),max(),min()等高级函数,简单易用。
Table API: 对DataStream/DataSet API做了进一步封装,提供了基于Table对象的一些关系型API。
SQL: 高级语言,Flink的SQL是基于Apache Calcite开发的,实现了标准SQL(类似于Hive的SQL),使用起来比其他API更加方便。Table API和SQL可以很容易结合使用,它们都返回Table对象。
在工作中能用SQL解决的优先使用SQL,复杂一些的考虑DataStream/DataSet API。

DataStreamAPI中常用的Transformation函数。
flink实时计算,大数据,flink,大数据
)

7.java编写flink程序

引入依赖,此文用的flink版本是1.15.2。

    <properties>
        <flink.version>1.15.2</flink.version>
        <java.version>1.8</java.version>
        <slf4j.version>1.7.30</slf4j.version>
        <!--flink依赖的作用域 provided 表示表示该依赖包已经由目标容器提供,compile 标为默认值 -->
        <flink.scope>compile</flink.scope>
    </properties>

    <dependencies>
        <!-- core dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <!-- test dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>flink</finalName>
        <plugins>
            <!-- scala 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ljm.hadoop.flink.Main</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

DataStream执行模式:
从1.12.0版本以后,flink实现了api的流批一体化处理。DataStream新增一个执行模式(execution mode),通过设置不同的执行模式,即可实现流处理与批处理之间的切换,这样一来,dataSet基本就被废弃了。

STREAMING: 流执行模式(默认)
BATCH: 批执行模式
AUTOMATIC: 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

flink实时计算,大数据,flink,大数据
以下为DataStream相关Api在Java中的简单应用

public class Main {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置执行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        map(env);
//        flatMap(env);
//        union(env);
//        connect(env);
//        socketTextStream(env);
        env.execute("testJob");
    }

    /**
     * 对数据处理
     */
    private static void map(StreamExecutionEnvironment env) {
        //在测试阶段,可以使用fromElements构造数据流
        DataStreamSource<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 9, 11);
        //处理数据
        SingleOutputStreamOperator<Integer> numStream = data.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer num) throws Exception {
                return num - 1;
            }
        });
        //使用一个线程打印数据
        numStream.print().setParallelism(1);
        //多线程输出(最大值=cpu总核数)
        //numStream.print();
    }

    /**
     * 将数据中的每行数据根据符号拆分为单词
     */
    private static void flatMap(StreamExecutionEnvironment env) {
        DataStreamSource<String> data = env.fromElements("hello,world", "hello,hadoop");
        //读取文件内容,文件内容格式  hello,world
        //DataStreamSource<String> data =  env.readTextFile("D:\\java\\hadoop\\text.txt");
        //处理数据
        SingleOutputStreamOperator<String> wordStream = data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(",");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        wordStream.print().setParallelism(1);
    }

    /**
     * 过滤数据中的奇数
     */
    private static void filter(StreamExecutionEnvironment env) {
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5, 6, 7);
        //处理数据
        SingleOutputStreamOperator<Integer> numStream = data1.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer num) throws Exception {
                return num % 2 == 1;
            }
        });
        numStream.print().setParallelism(1);
    }

    /**
     * 将两个流中的数字合并
     */
    private static void union(StreamExecutionEnvironment env) {
        //第1份数据流
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4);
        //第2份数据流
        DataStreamSource<Integer> data2 = env.fromElements(3, 4, 5, 6);
        //合并流
        DataStream unionData = data1.union(data2);
        unionData.print().setParallelism(1);
    }

    /**
     * 将两个数据源中的数据关联到一起
     */
    private static void connect(StreamExecutionEnvironment env) {
        //第1份数据流
        DataStreamSource<String> data1 = env.fromElements("user:tom,age:18");
        //第2份数据流
        DataStreamSource<String> data2 = env.fromElements("user:jack_age:18");
        //连接两个流
        ConnectedStreams<String, String> connectedStreams = data1.connect(data2);
        //处理数据
        SingleOutputStreamOperator<String> resStream = connectedStreams.map(new CoMapFunction<String, String, String>() {
            @Override
            public String map1(String s) throws Exception {
                return s.replace(",", "-");
            }
            @Override
            public String map2(String s) throws Exception {
                return s.replace("_", "-");
            }
        });
        resStream.print().setParallelism(1);
    }

    /**
     * 每隔3秒重socket读取数据
     */
    private static void socketTextStream(StreamExecutionEnvironment env) {
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //加载数据源
        DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9001);
        //数据处理
        SingleOutputStreamOperator<String> wordStream = data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(",");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = wordStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //根据Tuple2中的第1列分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = wordCountStream.keyBy(0);
        //窗口滑动设置,对指定时间窗口(例如3s内)内的数据聚合统计,并且把时间窗口内的结果打印出来
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowedStream = keyedStream.timeWindow(Time.seconds(3));
        //根据Tuple2中的第2列进行合并数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumRes = windowedStream.sum(1);
        //数据输出
        sumRes.print();
    }

}

上面示例的socketTextStream方法中用到了socketTextStream函数需要通过netnat工具发送数据

netnat工具下载
在netnat目录下执行 nc -L -p 9001 -v
flink实时计算,大数据,flink,大数据
运行socketTextStream方法,可以发现控制台打印了数据
flink实时计算,大数据,flink,大数据
上图中的3和5表示线程Id,如果只需要单线程打印则需要在print()后面追加setParallelism(1);

sumRes.print().setParallelism(1);

8.把flink程序部署到hadoop环境上面运行

8.1.安装flink程序

flink下载地址,下载1.15.2版本然后上传到服务器 /home/soft/目录下解压

tar -zxvf flink-1.15.2-bin-scala_2.12.tgz

flink客户端节点上需要设置HADOOP_HOME和HADOOP_CLASSPATH环境变量

vi /etc/profile
export HADOOP_HOME=/home/soft/hadoop-3.2.4
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
source /etc/profile

8.2.编译java开发的flink应用

使用socketTextStream接受socket传输的数据
flink实时计算,大数据,flink,大数据

修改socketTextStream方法里面的代码,把127.0.0.1改成netcat工具部署机器ip地址

DataStreamSource<String> data = env.socketTextStream("192.168.239.128", 9001);

需要把pom.xml文件中flink.scope属性值设置为provided,这些依赖不需要打进Jar包中。

<properties>
    <flink.scope>provided</flink.scope>
</properties>

执行命令打包

mvn clean package

flink实时计算,大数据,flink,大数据

把flink-1.0-SNAPSHOT.jar上传至/home/soft/flink-1.15.2目录下然后提交任务

8.3.提交Flink任务到YARN集群中

cd /home/soft/flink-1.15.2
bin/flink  run -m yarn-cluster -yjm 1024 -ytm 1024   flink-1.0-SNAPSHOT.jar

参数说明
bin/flink: 这个脚本启动的是Per-Job,bin/yarn-session.sh 则启动的是Session模式的
-m: 指定模式,yarn-cluster=集群模式,yarn-client=客户端模式
-yjm:每个JobManager内存 (default: MB)
-ytm:每个TaskManager内存 (default: MB)

flink实时计算,大数据,flink,大数据

8.4.测试任务并查看结果
在服务器上面安装netcat工具,然后发送数据,这台机器的ip必须和Java编写的Flink程序一致。

yum install nc
nc -l 9001

使用浏览器访问: http://hadoop集群主节点ip:8088/cluster可以看到已提交的Flink任务,然后下图的点击顺序可以看到任务的执行结果

flink实时计算,大数据,flink,大数据

flink实时计算,大数据,flink,大数据

flink实时计算,大数据,flink,大数据

flink实时计算,大数据,flink,大数据
flink实时计算,大数据,flink,大数据

8.5.停止任务
通过YARN命令停止

yarn application -kill  application_1665651925022_0008  

或通过Flink命令停止文章来源地址https://www.toymoban.com/news/detail-657122.html

bin/flink cancel -yid application_1665651925022_0008  a39f8b9258c9b9d0c17eca768c5b54c3

到了这里,关于Flink实时计算引擎入门教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink实时计算框架简介

    Apache Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。 分布式:表示flink程序可以运行在很多台机器上, 高性能:表示Flink处理性能比较高 高可用:表示flink支持程序的自动重启机制。 准确的:表示flink可以保证处理数据的准确性。 Flink支持流处理和批处理,

    2024年02月12日
    浏览(49)
  • Flink实时计算资源如何优化

    flink实时计算任务可以从以下四个方面进行优化 内存优化:Flink任务需要大量的内存来存储数据和状态信息。因此,我们需要尽可能地减少内存的使用量。可以通过以下几种方式来实现: 使用更小的窗口大小:窗口大小越大,需要使用的内存就越多。因此,我们可以使用更小

    2024年02月10日
    浏览(38)
  • 大数据Flink实时计算技术

    1、架构 2、应用场景 Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核

    2024年02月10日
    浏览(53)
  • Flink实时计算中台Kubernates功能改造点

    平台为数据开发人员提供基本的实时作业的管理功能,其中包括jar、sql等作业的在线开发;因此中台需要提供一个统一的SDK支持平台能够实现flink jar作业的发布;绝大多数情况下企业可能会考虑Flink On Yarn的这个发布模式,但是伴随云原生的呼声越来越大,一些企业不希望部署

    2024年02月09日
    浏览(41)
  • 小米基于 Flink 的实时计算资源治理实践

    摘要:本文整理自小米高级软件工程师张蛟,在 Flink Forward Asia 2022 生产实践专场的分享。本篇内容主要分为四个部分: 发展现状与规模 框架层治理实践 平台层治理实践 未来规划与展望 点击查看原文视频 演讲PPT 如上图所示,下层是基础服务,包括:统一元数据服务、统一

    2024年02月13日
    浏览(38)
  • 联通 Flink 实时计算平台化运维实践

    摘要:本文整理自联通数科实时计算团队负责人、Apache StreamPark Committer 穆纯进在 Flink Forward Asia 2022 平台建设专场的分享,本篇内容主要分为四个部分: 实时计算平台背景介绍 Flink 实时作业运维挑战 基于 StreamPark 一体化管理 未来规划与演进 点击查看原文视频 演讲PPT 上图是

    2024年02月16日
    浏览(41)
  • 【大数据-实时流计算】图文详解 Apache Flink 架构原理

    目录 Apache Flink架构介绍 一、Flink组件栈 二、Flink运行时架构 在Flink的整个

    2024年02月02日
    浏览(41)
  • YOLOv8入门教程:实现实时目标检测

    摘要:在本教程中,我们将介绍YOLOv8的基本概念和原理,然后用Python实现一个简单的实时目标检测应用。 正文: 车牌识别视频 车辆识别视频 一、YOLOv8简介 YOLOv8(You Only Look Once v8)是一种快速、准确的实时目标检测算法。相较于前代YOLO版本,YOLOv8在检测速度和准确率上取得

    2024年02月04日
    浏览(38)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(38)
  • WPF真入门教程27--项目案例--设备数据实时监测

    今天要做的一个案例是这样的效果,它能实时监测车间设备有关数据,并以表格和图形显示在界面上,这个比上个案例要复杂些,颜值也高些,通过这个来巩固wpf的技能,用到了命令绑定,样式资源,表格数据,图形控件livechart。将前面25的内容熟悉起来,就可以自己动手做

    2024年02月01日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包