测试反馈, 配置的flink任务提交上去后, 输入数据源符合条件,到时间窗口的size。最后一个窗口没有闭窗计算,数据并没及时输出告警
经过调试发现,watermark没有向后继续推进,导致无法闭窗, watermark的时间取的是数据中的业务时间,create_time。
因为没有后续数据进来, 所以watermark一直停在收到的最后一条数据的时间,,
按照官网的watermark的实现:
inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new EventWaterMark(window))
.withTimestampAssigner((SerializableTimestampAssigner<JSONObject>) (jsonObject, l) -> jsonObject.getLong(window.getAttrField()))).name("Watermark|"+rule.getName());
public class EventWaterMark implements WatermarkGeneratorSupplier<JSONObject> {
private static final long serialVersionUID = -2338922000184097299L;
private final String eventTimeFieldNameJSONObject;
private long currentMaxTimestamp;
private String eventTimeFieldName;
private long maxOutOfOrderness;
public EventWaterMark(Window window) {
this.maxOutOfOrderness = window.getMaxOutOfOrderness();
this.eventTimeFieldNameJSONObject = window.getAttrField();
this.eventTimeFieldName = window.getAttrField();
}
@Override
public WatermarkGenerator<JSONObject> createWatermarkGenerator(Context context) {
return new WatermarkGenerator<JSONObject>() {
@Override
public void onEvent(JSONObject jsonObject, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, jsonObject.getLong(eventTimeFieldName));
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// 无后续数据,窗口不会关闭计算?
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
# 使用当前时间,无法处理历史的数据
// watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - maxOutOfOrderness));
}
};
}
}
解决思路: 记录最后一条数据进入的时间, 多少秒后无数据进来,就主动向后推进watermark多少秒,保持watermark向后推进
更新后自定义的watermark实现:
public class EventWaterMark implements WatermarkGeneratorSupplier<JSONObject> {
private static final long serialVersionUID = -2338922000184097299L;
private final String eventTimeFieldNameJSONObject;
private long currentMaxTimestamp;
private String eventTimeFieldName;
private long maxOutOfOrderness;
// 当前数据进入的时间
private long currentDateTimeMillis;
public EventWaterMark(Window window) {
this.maxOutOfOrderness = window.getMaxOutOfOrderness();
this.eventTimeFieldNameJSONObject = window.getAttrField();
this.eventTimeFieldName = window.getAttrField();
}
@Override
public WatermarkGenerator<JSONObject> createWatermarkGenerator(Context context) {
return new WatermarkGenerator<JSONObject>() {
@Override
public void onEvent(JSONObject jsonObject, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, jsonObject.getLong(eventTimeFieldName));
// 记录 当前来的数据来到 时间戳
currentDateTimeMillis = System.currentTimeMillis();
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// 无后续数据,最后一个窗口不会关闭计算,
// watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
// 使用System.currentTimeMillis(),如果第一条数据时间很早, 窗口不会进行关闭计算
// watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - maxOutOfOrderness));
// 无初始数据进入 watermark不用推进, 多并行度时候,会取最小的watermark,通过withIdleness 将分区标记为空闲,这样watermark就可以往下走了。
if (currentMaxTimestamp - maxOutOfOrderness <= 0) {
} else {
// 5秒内无数据进入,watermark保持向后推进, 确保无数据进入,窗口能闭窗计算
if (System.currentTimeMillis() - currentDateTimeMillis >= 5000) {
// 无数据进入,保持watermark递增
currentMaxTimestamp = currentMaxTimestamp + System.currentTimeMillis() - currentDateTimeMillis;
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
// 模拟数据进入 设置数据进入时间
currentDateTimeMillis = System.currentTimeMillis();
} else {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
}
}
}
};
}
}
上面虽然可以解决最后一个窗口闭合问题,但是还有个问题,当执行繁忙的时候,出现反压,导致source消费为0, 此时持续高反压时间大于代码中配置的5秒后, 此时自定义的watermark由于没收到上游source算子的数据,就会自动的向后推进watermark,这样就会导致,窗口会及时闭合计算,当反压结束后, source再来的数据 已经超过 窗口时间和延迟时间,, 直接作为迟到数据不参与窗口计算了。 文章来源:https://www.toymoban.com/news/detail-549147.html
此处暂未想到更好的解决方法文章来源地址https://www.toymoban.com/news/detail-549147.html
到了这里,关于flink时间窗口无新的数据进来最后一个窗口不关闭的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!