大数据-玩转数据-Flink恶意登录监控

这篇具有很好参考价值的文章主要介绍了大数据-玩转数据-Flink恶意登录监控。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、恶意登录

对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。
因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。

二、数据源格式

937166,1715,beijing,beijing,1511661606
937166,1715,beijing,beijing,1511661607
937166,1715,beijing,beijing,1511661608
161501,36156,jiangsu,nanjing,1511661608
937166,1715,beijing,beijing,1511661609
937166,1715,beijing,beijing,1511661610
937166,1715,beijing,beijing,1511661611
937166,1715,beijing,beijing,1511661612

三、封装数据

package com.lyh.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginEvent {
        private Long userId;
        private String ip;
        private String eventType;
        private Long eventTime;
}

四、代码实现逻辑

实现逻辑:
统计连续失败的次数:

  1. 把失败的时间戳放入到List中,
  2. 当List中的长度到达2的时候, 判断这个两个时间戳的差是否小于等于2s
  3. 如果是, 则这个用户在恶意登录
  4. 否则不是, 然后删除List的第一个元素
  5. 用于保持List的长度为2
  6. 如果出现成功, 则需要清空List集合

五、代码实现

package com.lyh.flink11;

import com.lyh.bean.LoginEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;


import java.time.Duration;
import java.util.List;
import java.util.Map;

public class Login_ey {
    public static void main(String[] args) throws Exception {
        //创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        //创建水印策略
        WatermarkStrategy<LoginEvent> wms = WatermarkStrategy.
                <LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
                    @Override
                    public long extractTimestamp(LoginEvent element, long recordTimestamp) {
                        return element.getEventTime();
                    }
                });
        //读入数据
        KeyedStream<LoginEvent, Long> watersencerStream = env.readTextFile("input/LoginLog.csv")
                .map(line -> {
                    String[] datas = line.split(",");
                    return new LoginEvent(Long.valueOf(datas[0]),
                            datas[1],
                            datas[2],
                            Long.valueOf(datas[3]));
                    // 指定水印和时间戳
                }).assignTimestampsAndWatermarks(wms)
                // 按照用户ID分组
                .keyBy(LoginEvent::getUserId);
        // Flink CEP 也叫做Flink复杂事件处理,
               // 可以在无穷无界的事件流中检测事件规则,通过模式规则匹配的方式对重要信息进行跟踪和分析,从而在实时数据中发掘出有价值的信息
        //定义模式
        Pattern<LoginEvent, LoginEvent> fail = Pattern.
                <LoginEvent>begin("fail")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.getEventType());
                    }
                }).timesOrMore(2).consecutive()
                .until(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "success".equals(value.getEventType());
                    }
                }).within(Time.seconds(2));
        // 把模式用在流上
        PatternStream<LoginEvent> ps = CEP.pattern(watersencerStream, fail);
        //获取匹配到的结果
        ps.select(new PatternSelectFunction<LoginEvent, String>() {
            @Override
            public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
                return pattern.get("fail").toString();
            }
        }).print();
        env.execute();


    }
}

六、测试结果

大数据-玩转数据-Flink恶意登录监控,大数据,flink文章来源地址https://www.toymoban.com/news/detail-733158.html

到了这里,关于大数据-玩转数据-Flink恶意登录监控的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据-玩转数据-Flink定时器

    基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Long): Unit 会注

    2024年02月10日
    浏览(38)
  • 大数据-玩转数据-Flink状态编程(上)

    有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。 Flink的状态管理是它的优

    2024年02月09日
    浏览(47)
  • 大数据-玩转数据-Flink状态后端(下)

    每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(

    2024年02月09日
    浏览(46)
  • 大数据-玩转数据-Flink时间滚动动窗口

    在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集

    2024年02月11日
    浏览(48)
  • 大数据-玩转数据-Flink-Transform

    转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 2.1、map(映射) 将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素 2.2、filter(过滤) 根据指定的规则将满足条件(true)的数据保留,不

    2024年02月13日
    浏览(32)
  • 大数据-玩转数据-Flink 网站UV统计

    在实际应用中,我们往往会关注,到底有多少不同的用户访问了网站,所以另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。 对于UserBehavior数据源来说,我们直接可以根据userId来区分不同的用户。 将userid放到SET集合里面,统计集合长度,便可以统计到网

    2024年02月11日
    浏览(48)
  • 大数据-玩转数据-FLINK-从kafka消费数据

    大数据-玩转数据-Kafka安装 运行本段代码,等待kafka产生数据进行消费。

    2024年02月14日
    浏览(38)
  • 大数据-玩转数据-Flink 海量数据实时去重

    大数据|阿里实时计算|Flink 借助redis的Set,需要频繁连接Redis,如果数据量过大, 对redis的内存也是一种压力;使用Flink的MapState,如果数据量过大, 状态后端最好选择 RocksDBStateBackend; 使用布隆过滤器,布隆过滤器可以大大减少存储的数据的数据量。 如果想判断一个元素是不

    2024年02月07日
    浏览(37)
  • 大数据-玩转数据-Flink-Transform(上)

    转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 2.1、map(映射) 将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素 2.2、filter(过滤) 根据指定的规则将满足条件(true)的数据保留,不

    2024年02月14日
    浏览(32)
  • 玩转数据-大数据-Flink SQL 中的时间属性

    时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。 2.1、准备WaterSensor类,方便使用 2.2、DataStream 到

    2024年02月07日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包