使用事件时间时,需要通过 Flink API 的 WatermarkStrategy
接口配置 watermark 的生成策略。
我们将逐段来看这个 API 的各个部分。
继承关系
Flink 使用 WatermarkStrategy<T>
接口来构建 Watermark 策略,其中泛型 T
为输入数据流类型。
WatermarkStrategy
接口继承了 TimestampAssignerSupplier
和 WatermarkGeneratorSupplier
,即相当于包含了 TimestampAssigner
和 WatermarkGenerator
,具体地:
-
TimestampAssigner
用于从消息记录中提取事件时间戳 -
WatermarkGenerator
用于根据事件时间戳生成 watermark
源码:
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L19
【Github】package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import java.io.Serializable; import java.time.Duration; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @Public public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
TimestampAssignerSupplier
和 WatermarkGeneratorSupplier
均继承了 Serializable
,所以WatermarkStrategy
也继承了 Serializable
,是可序列化的。这是因为在分布式计算过程中,WatermarkStrategy
可能会在不同节点之间传输。
接口方法
需要实现或重写的方法
在 WatermarkStrategy
接口中,有如下 3 个方法是需要被实现的,这 3 个方法分别对应需要确定的 3 个问题:
-
createWatermarkGenerator()
:返回实现了WatermarkGenerator
接口的对象,该对象用于根据输入流记录的事件时间生成 watermark -
createTimestampAssigner()
:返回实现了TimestampAssigner
接口的对象,该对象用于从输入流中获取每条记录的事件时间 -
getAlignmentParameters()
:返回WatermarkAlignmentParams
类的实例,该实例用于控制是否需要对齐不同输入流的 watermark
源码|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy
(部分)
// ------------------------------------------------------------------------
// Methods that implementors need to implement.
// ------------------------------------------------------------------------
/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this strategy.
*/
@Override
default TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
// By default, this is {@link RecordTimestampAssigner},
// for cases where records come out of a source with valid timestamps, for example from
// Kafka.
return new RecordTimestampAssigner<>();
}
/**
* Provides configuration for watermark alignment of a maximum watermark of multiple
* sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*/
@PublicEvolving
default WatermarkAlignmentParams getAlignmentParameters() {
return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
}
其中,只有 createWatermarkGenerator(WatermarkGeneratorSupplier.Context context)
方法是非默认的,这样就可以通过 Java 的 lambda 表达式语法来实现 WatermarkStrategy
接口。
指定 WatermarkGenerator
的方法
WatermarkStrategy
接口提供了如下 4 个直接指定 WatermarkGenerator
以实现 WatermarkStrategy
的默认方法。因为 WatermarkStrategy
接口只有 createWatermarkGenerator
这 1 个方法没有默认实现,所以在实现上均使用了 Java 的 lambda 表达式。
-
forMonotonousTimestamps
:使用AscendingTimestampsWatermarks
生成 watermark,适用于事件时间单调递增的场景 -
forBoundedOutOfOrderness
:使用BoundedOutOfOrdernessWatermarks
生成 watermark,适用于事件时间虽然不单调递增,但延迟时间有限的场景 -
forGenerator
:使用参数提供的WatermarkGeneratorSupplier
生成的 watermark 生成器 -
noWatermarks
:不使用 watermark
源码|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy.java:197
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
return generatorSupplier::createWatermarkGenerator;
}
static <T> WatermarkStrategy<T> noWatermarks() {
return (ctx) -> new NoWatermarksGenerator<>();
}
指定 TimestampAssigner
的方法
在 WatermarkStrategy
接口中,提供了如下 3 个指定 TimestampAssigner
的默认方法。这些方法会创建一个新的 WatermarkStrategy
类,并将调用该方法的基础 WatermarkStrategy
类封装起来;在调用新类的 createWatermarkGenerator
方法时,会使用新类中重写的方法;在调用新类的其他方法时,会返回被封装的基础类的方法。
-
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner)
:使用参数指定的TimestampAssigner
的提供者类 -
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner)
:使用参数指定的可序列化的TimestampAssigner
源码|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy
(部分)文章来源:https://www.toymoban.com/news/detail-846325.html
default WatermarkStrategy<T> withTimestampAssigner(
TimestampAssignerSupplier<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
}
default WatermarkStrategy<T> withTimestampAssigner(
SerializableTimestampAssigner<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(
this, TimestampAssignerSupplier.of(timestampAssigner));
}
制定处理空闲数据源的方法
在 WatermarkStrategy
接口中,提供了 withIdleness(Duration idleTimeout)
方法用于处理存在空闲数据源的情况,其参数为标记空闲数据源的时间阈值,即当某个数据源超过 idleTimeout
没有生产消息时则标记为空闲数据源。
源码|Github|org.apache.flink.api.common.eventtime.WatermarkStrategy#withIdleness
/**
* Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
* created {@link WatermarkGenerator}.
*
* <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
* stream for that amount of time, then that partition is considered "idle" and will not hold
* back the progress of watermarks in downstream operators.
*
* <p>Idleness can be important if some partitions have little data and might not have events
* during some periods. Without idleness, these streams can stall the overall event time
* progress of the application.
*/
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(
!(idleTimeout.isZero() || idleTimeout.isNegative()),
"idleTimeout must be greater than zero");
return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}
指定 AlignmentParameters
的方法
在 WatermarkStrategy
接口中,还提供了如下 2 个方法,用于指定 watermark 的对齐方法。在实现上,这 2 个方法也与指定 TimestampAssigner
的方法类似,在新类中重写 getAlignmentParameters()
方法。
-
withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift)
:指定 watermark 对齐策略;其中参数watermarkGroup
为 watermark 的组名,maxAllowedWatermarkDrift
为在暂停消费前允许超出 watermark 时间 -
withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval)
:指定 watermark 对齐策略;其中新增的参数updateInterval
为 task 上报 watermark 以及调度器制定对齐 watermark 的时间间隔
源码|Github|org/apache/flink/api/common/eventtime/WatermarkStrategy
(部分)
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift) {
return withWatermarkAlignment(
watermarkGroup,
maxAllowedWatermarkDrift,
WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
}
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
return new WatermarksWithWatermarkAlignment<T>(
this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
}
整体说明
通过上述设计,使 WatermarkStrategy
接口允许使用实现 WatermarkGenerator
方法的 Java Lambda 表达式实现;对于已实现的 WatermarkStrategy
匿名类,允许通过调用指定 TimestampAssigner
和 AlignmentParameters
的方法,来覆盖掉这两个方法的默认实现。文章来源地址https://www.toymoban.com/news/detail-846325.html
到了这里,关于Flink 源码学习|使用 Watermark 策略(WatermarkStrategy)【v2 修订版】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!