Flink 系列三 Flink 实战

这篇具有很好参考价值的文章主要介绍了Flink 系列三 Flink 实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

​编辑

前言

1、安装flink环境

2、在idea中创建flink的第一个demo

2.1、执行如下maven命令

2.2、填写'groupId'、'artifactId'、'version'、'package'

2.3、选择Yes即可生成创建好的工程

3、开发第一个flink程序

3.1、开发一个简单的统计程序

3.2、直接编译得到jar包

4、启动环境

4.1、启动已经下载好的flink环境

4.2、创建一个服务端的Tcp 监听

4.3、打开计算日志

4.4、在建立nc监听端口中输入text

4.5、在输出日志中就有统计


Flink 系列三 Flink 实战,flink,flink,大数据

前言

        Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去。详细的介绍我本篇就不做阐述了,感兴趣的同学可以回复往期的文章:Flink 系列二 Flink 状态化流处理概述,Flink 系列一 开发机 安装。本篇作为 Flink 系列的第三篇,咱们尝试在本地安装和实操 Flink。

1、安装flink环境

        首先需要在你的本地环境安装apache-flink,执行如下命令即可,若采用docker安装更加方便。

brew install apache-flink

2、在idea中创建flink的第一个demo

2.1、执行如下maven命令

        执行如下命令创建工程。这个命令的作用是使用Maven构建一个基于Apache Flink的Java快速启动项目模板。执行完后会下载对应的依赖包。

mvn archetype:generate                               \
-DarchetypeGroupId=org.apache.flink              \
-DarchetypeArtifactId=flink-quickstart-java      \
-DarchetypeVersion=1.8.0     \
-DarchetypeCatalog=local

解释一下具体含义:

  • mvn是Maven的命令行工具。
  • archetype:generate表示使用原型机模板生成一个新项目。
  • -DarchetypeGroupId指定了项目模板的组ID,即Apache Flink团队为项目提供的默认模板组ID。
  • -DarchetypeArtifactId指定了项目模板的Artifact ID,即Apache Flink团队为项目提供的默认模板Artifact ID。
  • -DarchetypeVersion指定了项目模板的版本号。
  • -DarchetypeCatalog指定了本地的模板目录。
  • 反斜杠(\)是命令的折行符,它表示这个命令是连续的,但是出于格式上的考虑需要分成多行。

Flink 系列三 Flink 实战,flink,flink,大数据

2.2、填写'groupId'、'artifactId'、'version'、'package'

Define value for property 'groupId': com.lly.flink.java
Define value for property 'artifactId': flink-traning
Define value for property 'version' 1.0-SNAPSHOT: : 1.0.0
Define value for property 'package' com.lly.flink.java: : 
Confirm properties configuration:
groupId: com.lly.flink.java
artifactId: flink-traning
version: 1.0.0
package: com.lly.flink.java

