【Flink入门修炼】1-3 Flink WordCount 入门实现

这篇具有很好参考价值的文章主要介绍了【Flink入门修炼】1-3 Flink WordCount 入门实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。
下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。
Flink 各版本之间变化较多,之前版本的函数在后续版本可能不再支持。跟随学习时,请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、创建项目

在很多其他教程中,会看到如下来创建 Flink 程序的方式。虽然简单方便,但对初学者来说,不知道初始化项目的时候做了什么,如果报错了也不知道该如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通过指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我们手动来创建一个 Maven 项目,看看到底如何创建出一个 Flink 项目。
1、通过 IDEA 创建一个 Maven 项目
【Flink入门修炼】1-3 Flink WordCount 入门实现

2、pom.xml 添加:
这里我们选择的是 Flink 1.13.2 版本(Flink 1.14 之后部分类和函数有变化,可自行探索)。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version> <!-- 1.14 之后部分类和函数有变化,可自行探索 -->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

二、DataStream WordCount

一)编写程序

基础项目环境已经搞好了,接下来我们模仿一个流式环境,监听本地的 Socket 端口,使用 Flink 统计流入的不同单词个数。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //参数检查
        if (args.length != 2) {
            // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            // return;
            args = new String[]{"127.0.0.1", "9000"};
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // 创建 streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

二)测试

接下来我们进行程序测试。
我们在本地使用 netcat 命令启动一个端口:

nc -l 9000

然后启动程序,能看到控制台一些输出:
【Flink入门修炼】1-3 Flink WordCount 入门实现

接下来,在 nc 中输入:

$ nc -l 9000
hello world
flink flink flink

回到我们的程序,能看到统计的输出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

【Flink入门修炼】1-3 Flink WordCount 入门实现

三)如果有报错

如果出现执行报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
	at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 1 more

在 IDE 中把 「Add dependencies with "Provided" scope to classpath」勾选上:
【Flink入门修炼】1-3 Flink WordCount 入门实现

三、Flink Table & SQL WordCount

一)介绍 FlinkSQL

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
上面单词统计的逻辑可以转化为下面的 SQL。
直接来看这个 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要进行单词统计的表,我们会先做一些处理,将输入的单词都存放到这个表中
  • 表我们定义为两列(word, frequency),初始转化输入每个单词占一行,frequency 都是 1
  • 然后,就可以按照 SQL 的逻辑来进行统计聚合了。

其中,WordCount 表数据如下:

word frequency
hello 1
world 1
flink 1
flink 1
flink 1

那么接下来我们看,如何写一个 FlinkSQL 的程序。

二)环境和程序

首先,添加 FlinkSQL 需要的依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

程序如下:

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 创建上下文环境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        // 读取一行模拟数据作为输入
        String words = "hello world flink flink flink";
        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for (String word : split) {
            WC wc = new WC(word, 1);
            list.add(wc);
        }

        DataSource<WC> input = fbEnv.fromCollection(list);

        // DataSet 转 SQL,指定字段名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        // 注册为一个表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");

        DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
        ds1.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

执行,结果输出:

(
  `word` STRING,
  `frequency` BIGINT
)
flink, 3
world, 1
hello, 1

【Flink入门修炼】1-3 Flink WordCount 入门实现

四、小结

本篇手把手的带大家搭建起 Flink Maven 项目,然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来,大家应该对 Flink 程序基本执行流程有个初步的了解,为后续的学习打下了基础。文章来源地址https://www.toymoban.com/news/detail-825439.html

