大数据职业技能大赛样题(数据采集与实时计算:使用Flink处理Kafka中的数据)

这篇具有很好参考价值的文章主要介绍了大数据职业技能大赛样题(数据采集与实时计算:使用Flink处理Kafka中的数据)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

       编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime,若operate_time为空值或无此列,则使用create_time填充,允许数据延迟5s,订单状态分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。另外对于数据结果展示时,不要采用例如:1.9786518E7的科学计数法)。文章来源地址https://www.toymoban.com/news/detail-842946.html

  1. 使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis cli以get key方式获取totalprice值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
  2. 在任务1进行的同时,使用侧边流,监控若发现order_status字段为退回完成, 将key设置成totalrefundordercount存入Redis中,value存放用户退款消费额。使用redis cli以get key方式获取totalrefundordercount值,将结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
  3. 在任务1进行的同时,使用侧边流,监控若发现order_status字段为取消订单,将数据存入MySQL数据库shtd_result的order_info表中,然后在Linux的MySQL命令行中根据id降序排序,查询列id、consignee、consignee_tel、final_total_amount、feight_fee,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。

使用Flink处理Kafka中的数据

package module_d

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.{DecimalFormat, SimpleDateFormat}
import java.time.Duration
import java.util.Properties

/**
 * 编写Scala代码,使用Flink消费Kafka中Topic为order的数据并进行相应的数据统计计算(订单信息对应表结构order_info,订单详细信息对应表结构order_detail(来源类型和来源编号这两个字段不考虑,所以在实时数据中不会出现),同时计算中使用order_info或order_detail表中create_time或operate_time取两者中值较大者作为EventTime,若operate_time为空值或无此属性,则使用create_time填充,允许数据延迟5S,订单状态分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。另外对于数据结果展示时,不要采用例如:1.9786518E7的科学计数法)。
 */
object task1 {
  /**
   * 一个流分成四个流
   */
  lazy val statusother: OutputTag[String] = new OutputTag[String]("other")
  lazy val status1003: OutputTag[String] = new OutputTag[String]("s1003")
  lazy val status1005: OutputTag[String] = new OutputTag[String]("s1005")
  lazy val status1006: OutputTag[String] = new OutputTag[String]("s1006")

  def main(args: Array[String]): Unit = {
    /**
     * 1、使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis cli以get key方式获取totalprice值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
     */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) //并行度


    //Kafka配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "ngc:9092") //集群地址
    properties.setProperty("group.id", "g1") //消费者组

    //原始流
    val stream = env.addSource(new FlinkKafkaConsumer[String]("order1", new SimpleStringSchema(), properties).setStartFromLatest())
      .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5))//允许数据延迟5S
        .withTimestampAssigner(
          new SerializableTimestampAssigner[String] {
            override def extractTimestamp(t: String, l: Long): Long = {
              val sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss")
              if (t.split(",")(11).equals("")) { //如果operate_time为空
                sdf.parse(t.split(",")(10)).getTime
              } else {
                val create_time = sdf.parse(t.split(",")(10)).getTime
                val operate_time = sdf.parse(t.split(",")(11)).getTime
                math.max(create_time, operate_time)
              }
            }
          }
        ))
    //设置自定义侧边流
    val streamProcess = stream.process(new MdSplitProcessFunction)
    /**
     * 1、使用Flink消费Kafka中的数据,统计商城实时订单实收金额(需要考虑订单状态,若有取消订单、申请退回、
     * 退回完成则不计入订单实收金额,其他状态的则累加),将key设置成totalprice存入Redis中。使用redis
     * cli以get key方式获取totalprice值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图
     * 间隔1分钟以上,第一次截图放前面,第二次截图放后面;
     */
    val ds1 = streamProcess
      .getSideOutput(statusother)
      .map(line => line.split(",")(3).toDouble)
      .keyBy(_ => true) //聚合到一起
      .sum(0)
      .map(n=>new DecimalFormat("#.#").format(n))
    //redis配置
    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("ngc")
      .setPort(6378)
      .setPassword("123456")
      .build()
    ds1.addSink(new RedisSink[String](conf, new MyRedisMapper("totalcount")))
    /**
     * 2、在任务1进行的同时,使用侧边流,监控若发现order_status字段为退回完成, 将key设置成totalrefundordercount存入Redis中,value存放用户退款消费额。使用redis cli以get key方式获取totalrefundordercount值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔1分钟以上,第一次截图放前面,第二次截图放后面;
     */
    val ds2 = streamProcess
      .getSideOutput(status1006)
      .map(line => line.split(",")(3).toDouble)
      .keyBy(_ => true) //聚合到一起
      .sum(0)
      .map(n=>new DecimalFormat("#.#").format(n))
    ds2.addSink(new RedisSink[String](conf, new MyRedisMapper("totalrefundordercount")))

    /**
     * 3、在任务1进行的同时,使用侧边流,监控若发现order_status字段为取消订单,将数据存入MySQL数据库shtd_result的order_info表中,然后在Linux的MySQL命令行中根据id降序排序,查询列id、consignee、consignee_tel、final_total_amount、feight_fee,查询出前5条,将SQL语句与执行结果截图粘贴至对应报告中。
     */
    val ds3 = streamProcess
      .getSideOutput(status1003)
 
    ds3.addSink(new RichSinkFunction[String] {
      var conn: Connection = _
      var insertStmt: PreparedStatement = _

      override def open(parameters: Configuration): Unit =  {
        conn = DriverManager.getConnection("jdbc:mysql://ngc:3307/shtd_result?useSSL=false", "root", "123456")
        insertStmt = conn.prepareStatement("insert into order_info (id,consignee,consignee_tel,final_total_amount,feight_fee) values (?,?,?,?,?)")
      }

      override def close(): Unit = {
        insertStmt.close()
        conn.close()
      }

      override def invoke(value: String, context: SinkFunction.Context): Unit = {
        val arr = value.split(",")
        insertStmt.setString(1, arr(0))
        insertStmt.setString(2, arr(1))
        insertStmt.setString(3, arr(2))
        insertStmt.setString(4, arr(3))
        insertStmt.setString(5, arr(19))
        insertStmt.execute()
      }
    })

    ds1.print()
    ds2.print()
    env.execute("kafka sink test")
  }


  /**
   * 自定义侧边流配置
   */
  class MdSplitProcessFunction extends ProcessFunction[String, String] {
    override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
      val line = value.split(",")

      /**
       * 订单状态order_status分别为1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。
       */
      if (line(4).equals("1003")) {
        ctx.output(status1003, value)
      } else if (line(4).equals("1005")) {
        ctx.output(status1005, value)
      } else if (line(4).equals("1006")) {
        ctx.output(status1006, value)
      } else {
        ctx.output(statusother, value)
      }

    }


  }

  /**
   * Redis key——value存储 也可用RichSinkFunction建立Redis
   */
  class MyRedisMapper(key: String) extends RedisMapper[String] {

    override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)

    override def getValueFromData(data: String): String = data

    override def getKeyFromData(data: String): String = key
  }


}

