Flink实时数仓之用户埋点系统(一)

这篇具有很好参考价值的文章主要介绍了Flink实时数仓之用户埋点系统(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求分析

数据采集

用户行为采集

  1. 行为数据:页面浏览、点击、在线日志等数据
  2. 活跃数据:用户注册、卸载安装、活跃等数据
  3. App性能日志:卡顿、异常等数据

业务数据采集

  1. 业务数据:支付等
  2. 维度表:渠道、商品等

行为日志分析

用户行为日志

日志结构大致可分为两类,一是页面日志,二是启动日志和在线日志。

页面日志

页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录和多个用户在该页面所做的动作记录,以及若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
   "common": {                     -- 环境信息
      "imei":"xxx",
      "device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)
      "acc_id": "aad3",             -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)
      "app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)
      "qid": "kuaishou1",           -- 渠道
      "group_qid":"kuaishou",  -- 渠道分组
      "asc_qid":"xiaoh",                -- 归因渠道号 (可以为空,上报空值率)
      "app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)
      "os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )
      "os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)
      "device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )
      "device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)
      "pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)
      "network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)
      "is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录
      "obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  
      "ip":"127.0.0.1",            --ip
      "is_new": 1,                 -- 是否为新用户 0老用户,1新用户(安装后启动的第一天用户都为新用户,第一天之后都为老用户) 
      "code": "xxx",              -- 平台标识 
      "lab_code": "实验A",         -- 实验code 
      "lab_group_code": "note"     -- 实验分组code  
   },
   "actions": [{                   -- 页面动作信息
      "page_url": "/good_detail",  -- 页面url(取相对路径)
      "action_type": "show",       -- 动作类型:展现传“show”、点击传“click”、关闭传“close” (不可为空,非show、click、close报错 空值报错)
      "event": "Vip",              -- 事件类型
      "sub_event": "Me"            -- 事件子类型
    }],
   "pages": [{                        -- 页面信息
      "during_time": 7648,            -- 持续时间毫秒
      "page_url": "/good_detail",     -- 页面url(取相对路径)
      "last_page_url":"",             -- 上一个页面url(取相对路径,首次访问为空)
      "event": "Vip",                 -- 事件类型
      "sub_event": "Me",              -- 页面名称
      "last_sub_event": "login"       -- 上页的名称  
   }] 
    "ts": 1585744374423             --日志上报时间戳
}

启动日志

启动日志以启动为单位,一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
  "common": {
      "imei":"idfv",
      "device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)
      "acc_id": "aad3",            -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)
      "app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)
      "qid": "xxx",                -- 渠道
      "group_qid":"xxxx",    -- 渠道分组
      "asc_qid":"",                --归因渠道号 (可以为空,上报空值率)
      "app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)
      "os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )
      "os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)
      "device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )
      "device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)
      "pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)
      "network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)
      "is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录
      "obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  
      "ip":"127.0.0.1",            --ip
      "is_new": 1,                 -- 是否为新用户 0老用户,1新用户
      "code": "xxx",               -- 平台标识 
      "lab_code": "实验A",         -- 实验code 
      "lab_group_code": "note"     -- 实验分组code  
  },
  "start": {   
    "start_way": 0,          --启动方式。 0:热启动  1:代表首次安装首次启动  2:冷启动
    "entry": "icon",          --启动途径。icon:手机图标  notice:通知   install:安装后启动
    "loading_time": 18803    --启动加载时间
  },
  "ts": 1585744304000        --日志上报时间戳
}

APP在线日志

App在线日志以启动-关闭为单位,一次启动-关闭行为,生成一条启动-关闭日志。

{
  "common": {
      "imei":"idfv",
      "device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)
      "acc_id": "aad3",            -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)
      "app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)
      "qid": "xxx",           -- 渠道
      "group_qid":"xxxx",  -- 渠道分组
      "asc_qid":"",                -- 归因渠道号 (可以为空,上报空值率)
      "app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)
      "os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )
      "os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)
      "device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )
      "device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)
      "pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)
      "network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)
      "is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录
      "obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  
      "ip":"127.0.0.1",            --ip
      "is_new": 1,                 -- 是否为新用户 0老用户,1新用户 
      "code": "xxx",              -- 平台标识 
      "lab_code": "实验A",         -- 实验code 
      "lab_group_code": "note"     -- 实验分组code  
  },
  "online": {   
    "start_way": 0,            --启动方式。 0:热启动  1:代表首次安装首次启动  2:冷启动
    "start_time":  18803111 ,  --开始时间(毫秒)
    "end_time":  188033 ,      --退出时间(毫秒)
    "online_time": 18803      --在线时长(毫秒)
  },
  "ts": 1585744304000        --日志上报时间戳
}

