阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse

这篇具有很好参考价值的文章主要介绍了阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

01

背景信息

数据湖与传统的数据仓库相比,可以更灵活地处理各种类型的数据,并支持高度可扩展的存储,通常被用于大数据分析。为了支持准实时乃至实时的数据处理,数据湖需要能够快速地接收和存储数据(数据入湖),同时提供低延迟的查询性能以满足分析需求。

Apache Paimon 和 Apache Hudi 作为数据湖存储格式,有着高吞吐的写入和低延迟的查询性能,是构建数据湖的常用组件。本文将在阿里云EMR[1] 上,针对数据实时入湖场景,对 Paimon 和 Hudi 的性能进行比对,然后分别以 Paimon 和 Hudi 作为统一存储搭建准实时数仓。

02

集群环境

本文使用的集群环境是最新的阿里云 EMR 5.16.0(预计1月份正式发布),集群节点的属性如下:

  • master: 1 * ecs.g7.2xlarge 8 vCPU 32 GiB

  • core: 4 * ecs.g7.6xlarge 24 vCPU 96 GiB

使用的组件及版本如下:

  • Paimon: 0.7-SNAPSHOT(Paimon社区0.6 release版本)

  • Hudi: 0.14.0

  • Flink: 1.15

  • Spark: 3.3.1

  • OSS-HDFS: 1.0.0

本文主要由两部分组成,分别是 Paimon 和 Hudi 数据实时入湖性能测试(Flink),以及 Paimon 和 Hudi 准实时数仓全链路搭建(Flink + Spark),测试数据均存储在 EMR 的 OSS-HDFS 中。

03

数据实时入湖

数据实时入湖是数据湖格式的一个重要应用场景,也是构建实时湖仓的第一步。本节测试参考的是 paimon-cluster-benchmark[2] 。按照实际的业务情况,划分了两个具体场景:upsert 场景(数据存在更新与订正)和纯 append 场景,在两个场景上分别检验 Paimon 和 Hudi 的读写能力。

本节测试将使用 Flink 流式入湖,部署模式是 Flink Standalone 模式,Flink 配置如下,由于 TM 运行内存对测试结果影响较大,分别统计 8g/16g/20g 下的测试结果。并且由于本测试不需要用到 TM 的 managed 内存,将其设为1m。

parallelism.default: 16
jobmanager.memory.process.size: 4g
taskmanager.numberOfTaskSlots: 1
taskmanager.memory.process.size: 8g/16g/20g
execution.checkpointing.interval: 2min
execution.checkpointing.max-concurrent-checkpoints: 3
taskmanager.memory.managed.size: 1m
state.backend: rocksdb
state.backend.incremental: true
table.exec.sink.upsert-materialize: NONE


1. upsert 场景

数据湖 upsert 用于更新或插入新数据。在进行 upsert 时,会检查待写的数据是否已存在于数据湖中。如果数据已存在,则更新该数据;如果数据不存在,则插入新数据。upsert 通常是基于某种唯一标识符或主键来判断数据是否已存在。

本节测试数据源由 Flink datagen 产生,随机生成主键范围为 0~100,000,000 的数据,然后使用 Flink 将数据分别流式写入 Paimon 和 Hudi 表中,并统计写入 5 亿条数据(经统计,此时单个 bucket 内的 parquet 文件总大小在 2GB 内)的总耗时。同时,我们还使用 Flink 以批读的方式读取写入的 Paimon 和 Hudi 表,并统计总耗时。

对于 upsert 场景,Paimon 选择 primary-key 表,Hudi 选择 merge-on-read 表,由于它们都支持 compaction,所以测试将进一步划分为关闭和开启 compaction。

  • 关闭 compaction

Paimon 表的配置如下,bucket 个数与 Flink 的并行度一致,设置为 16。由于 Hudi 默认文件格式为 parquet 格式,为了与 Hudi 保持一致,后续均采用 parquet 作为文件输出格式,压缩方式统一设为 snappy。

'bucket' = '16',
'file.format' = 'parquet',
'file.compression' = 'snappy',
'write-only' = 'true'

