Flink第一章:环境搭建

这篇具有很好参考价值的文章主要介绍了Flink第一章:环境搭建。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

Flink第一章:环境搭建



前言

Flink也是现在现在大数据技术中火爆的一门,反正大数据的热门技术学的也差不多了,啃完Flink基本的大数据技术就差不多哦学完了.


一、Idea项目

1.创建项目

2.pom.依赖

这里说明一下我选择的环境.
java8
scala2.12
flink采用最新的1.17
请大家根据自己的环境更换版本

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>FlinkTutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <flink.version>1.17.0</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>


    <dependencies>
        <!-- 引入 Flink 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
</project>

3.DataSet

:这里使用DataSet对数据进行批处理,但是在新版本flink中DataStreaming已经做到了流批一体,未来会慢慢移除DataSet接口,所以这里只是做个示例.
Flink第一章:环境搭建
BatchWC.scala

package com.atguigu.chapter01

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment, GroupedDataSet, createTypeInformation}


object BatchWC {
  def main(args: Array[String]): Unit = {
    //1.创建执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //2.读取文本数据
    val lineData: DataSet[String] = env.readTextFile("input/word.txt")

    //3.对数据进行处理
    val wordAneOne: DataSet[(String, Int)] = lineData.flatMap(_.split(" ")).map(word => (word, 1))

    val wordAndOneGroup: GroupedDataSet[(String, Int)] = wordAneOne.groupBy(0)

    val sum: AggregateDataSet[(String, Int)] = wordAndOneGroup.sum(1)

    sum.print()
  }
}

Flink第一章:环境搭建

4.DataStreaming

DataStreaming进行批处理

BoundedStreamingWordCount.scala

package com.atguigu.chapter01

import org.apache.flink.streaming.api.scala._


object BoundedStreamingWordCount{
  def main(args: Array[String]): Unit = {
    //1.创建一个流式执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.读取文本文件
    val lineDataStreaming: DataStream[String] = env.readTextFile("input/word.txt")

    //3.对数据进行处理
    val wordAneOne: DataStream[(String, Int)] = lineDataStreaming.flatMap(_.split(" ")).map(word => (word, 1))

    val wordAndOneGroup: KeyedStream[(String, Int), String] = wordAneOne.keyBy(_._1)

    val sum: DataStream[(String, Int)] = wordAndOneGroup.sum(1)

    sum.print()

    //4.执行方法
    env.execute()
  }
}

Flink第一章:环境搭建
DataStreaming进行流处理
StreamingWC.scala

package com.atguigu.chapter01

import org.apache.flink.streaming.api.scala._


object StreamingWC{
  def main(args: Array[String]): Unit = {
    //1.创建一个流式执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.读取文本文件
    val lineDataStreaming: DataStream[String] = env.socketTextStream("hadoop102",7777)


    //3.对数据进行处理
    val wordAneOne: DataStream[(String, Int)] = lineDataStreaming.flatMap(_.split(" ")).map(word => (word, 1))

    val wordAndOneGroup: KeyedStream[(String, Int), String] = wordAneOne.keyBy(_._1)

    val sum: DataStream[(String, Int)] = wordAndOneGroup.sum(1)

    sum.print()

    //4.执行方法
    env.execute()
  }
}

这里我们选择对hadoop102的7777端口进行监听,所以要提前打开虚拟机.
Flink第一章:环境搭建
输入数据查看结果
Flink第一章:环境搭建

二、环境搭建

Flink第一章:环境搭建
我们直接使用官方推荐最新版.
官方下载连接

1.Standalone

但节点模式,一般用于数据测试,我们在hadoop102上进行.
上传并解压文件

tar -xvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
cd ../module/
mv flink-1.17.0/ flink

修改配置
本来单节点是不需要修改配置的,但是咱们虚拟机没有桌面,需要从外部访问,所以还是需要修改一下.
Flink第一章:环境搭建
在203行修改,或者用vim的搜索功能.
Flink第一章:环境搭建
启动Flink

./bin/start-cluster.sh

Flink第一章:环境搭建
在Web UI界面查看一下
hadoop102:8081
Flink第一章:环境搭建
现在我们跑一下官方的测试案例进行测试.

 ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

Flink第一章:环境搭建
Flink第一章:环境搭建
然后直接停掉集群Flink就行了,命令行操作,咱们后边再说.

./bin/stop-cluster.sh

Flink第一章:环境搭建