新老用户的判断规则
APP 端:用户安装 App 后,第一次打开 App 的当天,Android/iOS SDK 会在手机本地缓存内,创建一个首日为 true 的标记,并且设置第一天 24 点之前,该标记均为 true。
即:第一天触发的 APP 端所有事件中,is_new = 1。即第一天之后触发的 APP 端所有事件中,is_new = 0。
对于此类日志,如果首日之后用户清除了手机本地缓存中的标记,再次启动 APP 会重新设置一个首日为 true 的标记,导致本应为 0 的 is_new 字段被置为1
前端处理规则
is_new(1:新用户,0:老用户)用户安装 App 后,第一次打开 App 的当天,即第一天触发的 APP 端所有事件中,is_new = 1,第一天之后,该标记则为 false,即第一天之后触发的 APP 端所有事件中,is_new = 0。首日之后用户清除了手机本地缓存中的标记,is_new = 1此时由后端处理

业务数据分析

1)用户订单、支付、退款等业务的新增、修改、删除操作都会生成一个binlog日志,通过MaxWell采集这些日志到Kafka消息队列中

用户Insert数据

类型:“type”: “insert”

{
    "database":"databaseA",
    "table":"t_pay_order",
    "type":"insert",
    "ts":1686540443,
    "xid":16179,
    "commit":true,
    "data":{
        "uid":"1660557015483727879",
        "order_no":"P202305221603541978152962",
        "pay_order_id":"",
        "way_code":"APPLE_APP",
        "amount":1800,
        "currency":"cny",
        "state":1,
        "product_id": 1,
        "product_name":"商品1",
        "product_num":1,
        "body":"xxxxx",
        "user_id":"1660533343607898114",
        "refund_state":0,
        "refund_times":0,
        "refund_amount":0,
        "subscribed":1,
        "expired_time":"2023-06-21 16:03:39",
        "success_time":null,
        "create_time":"2023-05-22 08:03:38.805000",
        "update_time":"2023-06-12 02:01:26.996053",
        "err_code":"21011",
        "err_msg":"订单已退款或已订阅过期"
    }
}

用户Update数据

{
    "database":"note_data",
    "table":"t_pay_order",
    "type":"update",
    "ts":1686535286,
    "xid":4853,
    "commit":true,
    "data":{
        "uid":"1660557015483727876",
        "order_no":"P202305221603541978152961",
        "pay_order_id":"",
        "way_code":"APPLE_APP",
        "amount":1800,
        "currency":"cny",
        "state":3,
        "product_id":"VIP_Moth_18",
        "product_name":"月度VIP",
        "product_num":1,
        "body":"月度会员",
        "user_id":"1660533343607898114",
        "refund_state":0,
        "refund_times":0,
        "refund_amount":0,
        "subscribed":1,
        "expired_time":"2023-06-21 16:03:39",
        "success_time":null,
        "create_time":"2023-05-22 08:03:38.805000",
        "update_time":"2023-06-12 02:01:26.996053",
        "err_code":"21011",
        "err_msg":"订单已退款或已订阅过期"
    },
    "old":{
        "pay_order_id":"rfsddfx",
        "update_time":"2023-06-09 10:32:59.769593"
    }
}

技术选型

  1. 数据采集与传输:Nginx、Flume、Kafka、MaxWell
  2. 数据存储:HDFS、HBASE、Redis
  3. 计算引擎:Flink
  4. 数据存储:ClickHouse
  5. 任务调度:Flink On Yarn

Nginx配置

作用

  • 收集用户埋点日志:生成log_file文件。
  • 收集post请求中的request_body,在/data/logs/nginx/user_data/文件夹下生成log日志

配置

http {
    include       mime.types;
    default_type  application/octet-stream;
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" ';
    log_format data_json escape=json ' $request_body ';
    access_log  logs/access.log  main;
    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    map $time_iso8601 $logdate {
        '~^(?<ymd>\d{4}-\d{2}-\d{2})' $ymd;
        default    'date-not-found';
    }

   server {
        listen      8090;
        server_name 127.0.0.1;

        access_log  /data/logs/nginx/user_data/user_big_data-$logdate.log  data_json;
        error_log /data/logs/nginx/user_data/user_big_data_error-$logdate.log  error;
        
        location / {
            proxy_pass  http://127.0.0.1:8090/api/log/;
        }

        location /api/log/ {
            return 200;
        }
   }
}

Flume配置

作用

  • 采集文件到kafka队列中,这里的source(数据源)是文件,channel(通道),sink(输出源)是kafka

关键配置

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/logs/nginx/user_data/user_data/.*log
a1.sources.r1.positionFile =  /opt/apache-flume-1.9.0-bin/opt/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.sinozo.data.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers =127.0.0.1:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

#组装 
a1.sources.r1.channels = c1

MaxWell

作用
实时收集mysql中的binlog数据,输出到kafka队列中

关键配置

#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=localhost:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db