Hudi 表的配置如下,采用 BUCKET index,桶个数为 16,与 Flink 并行度一致。由于 Hudi MOR 表的读取会受到参数 compaction.max_memory 的影响,将其配置为 taskmanager.memory.process.size 的一半。

'table.type' = 'MERGE_ON_READ',
'metadata.enabled' = 'false',
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '16',
'write.operation' = 'upsert',
'write.tasks' = '16',
'hoodie.parquet.compression.codec' = 'snappy',
'read.tasks' = '16',
'compaction.schedule.enabled' = 'false',
'compaction.async.enabled' = 'false',
'compaction.max_memory' = '4096/8192/10240' -- TM process memory的一半

测试结果如下:

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

可以发现在 upsert 场景,关闭 compaction 时,Paimon 读写性能均优于 Hudi,且 Hudi 对 TM 的内存要求更高。

  • 开启 compaction

Paimon 配置:

'bucket' = '16',
'file.format' = 'parquet',
'file.compression' = 'snappy',
'num-sorted-run.compaction-trigger' = '5' -- 默认配置

Hudi 配置:

由于测试所需的总耗时不多(checkpoint 个数也相应较少),并且随着未 compaction 的 log 文件增加,Hudi 需要的 compaction 内存将变得更大,因此配置 compaction.delta_commits 为 2 来保证在写入期间有 compaction 执行完成。

'table.type' = 'MERGE_ON_READ',
'metadata.enabled' = 'false',
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '16',
'write.operation' = 'upsert',
'write.tasks' = '16',
'hoodie.parquet.compression.codec' = 'snappy',
'read.tasks' = '16',
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'true',
'compaction.tasks' = '16',
'compaction.delta_commits' = '2'
'compaction.max_memory' = '4096/8192/10240' -- TM process memory的一半

测试结果如下:

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

在 upsert 场景,开启 compaction 时,Paimon 读写性能均优于 Hudi。对比前面的关闭 compaction 测试,Paimon 和 Hudi 的写性能均有所下降,但读性能得到提升。

Hudi 的 compaction 比较消耗内存,运行时间较长,并且它是异步执行,当写入任务完成时,未完成的 compaction 是不会继续执行的。观察发现,当 TM 内存给到20G时,Hudi 仍有 4 个 delta commits 未被 compaction(即使配置了compaction.delta_commits=2)。并且,Paimon 的 compaction 默认也不是 full compaction。因此,我们还做了以下补充测试,手动对 Paimon 和 Hudi 做一次 full compaction,然后对比读取数据的时间,结果如下:

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

2. append 场景

数据入湖的另一种场景是数据 append 写,比如日志入湖。

本节测试数据源同样由 Flink datagen 产生,然后使用 Flink 写入 Paimon 和 Hudi 表中,同样统计使用 Flink 写入5亿条数据(在 append 场景 Paimon 和 Hudi 均不需要 bucket)的总耗时;以及使用 Flink 批读已写入的 Paimon 和 Hudi 表的总耗时。

Paimon 表的配置:

'bucket' = '-1',
'file.format' = 'parquet',
'file.compression' = 'snappy'

Hudi 表的配置:

由于单个批次数据量足够大,不存在小文件问题,因此关闭 clustering:

'table.type' = 'COPY_ON_WRITE',
'metadata.enabled' = 'false',
'write.operation' = 'insert',
'write.tasks' = '16',
'hoodie.parquet.compression.codec' = 'snappy',
'read.tasks' = '16',
'write.insert.cluster' = 'false',
'clustering.schedule.enabled' = 'false',
'clustering.async.enabled' = 'false'

测试结果如下:

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

在 append 场景,Paimon 读写性能均优于 Hudi,且二者都对 TM 内存要求均不高。

04

准实时数仓

在数据入湖之后,基于数据湖格式+流式引擎的强大能力,可以进一步构建一体化实时数仓。本节将分别以 Paimon 和 Hudi 为统一存储,在经典的电商场景下搭建一套准实时数仓,数仓具体有以下几层:

1. ODS 层:通过 Flink 的 datagen connector 产生 orders(订单表,包含原始订单信息),再通过 Flink 实时写入,作为 ODS 层。