到了这里,关于【Flink入门修炼】1-3 Flink WordCount 入门实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink WordCount实践

    目录 前提条件 基本准备 批处理API实现WordCount 流处理API实现WordCount 数据源是文件 数据源是socket文本流 打包 提交到集群运行 命令行提交作业 Web UI提交作业 上传代码到gitee Windows安装好jdk8、Maven3、IDEA Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式   创建项目 使用

    2024年04月13日
    浏览(28)
  • 05 Flink 的 WordCount

    本文对应于 spark 系列的 Spark 的 WordCount 这里主要是 从宏观上面来看一下 flink 这边的几个角色, 以及其调度的整个流程  一个宏观 大局上的任务的处理, 执行  基于 一个本地的 flink 集群        Test01WordCount.txt 内容如下      Driver 提交 Job 到 JobManager, JobManager 分配任务到

    2024年02月22日
    浏览(27)
  • 使用Flink进行WordCount计算

    Flink是一款应用非常广泛的流处理系统,目前有客户使用Flink进行数据同步,效率较差。 之前虽然使用过Spark Streaming,但是Flink和Spark Streaming在使用上,还是有一点差异。如Word Count计算,Spark中好像是一个reduceByKey,Flink中需要先进行GroupBy,然后再做一次sum。 文件路径为: C

    2024年02月11日
    浏览(30)
  • Iceberg从入门到精通系列之十八:一篇文章深入了解Flink对Iceberg的支持

    Apache Iceberg 支持 Apache Flink 的 DataStream API 和 Table API。 功能支持 Flink 注意事项 SQL create catalog ✔️ SQL create database ✔️ SQL create table ✔️ SQL create table like ✔️ SQL alter table ✔️ 仅支持更改表属性,不支持列和分区更改 SQL drop_table ✔️ SQL select ✔️ 支持流式和批处理模式 SQ

    2024年02月16日
    浏览(48)
  • 修炼k8s+flink+hdfs+dlink(二:安装flink)

    创建目录,上传安装包。 配置参数。 在flink-conf.yaml中添加zookeeper配置 上传俩个包,在lib文件夹下面。 传送文件夹。 启动。 在node01,node02节点 ./bin/jobmanager.sh start 在node01,node02节点 ./bin/taskmanager.sh start jps 查看界面。 http://node01:8081/#/overview 5. 配置环境变量

    2024年02月07日
    浏览(28)
  • 修炼k8s+flink+hdfs+dlink(一:安装hdfs)

    在对应的所有的节点上进行安装。 https://blog.csdn.net/weixin_43446246/article/details/123327143 下载安装包 所有创建文件夹 修改配置文件 copy到其他节点。 所有机器全部配置环境变量 启动JournalNode 所有机器执行命令。 格式化NameNode 访问地址 http://node01:9870 http://node02:9870

    2024年02月07日
    浏览(34)
  • 我用GPT写了一个关于GPT的文章,大家看看写的如何

    目录 I. 引言 1.1 研究背景和意义 1.2 现有研究综述 II. ChatGPT技术介绍 2.1 ChatGPT技术原理 2.2 ChatGPT技术优势 III. ChatGPT技术在智能客服中的应用和挑战 3.1 ChatGPT技术在智能客服中的应用 3.2 ChatGPT技术在智能客服中面临的挑战 3.3 优化用户体验提升ChatGPT技术在智能客服中的作用 IV

    2024年02月07日
    浏览(122)
  • 修炼k8s+flink+hdfs+dlink(四:k8s(二)组件)

    控制平面组件会为集群做出全局决策,比如资源的调度。 以及检测和响应集群事件,例如当不满足部署的 replicas 字段时, 要启动新的 pod)。 该组件负责公开了 Kubernetes API,负责处理接受请求的工作。 API 服务器是 Kubernetes 控制平面的前端。 负责运行控制器进程。从逻辑上

    2024年02月07日
    浏览(38)
  • MapReduce入门(一)—— MapReduce概述 + WordCount案例实操

    MapReduce知识点总览图 MapReduce 是 一个分布式运算程序的编程框架 ,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。 MapReduce 核心功能是 将用户编写的业务逻辑代码 和 自带默认组件 整合成一个 完整的分布式运算程序 ,并发运行在一个 Hadoop 集群上。 1.2.1 优点 1 )M

    2023年04月21日
    浏览(43)
  • 修炼k8s+flink+hdfs+dlink(六:学习k8s-pod)

    直接进行创建。 使用yaml清单方式进行创建。 直接创建方式,并建立pod。 先创建employment,不会自动建立pod。 第一步: 创建文件 第二步:运行 kubectl delete pods/nginx kubectl delete kubectl exec -it my-nginx – /bin/bash 对象资源的扩容和缩容,内容修改。 edit – 直接在现有的资源对象,修

    2024年02月05日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包