Flink 客户端操作命令及可视化工具

这篇具有很好参考价值的文章主要介绍了Flink 客户端操作命令及可视化工具。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从Flink命令行、Scala ShellSQL ClientRestful APIWeb五个方面进行整理。

Flink安装目录的bin目录下可以看到flinkstart-scala-shell.shsql-client.sh等文件,这些都是客户端操作的入口。
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

flink 常见操作:可以通过 -help 查看帮助

run 运行任务

-d:以分离模式运行作业
-c:如果没有在jar包中指定入口类,则需要在这里通过这个参数指定;
-m:指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager,可以说是yarn集群名称;
-p:指定程序的并行度。可以覆盖配置文件中的默认值;
-s:保存点savepoint的路径以还原作业来自(例如hdfs:///flink/savepoint-1537);

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID dce7b69ad15e8756766967c46122736f

就可以看到我们提交的JobManager,默认是一个并发。
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

点进去就可以看到详细的信息
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

点击左侧TaskManager —Stdout能看到具体输出的日志信息。
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

或者查看TaskManager节点的log目录下的*.out文件,也能看到具体的输出信息。
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

list 查看任务列表

-mjobmanager<arg>作业管理器(主)的地址连接。

[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop 停止任务

需要指定jobmanagerip:protjobId。如下报错可知,一个job能够被stop要求所有的source都是可以stoppable的,即实现了 StoppableFunction接口。

[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
Suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".
    at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

StoppableFunction接口如下,属于优雅停止任务。

 /**
 * @Description 需要 stoppabel 的函数必须实现此接口,例如流式任务 source*
 *               stop() 方法在任务收到 stop信号的时候调用
 *               source 在接收到这个信号后,必须停止发送新的数据优雅的停止。
 * @Date 2020/7/9 17:26
 */
 @PublicEvolving
 public interface StoppableFunction {
     /**
     * 停止 source,与 cancel() 不同的是,这是一个让 source优雅停止的请求。
     * 等待中的数据可以继续发送出去,不需要立即停止
    */
    void stop();
}

Cancel 取消任务

如果在conf/flink-conf.yaml里面配置state.savepoints.dir,会保存savepoint,否则不会保存savepoint。(重启)

state.savepoints.dir: file:///tmp/savepoint

执行 Cancel命令 取消任务

[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.

也可以在停止的时候显示指定savepoint目录

1 [root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint.
Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.

取消和停止(流作业)的区别如下:
cancel()调用, 立即调用作业算子的cancel()方法,以尽快取消它们。如果算子在接到cancel()调用后没有停止,Flink将开始定期中断算子线程的执行,直到所有算子停止为止。
stop()调用 ,是更优雅的停止正在运行流作业的方式。stop()仅适用于source实现了StoppableFunction接口的作业。当用户请求停止作业时,作业的所有source都将接收stop()方法调用。直到所有source正常关闭时,作业才会正常结束。这种方式,使 作业正常处理完所有作业。

触发 savepoint

当需要生成savepoint文件时,需要手动触发savepoint。如下,需要指定正在运行的 JobID 和生成文件的存放目录。同时,我们也可以看到它会返回给用户存放的savepoint的文件名称等信息。

 [root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
 Executing TopSpeedWindowing example with default input data set.
 Use --input to specify file input.
 Printing result to stdout. Use --output to specify output path.
 Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d
 [root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/
 Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.
 Waiting for response...
 Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfd
 You can resume your program from this savepoint with the run command.

savepointcheckpoint的区别:
checkpoint是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;savepoint是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。
checkpoint是作业failover的时候自动使用,不需要用户指定。savepoint一般用于程序的版本更新,bug修复,A/B Test等场景,需要用户指定。

从指定 savepoint 中启动

[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2

查看JobManager的日志能够看到Reset the checkpoint ID为我们指定的savepoint文件中的ID
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

modify 修改任务并行度

这里修改masterconf/flink-conf.yamltask slot数修改为4。并通过xsync分发到 两个slave节点上。

taskmanager.numberOfTaskSlots: 4

修改参数后需要重启集群生效:关闭/启动集群

[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh 
Stopping taskexecutor daemon (pid: 8236) on host hadoop2.
Stopping taskexecutor daemon (pid: 8141) on host hadoop3.
Stopping standalonesession daemon (pid: 22633) on host hadoop1.
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.

启动任务

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a

从页面上能看到Task Slots总计变为了8,运行的Slot1,剩余Slot数量为7
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

这时候默认的并行度是1
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

Flink1.0版本命令行flink modify已经没有这个行为了,被移除了。。。Flink1.7上是可以运行的。

[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3
"modify" is not a valid action.

Info 显示程序的执行计划

[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar 
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

拷贝输出的json内容,粘贴到这个网站:http://flink.apache.org/visualizer/可以生成类似如下的执行图。

Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

可以与实际运行的物理执行计划进行对比。
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

SQL Client Beta

进入 Flink SQL

[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded

Select查询,按Q退出如下界面;

Flink SQL> select 'hello word';
                                                                                                        SQL Query Result (Table)
 Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:37:04.649

                    EXPR$0
                hello word




Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

打开http://hadoop1:8081能看到这条select语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的Custom Source,输出用的是Stream Collect Sink,且只输出一条结果。
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

explain 查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
== Abstract Syntax Tree ==         //抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Optimized Logical Plan ==      //优化后的逻辑执行计划
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])
   +- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Physical Execution Plan ==    //物理执行计划
Stage 13 : Data Source
    content : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

    Stage 15 : Operator
        content : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
        ship_strategy : HASH

结果展示

SQL Client支持两种模式来维护并展示查询结果:

table mode

在内存中物化查询结果,并以分页table形式展示。用户可以通过以下命令启用table mode:例如如下案例;

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.

Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                                                                                                          SQL Query Result (Table)
 Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:55:08.589

                      name                       cnt
                     Alice                         1
                      Greg                         1
                       Bob                         2



Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

changelog mode

不会物化查询结果,而是直接对continuous query产生的添加和撤回retractions结果进行展示:如下案例中的-表示撤回消息

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.

Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                                                                                                        SQL Query Result (Changelog)
 Table program finished.                                                                                                                                                                                               Updated: 16:58:05.777

 +/-                      name                       cnt
   +                       Bob                         1
   +                     Alice                         1
   +                      Greg                         1
   -                       Bob                         1
   +                       Bob                         2



Q Quit                                                                        + Inc Refresh                                                                 O Open Row
R Refresh                                                                     - Dec Refresh

Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

Environment Files

CREATE TABLE 创建表DDL语句:

Flink SQL> CREATE TABLE pvuv_sink (
>     dt VARCHAR,
>     pv BIGINT,
>     uv BIGINT
> ) ;
[INFO] Table has been created.

SHOW TABLES 查看所有表名

Flink SQL>  show tables;
pvuv_sink

DESCRIBE 表名 查看表的详细信息;

Flink SQL>  describe pvuv_sink;
root
 |-- dt: STRING
 |-- pv: BIGINT
 |-- uv: BIGINT

插入等操作均与关系型数据库操作语句一样,省略N个操作

Restful API

接下来我们演示如何通过rest api来提交jar包和执行任务。
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

通过Show Plan可以看到执行图
Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端

提交之后的操作,取消的话点击页面的Cancel Job

Flink 客户端操作命令及可视化工具,Flink,flink,python,大数据,java,面试,性能优化,后端文章来源地址https://www.toymoban.com/news/detail-761044.html

到了这里,关于Flink 客户端操作命令及可视化工具的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • NexNoSQL Client:Elasticsearch、Redis、MongoDB三合一的可视化客户端管理工具

    工作中我们使用了Elasticsearch作为存储,来支持内容的搜索,Elasticsearch这个软件大家都耳熟能详,它是一个分布式、高扩展、高实时的搜索与数据分析引擎,不仅仅支持文本索引,还支持聚合操作,使用它既可以做数据搜索,还可以做报表分析,非常的方便。 在使用过程中我

    2024年02月15日
    浏览(98)
  • mysql 命令行常用操作和客户端

            学习了安装mysql,学习下命令行常用的操作。   1、SHOW DATABASES 用SHOW语句找出在服务器上当前存在什么数据库: 2、创建数据库 CREATE DATABASE 创建一个数据库MYSQLDATA 3、选择数据库 USE MYSQLDATA 选择你所创建的数据库 按回车键出现Database changed 时说明操作成功! 4、查看现

    2024年01月19日
    浏览(34)
  • ZooKeeper基础命令和Java客户端操作

    (1)Help (2)ls 使用 ls 命令来查看当前znode中所包含的内容 (3)ls2查看当前节点数据并能看到更新次数等数据 (4)stat查看节点状态 (5)set 1)设置节点的具体值 2)set 节点 value值 set /test atguigu (6)get 1)获得节点的值 2)get 节点 (7)create 1)普通创建 create /test demo001

    2024年02月10日
    浏览(31)
  • flink客户端提交任务报错

    { “errors”: [ “org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.ntat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest KaTeX parse error: Undefined control sequence: n at position 26: …ndler.java:110)̲n̲tat java.util.… UniHandle.tryFire(CompletableFuture.java:797)ntat j

    2024年02月15日
    浏览(33)
  • Docker客户端命令

    使用  podman  模拟  Docker CLI  的功能,并创建  /etc/containers/nodocker  文件以静默消息。管理 pods、容器和镜像。 用法: podman [选项] [命令] 命令 : attach 附加到一个正在运行的容器 auto-update 根据容器的自动更新策略自动更新容器 build 使用 Containerfiles 中的指令构建镜像 commi

    2024年04月14日
    浏览(31)
  • ZooKeeper【客户端命令行】

    启动ZooKeeper集群 启动客户端           我们发现启动客户端时它会默认连接本地的服务器,这是因为zookeeper客户端启动时默认连接的是本地模式。 指定连接集群中的服务器  甚至连接别的服务器节点:从hadoop102上连接hadoop103。      czxid : 每次修改 ZooKeeper 状态都会产生

    2024年02月11日
    浏览(32)
  • redis 登录客户端命令

    Redis 命令用于在 redis 服务上执行操作。 要在 redis 服务上执行命令需要一个 redis 客户端。Redis 客户端在我们之前下载的的 redis 的安装包中。 语法 Redis 客户端的基本语法为: $ redis-cli 实例 以下实例讲解了如何启动 redis 客户端: 启动 redis 客户端,打开终端并输入命令 redis

    2023年04月08日
    浏览(33)
  • ElasticSearch的客户端操作

    ElasticSearch的客户端操作 使用Kibana进行以下实验,进行Restful接口访问 索引库操作,完成对索引的增、删、查操作 “acknowledged” : true, 代表操作成功 “shards_acknowledged” : true, 代表分片操作成功 “index” : “shopping” 表示创建的索引库名称 注意:创建索引库的分片数默认5片,

    2024年02月12日
    浏览(33)
  • java 客户端操作HDFS

    部署包win版本 源码包zip包 lib整合:共121个jar包 $HADOOP_PREFIX/share/hadoop/{common,hdfs,mapreduce,yarn,tools}/{lib,.}*.jar  将windows版本hadoop/bin/hadoop.dll 放到c:/windows/system32下 hadoop的bin和sbin目录放PATH中+HADOOP_HOME+HADOOP_USER_NAME=root 安装插件 配置 重启电脑!!!!!!!加载hadoop.dll 创建java p

    2024年02月10日
    浏览(33)
  • ClickHouse(五):Clickhouse客户端命令行参数

      进入正文前,感谢宝子们订阅专题、点赞、评论、收藏!关注IT贫道,获取高质量博客内容! 🏡个人主页:含各种IT体系技术,IT贫道_Apache Doris,Kerberos安全认证,大数据OLAP体系技术栈-CSDN博客 📌订阅:拥抱独家专题,你的订阅将点燃我的创作热情! 👍点赞:赞同优秀创作,

    2024年02月15日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包