flink cep数据源keyby union后 keybe失效

这篇具有很好参考价值的文章主要介绍了flink cep数据源keyby union后 keybe失效。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警

策略条件:

选择了两个kafka的的topic的数据作为数据源,

对A 数据源 test-topic1, 进行条件过滤, 过滤条件为:login_type  = 1

对B 数据源 test-topic2,进行条件过滤,过滤条件为:login_type =  2

分组条件 为   src_ip,hostname两个字段进行分组

进行followby 关联。时间关联的最大时间间隔为  60秒

运行并行度设置为3

通过SourceStream打印的原始数据:

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type ":"2"}

经过cep处理后,产了告警

产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}

经过src_ip,和hostname分组后, 理论上应该只分组后的相同的 scr_ip,hostname进行事件关联告警

结果其他的分组数据也参和进来关联告警了。 

期望的是  login_type = 1 出现至少两次, 接着login_type=2的至少出现1次,且相同的src_ip和hostname

然后结果是下面数据也产生了告警。

{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}

怀疑是分组没生效。

然后debug数据源那块的方法kafkaStreamSource() 里面有进行分组,debug后发现确实也进行了keyby

后来找不到其他问题,纠结了下, 怀疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一个KeyedSteam了。 所以

出现问题的原始代码数据源代码:

//程序具体执行流程
 DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);
            DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);
            SinkProcess.sink(resultStream, rule);



public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {
        DataStream<JSONObject> inputStream = null;
        List<Event> events = rule.getEvents();
        if (events.size() > SharingConstant.NUMBER_ZERO) {
            for (Event event : events) {
                FlinkKafkaConsumer<JSONObject> kafkaConsumer =
                        new KafkaSourceFunction(rule, event).init();
                if (inputStream != null) {
                    // 多条 stream 合成一条 stream
                    inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
                } else {
                    // 只有一条 stream
                    inputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);
                }
            }
        }
        return inputStream;
    }



private static DataStream<JSONObject> kafkaStreamSource(
            StreamExecutionEnvironment env,
            Event event,
            Rule rule,
            FlinkKafkaConsumer<JSONObject> kafkaConsumer) {
        DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);

        // 对多个黑白名单查询进行循环
        String conditions = event.getConditions();
        while (conditions.contains(SharingConstant.ARGS_NAME)) {
            // 使用新的redis 数据结构,进行 s.include 过滤
            inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);

            conditions = conditions.replace(s, "");
        }
        // 一般过滤处理
        inputStream = AsyncDataStream.orderedWait(inputStream,
                new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);

        // kafka source 进行 keyBy 处理
        return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
    }

public static DataStream<JSONObject> keyedBy(
            DataStream<JSONObject> input, Map<String, String> groupBy) {
        if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){
            return input;
        }
        return input.keyBy(
                new TwoEventKeySelector(
                        groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));
    }

public class TwoEventKeySelector implements KeySelector<JSONObject, String> {
    private static final long serialVersionUID = 8534968406068735616L;
    private final String groupBy;

    public TwoEventKeySelector(String groupBy) {
        this.groupBy = groupBy;
    }

    @Override
    public String getKey(JSONObject event) {
        StringBuilder keys = new StringBuilder();
        for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {
            keys.append(event.getString(key));
        }
        return keys.toString();
    }
}

问题出现在这里:

// 多条 stream 合成一条 stream
                    inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));

kafkaStreamSource()这个方法返回的是 KeyedStream ,

两个KeyedStream unio合并后,  本来以为返回时KeyedStream,结果确是DataStream类型,

结果导致cep分组不生效,一个告警中出现了其他分组的数据。

解决方法, 就是在cep pattern前 根据是否有分组条件再KeyedBy一次

  private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {
        PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());
        Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();

        if (!rule.getGroupBy().isEmpty()){
            inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
        }

        PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);
        return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));

输入数据:

 {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
 {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
 {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}

不产生告警,符合预期

再次输入同分组的数据:

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警输出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"无","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"资源开发","severity":"info","create_time":1666860369647,"strategy_category_name":"网络威胁分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"无","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}

产生告警符合预期文章来源地址https://www.toymoban.com/news/detail-427346.html

到了这里,关于flink cep数据源keyby union后 keybe失效的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink学习之旅:(三)Flink源算子(数据源)

            Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。 数据集合 数据文件 Socket数据 kafka数据 自定义Source         创建 FlinkSource_List 类,再创建个 Student 类(姓名、年龄、性别三个属性就行,反正测试用) 运行结果

    2024年02月06日
    浏览(44)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(55)
  • flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

    前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。 Tips:我觉得学习 Flink 还是挺有意思的

    2024年02月19日
    浏览(41)
  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(48)
  • flink执行环境和读取kafka以及自定义数据源操作

    目录 创建执行环境 1. getExecutionEnvironment 2. createLocalEnvironment 3. createRemoteEnvironment  执行模式(Execution Mode) 1. BATCH 模式的配置方法 2. 什么时候选择 BATCH 模式 触发程序执行 数据源操作 读取kafka数据源操作  自定义Source           编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执

    2023年04月10日
    浏览(37)
  • Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

    这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。 程序代码: 输出结果 用于验证一些通过Socket传输数据的场景非常方便。 程序代码: 测试时,需要先在 172.16.3.6 的服务器上启动 nc ,然后再启动Flink读

    2024年02月16日
    浏览(41)
  • 基于大数据平台(XSailboat)的计算管道实现MySQL数据源的CDC同步--flink CDC

    笔者在先前的一篇文档《数据标签设计 – 大数据平台(XSailboat)的数据标签模块》 提到了关于数据标签的模块,现已实现并应用于项目中。在项目中遇到这样一种情形: 如果打标信息和业务数据是在一个数据库实例中,那么只需要连接两张表进行查询即可。但是数据标签作为

    2024年01月17日
    浏览(63)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

    目标 : 了解数据源的格式及实现模拟数据的生成 路径 step1:数据格式 step2:数据生成 实施 数据格式 消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号

    2024年02月04日
    浏览(38)
  • 【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月21日
    浏览(65)
  • Flink CDC 2.4 正式发布,新增 Vitess 数据源,更多连接器支持增量快照,升级 Debezium 版本

    Flink CDC [1] 是基于数据库的日志 CDC 技术,实现了全增量一体化读取的数据集成框架。配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。 作为新一代的实时数据集成框架,Flink CDC 具有全增量一体化、无锁读取、并行读取、表结构变更

    2024年02月12日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包