Flink WordCount实践

这篇具有很好参考价值的文章主要介绍了Flink WordCount实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

	<properties>
        <flink.version>1.17.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

Flink WordCount实践,flink,IDEA,flink,大数据

刷新依赖

Flink WordCount实践,flink,IDEA,flink,大数据

刷新依赖后,能看到相关依赖如下

Flink WordCount实践,flink,IDEA,flink,大数据

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

Flink WordCount实践,flink,IDEA,flink,大数据

</mirrors>上面一行添加阿里云仓库镜像

	<mirror>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <mirrorOf>central</mirrorOf>        
    </mirror>

Flink WordCount实践,flink,IDEA,flink,大数据

2.IDEA设置maven

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

数据准备

在工程的根目录下,新建一个data文件夹

Flink WordCount实践,flink,IDEA,flink,大数据

并在data文件夹下创建文本文件words.txt

Flink WordCount实践,flink,IDEA,flink,大数据

内容如下

hello world
hello java
hello flink

Flink WordCount实践,flink,IDEA,flink,大数据

新建包

右键src/main下的java,新建Package

Flink WordCount实践,flink,IDEA,flink,大数据

填写包名org.example,包名与groupId的内容一致。

Flink WordCount实践,flink,IDEA,flink,大数据

批处理API实现WordCount

org.exmaple下新建wc包及BatchWordCount

Flink WordCount实践,flink,IDEA,flink,大数据

填写wc.BatchWordCount

Flink WordCount实践,flink,IDEA,flink,大数据

效果如下

Flink WordCount实践,flink,IDEA,flink,大数据

BatchWordCount.java代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件读取数据 按行读取
        DataSource<String> lineDS = env.readTextFile("data/words.txt");

        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {

            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word,1L));
                }
            }
        });

        // 4. 按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

运行程序,查看结果

Flink WordCount实践,flink,IDEA,flink,大数据

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

org.example.wc包下新建Java类StreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文件
        DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                        String[] words = line.split(" ");

                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();

        // 5. 执行
        env.execute();
    }
}

运行结果

Flink WordCount实践,flink,IDEA,flink,大数据

与批处理程序BatchWordCount的区别:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
        DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                    String[] words = line.split(" ");

                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();

        // 5. 执行
        env.execute();
    }
}

进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

[hadoop@node2 ~]$ sudo yum install nc -y

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

IDEA中,运行SocketStreamWordCount程序。

往7777端口发送数据,例如发送hello world

Flink WordCount实践,flink,IDEA,flink,大数据

控制台输出

Flink WordCount实践,flink,IDEA,flink,大数据

继续往7777端口发送数据,例如发送hello flink

Flink WordCount实践,flink,IDEA,flink,大数据

控制台输出

Flink WordCount实践,flink,IDEA,flink,大数据

停止SocketStreamWordCount程序。

按Ctrl+c停止nc命令。

打包

这里的打包是将写好的程序打成jar包。

点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

Flink WordCount实践,flink,IDEA,flink,大数据

打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

Flink WordCount实践,flink,IDEA,flink,大数据

提交到集群运行

把jar包提交到flink集群运行有两种方式:

1.通过命令行提交作业   

2.通过Web UI提交作业

命令行提交作业

将jar包上传Linux

Flink WordCount实践,flink,IDEA,flink,大数据

启动flink集群
[hadoop@node2 ~]$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
​
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
​
命令提交作业

开启另一个node2终端,使用flink run命令提交作业到flink集群

