Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

这篇具有很好参考价值的文章主要介绍了Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

概述

设置重启策略

什么是flink的重启策略(Restartstrategy)

flink的重启策略(Restartstrategy)实战

flink的4种重启策略

FixedDelayRestartstrategy(固定延时重启策略)

FailureRateRestartstrategy(故障率重启策略)

NoRestartstrategy(不重启策略)

配置State Backends 以及 Checkpointing

Checkpoint

启用和配置

选择 State backend

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

State backend比较

概述

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

get_config()返回 table config,可以通过 table config 来定义 Table API 的运行时行为。

t_env = TableEnvironment.create(Environmentsettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

设置重启策略

在TableConfig中,通过设置键值选项来配置它们。

什么是flink的重启策略(Restartstrategy)

Restartstrategy,重启策略,在遇到机器或者代码等不可预知的问题时导致 Job 或者 Task 挂掉的时候,它会根据配置的重启策略将 Job 或者受影响的 Task 拉起来重新执行,以使得作业恢复到之前正常执行状态。Flink 中的重启策略决定了是否要重启 Job 或者 Task,以及重启的次数和每次重启的时间间隔。

flink的重启策略(Restartstrategy)实战

flink的 Restartstrategy 作用是提升任务健壮性和容错性,保证任务可以实时产出数据。

设置重启策略和公司处理数据业务需求有很大的关系,根据不同的业务需求设置处理任务的不同策略。

其实遇到上面这种问题比较常见,比如有时候因为数据的问题(不合规范、为 null 等),这时在处理这些脏数据的时候可能就会遇到各种各样的异常错误,比如空指针、数组越界、数据类型转换错误等。

可能你会说只要过滤掉这种脏数据就行了,或者进行异常捕获就不会导致 Job 不断重启的问题了。

所以日常开发中我们要尽力的保证代码的健壮性,但是也要配置好 Flink Job 的 Restartstrategy(重启策略)。

flink的4种重启策略

默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。

如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。

配置参数 restart-strategy 定义了哪个策略被使用。

固定间隔 (Fixed delay)

失败率 (Failure rate)

无重启 (No restart)

FixedDelayRestartstrategy(固定延时重启策略)

FixedDelayRestartstrategy是固定延迟重启策略,程序按照集群配置文件中或者程序中额外设置的重启次数尝试重启作业,如果尝试次数超过了给定的最大次数,程序还没有起来,则停止作业,另外还可以配置连续两次重启之间的等待时间。

在 flink-conf.yaml 中可以像下面这样配置:

restart-strategy: fixed-delay

#表示作业重启的最大次数,启用 checkpoint 的话是 Integer.MAX_VALUE,否则是 1

restart-strategy.fixed-delay.attempts: 3

#如果设置分钟可以类似 1 min,该参数表示两次重启之间的时间间隔,当程序与外部系统有连接交互时延迟重启可能会有帮助,启用 checkpoint 的话,延迟重启的时间是 10 秒,否则使用 akka.ask.timeout 的值。

restart-strategy.fixed-delay.delay: 10 s

python程序设置重启策略为 "fixed-delay":

table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")
FailureRateRestartstrategy(故障率重启策略)

FailureRateRestartstrategy 是故障率重启策略,在发生故障之后重启作业,如果固定时间间隔之内发生故障的次数超过设置的值后,作业就会失败停止,该重启策略也支持设置连续两次重启之间的等待时间。

在 flink-conf.yaml 中可以像下面这样配置:

restart-strategy: failure-rate

restart-strategy.failure-rate.max-failures-per-interval:

#固定时间间隔内允许的最大重启次数,默认 1

restart-strategy.failure-rate.failure-rate-interval: 5 min 

#固定时间间隔,默认 1 分钟

restart-strategy.failure-rate.delay: 10 s

#连续两次重启尝试之间的延迟时间,默认是 akka.ask.timeout

python程序设置重启策略为 "fixed-delay":

table_env.get_config().get_configuration().set_string("restart-strategy", "failure-rate")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.delay", "1s")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.failure-rate-interval", "1 min")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.max-failures-per-interval", "1")
NoRestartstrategy(不重启策略)

NoRestartstrategy 作业不重启策略,直接失败停止,在 flink-conf.yaml 中配置如下:

restart-strategy: none

配置State Backends 以及 Checkpointing

Checkpoint

Flink为了使 State 容错,需要有 State checkpoint(状态检查点)。

Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义。

Checkpoint 使用的先决条件:

