Flink 学习六 Flink 窗口计算API

这篇具有很好参考价值的文章主要介绍了Flink 学习六 Flink 窗口计算API。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 学习六 Flink 窗口计算API

1.窗口 (window)概念和分类

窗口 window 是处理无限流的核心就是把无界的数据流,按照一定的规则划分成一段一段的有界的数据流(桶),然后再这个有界的数据流里面去做计算;

Flink 学习六 Flink 窗口计算API

2.分类体系

2.1 滚动窗口

相邻窗口之间是没有数据重合

  • window 大小可以是时间,可以是数据长度
  • 按照数据流是否可以是 keyed , 在分类,nonkey window 也叫做global window (并行度为1)

Flink 学习六 Flink 窗口计算API

2.2 滑动窗口

滑动步长小于窗口大小,数据会有重合;正常情况下是有数据重合的

  • window 大小可以是时间,可以是数据长度
  • 按照数据流是否可以是 keyed , 在分类,nonkey window 也叫做global window (并行度为1)

Flink 学习六 Flink 窗口计算API

2.3 会话窗口

按照时间分类的一个类别,一段时间没有数据,就重新开一个窗口;

  • window 大小可以是时间;
  • 按照数据流是否可以是 keyed , 在分类,nonkey window 也叫做global window (并行度为1)

Flink 学习六 Flink 窗口计算API

3.窗口计算API

计算的API 主要是根据最后window的类型是否是 Keyed ,

3.1 KeyedWindow

上游数据按照hash key 后计算,并行度可控

stream
       .keyBy(...)               <-  keyed versus non-keyed windows      //按照上面key 分组
       .window(...)              <-  required: "assigner" //window 类别 是 滚动/滑动/会话
      [.trigger(...)]            <-  optional: "trigger" (else default trigger) //数据进入桶触发
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)  // 桶数据擦除策略
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)  //是否允许数据迟到
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data) //测流输出处理迟到数据,补救策略
       .reduce/aggregate/apply()      <-  required: "function" //处理函数,一般是聚合计算
      [.getSideOutput(...)]      <-  optional: "output tag" //
        //滚动窗口  处理时间语义
        dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        //滑动窗口  处理时间语义
        dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)));

        //滚动窗口  事件时间语义
        dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
        //滑动窗口  事件时间语义
        dataStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)));
        
        //计数滚动窗口
        dataStream.countWindowAll(100);
        //计数滑动窗口
        dataStream.countWindowAll(100,20);

3.2 NonKeyedWindow

上游数据聚合到一起计算 ,并行度是1

stream 
       .windowAll(...)           <-  required: "assigner"  //窗口类型是全局窗口 ,这个不一样  //window 类别 是 滚动/滑动/会话
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
        //滚动窗口  处理时间语义
        keyedStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        //滑动窗口  处理时间语义
        keyedStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)));

        //滚动窗口  事件时间语义
        keyedStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
        //滑动窗口  事件时间语义
        keyedStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)));

        //计数滚动窗口
        keyedStream.countWindow(100);
        //计数滑动窗口
        keyedStream.countWindow(100,20);

3.3窗口聚合算子

3.3.1 增量窗口聚合算子

数据来一条出库一条,比如累加, min,max,minBy,maxBy,sum,reduce,aggregate

min,max,minBy,maxBy,sum

        KeyedStream<EventLog, Long> keyedStream = streamOperator.keyBy(EventLog::getGuid);
        SingleOutputStreamOperator<EventLog> serviceTime = keyedStream
                .countWindow(10,2) // 创建滑动 count 窗口 
                .min("serviceTime") ; //serviceTime 是符合逻辑的,其他字段是窗口的第一条数据
				.max("serviceTime") ; //serviceTime 是符合逻辑的,其他字段是窗口的第一条数据
				.minBy("serviceTime") ; //serviceTime 是符合逻辑的,其他字段是 最小 serviceTime  所在哪一行
				.maxBy("serviceTime") ; //serviceTime 是符合逻辑的,其他字段是 最大 serviceTime  所在哪一行
				.sum("serviceTime") ; // serviceTime 是符合逻辑的(serviceTime 字段之和),其他字段是窗口的第一条数据

