Flink CEP(三)pattern动态更新

这篇具有很好参考价值的文章主要介绍了Flink CEP(三)pattern动态更新。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1.实现分析

2.代码实现

3.测试验证

4.源码地址


        

线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;

import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;

import cep.pattern.Pattern;

/**
 * @author StephenYou
 * Created on 2023-07-23
 * Description: 动态Pattern接口(用户调用API)不区分key
 */
public interface DynamicPatternFunction<T> extends Function, Serializable {

    /***
     * 初始化
     * @throws Exception
     */
    public void init() throws Exception;

    /**
     * 注入新的pattern
     * @return
     */
    public Pattern<T,T> inject() throws Exception;

    /**
     * 一个扫描周期:ms
     * @return
     */
    public long getPeriod() throws Exception;

    /**
     * 规则是否发生变更
     * @return
     */
    public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用

CEP.pattern(dataStream,pattern);

//动态Pattern

CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {
    /***
     * Dynamic injection pattern function 
     * @param input
     * @param dynamicPatternFunction
     * @return
     * @param <T>
     */
    public static <T> PatternStream<T> injectionPattern throws Exception (
            DataStream<T> input,
            DynamicPatternFunction<T> dynamicPatternFunction){
        return new PatternStream<>(input, dynamicPatternFunction); 
    }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {
    PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {
        this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));
    }
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;
        if (patternFunction == null ) {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    nfaFactory,
                    comparator,
                    pattern.getAfterMatchSkipStrategy(),
                    processFunction,
                    lateDataOutputTag);
        } else {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    patternFunction,
                    comparator,
                    null,
                    processFunction,
                    lateDataOutputTag);
        }

        增加对应的CepOperator构造函数。

    public CepOperator(
            final TypeSerializer<IN> inputSerializer,
            final boolean isProcessingTime,
            final DynamicPatternFunction patternFunction,
            @Nullable final EventComparator<IN> comparator,
            @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
            final PatternProcessFunction<IN, OUT> function,
            @Nullable final OutputTag<IN> lateDataOutputTag) {
        super(function);

        this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
        this.patternFunction = patternFunction;
        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;

        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
        this.nfaFactory = null;
    }

        加载Pattern,构造NFA

    @Override
    public void open() throws Exception {
        super.open();
        timerService =
                getInternalTimerService(
                        "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);

        //初始化
        if (patternFunction != null) {
            patternFunction.init();
            Pattern pattern = patternFunction.inject();
            afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
            boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
            nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
            long period = patternFunction.getPeriod();
            // 注册定时器检测规则是否变更
            if (period > 0) {
                getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);
            }
        }


        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

        context = new ContextFunctionImpl();
        collector = new TimestampedCollector<>(output);
        cepTimerService = new TimerServiceImpl();

        // metrics
        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        if (patternFunction != null) {
            // 规则版本更新
            if (needRefresh.value() < refreshVersion.get()) {
                //清除状态
                computationStates.clear();
                elementQueueState.clear();
                partialMatches.releaseCacheStatisticsTimer();
                //清除定时器
                Iterable<Long> registerTime = registerTimeState.get();
                if (registerTime != null) {
                    Iterator<Long> iterator = registerTime.iterator();
                    while (iterator.hasNext()) {
                        Long l = iterator.next();
                        //删除定时器
                        timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);
                        timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);
                        //状态清理
                        iterator.remove();
                    }
                }
                //更新当前的版本
                needRefresh.update(refreshVersion.get());
            }
        }
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        //初始化状态
        if (patternFunction != null) {
            /**
             * 两个标识位状态
             */
            refreshFlagState = context.getOperatorStateStore()
                    .getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));
            if (context.isRestored()) {
                if (refreshFlagState.get().iterator().hasNext()) {
                    refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
                }
            } else {
                refreshVersion = new AtomicInteger(0);
            }
            needRefresh = context.getKeyedStateStore()
                    .getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));
        }
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());
        patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {
            @Override
            public Map select(Map map) throws Exception {
                map.put("processingTime", System.currentTimeMillis());
                return map;
            }
        }).print();
        env.execute("SyCep");

    }

    public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {

        public TestDynamicPatternFunction() {
            this.flag = true;
        }

        boolean flag;
        int time = 0;
        @Override
        public void init() throws Exception {
            flag = true;
        }

        @Override
        public Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()
                            throws Exception {

            // 2种pattern
            if (flag) {
                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success");
                            }
                        })
                        .times(1)
                        .followedBy("middle")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail");
                            }
                        })
                        .times(1)
                        .next("end");
                return pattern;
            } else {

                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success2");
                            }
                        })
                        .times(2)
                        .next("middle2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail2");
                            }
                        })
                        .times(2)
                        .next("end2");
                return pattern;
            }
        }

        @Override
        public long getPeriod() throws Exception {
            return 10000;
        }

        @Override
        public boolean isChanged() throws Exception {
            flag = !flag ;
            time += getPeriod();
            System.out.println("change pattern : " + time);
            return true;
        }
    }

