创建第一个 Flink 项目

这篇具有很好参考价值的文章主要介绍了创建第一个 Flink 项目。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarnk8sMesos等不同的资源管理器部署自己的应用。

环境依赖:
【1】JDK环境:Flink核心模块均使用 Java开发,所以运行环境需要依赖JDKJDK版本需要保证在1.8以上。
【2】Maven编译环境:Flink的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA:需要安装scala插件以及scala环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建Maven项目,pom.xml文件中配置如下依赖

<dependencies>
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-scala_2.12</artifactId>
       <version>1.10.0</version>
   </dependency>
   <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-scala_2.12</artifactId>
       <version>1.10.0</version>
   </dependency>
</dependencies>

<build>
   <plugins>
       <!-- 该插件用于将Scala代码编译成class文件 -->
       <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
           <version>3.4.6</version>
           <executions>
               <execution>
                   <goals>
                       <!--声明绑定到 maven 的compile阶段-->
                       <goal>compile</goal>
                   </goals>
               </execution>
           </executions>
       </plugin>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-assembly-plugin</artifactId>
           <version>3.0.0</version>
           <configuration>
               <descriptorRefs>
                   <descriptorRef>jar-with-dependencies</descriptorRef>
               </descriptorRefs>
           </configuration>
           <executions>
               <execution>
                   <id>make-assembly</id>
                   <phase>package</phase>
                   <goals>
                       <goal>single</goal>
                   </goals>
               </execution>
           </executions>
       </plugin>
   </plugins>
</build>

【2】resource目录中添加需要进行统计的文件文件及内容
创建第一个 Flink 项目,Flink,flink,大数据,算法,java,后端,数据结构,链表

【3】WordCount.java文件内容如下,需要注意隐私转换问题,需要引入scala._

 import org.apache.flink.api.scala._

/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建一个批处理的执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据
    var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
    //基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group by
    val resultDataSet: DataSet[(String,Int)] = inputDateSet
      .flatMap(_.split(" "))//分词得到所有 word构成的数据集
      .map((_,1))//_表示当前 word 转换成一个二元组(word,count)
      .groupBy(0)//以二元组中第一个元素作为key
      .sum(1) //1表示聚合二元组的第二个元素的值

    //打印输出
    resultDataSet.print()
  }
}

【4】统计结果展示:
创建第一个 Flink 项目,Flink,flink,大数据,算法,java,后端,数据结构,链表

三、Flink项目 Scala版 DataStream 无界流

【1】StreamWordCount.java文件内容如下

package com.zzx.flink

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
 def main(args: Array[String]): Unit = {
   // 创建一个流处理执行环境
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流
   val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);
   //定义转换操作 word count
   val resultDataStream: DataStream[(String,Int)] = inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word
     .filter(_.nonEmpty)
     .map((_,1))//转换成 word count 二元组
     .keyBy(0)//按照第一个元素分组
     .sum(1)//按照第二个元素求和

   resultDataStream.print()

   //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")
 }
}

【2】我这里在Hadoop1中通过nc -lk xxx打开一个socket通信
创建第一个 Flink 项目,Flink,flink,大数据,算法,java,后端,数据结构,链表

【3】查看IDEA输出统计内容如下:输出word的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
创建第一个 Flink 项目,Flink,flink,大数据,算法,java,后端,数据结构,链表

【4】当我们需要将Java文件打包上传到Flink的时候,这里的hostport可以从参数中进行获取,代码修改如下:文章来源地址https://www.toymoban.com/news/detail-752960.html

package com.zzx.flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCount {
 def main(args: Array[String]): Unit = {
   // 创建一个流处理执行环境
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流  hostname:prot 从程序运行参数中读取
   val params: ParameterTool = ParameterTool.fromArgs(args);
   val hostname: String = params.get("host");
   val port: Int = params.getInt("port");
   val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);
   //定义转换操作 word count
   val resultDataStream: DataStream[(String,Int)] = inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word
     .filter(_.nonEmpty)
     .map((_,1))//转换成 word count 二元组
     .keyBy(0)//按照第一个元素分组
     .sum(1)//按照第二个元素求和

   resultDataStream.print()

   //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")
 }
}

到了这里,关于创建第一个 Flink 项目的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据Flink(六十一):Flink流处理程序流程和项目准备

    文章目录 Flink流处理程序流程和项目准备 一、Flink流处理程序的一般流程

    2024年02月11日
    浏览(38)
  • 手把手教你实现一个JavaWeb项目:创建一个自己的网页博客系统(前端+后端)(一)

    一篇博客带你实现一个真正的项目!  先来看看它是什么样式的: 目录: 1、大体步骤🦖:         1、创建Maven项目🦕         2、引入依赖🦕         3、创建必要的目录🦕         4、编写代码🦕         5、打包部署(基于SmartTomcat)🦕         

    2024年02月06日
    浏览(55)
  • flink 最后一个窗口一直没有新数据,窗口不关闭问题

    窗口类型:滚动窗口 代码: 代码部分逻辑说明 若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算) env.getConfig().setAutoWatermarkInterval(5000); 使用自定义的watermark: watermark 周期生成()的疑问: 1、默认200ms,会连续生成4次后,

    2024年01月18日
    浏览(39)
  • Flink 状态后端

    状态后端 (state backend) : 负责管理本地状态的存储方式, 位置 Flink 的状态后端有两类 : 哈希表状态后端 (HashMapStateBackend) : 状态放在内存 内嵌 RocksDB 状态后端 (EmbeddedRocksDBStateBackend) : 状态放在 RocksDB 数据库 哈希表状态后端 : 实现 : 将状态当作对象 (objects) , 保存在 Taskmanager 的

    2024年02月13日
    浏览(41)
  • 关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑

    目录 前言         滚动窗口(Tumbling Windows)         allowedLateness 场景描述 数据倾斜问题解决 输出结果偏差问题         思考 输出结果偏差解决 扩展         滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

    2024年02月21日
    浏览(42)
  • flink时间窗口无新的数据进来最后一个窗口不关闭

    测试反馈, 配置的flink任务提交上去后, 输入数据源符合条件,到时间窗口的size。最后一个窗口没有闭窗计算,数据并没及时输出告警 经过调试发现,watermark没有向后继续推进,导致无法闭窗, watermark的时间取的是数据中的业务时间,create_time。 因为没有后续数据进来,

    2024年02月13日
    浏览(43)
  • JAVA-服务器搭建-创建web后端项目

       

    2024年04月22日
    浏览(71)
  • Flink State backend状态后端

    Flink在v1.12到v1.14的改进当中,其状态后端也发生了变化。老版本的状态后端有三个,分别是MemoryStateBackend、FsStateBackend、RocksDBStateBackend,在flink1.14中,这些状态已经被废弃了,新版本的状态后端是 HashMapStateBackend、EmbeddedRocksDBStateBackend。 有状态流应用中的检查点(checkpoint),

    2024年01月25日
    浏览(41)
  • Flink读取mysql数据库(java)

    代码如下: 运行结果如下:

    2024年02月12日
    浏览(40)
  • Flink理论—容错之状态后端(State Backends)

    Flink 使用流重放 和 检查点 的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,同时保持一致性 容错机制不断地绘制分布式流数据流的快照。对于小状态的流式应用程

    2024年02月20日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包