2. DWM 层:通过 Spark streaming 实时消费 ODS 层,产出 DWM 层 dwm_shop_users(用户-商户聚合中间表,包含中间聚合指标)。

3. DWS 层:通过 Spark streaming 实时消费 DWM 层的 changelog 数据,构建 DWS 层 dws_users(用户聚合指标表)以及 dws_shops(商户聚合指标表)。

  • datagen -> ODS

该层使用 Flink 实时入湖,为了更贴近生产环境,Flink 以 Yarn Session 模式启动,同时由于数据链路的增加,为了合理分配资源,对内存和并行度做出以下调整:

yarn-session.sh -Dparallelism.default=8 \
                -Djobmanager.memory.process.size=2g \
                -Dtaskmanager.numberOfTaskSlots=2 \
                -Dtaskmanager.memory.process.size=8g \
                -Dtaskmanager.memory.managed.size=1m \
                -Dexecution.checkpointing.interval=2min \
                -Dexecution.checkpointing.max-concurrent-checkpoints=3 \
                -Dstate.backend=rocksdb \
                -Dstate.backend.incremental=true \
                -Dtable.exec.sink.upsert-materialize=NONE \
                --detached

datagen 建表语句如下,rows-per-second 调整为 10000

CREATE TEMPORARY TABLE datagen_orders
(
  order_name         STRING
  ,order_user_id     BIGINT
  ,order_shop_id     BIGINT
  ,order_product_id  BIGINT
  ,order_fee         DECIMAL(20, 2)
  ,order_state       INT
)
WITH (
  'connector' = 'datagen'
  ,'rows-per-second' = '10000'
  ,'fields.order_user_id.kind' = 'random'
  ,'fields.order_user_id.min' = '1'
  ,'fields.order_user_id.max' = '10000'
  ,'fields.order_shop_id.kind' = 'random'
  ,'fields.order_shop_id.min' = '1'
  ,'fields.order_shop_id.max' = '10000'
  ,'fields.order_product_id.kind' = 'random'
  ,'fields.order_product_id.min' = '1'
  ,'fields.order_product_id.max' = '1000'
  ,'fields.order_fee.kind' = 'random'
  ,'fields.order_fee.min' = '0.1'
  ,'fields.order_fee.max' = '10.0'
  ,'fields.order_state.kind' = 'random'
  ,'fields.order_state.min' = '1'
  ,'fields.order_state.max' = '5'
);

Paimon 建表和写入语句如下:

CREATE TABLE IF NOT EXISTS paimon_catalog.order_dw.ods_orders
(
  order_id           STRING
  ,order_name        STRING
  ,order_user_id     BIGINT
  ,order_shop_id     BIGINT
  ,order_product_id  BIGINT
  ,order_fee         DECIMAL(20, 2)
  ,order_create_time TIMESTAMP(3)
  ,order_update_time TIMESTAMP(3)
  ,order_state       INT
)
WITH (
  'bucket' = '-1',
  'file.format' = 'parquet',
  'file.compression' = 'snappy'
);


INSERT INTO paimon_catalog.order_dw.ods_orders
SELECT
  UUID() AS order_id
  ,order_name
  ,order_user_id
  ,order_shop_id
  ,order_product_id
  ,order_fee
  ,NOW() AS order_create_time
  ,NOW() AS order_update_time
  ,order_state
FROM datagen_orders;

Hudi 建表和写入语句如下:

create TEMPORARY table ods_orders
(
  order_id           STRING
  ,order_name        STRING
  ,order_user_id     BIGINT
  ,order_shop_id     BIGINT
  ,order_product_id  BIGINT
  ,order_fee         DECIMAL(20, 2)
  ,order_create_time TIMESTAMP(3)
  ,order_update_time TIMESTAMP(3)
  ,order_state       INT
)
WITH (
    'connector' = 'hudi'
    ,'path' = '/xxx/hudi/order_dw.db/ods_orders'
    ,'precombine.field' = 'order_update_time'
    ,'table.type' = 'COPY_ON_WRITE'
    ,'hoodie.database.name' = 'order_dw'
    ,'hoodie.table.name' = 'ods_orders'
    ,'hoodie.datasource.write.recordkey.field' = 'order_id'
    ,'metadata.enabled' = 'false'
    ,'write.operation' = 'insert'
    ,'write.tasks' = '8'
    ,'hoodie.parquet.compression.codec' = 'snappy'
    ,'write.insert.cluster' = 'false'
    ,'clustering.schedule.enabled' = 'false'
    ,'clustering.async.enabled' = 'false'
)
;