reduce

reduce 函数的返回值写死了,和数据流中一样

        keyedStream.countWindow(10,2).reduce(new ReduceFunction<EventLog>() {
            @Override
            public EventLog reduce(EventLog value1, EventLog value2) throws Exception {
                //
                return null;
            }
        })

aggregate

aggregate 函数的返回值可以自定义: 详细见下面 3.4.1

    keyedStream.countWindow(10,2).aggregate(new AggregateFunction<EventLog, Object, Object>() {
            ......
        })
3.3.2 全量聚合算子

process

详细见下面 3.4.2

相比于apply 更加的强大,可以拿到上下文

apply

        keyedStream.countWindow(10,2).apply(new WindowFunction<EventLog, Object, Long, GlobalWindow>() {
            /**
             * Evaluates the window and outputs none or several elements.
             *
             * @param key  key
             * @param window  窗口,可以拿到开始和结束
             * @param input The elements in the window being evaluated. //窗口所有数据
             * @param out //收集器
             */
            @Override
            public void apply(Long aLong, GlobalWindow window, Iterable<EventLog> input, Collector<Object> out) throws Exception {
            }
        })
3.3.3 示例

演示 KeyedWindow 使用的较多,和NonKeyedWindow 区别也不大;

原始需求 :

计算时间窗口内 EventLog 中根据guid 分组的个数

@Data
@AllArgsConstructor
@NoArgsConstructor
public class EventLog {

    private Long guid; //用户id
    private String event;//用户事件
    private String timeStamp; //事件发生时间
    private Integer serviceTime; //接口时间

}

自定义source 生成数据使用

public class CustomEventLogSourceFunction implements SourceFunction<EventLog> {

	volatile boolean runFlag = true;
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");


	@Override
	public void run(SourceContext<EventLog> sourceContext) throws Exception {
        Random random = new Random();
        while (runFlag) {
            EventLog eventLog = new EventLog(Long.valueOf(random.nextInt(5)),"xw"+random.nextInt(3),simpleDateFormat.format(new Date()));
            Thread.sleep(random.nextInt(5)*1000);
            sourceContext.collect(eventLog);
		}

	}

	@Override
	public void cancel() {
		runFlag = false;
	}
}

aggregate

聚合:窗口中**,拿到每一条数据**,去做聚合计算


public class _03_Window {

	public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
		// 获取环境
		Configuration configuration = new Configuration();
		configuration.setInteger("rest.port", 8822);
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(1); //并行度设置为1 好观察
		//DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        DataStreamSource<EventLog> dataStream = env.addSource(new CustomEventLogSourceFunction());

		// 创建 Watermark 策略 事件时间推进策略
		WatermarkStrategy<EventLog> watermarkStrategy = WatermarkStrategy
				.<EventLog>forBoundedOutOfOrderness(Duration.ofMillis(0))
				.withTimestampAssigner(new SerializableTimestampAssigner<EventLog>() {
					@Override
					public long extractTimestamp(EventLog element, long recordTimestamp) {
                        try {
                            return sdf.parse(element.getTimeStamp()).getTime();
                        } catch (Exception e) {
                            return 0;
                        }
                    }
				});
		// 分配wm , 使用事件时间
		DataStream<EventLog> streamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy).disableChaining();

		// 需求 10 s 统计30s 数据 用户行为个数
		streamOperator.keyBy(EventLog::getGuid)
                .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 创建滑动窗口
				// .apply() // 全部窗口数据给你,自定义全窗口运算逻辑,返回计算结果,灵活度更大
                //.process() //全部窗口数据给你,自定义全窗口运算逻辑,返回计算结果,灵活度更大 可以获取上下文,数据更多
				.aggregate(new AggregateFunction<EventLog, Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { // api 聚合
					// 初始化累加器
					@Override
					public Tuple2<Long,Integer> createAccumulator() {
						return Tuple2.of(null,0);
					}

					// 累加逻辑
					@Override
					public Tuple2<Long,Integer> add(EventLog value, Tuple2<Long,Integer> accumulator) {
						return Tuple2.of(value.getGuid(),accumulator.f1+1);
					}

					// 获取结果
					@Override
					public Tuple2<Long,Integer> getResult(Tuple2<Long,Integer> accumulator) {
						return accumulator;
					}

					// 分区合并 方法
					@Override
					public Tuple2<Long,Integer> merge(Tuple2<Long,Integer> a, Tuple2<Long,Integer> b) {
						return Tuple2.of(a.f0,a.f1+b.f1);
					}
				}).print();


		env.execute();

	}
}

