基于数据湖的流批一体:flink1.15.3与Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

这篇具有很好参考价值的文章主要介绍了基于数据湖的流批一体:flink1.15.3与Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:为实现基于数据湖的流批一体,采用业内主流技术栈hudi、flink、CDH(hive、spark)。flink使用sql client与hive的catalog打通,可以与hive共享元数据,使用sql client可操作hive中的表,实现批流一体;flink与hudi集成可以实现数据实时入湖;hudi与hive集成可以实现湖仓一体,用flink实时入湖,用spark跑批处理。由于方案中中采用的CDH6.3.2是官方最后的开源版本,而flink与hudi是社区近期发布的开源版,网上几乎没有关于它们集成的资料,近期为完成它们集成费了不少神,特写出来分享给大家,有问题可一起交流。

以下为实现hudi、flink、CDH(hive、spark)集成的过程:

1、由于cdh的Hadoop版本是3.0.0-cdh6.3.2,所以需要重新编译flink-sql-connector-hive-2.2.0,修改内容如下:

1)修改flink-1.15.3/pom.xml的Hadoop.version为3.0.0-cdh6.3.2

flink hive hudi,hive,hadoop,大数据

2)新增repository

<repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

flink hive hudi,hive,hadoop,大数据

3)修改flink-1.15.3/flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml

flink hive hudi,hive,hadoop,大数据

2、编译flink-sql-connector-hive-2.2.0,在flink-1.15.3目录执行以下命令:

mvn clean install -DskipTests -Dfast -Dhadoop.version=3.0.0-cdh6.3.2 -Dskip.npm idea:idea -pl flink-connectors/flink-sql-connector-hive-2.2.0

3、部署flink,flink15.3支支持0.12.1版本的hudi

flink hive hudi,hive,hadoop,大数据

4、flink安装包下载路径https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz,将下载的安装包,上传到服务目录/opt/modules,执行以下命令:

tar -zcxf flink-1.15.3-bin-scala_2.12.tgz
mv flink-1.15.3-bin-scala_2.1 flink

 5、上传依赖包到flink的lib中

cp /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/client/guava-11.0.2.jar /opt/modules/flink/lib/

cp /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/lib/libfb303-0.9.3.jar /opt/modules/flink/lib/

cp /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/client/hadoop-mapreduce-client-core-3.0.0-cdh6.3.2.jar /opt/modules/flink/lib/

#上传mysql-connector-java-5.1.47.jar到/opt/modules/flink/lib/中

#上传flink-sql-connector-hive-2.2.0_2.12-1.15.3.jar到/opt/modules/flink/lib/中

#上传hudi-flink1.15-bundle-0.12.1.jar到/opt/modules/flink/lib/中

6、配置环境变量

sudo vim /etc/profile
#新增以下环境变量
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
export HADOOP_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HIVE_CONF_DIR=/etc/hive/conf
export HIVE_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive

source /etc/profile

7、修改flink-conf.yaml配置

classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop01:8020/ckps
state.backend.incremental: true

#为解决异常:flink you can disable this check with the configuration ‘classloader.check-leaked-classloader‘,增加以下配置
classloader.check-leaked-classloader: false

8、创建sql-client初始化脚本

vim /opt/modules/flink/conf/hive_catalog.sql

CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/etc/hive/conf'
);

USE CATALOG hive_catalog;

9、启动Flink的sql-client

chmod 777 /opt/modules/flink/log/flink-hive-sql-client-hadoop01.log
chmod 777 /opt/modules/flink/log

/opt/modules/flink/bin/sql-client.sh embedded
/opt/modules/flink/bin/sql-client.sh embedded -i /opt/modules/flink/conf/hive_catalog.sql -s yarn-session

10、Flink的sql-clinet的使用文章来源地址https://www.toymoban.com/news/detail-664192.html

#查看数据库
show databases;
#切换数据库
use hudi;
#查看表
show tables;
#创建mor表并关闭compaction,因为每次compaction需要消耗大量内存,干扰写流程,采用离线compaction任务更稳定
CREATE TABLE ods_hudi_flink (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price int,
ts int,
dt VARCHAR(10))
PARTITIONED BY (dt)
WITH ('connector' = 'hudi',
'path' = 'hdfs://hadoop01:8020/user/hive/hudi/ods_hudi_flink',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.parquet.compression.codec'= 'snappy',
'write.operation' = 'upsert',
'compaction.async.enabled' = 'false',
'compaction.schedule.enabled' = 'true',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hadoop01:9083',
'hive_sync.conf.dir'='/etc/hive/conf',
'hive_sync.db' = 'hudi',
'hive_sync.table' = 'ods_hudi_flink',
'hive_sync.partition_fields' = 'dt',
'hive_sync.assume_date_partitioning' = 'false',
'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor',
'hive_sync.support_timestamp'= 'true'
);

#插入数据
insert into ods_hudi_flink values (1,'hudi1',5,10,'2023-1-31'),(2,'hudi2',10,10,'2023-1-31');
#在flink中查询
set sql-client.execution.result-mode=tableau;
select * from ods_hudi_flink;
#在hive中查询,由于该表配置了离线compaction,所以需要做compaction以后才能查询
/opt/modules/flink/bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor /opt/modules/flink/lib/hudi-flink1.15-bundle-0.12.1.jar --path hdfs://hadoop01:8020/user/hive/hudi/ods_hudi_flink
#注意:接入无界流可以在表中配置自动compaction