2.3、选择Yes即可生成创建好的工程

         特别注意,这里一定要选择 “Y”,保证项目顺利生产。

 Y: : Y
 [INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: flink-quickstart-java:1.8.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.lly.flink.java
[INFO] Parameter: artifactId, Value: flink-traning
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: package, Value: com.lly.flink.java
[INFO] Parameter: packageInPathFormat, Value: com/lly/flink/java
[INFO] Parameter: package, Value: com.lly.flink.java
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: groupId, Value: com.lly.flink.java
[INFO] Parameter: artifactId, Value: flink-traning
[WARNING] CP Don't override file /Users/liluyang/flink-traning/src/main/resources
[INFO] Project created from Archetype in dir: /Users/liluyang/flink-traning
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:17 min
[INFO] Finished at: 2020-11-05T12:42:42+08:00
[INFO] ------------------------------------------------------------------------

3、开发第一个flink程序

3.1、开发一个简单的统计程序

package com.lly.flink.java;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author lly
 * @date 2020-11-05
 **/
public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //参数检查
        if (args.length != 2) {
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        //计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

        我这里简单解释一些这段代码,希望刚开始学习的同学可以理解的更深刻。这段代码使用 Flink 实现了通过网络流读取数据,统计单词出现次数的功能。具体实现细节如下:

  1. 声明一个 SocketTextStreamWordCount 类,定义该类的 main 方法作为程序的入口。

  2. main 方法中,首先对传入的命令行参数进行检查,如果参数个数不为 2,则输出使用说明后直接返回。

  3. 然后获取主机地址和端口号,用于后续建立套接字连接。

  4. 接着创建 Flink 流处理的环境对象 StreamExecutionEnvironment,用于设置执行环境和创建数据流。

  5. 调用 socketTextStream 方法,获取一个 DataStreamSource<String> 对象,用于从套接字连接中获取数据流。

  6. 对获取的数据流进行 flatMap 操作,使用 LineSplitter 类作为转换器将每行文本数据切分成单词,并将单词转化为 "单词,1" 的元组格式,用于后续统计。

  7. 对转换后的数据流使用 keyBy 方法,按照第一个字段(即单词)进行分组。

  8. 对分组后的数据使用 sum 方法,对第二个字段(即出现次数)进行求和,返回一个 SingleOutputStreamOperator<Tuple2<String, Integer>> 类型的结果流。

  9. 最后调用 print 方法,将结果打印输出到控制台。

  10. 最后调用 execute 方法,传入一个字符串 "Java WordCount from SocketTextStream Example" 作为任务名称,开始执行整个 Flink 应用程序。

  11. 声明了一个静态内部类 LineSplitter,实现了 Flink 的 FlatMapFunction 接口,并重写了 flatMap 方法。该方法将输入的文本行按照非单词字符(如空格、逗号等)进行切分,并将每个单词转化为一个元组,其中第一个字段为单词,第二个字段为 1,表示该单词出现了 1 次。 

3.2、直接编译得到jar包

Flink 系列三 Flink 实战,flink,flink,大数据

4、启动环境

4.1、启动已经下载好的flink环境

flink run -c 业务类包路径 jar包路径 IP 端口 示例

flink run -c 业务类包路径 jar包路径 IP 端口
示例:
flink run -c com.lly.flink.SocketTextStreamWordCount /Users/liluyang/flink-traning/target/original-flink-traning-1.0.0.jar 127.0.0.1 9000

启动成功之后会生成Job ID

Job has been submitted with JobID b04bad9f4c05efd67344179ee676b513

启动成功之后访问:http://localhost:8081/就可以直接当问flink的的操作后台,操作后台可以直观的看到Job的执行情况和基本的操作

Flink 系列三 Flink 实战,flink,flink,大数据

 4.2、创建一个服务端的Tcp 监听

创建一个server监听并接受链接

nc -l 9000

4.3、打开计算日志

cd /usr/local/Cellar/apache-flink/1.10.0/libexec/log

Flink 系列三 Flink 实战,flink,flink,大数据

 4.4、在建立nc监听端口中输入text

liluyang@liluyangdeMacBook-Pro ~ % nc -l 9000



cda
cda
dsas
assgasg
nihao 
nihao 
nihao
nihao
1
1
1
1
1
1
1

4.5、在输出日志中就有统计

liluyang@liluyangdeMacBook-Pro log % tail -100f flink-liluyang-taskexecutor-0-liluyangdeMacBook-Pro.local.out
(cda,1)
(cda,2)
(dsas,1)
(assgasg,1)
(nihao,1)
(nihao,2)
(nihao,3)
(nihao,4)
(1,1)
(1,2)
(1,3)
(1,4)
(1,5)
(1,6)

至此:Mac 电脑上安装 Flink,并且运行它。接着通过一个简单的 Flink 程序来介绍如何构建及运行Flink 程序。文章来源地址https://www.toymoban.com/news/detail-562447.html

到了这里,关于Flink 系列三 Flink 实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月11日
    浏览(38)
  • 【Flink实战系列】Hash collision on user-specified ID “Kafka Source”

    在使用 fromSource 构建 Kafka Source 的时候,遇到下面的报错,下面就走进源码,分析一下原因。

    2024年02月09日
    浏览(65)
  • 尚硅谷大数据Flink1.17实战教程-笔记03【Flink运行时架构】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月16日
    浏览(43)
  • 大数据:【学习笔记系列】 Flink 学习路线

    Apache Flink 是一种高效、可扩展的 实时流处理框架 ,它允许开发者以 实时方式处理连续的数据流 。学习 Flink 要求你具备一定的编程基础(尤其是 Java 或 Scala),同时对大数据处理的基本概念有所了解。下面是一个详细的 Flink 学习路线,包括各阶段的学习目标和推荐资源。

    2024年04月23日
    浏览(30)
  • 大数据:【学习笔记系列】Flink基础架构

    Apache Flink 是一个开源的流处理框架,用于处理 有界 和 无界 的 数据流 。Flink 设计用于 运行在所有常见的集群环境 中,并且能够以 高性能 和 可扩展 的方式进行实时数据处理和分析。下面将详细介绍 Flink 的基础架构组件和其工作原理。 1. Flink 架构概览 Flink 的架构主要包括

    2024年04月23日
    浏览(35)
  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(73)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(51)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(43)
  • 【大数据】Flink 内存管理(四):TaskManager 内存分配(实战篇)

    《 Flink 内存管理 》系列(已完结),共包含以下 4 篇文章: Flink 内存管理(一):设置 Flink 进程内存 Flink 内存管理(二):JobManager 内存分配(含实际计算案例) Flink 内存管理(三):TaskManager 内存分配(理论篇) Flink 内存管理(四):TaskManager 内存分配(实战篇) 😊

    2024年03月13日
    浏览(52)
  • Flink-1.17.0(Standalone)集群安装-大数据学习系列(四)

    机器信息 Hostname k8s-master k8s-node1 k8s-node2 外网IP 106.15.186.55 139.196.15.28 47.101.63.122 内网IP 172.25.73.65 172.25.73.64 172.25.73.66 master slave1 slave2 slave3 安装Scala 从官网( The Scala Programming Language )下载 Scala版本 链接: https://pan.baidu.com/s/1-GAeyyDOPjhsWhIp_VV7yg?pwd=3fws 提取码: 3fws  2.1 在集群(

    2024年02月08日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包