process

聚合:窗口结束后,拿到窗口中每一条数据,也可以拿到上下文,去做自定义的逻辑处理数据,比 aggregate 更加的自由

keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                .process(new ProcessWindowFunction<EventLog, Tuple2<Long, Integer>, Long, TimeWindow>() {
                    @Override
                    public void process(Long aLong,
                                        ProcessWindowFunction<EventLog, Tuple2<Long, Integer>, Long, TimeWindow>.Context context,
                                        Iterable<EventLog> elements, Collector<Tuple2<Long, Integer>> out) throws Exception {
                        TimeWindow window = context.window();
                        String start = sdf.format(new Date(window.getStart()));
                        String end = sdf.format(new Date(window.getEnd()));
                        Iterator<EventLog> iterator = elements.iterator();
                        int count = 0;
                        while (iterator.hasNext()) {
                            iterator.next();
                            count++;
                        }
                        System.out.println("start:" + start + ";end:" + end + ";count:" + count);
                        out.collect(Tuple2.of(aLong, count));
                    }
                }).print();

3.4 延迟数据处理

3.4.1 allowdLateness

允许数据迟到 3.5 的图可以参考下


public class _05_Window_allowedLateness {

    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        // 获取环境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8822);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(1);
        DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        // 字符串流==> 对象流

        SingleOutputStreamOperator<EventLog> outputStreamOperator = dataStreamSource.map(new MapFunction<String, EventLog>() {
            @Override
            public EventLog map(String value) throws Exception {
                String[] split = value.split(",");
                return new EventLog(Long.valueOf(split[0]), split[1], split[2]);
            }
        });

        // 创建 Watermark 策略 事件时间推进策略
        WatermarkStrategy<EventLog> watermarkStrategy = WatermarkStrategy
                .<EventLog>forBoundedOutOfOrderness(Duration.ofMillis(0))
                .withTimestampAssigner(new SerializableTimestampAssigner<EventLog>() {
                    @Override
                    public long extractTimestamp(EventLog element, long recordTimestamp) {
                         return  Long.valueOf(element.getTimeStamp());
                    }
                });
        // 分配wm , 使用事件时间
        DataStream<EventLog> streamOperator = outputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy)
                .disableChaining();
        OutputTag<EventLog> latedata = new OutputTag<EventLog>("latedata", TypeInformation.of(EventLog.class));
        KeyedStream<EventLog, Long> keyedStream = streamOperator.keyBy(EventLog::getGuid);

        SingleOutputStreamOperator<EventLog> guid = keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(30)))
                .allowedLateness(Time.seconds(3))  //允许迟到5s
                .sideOutputLateData(latedata)  //超过允许迟到的值后 ,输出到测流
                .apply(new WindowFunction<EventLog, EventLog, Long, TimeWindow>() {
                    long count = 0;
                    @Override
                    public void apply(Long aLong, TimeWindow window, Iterable<EventLog> input, Collector<EventLog> out) throws Exception {
                        for (EventLog eventLog : input) {
                            count+=eventLog.getGuid();
                        }
                        EventLog eventLog = new EventLog();
                        eventLog.setGuid(count);
                        out.collect(eventLog);
                        String start = sdf.format(new Date(window.getStart()));
                        String end = sdf.format(new Date(window.getEnd()));
                        System.out.println("==> start:" + start + ";end:" + end + ";count:" + count);
                    }
                });

        guid.print("主流输出");
        DataStream<EventLog> sideOutput = guid.getSideOutput(latedata);
        sideOutput.print("测流输出");
        env.execute();

    }
}
3.4.2 window时间&watermark

Flink 学习六 Flink 窗口计算API