INSERT INTO ods_orders
SELECT
  UUID() AS order_id
  ,order_name
  ,order_user_id
  ,order_shop_id
  ,order_product_id
  ,order_fee
  ,NOW() AS order_create_time
  ,NOW() AS order_update_time
  ,order_state
FROM datagen_orders;
  • ODS -> DWM

对于 Paimon 表,依靠其本身的聚合引擎能力,通过简单的配置(merge-engine)即可方便地聚合消费 pv 和总金额,从而构建用户-商户聚合中间表。同时由于下游需要读取 changelog,配置 changelog-producer为lookup。

CREATE TABLE paimon_catalog.order_dw.dwm_shop_users
(
  shop_id  BIGINT
  ,user_id BIGINT
  ,ds      STRING COMMENT '小时'
  ,pv      BIGINT COMMENT '该小时内,该用户在该商户的消费次数'
  ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该用户在该商户的消费总金额'
)
tblproperties (
  'primary-key' = 'shop_id, user_id, ds'
  ,'bucket' = '8'
  ,'changelog-producer' = 'lookup'
  ,'file.format' = 'parquet'
  ,'file.compression' = 'snappy'
  ,'merge-engine' = 'aggregation'
  ,'fields.pv.aggregate-function' = 'sum'
  ,'fields.fee_sum.aggregate-function' = 'sum'
  ,'metadata.stats-mode' = 'none'
);

Paimon Spark Streaming 作业示例代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{date_format, lit}


object PaimonOds2DwmJob {


  def main(args: Array[String]): Unit = {


    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation = "/xxx/paimon/order_dw.db/ods_orders"
    val targetLocation = "/xxx/paimon/order_dw.db/dwm_shop_users"
    val checkpointDir = "/xxx/paimon/order_dw.db/dwm_shop_users_checkpoint"
    import spark.implicits._


    spark.readStream
      .format("paimon")
      .load(sourceLocation)
      .select(
        $"order_shop_id",
        $"order_user_id",
        date_format($"order_create_time", "yyyyMMddHH").alias("ds"),
        lit(1L),
        $"order_fee"
      )
      .writeStream
      .format("paimon")
      .option("checkpointLocation", checkpointDir)
      .start(targetLocation)
    
    spark.streams.awaitAnyTermination()
  }
}

对于 Hudi 表,想要实现类似的聚合操作,则需要依赖于自定义 Payload 或者 Merger 来实现,本文采用自定义 merger 实现,对 key 相同的记录的 uv、pv、fee_sum 字段进行聚合,核心逻辑如下: 

public class OrdersLakeHouseMerger extends HoodieAvroRecordMerger {
  @Override
  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
    // ...
    Object oldData = older.getData();
    GenericData.Record oldRecord = (oldData instanceof HoodieRecordPayload)
        ? (GenericData.Record) ((HoodieRecordPayload) older.getData()).getInsertValue(oldSchema).get()
        : (GenericData.Record) oldData;


    Object newData = newer.getData();
    GenericData.Record newRecord = (newData instanceof HoodieRecordPayload)
        ? (GenericData.Record) ((HoodieRecordPayload) newer.getData()).getInsertValue(newSchema).get()
        : (GenericData.Record) newData;


    // merge uv
    if (HoodieAvroUtils.getFieldVal(newRecord, "uv") != null && HoodieAvroUtils.getFieldVal(oldRecord, "uv") != null) {
      newRecord.put("uv", (Long) oldRecord.get("uv") + (Long) newRecord.get("uv"));
    }