打印结果:符合预期

Flink CEP(三)pattern动态更新,大数据学习之路,# Flink CEP,flink,大数据

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern文章来源地址https://www.toymoban.com/news/detail-636271.html

到了这里,关于Flink CEP(三)pattern动态更新的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink(九)CEP

    1.概述 所谓 CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。那到底什么是“复杂事件处理”呢?就是可以在事件流里,检测到特定的事件组合并进行处理,比如说“连续登录失败”,或者“订

    2024年01月16日
    浏览(46)
  • Flink-CEP 实战教程

      所谓CEP,其实就是“ 复杂事件处理(Complex Event Processing) ”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。   那到底什么是“复杂事件处理”呢?就是可以在事件流里,检测到特定的事件组合并进行处理,比如说“连续登录失败”,或者

    2024年02月02日
    浏览(60)
  • Flink CEP (一)原理及概念

    目录 1.Flink CEP 原理 2.Flink API开发 2.1 模式 pattern 2.2 模式 pattern属性 2.3 模式间的关系   Flink CEP内部是用 NFA(非确定有限自动机) 来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为 起始状态 、 中间状态 、 最终状

    2024年02月16日
    浏览(52)
  • Flink CEP(Complex Event Processing)库

    复杂事件处理(Complex Event Processing,CEP)是一种用于在流式数据中识别和处理复杂事件模式的技术。Apache Flink 作为一个流式处理框架,也可以用于实现复杂事件处理。下面是 Flink 中实现复杂事件处理的一般原理: 事件流输入: 首先,Flink 接收外部的事件流作为输入。这些事

    2024年02月13日
    浏览(35)
  • Flink动态更新维表

         概念:Lookup join是针对于由作业流表触发,关联右侧维表来补全数据的场景 。 默认情况下,在流表有数据变更,都会触发维表查询(可以通过设置维表是否缓存,来减轻查询压力),由于不保存状态,因此对内存占用较小。(以上来自网络) 具体配置如下:        

    2024年02月09日
    浏览(40)
  • Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流(Upsert)

    Flink中使用表和SQL基本上跟其他场景是一样的。不过对于表和流的转换,却稍显复杂。当我们将一个Table转换成DataStream时,有\\\"仅插入流\\\"(Insert-Only Streams)和\\\"更新日志流\\\"(Changelog Streams)两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。 这种麻烦其实是不可避

    2024年02月03日
    浏览(73)
  • Flink高手之路:Flink的环境搭建

    本地单机模式,一般用于测试环境是否搭建成功,很少使用 flink 自带集群,开发测试使用 StandAloneHA :独立集群的高可用模式,也是 flink 自带,用于开发测试环境 计算资源统一由hadoop yarn管理,生产环境使用   上传到hadoop001 [root@hadoop001 software]# tar -xzvf flink-1.12.2-bin-scala_2.1

    2023年04月24日
    浏览(47)
  • 大数据Flink(七十):SQL 动态表 & 连续查询

    文章目录 SQL 动态表 连续查询 一、​​​​​​​SQL 应用于流处理的思路

    2024年02月10日
    浏览(37)
  • Flink高手之路2-Flink集群的搭建

    1.本地local模式 本地单机模式,一般用于测试环境是否搭建成功,很少使用 2.独立集群模式standalone Flink自带集群,开发测试使用 3.高可用的独立集群模式standalone HA Flink自带集群,用于开发测试 4.基于yarn模式Flink on yarn 计算资源统一交给hadoop的yarn进行管理,用于生产环境 虚拟

    2024年02月05日
    浏览(44)
  • 数睿通2.0功能更新:支持多版本 Flink 切换,新增数据标签模块

    小伙伴们,大家好,数睿通 2.0 数据中台迎来了 12 月份的更新,由于年底工作繁忙,所以本次更新内容稍微少了点,还望理解,本次更新内容主要包括: 数据开发多版本 Flink 支持,执行任务的时候可以动态切换 Flink 版本,目前支持的版本有 1.14 和 1.16 新增数据标签模块,包

    2024年02月01日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包