使用Flink进行WordCount计算

这篇具有很好参考价值的文章主要介绍了使用Flink进行WordCount计算。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink是一款应用非常广泛的流处理系统,目前有客户使用Flink进行数据同步,效率较差。

之前虽然使用过Spark Streaming,但是Flink和Spark Streaming在使用上,还是有一点差异。如Word Count计算,Spark中好像是一个reduceByKey,Flink中需要先进行GroupBy,然后再做一次sum。

程序代码

package cn.jihui.flink

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object wc2 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val file_name = "C:\\Users\\32985\\IdeaProjects\\flink_demo1\\resources\\wc.txt"
    val data = env.readTextFile(file_name)
        .flatMap(p => p.split(" "))
        .map(p => (p, 1))
        .groupBy(0)
        .sum(1)

    data.print
  }
}

测试文件

文件路径为:C:\\Users\\32985\\IdeaProjects\\flink_demo1\\resources\\wc.txt,文件内容如下:文章来源地址https://www.toymoban.com/news/detail-515929.html

hello world
how are you
I am fine
how old are you

程序输出:

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=60578:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.wc2
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(I,1)
(am,1)
(are,2)
(fine,1)
(hello,1)
(how,2)
(old,1)
(world,1)
(you,2)
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

Process finished with exit code 0

到了这里,关于使用Flink进行WordCount计算的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink入门修炼】1-3 Flink WordCount 入门实现

    本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。 下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学

    2024年02月19日
    浏览(28)
  • Flink WordCount实践

    目录 前提条件 基本准备 批处理API实现WordCount 流处理API实现WordCount 数据源是文件 数据源是socket文本流 打包 提交到集群运行 命令行提交作业 Web UI提交作业 上传代码到gitee Windows安装好jdk8、Maven3、IDEA Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式   创建项目 使用

    2024年04月13日
    浏览(28)
  • 05 Flink 的 WordCount

    本文对应于 spark 系列的 Spark 的 WordCount 这里主要是 从宏观上面来看一下 flink 这边的几个角色, 以及其调度的整个流程  一个宏观 大局上的任务的处理, 执行  基于 一个本地的 flink 集群        Test01WordCount.txt 内容如下      Driver 提交 Job 到 JobManager, JobManager 分配任务到

    2024年02月22日
    浏览(27)
  • 使用Flink处理Kafka中的数据

    目录         使用Flink处理Kafka中的数据 前提:  一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据           具体代码(scala)                 具体执行代码①                 重要逻

    2024年01月23日
    浏览(51)
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例 - 完整版

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月02日
    浏览(52)
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(1) - 介绍

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月01日
    浏览(54)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(55)
  • 大数据Flink(五十三):Flink流处理特性、发展历史以及Flink的优势

    文章目录 Flink流处理特性、发展历史以及Flink的优势 一、Flink流处理特性 二、发展历史

    2024年02月14日
    浏览(54)
  • flink 对每天的数据进行汇总

    使用flink 对kafka中每天的数据进行汇总,来一条数据统计一次结果,并将结果进行持久化 之前一直在 trigger中想用时间进行触发,后面发现当环境中使用的是 事件时间(EventTime) 事件时间(EventTime)语义,因此触发器应该使用 EventTimeTrigger ,而不是 ProcessingTimeTrigger 。 因此,

    2024年02月16日
    浏览(37)
  • Flink:处理大规模复杂数据集的最佳实践深入探究Flink的数据处理和性能优化技术

    作者:禅与计算机程序设计艺术 随着互联网、移动互联网、物联网等新型网络技术的不断发展,企业对海量数据的处理日益依赖,而大数据分析、决策支持、风险控制等领域都需要海量的数据处理能力。如何高效、快速地处理海量数据、提升处理效率、降低成本,是当下处理

    2024年02月13日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包