一个持久化的,能够在一定时间范围内重放记录的数据源。

例如,持久化消息队列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系统:HDFS,S3,GFS,NFS,Ceph...

State 持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph...

启用和配置

Checkpoint 默认情况下是不启用的。StreamExecutionEnvironment 对象调用 enableCheckpointing(n) 启用 Checkpoint,其中n是以毫秒为单位的 Checkpoint 间隔。

Checkpoint 的配置项包括:

#设置 checkpoint 模式为 EXACTLY_ONCE

#Checkpoint 支持这两种模式:恰好一次exactly-once或至少一次at-least-once

#对于大多数应用来说,EXACTLY_ONCE是优选的。at-least-once可能在某些要求超低延迟(几毫秒)的应用程序使用。

table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")

#Checkpoint最小间隔时间,设置为5000,表示在上一个 checkpoint 完成后的至少5秒后才会启动下一个 checkpoint

table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")

#Checkpoint 超时时间:在超时时间内 checkpoint 未完成,则中止正在进行的 checkpoint

table_env.get_config().get_configuration().set_string("execution.checkpointing.timeout", "10min")

#Checkpoint并发数:最多可以同时运行checkpoint的数量,当达到最大值,必须其中一个完成,才能新启一个。

table_env.get_config().get_configuration().set_string("execution.checkpointing.max-concurrent-checkpoints", "2")
选择 State backend

Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库...)。

默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。

Flink 自带了以下几种开箱即用的 state backend:

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

在没有配置的情况下,系统默认使用 MemoryStateBackend

MemoryStateBackend

使用 MemoryStateBackend,在 checkpoint 中对 State 做一次快照,并在向 JobManager 发送 checkpoint 确认完成的消息中带上此快照数据,然后快照就会存储在 JobManager 的内存堆中。

FsStateBackend

FsStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

FsStateBackend 在 TaskManager 的内存中持有正在处理的数据。

Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

FsStateBackend 默认是异步操作,以避免在写 state snapshot 时阻塞处理程序。如果要禁用异步,可以在 FsStateBackend 构造函数中设置

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

RocksDBStateBackend 在 RocksDB 中持有正在处理的数据,RocksDB 在 TaskManager 的数据目录下。

Checkpoint 时将整个 RocksDB 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

RocksDBStateBackend 通常也是异步的。目前唯一支持增量 checkpoint。

使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制。这也意味着可以实现的最大吞吐量更低,后台的所有读/写都必须通过序列化和反序列化来检索/存储 State,这也比使用基于堆内存的方式代价更昂贵。

State backend比较

StateBackend

in-flight

checkpoint

吞吐

推荐使用场景

MemoryStateBackend 

TM Memory

JM Memory

调试、无状态或对数据丢失或重复无要求

FsStateBackend     

TM Memory

FS/HDFS

普通状态、窗口、KV 结构

RocksDBStateBackend

RocksDB on TM

FS/HDFS

超大状态、超长窗口、大型 KV 结构

# 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" "jobmanager"

# 你也可以将这个属性设置为 StateBackendFactory 的完整类名

# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")

# 设置 RocksDB statebackend 所需要的 checkpoint 目录文章来源地址https://www.toymoban.com/news/detail-539189.html

table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")

到了这里,关于Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(34)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(38)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(27)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(35)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(31)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(34)
  • Flink流批一体计算(1):流批一体和Flink概述

    Apache Flink应运而生 数字化经济革命的浪潮正在颠覆性地改变着人类的工作方式和生活方式,数字化经济在全球经济增长中扮演着越来越重要的角色,以互联网、云计算、大数据、物联网、人工智能为代表的数字技术近几年发展迅猛,数字技术与传统产业的深度融合释放出巨大

    2024年02月10日
    浏览(31)
  • flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)

    前言 :今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值

    2024年02月19日
    浏览(32)
  • flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

    前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇! Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油! 3. 物理分区 3.1 Global Partitioner 该分区器会将所有的数据都发送到下游的某个算子实例(subta

    2024年02月19日
    浏览(30)
  • Flink流批一体计算(7):Flink优化

    目录 配置内存 设置并行度 操作场景 具体设置 补充 配置进程参数 操作场景 具体配置 配置netty网络通信 操作场景 具体配置 配置内存 Flink 是依赖内存计算,计算过程中内存不够对 Flink 的执行效率影响很大。可以通过监控 GC ( Garbage Collection ),评估内存使用及剩余情况来判

    2024年02月12日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包