自定义实现 WatermarkStrategy接口
窗口类型:滚动窗口
代码:
public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{
private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);
@Override
public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<JSONObject>() {
private long maxWatermark;
@Override
public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
state.f0 = System.currentTimeMillis();
System.out.println("maxWatermark is " + maxWatermark);
state.f1 = false;
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
//乱序时间
long outOfTime = 3000L;
if (maxWatermark - outOfTime <=0){
} else {
// 10s内没有数据则关闭当前窗口
System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));
System.out.println("state.f1:" + state.f1);
if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
watermarkOutput.emitWatermark(new Watermark(maxWatermark + 6000L));
state.f1 = true;
System.out.println("触发窗口,maxWatermark + 6000L:" + (maxWatermark + 6000L));
} else {
System.out.println("正常发送水印");
watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
}
}
}
};
}
}
代码部分逻辑说明
若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);
使用自定义的watermark:
watermark 周期生成()的疑问:
1、默认200ms,会连续生成4次后,不会继续生成了
2、设置了周期生成间隔,env.getConfig().setAutoWatermarkInterval(1000L); 只会周期生成一次文章来源:https://www.toymoban.com/news/detail-801089.html
参考:https://blog.csdn.net/lr131425/article/details/127422833文章来源地址https://www.toymoban.com/news/detail-801089.html
到了这里,关于flink 最后一个窗口一直没有新数据,窗口不关闭问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!