3.5 窗口的触发机制

3.5.1 trigger 窗口计算触发

窗口计算的触发,是trigger 类决定,不同的 WindowAssigner 对应不同的trigger

Trigger (org.apache.flink.streaming.api.windowing.triggers)
    ProcessingTimeoutTrigger (org.apache.flink.streaming.api.windowing.triggers)
    EventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
    CountTrigger (org.apache.flink.streaming.api.windowing.triggers)
    DeltaTrigger (org.apache.flink.streaming.api.windowing.triggers)
    NeverTrigger in GlobalWindows (org.apache.flink.streaming.api.windowing.assigners)
    ContinuousEventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
    PurgingTrigger (org.apache.flink.streaming.api.windowing.triggers)
    ContinuousProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
    ProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)

EventTimeTrigger

事件时间触发器

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    /**
    * 数据来的时候触发
    */
    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        //窗口的最大时间小于当前的Watermark() 开启新窗口
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else { //否则继续
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    //当事件时间推进的时候 也判断是否开启新窗口
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

     //该类是处理事件时间,不处理ProcessingTime
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    /**
     * Creates an event-time trigger that fires once the watermark passes the end of the window.
     *
     * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
     * trigger window evaluation with just this one element.
     */
    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

CountTrigger

数据窗口触发器

public class CountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private CountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    // 数量超过最大 触发
    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
            throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
// 不做 onEventTime 的逻辑潘墩
    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }
// 不做 onProcessingTime 的逻辑潘墩
    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "CountTrigger(" + maxCount + ")";
    }

    /**
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount The count of elements at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> CountTrigger<W> of(long maxCount) {
        return new CountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}

结合上面的例子 写一个自定义触发器,当数据流中出现特定值,立刻触发窗口的计算,而不需要等到窗口结束

  class IEventTimeTrigger extends Trigger<EventLog, TimeWindow> {
    private static final long serialVersionUID = 1L;


      /**
       * watermark 是不是大于窗口结束点 触发新的窗口
       */
    @Override
    public TriggerResult onElement(
            EventLog element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            if(element.getGuid().equals(1111111)){   //如果数据的GUID 是 1111111 提前触发窗口计算0
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

}

3.5.2 evictor窗口计算移出数据

窗口触发前/后 数据的移除机制

Evictor (org.apache.flink.streaming.api.windowing.evictors)
	CountEvictor (org.apache.flink.streaming.api.windowing.evictors)  //数量窗口清除
	DeltaEvictor (org.apache.flink.streaming.api.windowing.evictors)  //滑动窗口清除
	TimeEvictor (org.apache.flink.streaming.api.windowing.evictors)   //时间窗口清除
public interface Evictor<T, W extends Window> extends Serializable {

//计算之前
    void evictBefore(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);

//计算之后调用
    void evictAfter(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);
...............
}

重写一个ITimeEvictor 相对于 TimeEvictor 的改动

class ITimeEvictor<W extends Window> implements Evictor<EventLog, W> {
......................

	private void evict(Iterable<TimestampedValue<EventLog>> elements, int size, EvictorContext ctx) {
		if (!hasTimestamp(elements)) {
			return;
		}

		long currentTime = getMaxTimestamp(elements);
		long evictCutoff = currentTime - windowSize;

		for (Iterator<TimestampedValue<EventLog>> iterator = elements.iterator(); iterator.hasNext();) {
			TimestampedValue<EventLog> record = iterator.next();
			// if (record.getTimestamp() <= evictCutoff)
			// 添加条件 ,窗口计算移除 Guid().equals(222222)
			if (record.getTimestamp() <= evictCutoff || record.getValue().getGuid().equals(222222)) {
				iterator.remove();
			}
		}
	}
 .................................
}
3.5.3 调用示例
   keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)))
				.trigger(new IEventTimeTrigger()) // 如果数据的GUID 是 1111111 提前触发窗口计算
				.evictor(ITimeEvictor.of(Time.seconds(30), false)) // 计算前移除GUID 是 222222的数据
				.apply(.....)
3.5.4 调用时机

