Flink CEP(三)pattern动态更新(附源码)

这篇具有很好参考价值的文章主要介绍了Flink CEP(三)pattern动态更新(附源码)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1.实现分析

2.代码实现

3.测试验证

4.源码地址


        

线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;

import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;

import cep.pattern.Pattern;

/**
 * @author StephenYou
 * Created on 2023-07-23
 * Description: 动态Pattern接口(用户调用API)不区分key
 */
public interface DynamicPatternFunction<T> extends Function, Serializable {

    /***
     * 初始化
     * @throws Exception
     */
    public void init() throws Exception;

    /**
     * 注入新的pattern
     * @return
     */
    public Pattern<T,T> inject() throws Exception;

    /**
     * 一个扫描周期:ms
     * @return
     */
    public long getPeriod() throws Exception;

    /**
     * 规则是否发生变更
     * @return
     */
    public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用

CEP.pattern(dataStream,pattern);

//动态Pattern

CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {
    /***
     * Dynamic injection pattern function 
     * @param input
     * @param dynamicPatternFunction
     * @return
     * @param <T>
     */
    public static <T> PatternStream<T> injectionPattern throws Exception (
            DataStream<T> input,
            DynamicPatternFunction<T> dynamicPatternFunction){
        return new PatternStream<>(input, dynamicPatternFunction); 
    }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {
    PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {
        this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));
    }
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;
        if (patternFunction == null ) {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    nfaFactory,
                    comparator,
                    pattern.getAfterMatchSkipStrategy(),
                    processFunction,
                    lateDataOutputTag);
        } else {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    patternFunction,
                    comparator,
                    null,
                    processFunction,
                    lateDataOutputTag);
        }

        增加对应的CepOperator构造函数。

    public CepOperator(
            final TypeSerializer<IN> inputSerializer,
            final boolean isProcessingTime,
            final DynamicPatternFunction patternFunction,
            @Nullable final EventComparator<IN> comparator,
            @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
            final PatternProcessFunction<IN, OUT> function,
            @Nullable final OutputTag<IN> lateDataOutputTag) {
        super(function);

        this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
        this.patternFunction = patternFunction;
        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;

        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
        this.nfaFactory = null;
    }

        加载Pattern,构造NFA

    @Override
    public void open() throws Exception {
        super.open();
        timerService =
                getInternalTimerService(
                        "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);

        //初始化
        if (patternFunction != null) {
            patternFunction.init();
            Pattern pattern = patternFunction.inject();
            afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
            boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
            nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
            long period = patternFunction.getPeriod();
            // 注册定时器检测规则是否变更
            if (period > 0) {
                getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);
            }
        }


        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

        context = new ContextFunctionImpl();
        collector = new TimestampedCollector<>(output);
        cepTimerService = new TimerServiceImpl();

        // metrics
        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        if (patternFunction != null) {
            // 规则版本更新
            if (needRefresh.value() < refreshVersion.get()) {
                //清除状态
                computationStates.clear();
                elementQueueState.clear();
                partialMatches.releaseCacheStatisticsTimer();
                //清除定时器
                Iterable<Long> registerTime = registerTimeState.get();
                if (registerTime != null) {
                    Iterator<Long> iterator = registerTime.iterator();
                    while (iterator.hasNext()) {
                        Long l = iterator.next();
                        //删除定时器
                        timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);
                        timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);
                        //状态清理
                        iterator.remove();
                    }
                }
                //更新当前的版本
                needRefresh.update(refreshVersion.get());
            }
        }
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        //初始化状态
        if (patternFunction != null) {
            /**
             * 两个标识位状态
             */
            refreshFlagState = context.getOperatorStateStore()
                    .getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));
            if (context.isRestored()) {
                if (refreshFlagState.get().iterator().hasNext()) {
                    refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
                }
            } else {
                refreshVersion = new AtomicInteger(0);
            }
            needRefresh = context.getKeyedStateStore()
                    .getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));
        }
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());
        patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {
            @Override
            public Map select(Map map) throws Exception {
                map.put("processingTime", System.currentTimeMillis());
                return map;
            }
        }).print();
        env.execute("SyCep");

    }

    public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {

        public TestDynamicPatternFunction() {
            this.flag = true;
        }

        boolean flag;
        int time = 0;
        @Override
        public void init() throws Exception {
            flag = true;
        }

        @Override
        public Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()
                            throws Exception {

            // 2种pattern
            if (flag) {
                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success");
                            }
                        })
                        .times(1)
                        .followedBy("middle")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail");
                            }
                        })
                        .times(1)
                        .next("end");
                return pattern;
            } else {

                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success2");
                            }
                        })
                        .times(2)
                        .next("middle2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail2");
                            }
                        })
                        .times(2)
                        .next("end2");
                return pattern;
            }
        }

        @Override
        public long getPeriod() throws Exception {
            return 10000;
        }

        @Override
        public boolean isChanged() throws Exception {
            flag = !flag ;
            time += getPeriod();
            System.out.println("change pattern : " + time);
            return true;
        }
    }