    // merge pv
    if (HoodieAvroUtils.getFieldVal(newRecord, "pv") != null && HoodieAvroUtils.getFieldVal(oldRecord, "pv") != null) {
      newRecord.put("pv", (Long) oldRecord.get("pv") + (Long) newRecord.get("pv"));
    }


    // merge fee_sum
    if (HoodieAvroUtils.getFieldVal(newRecord, "fee_sum") != null && HoodieAvroUtils.getFieldVal(oldRecord, "fee_sum") != null) {
      BigDecimal l = new BigDecimal(new BigInteger(((GenericData.Fixed) oldRecord.get("fee_sum")).bytes()), 2);
      BigDecimal r = new BigDecimal(new BigInteger(((GenericData.Fixed) newRecord.get("fee_sum")).bytes()), 2);
      byte[] bytes = l.add(r).unscaledValue().toByteArray();
      byte[] paddedBytes = new byte[9];
      System.arraycopy(bytes, 0, paddedBytes, 9 - bytes.length, bytes.length);
      newRecord.put("fee_sum", new GenericData.Fixed(((GenericData.Fixed) newRecord.get("fee_sum")).getSchema(), paddedBytes));
    }
    HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(newRecord);
    return Option.of(Pair.of(hoodieAvroIndexedRecord, newSchema));
  }
}

Hudi Spark Streaming 作业示例代码如下,由于下游需要读取 changelog,配置 hoodie.table.cdc.enabled为true。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{date_format, lit}


object Ods2DwmJob {


  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation ="/xxx/hudi/order_dw.db/ods_orders"
    val targetLocation = "/xxx/hudi/order_dw.db/dwm_shop_users"
    val checkpointDir = "/xxx/hudi/order_dw.db/dwm_shop_users_checkpoint"


    import spark.implicits._


    spark.readStream
      .format("hudi")
      .load(sourceLocation)
      .select(
        $"order_shop_id".alias("shop_id"),
        $"order_user_id".alias("user_id"),
        date_format($"order_create_time", "yyyyMMddHH").alias("ds"),
        lit(1L).alias("pv"),
        $"order_fee".alias("fee_sum")
      )
      .writeStream
      .format("hudi")
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.datasource.write.recordkey.field", "shop_id, user_id, ds")
      .option("hoodie.datasource.write.precombine.field", "ds")
      .option("hoodie.database.name", "order_dw")
      .option("hoodie.table.name", "dwm_shop_users")
      .option("hoodie.metadata.enable", "false")
      .option("hoodie.index.type", "BUCKET")
      .option("hoodie.bucket.index.num.buckets", "8")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
      .option("hoodie.parquet.compression.codec", "snappy")
      .option("hoodie.table.cdc.enabled", "true")
      .option("hoodie.table.cdc.supplemental.logging.mode", "data_before_after")
      .option("checkpointLocation", checkpointDir)
      .start(targetLocation)
    
    spark.streams.awaitAnyTermination()
  }
}

最后将作业分别提交任务到 yarn:

spark-submit --class Ods2DwmJob \
             --master yarn \
             --deploy-mode cluster \
             --name PaimonOds2DwmJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=16g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             ./paimon-spark-streaming-example.jar


spark-submit --class Ods2DwmJob \
             --master yarn \
             --deploy-mode cluster \
             --name HudiOds2DwmJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=16g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
             --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
             --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
             ./hudi-spark-streaming-example.jar

性能对比

在上述资源下,当作业稳定运行 100 个 batch(3小时左右)后的 Streaming 作业 UI 如下:

此时,Paimon 单个 batch 写入时间为 40s 左右。

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

Hudi 单个 batch 写入时间为 65s 左右。

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

  • DWM -> DWS

Paimon SparkSQL 建表语句如下,仍然配置聚合引擎对指定字段进行聚合:

CREATE TABLE paimon_catalog.order_dw.dws_users
(
  user_id  BIGINT
  ,ds      STRING COMMENT '小时'
  ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该用户的消费总金额'
)
tblproperties (
  'primary-key' = 'user_id, ds'
  ,'bucket' = '8'
  ,'merge-engine' = 'aggregation'
  ,'fields.fee_sum.aggregate-function' = 'sum'
);