[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

Flink WordCount实践,flink,IDEA,flink,大数据

-m指定提交到的JobManager,-c指定程序入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

Flink WordCount实践,flink,IDEA,flink,大数据

查看结果
Web UI查看结果

浏览器访问

node2:8081

看到正在运行的作业如下

Flink WordCount实践,flink,IDEA,flink,大数据

查看结果

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

继续发送测试数据

在nc终端继续发送数据

Flink WordCount实践,flink,IDEA,flink,大数据

Web UI刷新结果

Flink WordCount实践,flink,IDEA,flink,大数据

命令行查看结果

打开新的node2终端,查看结果

[hadoop@node2 ~]$ cd $FLINK_HOME/log
[hadoop@node2 log]$ ls
flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
[hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out 
(hello,1)
(flink,1)
(hello,2)
(world,1)
​

Flink WordCount实践,flink,IDEA,flink,大数据

取消flink作业

Flink WordCount实践,flink,IDEA,flink,大数据

点击Cancel Job取消作业 

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

停止nc监听

按Ctrl+c停止nc命令

Web UI提交作业

开启nc监听

开启nc监听发送数据

[hadoop@node2 ~]$ nc -lk 7777
Web UI提交作业

浏览器访问

node2:8081

点击Submit New Job

Flink WordCount实践,flink,IDEA,flink,大数据

点击Add New

Flink WordCount实践,flink,IDEA,flink,大数据

选择flink作业jar包所在路径

Flink WordCount实践,flink,IDEA,flink,大数据

点击jar包名称

Flink WordCount实践,flink,IDEA,flink,大数据

填写相关内容,点击Submit提交作业

Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

Parallesim填写作业的并行度,例如:1

Flink WordCount实践,flink,IDEA,flink,大数据

提交后,在Running Jobs里看到运行的作业

Flink WordCount实践,flink,IDEA,flink,大数据

发送测试数据

往7777端口发送数据

Flink WordCount实践,flink,IDEA,flink,大数据

查看结果

Flink WordCount实践,flink,IDEA,flink,大数据

继续发送测试数据

Flink WordCount实践,flink,IDEA,flink,大数据

刷新结果

Flink WordCount实践,flink,IDEA,flink,大数据

取消作业

Flink WordCount实践,flink,IDEA,flink,大数据

停止nc监听

按住Ctrl+c停止nc命令

关闭flink集群
[hadoop@node2 ~]$ stop-cluster.sh 
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee

登录gitee

https://gitee.com/

注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

创建仓库

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

提交代码

使用IDEA提交代码

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

提示有警告,忽略警告,继续提交

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

Flink WordCount实践,flink,IDEA,flink,大数据

提交成功后,IDEA显示如下

Flink WordCount实践,flink,IDEA,flink,大数据

刷新浏览器查看gitee界面,看到代码已上传成功

Flink WordCount实践,flink,IDEA,flink,大数据

完成!enjoy it!文章来源地址https://www.toymoban.com/news/detail-850174.html

到了这里,关于Flink WordCount实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MapReduce WordCount程序实践(IDEA版)

    Linux:Hadoop2.x Windows:jdk1.8、Maven3、IDEA2021 编程分析 编程分析包括: 1.数据过程分析:数据从输入到输出的过程分析。 2.数据类型分析:Map的输入输出类型,Reduce的输入输出类型; 编程分析决定了我们该如何编写代码。 新建Maven工程 打开IDEA–点击File–New–Project 选择Maven–点

    2024年02月03日
    浏览(43)
  • 实时数据湖 Flink Hudi 实践探索

    导读: 首先做个自我介绍,我目前在阿里云云计算平台,从事研究 Flink 和 Hudi 结合方向的相关工作。 目前,Flink + Hudi 的方案推广大概已经有了一年半的时间,在国内流行度也已比较高,主流的公司也会尝试去迭代他们的数仓方案。所以,今天我介绍的主题是 Flink 和 Hudi 在

    2024年01月16日
    浏览(48)
  • 基于 Flink CDC 的现代数据栈实践

    摘要:本文整理自阿里云技术专家,Apache Flink PMC Member Committer, Flink CDC Maintainer 徐榜江和阿里云高级研发工程师,Apache Flink Contributor Flink CDC Maintainer 阮航,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分: 1.深入解读 Flink CDC 2.3 版本 2.基于 Flink CDC 构建

    2024年02月09日
    浏览(40)
  • 基于 Flink 构建实时数据湖的实践

    本文整理自火山引擎云原生计算研发工程师王正和闵中元在本次 CommunityOverCode Asia 2023 数据湖专场中的《基于 Flink 构建实时数据湖的实践》主题演讲。 实时数据湖是现代数据架构的核心组成部分,随着数据湖技术的发展,用户对其也有了更高的需求:需要从多种数据源中导入

    2024年02月04日
    浏览(38)
  • 使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

    在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍

    2024年04月15日
    浏览(50)
  • Flink 内容分享(十九):理想汽车基于Flink on K8s的数据集成实践

    目录 数据集成的发展与现状 数据集成的落地实践 1. 数据集成平台架构 2. 设计模型 3. 典型场景 4. 异构数据源 5. SQL 形式的过滤条件 数据集成云原生的落地实践 1. 方案选型 2. 状态判断及日志采集 3. 监控告警 4. 共享存储 未来规划 理想汽车数据集成的发展经历了四个阶段:

    2024年02月01日
    浏览(45)
  • Flink:处理大规模复杂数据集的最佳实践深入探究Flink的数据处理和性能优化技术

    作者:禅与计算机程序设计艺术 随着互联网、移动互联网、物联网等新型网络技术的不断发展,企业对海量数据的处理日益依赖,而大数据分析、决策支持、风险控制等领域都需要海量的数据处理能力。如何高效、快速地处理海量数据、提升处理效率、降低成本,是当下处理

    2024年02月13日
    浏览(56)
  • Flink + Paimon数据 CDC 入湖最佳实践

    Apache Paimon 最典型的场景是解决了 CDC (Change Data Capture)数据的入湖,看完这篇文章可以了解到: 1、为什么 CDC 入Hive迁移到 Paimon? 2、CDC 入 Paimon 怎么样做到成本最低? 3、Paimon 对比 Hudi有什么样的优势?  Paimon 从 CDC 入湖场景出发,希望提供给你 简单、低成本、低延时 的

    2024年01月16日
    浏览(44)
  • 【大数据】Flink 从入门到实践(一):初步介绍

    Apache Flink 是一个框架和分布式处理引擎,用于在 无边界 和 有边界 数据流上进行 有状态 的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 1.1 处理无界和有界数据 任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志

    2024年02月14日
    浏览(42)
  • Flink 数据集成服务在小红书的降本增效实践

    摘要:本文整理自实时引擎研发工程师袁奎,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分: 小红书实时服务降本增效背景 Flink 与在离线混部实践 实践过程中遇到的问题及解决方案 未来展望 点击查看原文视频 演讲PPT 1.1 小红书 Flink 使用场景特点

    2024年02月11日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包