2.Flink on Yarn

修改环境变量

vim /etc/profile.d/my_env.sh 

新增这一行是Flink文档中要求,我也不知道啥意思
Flink第一章:环境搭建
然后source环境.
修改配置文件

vim ./conf/masters

Flink第一章:环境搭建

vim ./conf/workers

Flink第一章:环境搭建
修改完之后,启动集群.
因为我们在Yarn上完成任务,所以我们要启动Hadoop集群.
Flink第一章:环境搭建
向Yar提交任务,有三证模式,其中包括.
会话模式,应用模式,单作业模式.

应用程序模式将在 YARN 上启动一个 Flink 集群,其中应用程序 jar 的 main() 方法在 YARN 中的 JobManager 上执行。 应用程序完成后,群集将立即关闭。您可以使用或通过取消 Flink 作业手动停止集群。

会话模式有两种操作模式:
附加模式(默认):客户端将 Flink 集群提交到 YARN,但客户端继续运行,跟踪集群的状态。如果群集失败,客户端将显示错误。如果客户端被终止,它也会向群集发出关闭信号。yarn-session.sh
分离模式(或):客户端将 Flink 集群提交到 YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。-d–detachedyarn-session.sh

单作业模式
单作业集群模式将在 YARN 上启动一个 Flink 集群,然后在本地运行提供的应用程序 jar,最后将 JobGraph 提交给 YARN 上的 JobManager。如果传递参数,则客户端将在接受提交后停止。–detached

官方建议使用应用模式,并且单作业模式已经从1.15之后就被移除了,所以咱们只演示前两种.
如果日后工作有需要,自己看看文档就行了

应用模式
这里直接跑官方给的案例了.

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

Flink第一章:环境搭建
记住这里给的项目appID 后边要用

查看项目

./bin/flink list -t yarn-application -Dyarn.application.id=application_1682757957558_0001

Flink第一章:环境搭建
停止项目

./bin/flink cancel -t yarn-application -Dyarn.application.id=application_1682757957558_0001 ff18e3a66c94a581f8da0c027bbe4bc3

Flink第一章:环境搭建
当集群上没有项目时,项目就会停止,这时在查看项目,就会报错.
Flink第一章:环境搭建
由于当项目全部停止后,集群就会停止,所以当你的集群经常只跑单个项目时,就总会重启集群,所以生产中也不是最常用的.

生产中最常用的还是会话模式,它可以在没有项目运行的时候也使Flink集群处于启动状态.

会话模式
创建会话

 bin/yarn-session.sh -nm test -d

-nm 指定会话名称
-d 将当前会话挂载到后台
Flink第一章:环境搭建
启动成功后,有两条日志需要注意一下,一个是Web UI的网址,一个是关掉会话的方法.
我们先去Web查看一下
Flink第一章:环境搭建
因为他是动态分配,所以显示的可用资源永远都是0,当任务提交时,他会向Yarn申请资源,然后执行任务.
我们将之前写的代码打包然后将其提交到hadoop102Flink文件
Flink第一章:环境搭建
提交任务
记得要开启nc
Flink第一章:环境搭建

./bin/flink run -c com.atguigu.chapter01.StreamingWC ./FlinkTutorial-1.0-SNAPSHOT.jar 

Flink第一章:环境搭建
Web查看一下
Flink第一章:环境搭建
可用资源还是0,但是这个任务已经跑一起来了,现在查看一下效果.
Flink第一章:环境搭建
Flink第一章:环境搭建
关闭项目.
Flink第一章:环境搭建
Flink第一章:环境搭建
关闭会话

echo "stop" | ./bin/yarn-session.sh -id application_1682757957558_0002

Flink第一章:环境搭建
至此Flink环境搭建完成,建议保留快照


总结

Flink是做数据实时分析必不可少的技术,也要学习.文章来源地址https://www.toymoban.com/news/detail-440732.html