CREATE TABLE paimon_catalog.order_dw.dws_shops
(
  shop_id  BIGINT
  ,ds      STRING COMMENT '小时'
  ,uv      BIGINT COMMENT '该小时内,该商户的消费总人数'
  ,pv      BIGINT COMMENT '该小时内,该商户的消费总次数'
  ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该商户的消费总金额'
)
tblproperties (
  'primary-key' = 'shop_id, ds'
  ,'bucket' = '8'
  ,'merge-engine' = 'aggregation'
  ,'fields.uv.aggregate-function' = 'sum'
  ,'fields.pv.aggregate-function' = 'sum'
  ,'fields.fee_sum.aggregate-function' = 'sum'
  );

Paimon Spark Streaming Dwm2DwsJob 如下,由于需要流读上游 changelog,配置 read.changelog 为 true。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{lit, when}


object Dwm2DwsJob {
  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation = "/xxx/paimon/order_dw.db/dwm_shop_users"
    val targetLocation1 = "/xxx/paimon/order_dw.db/dws_users"
    val checkpointDir1 = "/xxx/paimon/order_dw.db/dws_users_checkpoint"
    val targetLocation2 = "/xxx/paimon/order_dw.db/dws_shops"
    val checkpointDir2 = "/xxx/paimon/order_dw.db/dws_shops_checkpoint"
    
    import spark.implicits._


    val df = spark.readStream
      .format("paimon")
      .option("read.changelog", "true")
      .load(sourceLocation)


    df.select(
      $"user_id",
      $"ds",
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"fee_sum")
        .otherwise($"fee_sum" * -1)
        .alias("fee_sum"))
      .writeStream
      .format("paimon")
      .option("checkpointLocation", checkpointDir1)
      .start(targetLocation1)


    df.select(
      $"shop_id",
      $"ds",
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", lit(1L)).otherwise(lit(-1L)).alias("uv"),
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"pv").otherwise($"pv" * -1).alias("pv"),
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"fee_sum")
        .otherwise($"fee_sum" * -1)
        .alias("fee_sum")
      .writeStream
      .format("paimon")
      .option("checkpointLocation", checkpointDir2)
      .start(targetLocation2)
    
    spark.streams.awaitAnyTermination()
  }
}

Hudi Spark Streaming Dwm2DwsJob 如下,可复用上一层定义的 Merger。由于 Hudi 也需要流读changelog,配置 hoodie.datasource.query.type 为 incremental 以及 hoodie.datasource.query.incremental.format 为 cdc。Hudi 的 changelog 格式和 Paimon 不同,数据处理逻辑和 Paimon 略有不同。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, get_json_object, lit, when}
import org.apache.spark.sql.types.{DecimalType, LongType}


object Dwm2DwsJob {


  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation ="/xxx/hudi/order_dw.db/dwm_shop_users"
    val targetLocation1 = "/xxx/hudi/order_dw.db/dws_users"
    val checkpointDir1 = "/xxx/hudi/order_dw.db/dws_users_checkpoint"
    val targetLocation2 = "/xxx/hudi/order_dw.db/dws_shops"
    val checkpointDir2 = "/xxx/hudi/order_dw.db/dws_shops_checkpoint"


    import spark.implicits._


    val df = spark.readStream
      .format("hudi")
      .option("hoodie.datasource.query.type", "incremental")
      .option("hoodie.datasource.query.incremental.format", "cdc")
      .load(sourceLocation)