#在hive中生产两个表ods_hudi_flink_ro,ods_hudi_flink_rt,增量数据写入ods_hudi_flink_rt,执行compaction后数据会写入ods_hudi_flink_ro,执行compaction前数据记录在log中,执行compaction后记录写到parquet文件

#cow模式建表示例
CREATE TABLE test_user2 (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
city varchar(100),
dt VARCHAR(10))
PARTITIONED BY (dt)
WITH ('connector' = 'hudi',
'path' = 'hdfs://hadoop01:8020/user/hive/hudi/test_user2',
'table.type' = 'COPY_ON_WRITE',
'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.parquet.compression.codec'= 'snappy',
'write.operation' = 'upsert',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hadoop01:9083',
'hive_sync.conf.dir'='/etc/hive/conf',
'hive_sync.db' = 'hudi',
'hive_sync.table' = 'test_user2',
'hive_sync.partition_fields' = 'dt',
'hive_sync.assume_date_partitioning' = 'false',
'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor',
'hive_sync.support_timestamp'= 'true'
);

到了这里,关于基于数据湖的流批一体:flink1.15.3与Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

    前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。 Tips:我觉得学习 Flink 还是挺有意思的

    2024年02月19日
    浏览(40)
  • Flink流批一体计算(1):流批一体和Flink概述

    Apache Flink应运而生 数字化经济革命的浪潮正在颠覆性地改变着人类的工作方式和生活方式,数字化经济在全球经济增长中扮演着越来越重要的角色,以互联网、云计算、大数据、物联网、人工智能为代表的数字技术近几年发展迅猛,数字技术与传统产业的深度融合释放出巨大

    2024年02月10日
    浏览(40)
  • flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

    ⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。 ⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。 经过测试 在fl

    2024年02月22日
    浏览(49)
  • Flink流批一体计算(7):Flink优化

    目录 配置内存 设置并行度 操作场景 具体设置 补充 配置进程参数 操作场景 具体配置 配置netty网络通信 操作场景 具体配置 配置内存 Flink 是依赖内存计算,计算过程中内存不够对 Flink 的执行效率影响很大。可以通过监控 GC ( Garbage Collection ),评估内存使用及剩余情况来判

    2024年02月12日
    浏览(45)
  • Flink流批一体计算(2):Flink关键特性

    目录 Flink关键特性 流式处理 丰富的状态管理 丰富的时间语义支持    Data pipeline 容错机制 Flink SQL CEP in SQL Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis )的实时数据,也可以从各种的数据源中消费有界的历史数据。同样, Fli

    2024年02月10日
    浏览(43)
  • Flink流批一体计算(9):Flink Python

    目录 使用Python依赖 使用自定义的Python虚拟环境 方式一:在集群中的某个节点创建Python虚拟环境 方式二:在本地开发机创建Python虚拟环境 使用JAR包 使用数据文件 使用Python依赖 通过以下场景为您介绍如何使用Python依赖: 使用自定义的Python虚拟环境 使用第三方Python包 使用J

    2024年02月12日
    浏览(37)
  • Flink流批一体计算(3):FLink作业调度

    架构 所有的分布式计算引擎都需要有集群的资源管理器,例如:可以把MapReduce、Spark程序运行在YARN集群中、或者是Mesos中。Flink也是一个分布式计算引擎,要运行Flink程序,也需要一个资源管理器。而学习每一种分布式计算引擎,首先需要搞清楚的就是:我们开发的分布式应用

    2024年02月10日
    浏览(45)
  • Flink流批一体计算(4):Flink功能模块

    目录 Flink功能架构 Flink输入输出 Flink功能架构 Flink是分层架构的分布式计算引擎,每层的实现依赖下层提供的服务,同时提供抽象的接口和服务供上层使用。 Flink 架构可以分为4层,包括Deploy部署层、Core核心层、API层和Library层 部署层:主要涉及Flink的部署模式。Flink支持多种

    2024年02月10日
    浏览(48)
  • Flink流批一体计算(5):部署运行模式

    目录 集群运行模式 1.local模式 2.standalone模式 3.Flink on YARN模式 本地模式 Standalone 模式 Flink on Yarn 模式 集群运行模式 类似于 Spark , Flink 也有各种运行模式,其中主要支持三种: local 模式、 standalone 模式以及 Flink on YARN 模式。 每种模式都有特定的使用场景,接下来一起了解一

    2024年02月10日
    浏览(41)
  • StreamX流批一体一站式大数据平台:大数据Flink可视化工具的革命性突破,让你的数据更高效、更直观!

    介绍:StreamX,开源的流批一体一站式大数据平台,致力于让Flink开发更简单。它极大地降低了学习成本和开发门槛,使开发者可以专注于最核心的业务。StreamX支持Flink多版本, 与Flink SQL WebIDE兼容,并可以进行Flink SQL校验。此外,StreamX还提供了一套标准化的配置、开发、测试

    2024年01月17日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包