到了这里,关于大数据职业技能大赛样题(数据采集与实时计算:使用Flink处理Kafka中的数据)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 河北省2023年职业院校(中职组)技能大赛“网络搭建与应用”赛项竞赛样题

    2023 年河北省职业院校技能大赛 “网络搭建与应用”赛项 竞赛(样题) “网络搭建与应用”赛项竞赛共分三个部分,其中: 第一部分:网络搭建及安全部署项目(500 分) 第二部分:服务器配置及应用项目(480 分)第三部分:职业规范与素养(20 分) 1. 禁止携带和使用移动存

    2024年02月07日
    浏览(39)
  • 全国职业院校技能大赛-大数据 离线数据处理模块-数据清洗

    子任务2:数据清洗         编写Hive SQL代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。 抽取

    2024年02月02日
    浏览(38)
  • 全国职业院校技能大赛-大数据 离线数据处理模块-指标计算

    赛题来源2023年全国职业院校技能大赛赛题第1套任务B中指标计算模块 编写Scala代码,使用Spark计算相关指标。 注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表

    2024年02月01日
    浏览(38)
  • 2023年江西省职业院校技能竞赛“网络安全”赛项样题

    二、竞赛注意事项 1.竞赛期间禁止携带和使用移动存储设备、计算器、通信工具及 参考资料。 2.请根据大赛所提供的竞赛环境,检查所列的硬件设备、软件清 单、材料清单是否齐全,计算机设备是否能正常使用。 3.在进行任何操作之前,请阅读每个部分的所有任务。各任务

    2024年02月05日
    浏览(45)
  • 2023年全国职业院校技能大赛-大数据应用开发-数据可视化

            可视化题目与以往相同,做法类似,我这里展示得到语句后处理优化以后的代码,以函数式来写可视化,比以前400-500多行代码简洁到100多行。其他题目见本栏目,那里面的代码都是没有优化后的,这次主要以效率和精简给大家提供更多的思路。         我们得到

    2024年02月04日
    浏览(41)
  • 云计算职业技能大赛组件介绍(一)

    上文我们准备好了一个实验平台,我们了解了该如何搭建开源平台open stack,在此基础上,我们该理论的,系统的,详细的了解一下open stack的各个组件的作用和原理。 官方的解释是:OpenStack是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管

    2024年02月21日
    浏览(28)
  • 云计算职业技能大赛私有云搭建部分

    需要的可联系,可提供相关的软件包和平台供测试 数据库安装与调优 在 controller 节点上使用 iaas-install-mysql.sh 脚本安装 Mariadb 、 Memcached 、 RabbitMQ 等服务。安装服务完毕后, 修改 /etc/my.cnf 文件,完成下列要求: 1.设置数据库支持大小写; 2.设置数据库缓存 innodb 表的索引,数

    2024年04月27日
    浏览(20)
  • 23年云计算全国职业技能大赛-私有云

    1.1.1 基础环境配置[0.2 分] 1.控制节点主机名为 controller,设置计算节点主机名为 compute; 2.hosts 文件将 IP 地址映射为主机名。 使用提供的用户名密码,登录提供的 OpenStack 私有云平台,在当前租户下, 使用 CentOS7.9镜像,创建两台云主机,云主机类型使用 4vCPU/12G/100G_50G 类型。

    2024年02月08日
    浏览(51)
  • 【全国职业院校技能大赛云计算赛项】

    题目: skywalking 服务部署与应用: 使用提供的 OpenStack 私有云平台,申请一台 centos7.9 系统的云主机,使用提供的软 件包安装 Elasticsearch 服务和 skywalking 服务,将 skywalking 的 UI 访问端口修改为 8888。 接下来再申请一台CentOS7.9的云主机,用于搭建gpmall商城应用,并配置SkyWalk

    2024年01月20日
    浏览(33)
  • 23云计算全国职业技能大赛容器云-容器编排

    编写 Dockerfile 文件构建 mysql 镜像,要求基于 centos 完成 MariaDB 数据库的安装和配置,并设置服务开机自启。编写 Dockerfile 构建镜像 erp-mysql:v1.0,要求使用 centos7.9.2009 镜像作为基镜像,完成 MariaDB 数据库的安装,设置 root 用户的密码为 tshoperp,新建数据库 jsh_erp 并导入数据库文

    2024年02月08日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包