Flink电商实时数仓(四)

这篇具有很好参考价值的文章主要介绍了Flink电商实时数仓(四)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

日志数据结构

业务数据:数据都是MySQL中的表格数据, 使用Flink SQL 处理
日志数据:分为page页面日志(页面信息,曝光信息,动作信息,报错信息)和启动日志(启动信息,报错信息),使用Flink Stream API处理

五种日志数据:文章来源地址https://www.toymoban.com/news/detail-797826.html

  • “start”; 启动信息
  • “err”; 错误信息
  • “display”; 曝光信息
  • “action”; 动作信息
  • “page”; 页面信息

"actions": [
        {
            "action_id": "cart_add",
            "item": "3",
            "item_type": "sku_id",
            "ts": 1645926900000
        }
    ],
    "common": {
        "ar": "4",
        "ba": "xiaomi",
        "ch": "xiaomi",
        "is_new": "0",
        "md": "xiaomi 12 ultra ",
        "mid": "mid_409",
        "os": "Android 13.0",
        "sid": "1879f0a8-2218-48ce-aa2d-efb32f2f6b7a",
        "uid": "57",
        "vc": "v2.1.134"
    },
    "page": {
        "during_time": 14499,
        "from_pos_id": 10,
        "from_pos_seq": 11,
        "item": "3",
        "item_type": "sku_id",
        "last_page_id": "good_list",
        "page_id": "good_detail"
    },
    "ts": 1645926900000

    "displays": [
        {
            "item": "31",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 0
        },
        {
            "item": "1",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 1
        },
        {
            "item": "3",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 2
        },
        {
            "item": "30",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 3
        },
        {
            "item": "33",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 4
        },
        {
            "item": "30",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 5
        },
        {
            "item": "15",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 6
        },
        {
            "item": "11",
            "item_type": "sku_id",
            "pos_id": 4,
            "pos_seq": 7
        }
    ],

"start": {
        "entry": "icon",
        "loading_time": 1234,
        "open_ad_id": 10,
        "open_ad_ms": 6169,
        "open_ad_skip_ms": 56163
    },

日志数据处理主要逻辑

  1. 数据清洗
  2. 新老访客状态标记修复
    • 埋点策略
      • 网页端:cookie中设置首日访问标记
      • 小程序端:小程序的缓存里面创建首日标记
      • APP端:手机的本地缓存
    • 如果客户删除本地缓存或cookie时,导致老客户可以这样变更为新客户。
    • 因此需要将访客是否新客户的标记存储到服务端,记录下首日访问的日志,当客户访问时就只需查询该客户对应的状态即可。
      • 如果is_new的值为1
        • 状态为null, 正常
        • 状态不为null,但是第一次登录日期不等于今天,修复为老客户
      • 如果is_new的值为0
        • 状态为null, 将状态的日期设置为昨天
        • 状态不为null, 正常的
//1. 获取当前数据的is_new字段
   JSONObject common = value.getJSONObject("common");
      String isNew = common.getString("is_new");
      String firstLoginDt = firstLoginDtState.value();
      Long ts = value.getLong("ts");
      String curDt = DateFormatUtil.tsToDate(ts);
      if("1".equals(isNew)){
          //判断当前状态
          if(firstLoginDt !=null && !firstLoginDt.equals(curDt)){

          }else if(firstLoginDt == null){
              //状态为空,确实为新用户,更新登录时间状态
              firstLoginDtState.update(curDt);
          }else{//留空,同一天新访客重复登录

          }
      }else if("0".equals(isNew)){
          //老用户,flink实时数仓没有记录相应的登录时间
          if(firstLoginDt == null){
              //把访客登录日期补充一个值,使用昨天的日期
              firstLoginDtState.update(DateFormatUtil.tsToDate(ts - 24*60*60*1000L));
          }else{
              //留空
              //正常情况,不需要修复

          }
      }else{
          //当前数据is_new不为0也不为1,错误数据
      }
  1. 分流拆分数据,按照信息类型进行拆分。使用process的侧输出流来对不同数据打上对应的标签。主流中存储最常见的页面信息,其余分别是曝光信息,动作信息,启动信息,报错信息都使用侧输出流。
  2. 写出到不同的kafka主题中
  3. 测试消费不同主题的数据,查看是否写入成功和检查数据格式

