Flink第九章:Flink CEP

这篇具有很好参考价值的文章主要介绍了Flink第九章:Flink CEP。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
Flink第六章:多流操作
Flink第七章:状态编程
Flink第八章:FlinkSQL
Flink第九章:Flink CEP



前言

这次是Flink的最后一次内容,终于还是在放假前啃完了.

FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。

这是官方的介绍,看看就行了.
先引入需要的依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

Flink第九章:Flink CEP


一、简单案例

1.LoginFailDetect.scala

检测连续三次登录失败的用户

package com.atguigu.chapter08


import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._

import java.util

object LoginFailDetect {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.数据源
    val loginEventStream: DataStream[LoginEvent] = env.fromElements(
      LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
      LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
      LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
      LoginEvent("user_2", "192.168.1.29", "success", 6000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
    ).assignAscendingTimestamps(_.timestamp)

    // 2.定义Pattern,检测连续三次登录失败时间
    val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("firstFail").where(_.eventType == "fail")
      .next("secondFail").where(_.eventType == "fail")
      .next("thirdFail").where(_.eventType == "fail")

    // 3. 将Pattern 检测应用到事件流
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), pattern)

    // 4.定义处理规则,精检测到的匹配事件报警输出
    val resultStream: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
        // 获取匹配到的复杂时间
        val firstFail: LoginEvent = map.get("firstFail").get(0)
        val secondFail: LoginEvent = map.get("secondFail").get(0)
        val thirdFail: LoginEvent = map.get("thirdFail").get(0)

        // 包装报警信息 输出
        s"${firstFail.userId} 连续三次登录失败! 登录时间:${firstFail.timestamp},${secondFail.timestamp},${thirdFail.timestamp}"
      }
    })
    resultStream.print()
    env.execute()
  }

}

case class LoginEvent(userId: String, ipAddr: String, eventType: String, timestamp: Long)

Flink第九章:Flink CEP
Flink第九章:Flink CEP

2.LoginFailDetectpro.scala

使用(Pattern API)中的量词改进代码

package com.atguigu.chapter08

import org.apache.flink.cep.functions.PatternProcessFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.util

object LoginFailDetectpro {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.数据源
    val loginEventStream: DataStream[LoginEvent] = env.fromElements(
      LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
      LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
      LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
      LoginEvent("user_2", "192.168.1.29", "success", 6000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
    ).assignAscendingTimestamps(_.timestamp)

    // 2.定义Pattern,检测连续三次登录失败时间
    val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("Fail").where(_.eventType=="fail").times(3).consecutive()

    // 3. 将Pattern 检测应用到事件流
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), pattern)

    // 4.定义处理规则,精检测到的匹配事件报警输出
    val resultStream: DataStream[String] = patternStream.process(new PatternProcessFunction[LoginEvent,String] {
      override def processMatch(map: util.Map[String, util.List[LoginEvent]], context: PatternProcessFunction.Context, collector: Collector[String]): Unit = {
        val firstFail: LoginEvent = map.get("Fail").get(0)
        val secondFail: LoginEvent = map.get("Fail").get(1)
        val thirdFail: LoginEvent = map.get("Fail").get(2)

        // 包装报警信息 输出
        collector.collect(s"${firstFail.userId} 连续三次登录失败! 登录时间:${firstFail.timestamp},${secondFail.timestamp},${thirdFail.timestamp}")
      }
    })
    resultStream.print()
    env.execute()
  }


}

Flink第九章:Flink CEP

3.OrderTimeoutDetect.scala

处理超时事件

package com.atguigu.chapter08

import org.apache.flink.cep.functions.{PatternProcessFunction, TimedOutPartialMatchHandler}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import java.util

// 定义订单事件样例类
case class OrderEvent(userId: String, orderId: String, eventType: String, timestamp: Long)

object OrderTimeoutDetect {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.读取源
    val orderEventStream: DataStream[OrderEvent] = env.fromElements(
      OrderEvent("user_1", "order_1", "create", 1000L),
      OrderEvent("user_2", "order_2", "create", 2000L),
      OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
      OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
      OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
      OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
    ).assignAscendingTimestamps(_.timestamp)
      .keyBy(_.orderId)

