一、说明
基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。文章来源地址https://www.toymoban.com/news/detail-689287.html
二、基于处理时间的定时器
package com.lyh.flink08;
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999)
.map(line -> {
String[] datas = line.split(",");
return new WaterSensor(datas[0],
Long.valueOf(datas[1]),
Integer.valueOf(datas[2]));
});
stream.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
out.collect(value.toString());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
System.out.println(timestamp);
out.collect("wo be chu fa le ");
}
}).print();
env.execute();
}
}
三、基于事件时间的定时器
package com.lyh.flink08;
import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class EventTime_s {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999)
.map(line -> {
String[] datas = line.split(",");
return new WaterSensor(
datas[0],
Long.valueOf(datas[1]),
Integer.valueOf(datas[2]));
});
WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);
stream.assignTimestampsAndWatermarks(wms)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
System.out.println(ctx.timestamp());
ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);
out.collect(value.toString());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
System.out.println("定时器被触发了");
}
}).print();
env.execute();
}
}
文章来源:https://www.toymoban.com/news/detail-689287.html
到了这里,关于大数据-玩转数据-Flink定时器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!