到了这里,关于Flink第一章:环境搭建的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第一章 Linux基础及Linux环境搭建(保姆级别)

    第一章 Linux基础及Linux环境的搭建(保姆级别) 一、Linux简介 1、什么是Linux? ​ 一款 免费开源流行 的操作系统。 2、Linux为什么流行? 1)Windows以用户的体验很好而流行 2)Linux流行主要是因为稳定而流行 Linux一般用于企业中中的服务器 Linux用来做服务器操作系统使用 3、L

    2023年04月16日
    浏览(30)
  • 数据库安全-第一章 Mysql 安全基础-【web 环境搭建——LAMP-2】(LAMP——apache2环境搭建)

    Apache HTTP 服务器项目致力于为现代操作系统(包括 UNIX 和 Windows)开发和维护开源 HTTP 服务器。该项目的目标是提供一个安全、高效且可扩展的服务器,该服务器提供与当前 HTTP 标准同步的 HTTP 服务。 Apache 软件基金会和 Apache HTTP 服务器项目在2021年6月1日宣布发布 Apache HTTP 服

    2024年02月20日
    浏览(36)
  • 我的世界Bukkit插件开发-第一章-初始环境搭建-搭建基于spigot核心的服务器-并连接客户端......

    基于Spigot核心的插件开发 本章实现本地成功搭建私服并连接客户端 前置开发工具:IDEA JDK环境-JKD-17 构建工具:maven 必备idea插件:Minecraft Development 服务器核心: Spigot-1.20.jar mc客户端 小部分内容来自AI大模型,如需深入,请联系博主或自行了解 手工不易,且看且珍惜 首次开始

    2024年03月21日
    浏览(44)
  • 数据库安全-第一章 Mysql 安全基础-【web 环境搭建——LAMP-1】-LAMP & LNMP 简介

    WEB 的本意是蜘蛛网和网,在网页设计中称为网页。现广泛译作网络、互联网等技术领域。日常生活中用户使用浏览器请求一个 URL ,这个 URL 标识着某个特定的服务器的特定资源,服务器接收到请求之后,服务器就把生成的 HTML 网页通过 HTTP 协议发送给浏览器。 早期的 WEB 页

    2024年01月18日
    浏览(34)
  • Flutter系列文章-Flutter环境搭建和Dart基础

    Flutter是Google推出的一个开源的、高性能的移动应用开发框架,可以用一套代码库开发Android和iOS应用。Dart则是Flutter所使用的编程语言。让我们来看看如何搭建Flutter开发环境,并了解Dart语言的基础知识。 1. 安装Flutter SDK 首先,访问Flutter官网下载Flutter SDK。选择适合你操作系统

    2024年02月15日
    浏览(35)
  • 【极数系列】Flink环境搭建(02)

    tips:下载地址https://repo.huaweicloud.com/java/jdk/ 双击解压后jdk的exe运行文件 tips:一般不安装在C盘,修改路径,接着直接next 此电脑–属性–高级系统设置–环境变量–找到系统变量path–选择新建–把jdk与jre目录均加上 进入oracle目录,如我的是C:ProgramDataOracleJavajavapath,删除该

    2024年01月24日
    浏览(29)
  • 《Flink学习笔记》——第一章 概念及背景

    ​ 什么是批处理和流处理,然后由传统数据处理架构为背景引出什么是有状态的流处理,为什么需要流处理,而什么又是有状态的流处理。进而再讲解流处理的发展和演变。而Flink作为新一代的流处理器,它有什么优势?它的相关背景及概念和特性又是什么?有哪些应用场景

    2024年02月11日
    浏览(36)
  • 基于linux下的高并发服务器开发(第一章)- 目录操作函数

     (1)int mkdir(const char* pathname,mode_t mode); #include sys/stat.h #include sys/types.h int mkdir(const char *pathname, mode_t mode);     作用:创建一个目录     参数:          pathname: 创建的目录的路径         mode: 权限,八进制的数     返回值:          成功返回0, 失败返回-1  (

    2024年02月16日
    浏览(33)
  • 第一章 Jenkins搭建

    7年java程序员,准备认真总结点自己这些年学习到的技术知识、业务知识。本文先从服务器整体搭建,开始总结。 Jenkins是一个开源软件项目,是基于java开发的一种持续集成工具,用于监控持续重复的工作,旨在提供一个开放易用的软件平台,使软件的持续集成变成可能。 本

    2024年01月24日
    浏览(29)
  • 《操作系统真象还原》第一章 部署工作环境

    配合视频阅读体验更佳!https://www.bilibili.com/video/BV1kg4y1V7TV/?pop_share=1vd_source=701807c4f8684b13e922d0a8b116af31 环境vmware + deepin-desktop-community-20.8-amd64 (已在纯净ubuntu 22.04上验证过此教程,完全可行!只是需要安装vim 命令: sudo apt install vim ) 先安装其他需要的东西: sudo apt install bui

    2024年02月08日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包