    df.select(
      get_json_object($"after", "$.user_id").cast(LongType).alias("user_id"),
      get_json_object($"after", "$.ds").alias("ds"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)) - get_json_object($"before", "$.fee_sum").cast(DecimalType(20, 2)))
        .otherwise(get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)))
        .alias("fee_sum"))
      .writeStream
      .format("hudi")
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.datasource.write.recordkey.field", "user_id, ds")
      .option("hoodie.datasource.write.precombine.field", "ds")
      .option("hoodie.database.name", "order_dw")
      .option("hoodie.table.name", "dws_users")
      .option("hoodie.metadata.enable", "false")
      .option("hoodie.index.type", "BUCKET")
      .option("hoodie.bucket.index.num.buckets", "8")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
      .option("hoodie.parquet.compression.codec", "snappy")
      .option("checkpointLocation", checkpointDir1)
      .start(targetLocation1)


    df.select(
      get_json_object($"after", "$.shop_id").cast(LongType).alias("shop_id"),
      get_json_object($"after", "$.ds").alias("ds"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, lit(0L)).otherwise(lit(1L)).alias("uv"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.pv").cast(LongType) - get_json_object($"before", "$.pv").cast(LongType))
        .otherwise(get_json_object($"after", "$.pv").cast(LongType))
        .alias("pv"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)) - get_json_object($"before", "$.fee_sum").cast(DecimalType(20, 2)))
        .otherwise(get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)))
        .alias("fee_sum"))
      .writeStream
      .format("hudi")
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.datasource.write.recordkey.field", "shop_id, ds")
      .option("hoodie.datasource.write.precombine.field", "ds")
      .option("hoodie.database.name", "order_dw")
      .option("hoodie.table.name", "dws_shops")
      .option("hoodie.metadata.enable", "false")
      .option("hoodie.index.type", "BUCKET")
      .option("hoodie.bucket.index.num.buckets", "8")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
      .option("hoodie.parquet.compression.codec", "snappy")
      .option("checkpointLocation", checkpointDir2)
      .start(targetLocation2)
    
    spark.streams.awaitAnyTermination()
  }
}

最后将作业分别提交任务到 yarn:

spark-submit --class Dwm2DwsJob \
             --master yarn \
             --deploy-mode cluster \
             --name PaimonDwm2DwsJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=8g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             ./paimon-spark-streaming-example.jar


spark-submit --class Dwm2DwsJob \
             --master yarn \
             --deploy-mode cluster \
             --name HudiDwm2DwsJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=8g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
             --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
             --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
             ./hudi-spark-streaming-example.jar

性能对比

在上述资源下,当作业稳定运行 100 个 batch( 3 小时左右)后的 Streaming 作业 UI(以 dws_shops 表为例)如下:

此时,Paimon 单个 batch 写入时间为 10s 左右

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

Hudi 单个 batch 写入时间为 13s 左右

阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse,阿里云,云计算

  • SparkSQL 查询

在该场景下,我们可以查询 DWM 层的 dwm_shop_users 表作为其他业务场景的上游表,也可以查询 DWS 层数据直接用于应用分析或者报表展示,使用如下两个 SQL 查询:

-- SparkSQL 查询 ods_orders
select order_id, order_user_id, order_shop_id, order_fee, order_create_time
from order_dw.ods_orders 
order by order_create_time desc limit 10;


-- SparkSQL 查询 dws_shops
select shop_id, ds, uv, pv, fee_sum 
from order_dw.dws_shops 
where ds = '2023120100' order by ds, shop_id limit 10;

以上,我们分别以 Paimon 和 Hudi 完成了每小时增加 4 千万条记录(压缩后 10 GB )量级的实时 ETL 链路的搭建,均可以满足分钟级的生产场景的需求。

05

总结

1. 在实时入湖场景中,Paimon 具有比 Hudi 更强的读写性能,并且对内存的需求更小。

2. 在数仓 DWM、DWS 层构建过程中,由于 Paimon 内置了 mergeFunction 功能,可以通过配置参数直接构建聚合指标,而 Hudi 需要通过手动编写自定义 Payload 或者 Merger 来实现。

3. 在基于 Spark 构建的准实时数仓的各层链路中,Paimon 计算单个 batch 的耗时均比 Hudi 更短。


文章超链接:

[1]阿里云EMR
https://www.aliyun.com/product/bigdata/emapreduce

[2] paimon-cluster-benchmark

https://github.com/apache/incubator-paimon/tree/master/paimon-benchmark/paimon-cluster-benchmark


▼ 关注「Apache Spark 技术交流社区」,获取更多技术干货 ▼

点击阅读原文~文章来源地址https://www.toymoban.com/news/detail-808492.html