    // 2.定义检测模式
    val pattern: Pattern[OrderEvent, OrderEvent] = Pattern.begin[OrderEvent]("create").where(_.eventType == "create")
      .followedBy("pay").where(_.eventType == "pay")
      .within(Time.minutes(15))

    // 3.应用到事件流
    val patternStream: PatternStream[OrderEvent] = CEP.pattern(orderEventStream, pattern)

    // 4.检测匹配事件和部分匹配的超时事件
    val payedOrderStream: DataStream[String] = patternStream.process(new OrderPayDetect())

    payedOrderStream.getSideOutput(new OutputTag[String]("timeout")).print("timeout")
    payedOrderStream.print("payed")

    env.execute()
  }


  class OrderPayDetect() extends PatternProcessFunction[OrderEvent,String] with TimedOutPartialMatchHandler[OrderEvent] {
    override def processMatch(map: util.Map[String, util.List[OrderEvent]], context: PatternProcessFunction.Context, collector: Collector[String]): Unit = {
      // 正常支付事件
      val payEvent: OrderEvent = map.get("pay").get(0)
      collector.collect(s"订单${payEvent.orderId}已成功支付")
    }

    override def processTimedOutMatch(map: util.Map[String, util.List[OrderEvent]], context: PatternProcessFunction.Context): Unit = {
      // 超时事件
      val createEvent: OrderEvent = map.get("create").get(0)
      context.output(new OutputTag[String]("timeout"),s"订单${createEvent.orderId}超时未支付! 用户${createEvent.userId}")
    }
  }
}

Flink第九章:Flink CEP

3.状态机实现

package com.atguigu.chapter08

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object NFAExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.数据源
    val loginEventStream: DataStream[LoginEvent] = env.fromElements(
      LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
      LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
      LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
      LoginEvent("user_2", "192.168.1.29", "success", 6000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
      LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
    ).assignAscendingTimestamps(_.timestamp)

    val resultStream: DataStream[String] = loginEventStream.keyBy(_.userId).flatMap(new StateMachineMapper())

    resultStream.print()
    env.execute()
  }

  // 实现自定义的RichFlatmapFunction
  class StateMachineMapper() extends RichFlatMapFunction[LoginEvent, String] {
    lazy val currentState: ValueState[State] = getRuntimeContext.getState(new ValueStateDescriptor[State]("state", classOf[State]))

    override def flatMap(in: LoginEvent, collector: Collector[String]): Unit = {
      // 定义一个状态机的状态
      if (currentState.value() == null) {
        currentState.update(Initial)
      }

      val nextState: State = transition(currentState.value(), in.eventType)

      nextState match {
        case Matched => collector.collect(s"${in.userId}连续三次登录失败")
        case Terminal => currentState.update(Initial)
        case _ => currentState.update(nextState)
      }

    }
  }

  // 将状态state定义为封闭的特征
  sealed trait State

  case object Initial extends State

  case object Terminal extends State

  case object Matched extends State

  case object S1 extends State

  case object S2 extends State

  // 定义状态转移函数
  def transition(state: State, eventType: String): State = {
    (state, eventType) match {
      case (Initial, "success") => Terminal
      case (Initial, "fail") => S1
      case (S1, "success") => Terminal
      case (S1, "fail") => S2
      case (S2, "success") => Terminal
      case (S2, "fail") => Matched
    }
  }
}

Flink第九章:Flink CEP


总结

最后的CEP有点抽象,我也没完全理解,有机会在巩固巩固吧.文章来源地址https://www.toymoban.com/news/detail-475514.html