常见报错及解决思路

  1. 出现空指针异常,并且报错为Could not forward element to next operator,主要是以下两个错误原因:
    • 数据流中没有ts时间戳字段java.lang.NullPointerException
    • 数据流中没有添加水位线Assigned key must not be null!,用来进行keyby的字段不能为空

到了这里,关于Flink电商实时数仓(四)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 实时数仓 (一) --------- 数据采集层

    1. 普通实时计算与实时数仓比较 普通的实时计算优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差,开发成本随着需求增加直线上升

    2024年02月06日
    浏览(47)
  • Flink+Doris 实时数仓

    Doris基本原理 Doris基本架构非常简单,只有FE(Frontend)、BE(Backend)两种角色,不依赖任何外部组件,对部署和运维非常友好。架构图如下 可以 看到Doris 的数仓架构十分简洁,不依赖 Hadoop 生态组件,构建及运维成本较低。 FE(Frontend)以 Java 语言为主,主要功能职责: 接收用户

    2024年02月07日
    浏览(51)
  • Flink CDC和Flink SQL构建实时数仓Flink写入Doris

    软件环境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 开启binlog日志、创建用户 1.开启bin log MySQL 8.0默认开启了binlog,可以通过代码show variables like \\\"%log_bin%\\\";查询是否开启了,show variables like \\\"%server_id%\\\";查询服务器ID。 上图分别显示了bin long是否开启以及bin log所在的位置。 2.创建用户 C

    2024年02月02日
    浏览(78)
  • 实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

    实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的 OLAP 分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。本文会分享基于 Flink

    2024年02月16日
    浏览(48)
  • Flink实时数仓同步:拉链表实战详解

    在大数据领域,业务数据通常最初存储在关系型数据库,例如MySQL。然而,为了满足日常分析和报表等需求,大数据平台会采用多种不同的存储方式来容纳这些业务数据。这些存储方式包括离线仓库、实时仓库等,根据不同的业务需求和数据特性进行选择。 举例来说,假设业

    2024年01月20日
    浏览(56)
  • flink 实时数仓构建与开发[记录一些坑]

    1、业务库使用pg数据库, 业务数据可以改动任意时间段数据 2、监听采集业务库数据,实时捕捉业务库数据变更,同时实时变更目标表和报表数据 实时数据流图与分层设计说明 1、debezium采集pg库表数据同步到kafka 【kafka模式】 2、flink 消费kafka写入pg或kafka 【upset-kafka,新版k

    2024年02月16日
    浏览(40)
  • Flink实时数仓之用户埋点系统(一)

    用户行为采集 行为数据:页面浏览、点击、在线日志等数据 活跃数据:用户注册、卸载安装、活跃等数据 App性能日志:卡顿、异常等数据 业务数据采集 业务数据:支付等 维度表:渠道、商品等 用户行为日志 日志结构大致可分为两类,一是页面日志,二是启动日志和在线

    2024年04月11日
    浏览(44)
  • GaussDB(DWS)基于Flink的实时数仓构建

    本文分享自华为云社区《GaussDB(DWS)基于Flink的实时数仓构建》,作者:胡辣汤。 大数据时代,厂商对实时数据分析的诉求越来越强烈,数据分析时效从T+1时效趋向于T+0时效,为了给客户提供极速分析查询能力,华为云数仓GaussDB(DWS)基于流处理框架Flink实现了实时数仓构建。在

    2024年04月22日
    浏览(43)
  • 美团买菜基于 Flink 的实时数仓建设

    美团买菜是美团自营生鲜零售平台,上面所有的商品都由美团亲自采购,并通过供应链物流体系,运输到距离用户 3km 范围内的服务站。用户从美团买菜平台下单后,商品会从服务站送到用户手中,最快 30 分钟内。 上图中,左侧的时间轴展示了美团买菜的发展历程,右侧展示

    2024年02月09日
    浏览(45)
  • 曹操出行基于 Hologres+Flink 的实时数仓建设

    曹操出行 创立于2015年5月21日,是吉利控股集团布局“新能源汽车共享生态”的战略性投资业务,以“科技重塑绿色共享出行”为使命,将全球领先的互联网、车联网、自动驾驶技术以及新能源科技,创新应用于共享出行领域,以“用心服务国民出行”为品牌主张,致力于打

    2024年01月20日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包