Flink 学习六 Flink 窗口计算API

  • 数据到达窗口后,调用 Trigger 的 onElement() 方法

  • 根据Trigger 的 onElement() 方法 返回值 判断是否要触发窗口计算

  • 若触发窗口计算,在计算前调用Evictor(after/before) 来移除某些数据文章来源地址https://www.toymoban.com/news/detail-493711.html

到了这里,关于Flink 学习六 Flink 窗口计算API的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 8 分钟看完这 7000+ 字,Flink 时间窗口和时间语义这对好朋友你一定搞得懂!外送窗口计算和水印一并搞懂!!!

    目录 一、时间语义 时间窗口 1. 前摘: 1.1 Flink的时间和窗口 1.2 什么是时间窗口和时间语义呢? 2. 时间窗口 2.1 举个例子: 2.2 3个实时数据计算场景 3. 时间语义 二、Flink上进行窗口计算: 1. 一个Flink窗口应用的大致骨架结构 2. Flink窗口的骨架结构中有两个必须的两个操作:

    2024年01月23日
    浏览(38)
  • 《十堂课学习 Flink》第七章:Flink 流计算保存结果env.sinkTo(以 Kafka / ES 为例)

    本章基于Elastic Search 以及 Kafka 用于介绍 Flink 的 sinkTo / addSink 的 API 的使用方法,此外我们还会实现几个通用的方法,在实际应用场景中,针对不同的实体类可以通过这个通用的方法来完成,而不需要一对一地实现。 flink 写数据到ES 此外,还将编写一个通用的工具类,用于

    2024年04月26日
    浏览(31)
  • 【Flink】Flink窗口触发器

           数据进入到窗口的时候,窗口是否触发后续的计算由窗口触发器决定,每种类型的窗口都有对应的窗口触发机制。WindowAssigner 默认的 Trigger通常可解决大多数的情况。我们通常使用方式如下,调用trigger()方法把我们想执行触发器传递进去:  SingleOutputStreamOperatorProduct

    2024年02月12日
    浏览(36)
  • flink 最后一个窗口一直没有新数据,窗口不关闭问题

    窗口类型:滚动窗口 代码: 代码部分逻辑说明 若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算) env.getConfig().setAutoWatermarkInterval(5000); 使用自定义的watermark: watermark 周期生成()的疑问: 1、默认200ms,会连续生成4次后,

    2024年01月18日
    浏览(39)
  • flink 窗口函数

    时间语义 事件像水流一样到来,经过pipline进行处理,为了划定窗口进行计算,需要以时间作为标准,也就是流中元素事件的先后以及间隔描述。 flink是一个分布式系统,如何让所有机器保证时间的完全同步呢。比如上游任务8点59分59秒发送了消息,到达下游时是9点零1秒,那

    2024年02月01日
    浏览(77)
  • Flink 窗口

    介绍:流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段,其分为两种类型:1、时间窗口,2:计数窗口 时间窗口根据窗口实现原理的不同分成三类:滚

    2024年02月09日
    浏览(25)
  • Flink中的窗口

      如下图所示,在Flink中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。   注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间

    2024年02月04日
    浏览(37)
  • Flink Windows(窗口)详解

    Windows是流计算的核心。Windows将流分成有限大小的“buckets”,我们可以在其上应用聚合计算( ProcessWindowFunction , ReduceFunction , AggregateFunction 或 FoldFunction )等。在Flink中编写一个窗口计算的基本结构如下: Keyed Windows Non-Keyed Windows In a nutshell, a window is created as soon as the first

    2024年02月10日
    浏览(55)
  • Flink之窗口聚合算子

    1.窗口聚合算子 在Flink中窗口聚合算子主要分类两类 滚动聚合算子(增量聚合) 全窗口聚合算子(全量聚合) 1.1 滚动聚合算子 滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下: aggregate max maxBy

    2024年02月07日
    浏览(43)
  • Flink窗口函数

    Flink窗口函数是指对数据流中的数据进行分组和聚合操作的函数。 FlinkSQL支持对一个特定的窗口的聚合。例如有用户想统计在过去的1分钟内有多少用户点击了某个的网页。在这种情况下,我们可以定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算

    2023年04月24日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包