到了这里,关于阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 Flink CDC 构建 MySQL 的 Streaming ETL to MySQL

    CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛: • 数据同步:用于备份,容灾; • 数据分发:

    2024年02月03日
    浏览(45)
  • 【大数据】基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java / Scala 代码,也无需安装 IDE。 假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。

    2024年02月03日
    浏览(43)
  • 阿里云EMR集群搭建及使用

    目录 1.简介 1.什么是EMR 2.组成 3.与自建hadoop集群对比 4.产品架构 2.使用 1.创建EMR集群 1.登录EMR on ECS控制台 2.软件设置 3.硬件设置 3.基础配置 2.配置 1.组件配置 2.用户管理 3.安全组 4.Gateway 5.trino配置 6.ranger配置 7.LDAP认证 3.组件UI 4.监控告警 1.ECS磁盘内存等监控 2. EMR组件服务状

    2024年02月01日
    浏览(33)
  • 阿里云EMR2.0平台:让大数据更简单

    摘要:本文整理自阿里云资深技术专家李钰(绝顶)在 阿里云EMR2.0线上发布会 的分享。本篇内容主要分为三个部分: 1.EMR 平台概述 2.EMR2.0 新平台核心能力 3.总结 EMR 平台是开源大数据的云原生运行环境,阿里云EMR 根据云原生的特点,在弹性伸缩、稳定性、智能化和研发效能四

    2024年02月12日
    浏览(42)
  • 使用 Amazon EMR 构建您的数据分析平台

    众所周知,在现如今大数据时代,数据越来越重要。据Gartner最新趋势分析,数据分析将成为创新起源与企业核心能力。同时国际数据公司IDC和数据存储公司希捷的一份报告表示,我国产生的数据量将从2019年的约9.4ZB增至2025年的48.6ZB。 面对如此愈加繁杂和庞大的数据,很多公

    2023年04月08日
    浏览(41)
  • 阿里云EMR 2.0:定义下一代云原生智能数据湖

    摘要:本文整理自阿里云高级技术专家/数据湖存储负责人郑锴(铁杰);阿里云高级技术专家/开源大数据OLAP负责人范振(辰繁)在 阿里云EMR2.0线上发布会 的分享。 本篇内容主要介绍了阿里云云原生数据湖分析解决方案的三个核心要素: 1.全托管,湖存储; 2.一站式,湖管理;

    2024年02月05日
    浏览(55)
  • Spark Streaming + Kafka构建实时数据流

    1. 使用Apache Kafka构建实时数据流 参考文档链接:https://cloud.tencent.com/developer/article/1814030 2. 数据见UserBehavior.csv 数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集 根据这一csv文档运用Kafka模拟实时数据流,

    2024年02月12日
    浏览(44)
  • 推荐系统架构设计实践:Spark Streaming+Kafka构建实时推荐系统架构

    作者:禅与计算机程序设计艺术 推荐系统(Recommendation System)一直都是互联网领域一个非常火热的话题。其主要目标是在用户多样化的信息环境中,通过分析用户的偏好、消费习惯等数据,提供个性化的信息推送、商品推荐、购物指导等服务。如何设计一个推荐系统的架构及

    2024年02月08日
    浏览(53)
  • 基于 Alpine 环境源码构建 alibaba-tengine(阿里巴巴)的 Docker 镜像

    Alpine Linux 是一款极其轻量级的 Linux 发行版,基于 busybox ,多被当做 Docker 镜像的底包(基础镜像),在使用容器时或多或少都会接触到此系统,本篇文章我们以该镜像构建 tengine-alpine 镜像。 说明:此处以 alpine 3.18.3 tengine 3.0.0 为实验进行容器镜像构建。 小巧: 基于 Musl li

    2024年02月11日
    浏览(51)
  • 基于EMR的新一代数据湖存储加速技术详解

    摘要:本文整理自阿里云开源大数据平台数据湖存储团队孙大鹏在7月17日阿里云数据湖技术专场交流会的分享。本篇内容主要分为两个部分: 背景介绍 JindoData 数据湖存储解决方案 点击查看直播回放 大数据行业蓬勃发展,主要源自于通讯技术的发展,全球数据规模,预计2

    2024年02月02日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包