打印结果:符合预期

Flink CEP(三)pattern动态更新(附源码),大数据学习之路,# Flink CEP,flink,大数据

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern文章来源地址https://www.toymoban.com/news/detail-757182.html

到了这里,关于Flink CEP(三)pattern动态更新(附源码)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink(九)CEP

    1.概述 所谓 CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。那到底什么是“复杂事件处理”呢?就是可以在事件流里,检测到特定的事件组合并进行处理,比如说“连续登录失败”,或者“订

    2024年01月16日
    浏览(45)
  • Flink CEP (一)原理及概念

    目录 1.Flink CEP 原理 2.Flink API开发 2.1 模式 pattern 2.2 模式 pattern属性 2.3 模式间的关系   Flink CEP内部是用 NFA(非确定有限自动机) 来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为 起始状态 、 中间状态 、 最终状

    2024年02月16日
    浏览(51)
  • Flink-CEP 实战教程

      所谓CEP,其实就是“ 复杂事件处理(Complex Event Processing) ”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。   那到底什么是“复杂事件处理”呢?就是可以在事件流里,检测到特定的事件组合并进行处理,比如说“连续登录失败”,或者

    2024年02月02日
    浏览(60)
  • Flink CEP(Complex Event Processing)库

    复杂事件处理(Complex Event Processing,CEP)是一种用于在流式数据中识别和处理复杂事件模式的技术。Apache Flink 作为一个流式处理框架,也可以用于实现复杂事件处理。下面是 Flink 中实现复杂事件处理的一般原理: 事件流输入: 首先,Flink 接收外部的事件流作为输入。这些事

    2024年02月13日
    浏览(34)
  • Flink动态更新维表

         概念:Lookup join是针对于由作业流表触发,关联右侧维表来补全数据的场景 。 默认情况下,在流表有数据变更,都会触发维表查询(可以通过设置维表是否缓存,来减轻查询压力),由于不保存状态,因此对内存占用较小。(以上来自网络) 具体配置如下:        

    2024年02月09日
    浏览(40)
  • Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流(Upsert)

    Flink中使用表和SQL基本上跟其他场景是一样的。不过对于表和流的转换,却稍显复杂。当我们将一个Table转换成DataStream时,有\\\"仅插入流\\\"(Insert-Only Streams)和\\\"更新日志流\\\"(Changelog Streams)两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。 这种麻烦其实是不可避

    2024年02月03日
    浏览(73)
  • Flink 源码学习|Watermark 与 WatermarkGenerator

    上游文档: Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 生成 Watermark》学习笔记 Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记 Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记 Watermark 是在各个算子生成的

    2024年02月19日
    浏览(31)
  • 动态规划例题(代码随想录学习)——持续更新

    dp[i][j]的含义是:从(0,0)到(i,j)的不同路径 当路线中有了障碍,此路不通,所以在不同路径的递推公式上需要增加条件 if(obs[i,j]==0)没有障碍,dp[i][j]= dp[i-1][j]+dp[i][j-1] if(obs[i][j]==1)有障碍,不进行推导 obs数组表示障碍 障碍的后面应该是0(原因:遇到障碍后,即

    2024年04月12日
    浏览(43)
  • Hudi Flink SQL源码调试学习(1)

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun 本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对

    2024年02月12日
    浏览(31)
  • Hudi Flink SQL源码调试学习(一)

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun 本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对

    2024年02月11日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包