【入门Flink】- 02Flink经典案例-WordCount

这篇具有很好参考价值的文章主要介绍了【入门Flink】- 02Flink经典案例-WordCount。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

WordCount

需求:统计一段文字中,每个单词出现的频次

添加依赖

	<properties>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

1.批处理

基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

1.1.数据准备

resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代码编写

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
        String filePath = Objects.requireNonNull(
               BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataSource<String> lineDS = env.readTextFile(filePath);

        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(
                new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                });

        // 4. 按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

打印结果如下:(结果正确)

【入门Flink】- 02Flink经典案例-WordCount,flink,flink,大数据

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流处理

DataStreamAPI可以直接处理批处理和流处理的所有场景

2.1读取文件

还是上述words.txt文件

代码实现:

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取文件
        String filePath = Objects.requireNonNull(
                StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataStreamSource<String> lineStream = env.readTextFile(filePath);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 执行
        env.execute();
    }
}

与批处理程序BatchWordCount有几点不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
  • 最后执行execute方法,开始执行任务。

2.2读取Socket文件流

实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

  1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
public class StreamSocketWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取文件
        DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 执行
        env.execute();
    }
}
  1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777

注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

  1. 从Linux发送数据

1、输入“hello flink”,输出如下内容

【入门Flink】- 02Flink经典案例-WordCount,flink,flink,大数据

2、再输入“hello world”,输出如下内容

【入门Flink】- 02Flink经典案例-WordCount,flink,flink,大数据

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据文章来源地址https://www.toymoban.com/news/detail-741672.html

到了这里,关于【入门Flink】- 02Flink经典案例-WordCount的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【数据结构与算法】三个经典案例带你了解动态规划

    从表中我们可以看到,最大的公共子串长度为2,一共有两个长度为2的公共子串,分别是第一个字符串的第2个字符到第3个字符和第一个字符串的第3个字符到第4个字符,即 ba 和 ac 根据上面的方法,我们来用代码封装一下求取最大公共子串的函数 function publicStr(s1, s2) { // 创建

    2024年04月09日
    浏览(95)
  • 第七篇【传奇开心果系列】Python微项目技术点案例示例:数据可视化界面图形化经典案例

    在学校或培训班,教学管理头绪繁杂,分析报告枯燥乏味。如果能编写一个程序实现数据可视化,界面图形化,那就可以让数据形象直观生动起来,变得有趣生动,而且有灵魂。于是我灵感顿悟就有了写一个数据可视化界面图形化示例的想法。我打算使用Python的nicegui库创建界

    2024年02月20日
    浏览(60)
  • 【机器学习】最经典案例:房价预测(完整流程:数据分析及处理、模型选择及微调)

    环境:anaconda+jupyter notebook 首先要明白一点: 数据决定模型的上限!数据决定模型的上限!数据决定模型的上限! (重要的事情说三遍。)对于数据的处理在一个完整案例中花费精力的比重应该占到一半以上。 以下分为:数据分析、数据清洗两部分。 数据分析主要包括:查

    2024年02月05日
    浏览(45)
  • 第六篇【传奇开心果系列】Python的自动化办公库技术点案例示例:大学生数据全方位分析挖掘经典案例

    Pandas在大学生数据的分析和挖掘中发挥着重要作用,帮助研究人员和教育工作者更好地理解大学生群体、优化教学管理和提升教育质量。 Pandas库可以用来分析挖掘大学生数据的各各方面,包括但不限于: 学生成绩数据:可以通过Pandas对大学生的成绩数据进行统计分析、可视

    2024年03月15日
    浏览(92)
  • Hadoop系统应用之MapReduce相关操作【IDEA版】---经典案例“倒排索引、数据去重、TopN”

      倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词(或词组)在一组文档中的存储位置的映射,提供了可以根据内容来查找文档的方式,而不是根据文档来确定内容,因此称为倒排索引(Inverted Index)。带有倒排索引

    2024年02月07日
    浏览(49)
  • 多线程四大经典案例

    本节内容很重要, 希 望 大 家 可 以 好 好 看 看 , 一 起 加 油~ 1.单线模式 1.1饿汉模式 1.2懒汉模式 2.阻塞式队列 2.1阻塞队列是什么 2.2生产者消费者模型 2.3标准库中的阻塞队列 2.4阻塞队列的实现 3.定时器 3.1定时器是什么 3.2标准库中的定时器 3.3实现定时器 4.线程池 4.1什么

    2023年04月27日
    浏览(49)
  • solidity经典案例-----智能投票

    角色分析:包括主持人、选民 功能分析: 仅主持人能授权给每个选民1票,即每个参与投票的选民拥有1票投票权。 选民可以选择将票数委托给其它选民,当然,收委托的选民仍然可以将票数继续委托给其它选民,即存在a—b–c–d,但是,一旦将票数委托给其它选民后,自己

    2024年01月16日
    浏览(52)
  • 路由器故障排错三大经典案例

    对于网络管理员来说,熟悉与掌握路由排错的思路和技巧是非常必要的。小编将通过三例典型的路由故障排错案例进行分析。 案例1 不堪重负,路由器外网口关闭 1、网络环境 某单位使用的是Cisco路由器,租用电信30MB做本地接入和l0MB教育网双线路上网,两年来网络运行稳定,

    2024年02月05日
    浏览(48)
  • 经典智能合约案例之发红包

    角色分析:发红包的人和抢红包的人 功能分析: 发红包:发红包的功能,可以借助构造函数实现,核心是将ether打入合约; 抢红包:抢红包的功能,抢成功需要一些断言判断,核心操作是合约转账给抢红包的人; 退还:当红包有剩余的时候,允许发红包的人收回余额,可以

    2024年02月07日
    浏览(46)
  • Python递归的几个经典案例

    当我们碰到诸如需要求阶乘或斐波那契数列的问题时,使用普通的循环往往比较麻烦,但如果我们使用递归时,会简单许多,起到事半功倍的效果。这篇文章主要和大家分享一些和递归有关的经典案例,结合一些资料谈一下个人的理解,也借此加深自己对递归的理解和掌握一

    2024年02月05日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包