到了这里,关于Flink第九章:Flink CEP的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《Flink学习笔记》——第一章 概念及背景

    ​ 什么是批处理和流处理,然后由传统数据处理架构为背景引出什么是有状态的流处理,为什么需要流处理,而什么又是有状态的流处理。进而再讲解流处理的发展和演变。而Flink作为新一代的流处理器,它有什么优势?它的相关背景及概念和特性又是什么?有哪些应用场景

    2024年02月11日
    浏览(44)
  • 基于linux下的高并发服务器开发(第一章)- 目录操作函数

     (1)int mkdir(const char* pathname,mode_t mode); #include sys/stat.h #include sys/types.h int mkdir(const char *pathname, mode_t mode);     作用:创建一个目录     参数:          pathname: 创建的目录的路径         mode: 权限,八进制的数     返回值:          成功返回0, 失败返回-1  (

    2024年02月16日
    浏览(45)
  • RCP系列-第一章 环境安装

    第一章 Matlab安装 提示:这里可以添加本文要记录的大概内容: 例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。 提示:以下是本篇文章正文内容,下面案例可供参考 名称 Value Matlab云盘链

    2024年02月09日
    浏览(39)
  • 【HTML5系列】第一章 · HTML5新增语义化标签

            Hello大家好, 我是【 麟-小白 】,一位 软件工程 专业的学生,喜好 计算机知识 。希望大家能够一起 学习进步 呀!本人是一名 在读大学生 ,专业水平有限,如发现 错误 或 不足之处 ,请多多指正!谢谢大家!!!         如果 小哥哥小姐姐们 对我的文章感兴趣

    2024年02月04日
    浏览(59)
  • Flink 系列文章汇总索引

    本专栏系统介绍某一知识点,并辅以具体的示例进行说明。 本专栏的文章编号可能不是顺序的,主要是因为写的时候顺序没统一,但相关的文章又引入了,所以后面就没有调整了,按照写文章的顺序进行编号。但一个专题的顺序号是统一的,不存在编号跳跃情况。 本部分介

    2024年02月03日
    浏览(43)
  • 第九章 shell 编程

    第九章 shell 编程 一、 编写与执行 Shell 脚本 • shell 脚本的作用类似于 DOS 的批处理文件,但无文件名规定。 • shell 脚本是文本文件,可用 vi、gedit 或其它文本编辑去创建。 • 脚本的首行应是#!/bin/sh,指明该用什么程序来解释该脚本。 • 使用 chmod +x 命令为脚本文件增加可

    2024年02月06日
    浏览(61)
  • 第九章 支持、解释

    真题(2010-40)-加强支持-分类1-反面支持-否命题加强-支持力度:反面>正面-多项递推,选支持最终项 40.鸽子走路时,头部并不是有规律地前后移动,而是一直在往前伸。行走时,鸽子脖子往前一探,然后头部保持静止,等待着身体和爪子跟进。有学者曾就鸽子走路时伸脖子

    2024年02月08日
    浏览(44)
  • 第九章 更复杂的光照

    Unity的渲染路径 渲染路径决定了光照是如何应用到Unity Shader中的, 需要为每个Pass指定它的渲染路径 。 完成上面的设置后,我们可以在每个Pass中使用标签来指定该Pass使用的渲染路径。 指定渲染路径是我们和Unity的底层渲染引擎的一次重要的沟通。 前向渲染路径 前向渲染路

    2024年04月27日
    浏览(34)
  • 第九章 Gitlab使用

    微服务项目,常常需要多人协作完成工作,本章教程是介绍Gitlab使用,使多人协作告别低端的手动拷贝,也告别传统的SVN。 https://git-scm.com/download/win ssh-keygen -t rsa -C “zhangsan@163.com” -b 4096 cat ~/.ssh/id_rsa.pub | clip 打开git bash窗口 定位到要上传的目录 初始化 将当前目录添加到

    2024年01月18日
    浏览(52)
  • 第九章 排序

    1.插入类排序:是在一个已排好序的记录子集的基础上,每一步将下一个待排序的记录有序插入已排好序的记录子集,直到将所有待排记录全部插入为止 a.直接插入排序(稳定) b.折半插入排序(稳定) c.希尔排序(不稳定) 2.交换类排序:通过一系列交换逆序元素进行排序

    2024年02月02日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包