#配置只监听note_data库下t_pay_order表
exclude_dbs=*
include_dbs=note_data
include_tables=t_pay_order

#MySQL相关配置
host=localhosts
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

启动命令

#!/bin/bash
MAXWELL_HOME=/opt/maxwell-1.29.2

status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}

start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --filter="exclude: *.*, include: db.*, exclude: *.*, include: *.t_pay_order"  --daemon
    else
        echo "Maxwell正在运行"
    fi
}

stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}

case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac

Hadoop

作用
HDFS作为存储的基础组件,防止flink计算过程中的checkPoint检查点数据以及状态数据
Yarn作为调度组件,对flink的jobManager、taskManager内存等资源进行动态分配、并对taskManager进行监控

Flink

作用
作为实时计算引擎,对业务数据、用户埋点数据进行分组、统计等计算

架构图

flink 处理埋点数据,flink,大数据,埋点

代码地址:

基于Flink的用户埋点系统文章来源地址https://www.toymoban.com/news/detail-848100.html

到了这里,关于Flink实时数仓之用户埋点系统(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink实时电商数仓(十)

    app BaseApp: 作为其他子模块中使用Flink - StreamAPI的父类,实现了StreamAPI中的通用逻辑,在其他子模块中只需编写关于数据处理的核心逻辑。 BaseSQLApp: 作为其他子模块中使用Flink- SQLAPI的父类。在里面设置了使用SQL API的环境、并行度、检查点等固定逻辑。 bean:存放其他子模块中

    2024年02月03日
    浏览(43)
  • Flink实时电商数仓(八)

    主要任务:从kafka页面日志主题读取数据,统计 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。 读取kafka页面主题数据 转换数据结构: String - JSONObject 过滤数据,u

    2024年02月03日
    浏览(38)
  • Flink+Doris 实时数仓

    Doris基本原理 Doris基本架构非常简单,只有FE(Frontend)、BE(Backend)两种角色,不依赖任何外部组件,对部署和运维非常友好。架构图如下 可以 看到Doris 的数仓架构十分简洁,不依赖 Hadoop 生态组件,构建及运维成本较低。 FE(Frontend)以 Java 语言为主,主要功能职责: 接收用户

    2024年02月07日
    浏览(50)
  • Flink CDC和Flink SQL构建实时数仓Flink写入Doris

    软件环境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 开启binlog日志、创建用户 1.开启bin log MySQL 8.0默认开启了binlog,可以通过代码show variables like \\\"%log_bin%\\\";查询是否开启了,show variables like \\\"%server_id%\\\";查询服务器ID。 上图分别显示了bin long是否开启以及bin log所在的位置。 2.创建用户 C

    2024年02月02日
    浏览(78)
  • 实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

    实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的 OLAP 分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。本文会分享基于 Flink

    2024年02月16日
    浏览(48)
  • Flink 实时数仓 (一) --------- 数据采集层

    1. 普通实时计算与实时数仓比较 普通的实时计算优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差,开发成本随着需求增加直线上升

    2024年02月06日
    浏览(47)
  • Flink实时数仓同步:拉链表实战详解

    在大数据领域,业务数据通常最初存储在关系型数据库,例如MySQL。然而,为了满足日常分析和报表等需求,大数据平台会采用多种不同的存储方式来容纳这些业务数据。这些存储方式包括离线仓库、实时仓库等,根据不同的业务需求和数据特性进行选择。 举例来说,假设业

    2024年01月20日
    浏览(56)
  • flink 实时数仓构建与开发[记录一些坑]

    1、业务库使用pg数据库, 业务数据可以改动任意时间段数据 2、监听采集业务库数据,实时捕捉业务库数据变更,同时实时变更目标表和报表数据 实时数据流图与分层设计说明 1、debezium采集pg库表数据同步到kafka 【kafka模式】 2、flink 消费kafka写入pg或kafka 【upset-kafka,新版k

    2024年02月16日
    浏览(39)
  • GaussDB(DWS)基于Flink的实时数仓构建

    本文分享自华为云社区《GaussDB(DWS)基于Flink的实时数仓构建》,作者:胡辣汤。 大数据时代,厂商对实时数据分析的诉求越来越强烈,数据分析时效从T+1时效趋向于T+0时效,为了给客户提供极速分析查询能力,华为云数仓GaussDB(DWS)基于流处理框架Flink实现了实时数仓构建。在

    2024年04月22日
    浏览(43)
  • 美团买菜基于 Flink 的实时数仓建设

    美团买菜是美团自营生鲜零售平台,上面所有的商品都由美团亲自采购,并通过供应链物流体系,运输到距离用户 3km 范围内的服务站。用户从美团买菜平台下单后,商品会从服务站送到用户手中,最快 30 分钟内。 上图中,左侧的时间轴展示了美团买菜的发展历程,右侧展示

    2024年02月09日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包