基于华为MRS实时消费Kafka通过Flink落盘至HDFS的Hive外部表的调度方案

这篇具有很好参考价值的文章主要介绍了基于华为MRS实时消费Kafka通过Flink落盘至HDFS的Hive外部表的调度方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


该需求为实时接收对手Topic,并进行消费落盘至Hive。

在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。
基于华为MRS实时消费Kafka通过Flink落盘至HDFS的Hive外部表的调度方案,Hadoop生态,Flink,华为,kafka,flink,fusioninsight,hdfs,hive

本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。

华为官方文档:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html

1 Kafka

1.1 Kerberos安全模式的认证与环境准备

着手开发前,需要将FushionInsight租户加入kafkaadmin组,保证有创建主题和消费主题的权限,在得到此权限时,切勿对集群中的主题进行危险操作。

保证租户权限后,开始准备开发环境。该步骤需要安装Idea客户端在windows本地,同时安装兼容的maven版本,华为MRS需要安装至少OpenJDK 1.8.0_332的版本。

运行环境的配置则需要在FushionInsight的web管理界面下载kafka的完整客户端,包括config配置文件也需要下载。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手动添加,同时应保证本地和集群能ping通。

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html

1.2 创建一个测试主题

在Linux环境中执行:

bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic

创建一个测试testTopic,创建成功后,FushionInsight的web界面会报topic只有一个分区副本的警告,请忽略它。

同时也可以开启两个新的终端窗口用于测试生产者和消费者:

  1. bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties
  2. bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html

1.3 消费主题的接收测试

通过以下网站下载华为MRS所需的样例代码:

https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0

下载样例代码之后需要在华为镜像站下载代码所需依赖,华为MRS所需的组件依赖不同于apache的开源版本,需要单独配置maven的setting文件华为中央仓库进行下载,在开发时,组件相关的依赖都需要用下载华为的。

镜像地址:

https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/

华为开源镜像站:

https://mirrors.huaweicloud.com/home

完成依赖和样例代码项目创建即可开发,在开发程序时,需要将用于安全认证的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在样例工程的“kafka-examples\src\main\resources”目录下。

在开发时,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。华为提供了LoginUtil相关接口来完成这些配置,样例代码中只需要配置用户自己租户名称和对应的keytab文件名称即可。

创建生产测试时,首先需要修改KafkaProperties类中的生产主题名,接下来在com.huawei.bigdata.kafka.example.Producer类中修改租户账号,keytab位置即可,运行成功后,会向主题推送100条测试数据,此时,我们在1.2小节中开启的消费者窗口就能接受到生产的数据。

在具体的测试中,需要控制消息发送的间隔和消息次数,方便后续开发Flink。一般来说,每秒发送一条,一直发送即可。

至此,Kafka的主题消费测试完成,接下来需要用Flink将主题落盘到HDFS。

如果运行代码时报和clock相关的错误,是因为本地时间和FushionInsight集群时间不一致所致,请将本地时间和服务器时间差控制在5分钟内。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html

2 Flink

1.1 Kerberos安全模式的认证与环境准备

用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。
基于华为MRS实时消费Kafka通过Flink落盘至HDFS的Hive外部表的调度方案,Hadoop生态,Flink,华为,kafka,flink,fusioninsight,hdfs,hive
图为Flink在华为MRS安全模式的认证体系。

对于Kafka的权限在章节1.1已经获取,另外要保证有yarn资源的使用权限,还需要对HDFS的/flink/flink-checkpoint目录获取权限,保证执行。有了相关权限之后,再下载kerberos认证凭据文件,keytab和conf。准备运行环境同Kafka类似,需要对Flink客户端进行配置,注意config文件应该在权限修改之后获取。

Flink整个系统存在三种认证方式,使用kerberos认证、使用security cookie进行认证、使用YARN内部的认证机制。在进行安全认证时,可以用flink自带的wordcount样例程序进行提交测试,根据提交结果反馈再进行适配,直到提交成功。如果报auth相关的错误,可能还是权限问题,可以尝试先将租户权限给到最大,谨慎操作,先保证代码能通。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html

1.2 Flink任务的开发

最终在yarn队列运行的flink程序是从本地idea打包,通过flink run提交的。前面安全模式已经打通,在开发时仍然是使用华为官方的flink样例代码进行修改调试。

在具体的flink程序开发中,由于是开启了kerberos认证的安全模式,需要加入判断安全模式登录的代码段在main方法,以下代码来自华为官方样例:

 if (LoginUtil.isSecurityModel()) {
            try {
                LOG.info("Securitymode start.");
                //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
                LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
            } catch (IOException e) {
                LOG.error("Security prepare failure.");
                LOG.error("The IOException occured : {}.", e);
                return;
            }
            LOG.info("Security prepare success.");
        }

对于具体需求的开发参照开源Flink的apache官方文档即可,只需要保证依赖是华为官方镜像站的。

在该需求中,是将消费的数据落盘到HDFS中。开发中要用到FlinkKafkaConsumer方法创建kafka消费者,拿到流数据。该方法在Flink1.17版本被弃用,但是Flink1.15仍然可以用,具体开发方法可参考Flink1.13的官方文档Apache Kafka 连接器。

FlinkKafkaConsumer方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

接收的Kafka数据,我们不需要处理,测试时直接测试主题的数据写入HDFS即可,需要用StreamingFileSink方法。该方法可以设置按照日期分桶,我们设置.withBucketAssigner为每天一个桶,保证每天消费的数据在一个文件中,同时用该方法传入日期格式参数yyyy-MM-dd,这样便于使用shell调度每日增量数据时日期变量的传递。

FileSink方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/

另外,关于Sink到HDFS的数据文件(part file) 生命周期有几种状态,其中当文件名为in-progress表示当前文件正在写入中,此时的文件是不能被Hive读到的,我们需要将该文件的状态通过checkpoint机制转变为Finished。需要配置env.enableCheckpointing(60000)开启checkpoint,该参数是60秒开启一次。

完成代码开发后无法在本地测试,只能通过maven打包到华为服务器,通过flink run提交到yarn,此时可以指定并行度及其他配置。

通过以上方法即可实现将我们测试主题中的数据存储在按照每天一个yyyy-mm-dd命名的文件夹中。

3 HDFS与Hive

HDFS与Hive的交互也可以使用FlinkSQL,但是考虑到未来对数据的加工过滤,在此需求中选择将数据落盘HDFS再通过Shell命令调度至Hive。

3.1 Shell脚本的编写思路

  1. source华为的环境,认证状态成功;

  2. 创建日期变量:c_date=$(date '+%Y-%m-%d')

  3. 在beeline -u中执行HiveSQL代码:

    • 使用beeline的变量函数--hivevar将在外部注册的c_date变量注册为hive beeline的变量;

    • 创建临时外部表,映射字段一行数据,建表语句中指定位置为Flink写入的当日日期变量的HDFS数据文件夹;

    • 将临时表中的数据解析,一般是json数据,可通过get_json_object函数解析为字段,insert into table存入贴源层正式表;

    • 删除临时表;

  4. 有需要的话,也可以添加日志路径,将执行结果追加至日志。

3.2 脚本测试方法

该脚本的执行原理是首先在刷新华为租户环境,然后创建时间变量,并且是yyyy-mm-dd格式,与flink写入在HDFS中的每日增量文件夹名相同;

然后在beeline客户端中注册beeline的变量,将linux的时间变量传入beeline;

解下来是建临时表,将HDFS中的增量数据先写入,再解析字段到下一层标准表,同时删除临时表,通过此方法即完成每天新增数据的导入。

需要注意的是,hive -e命令似乎由于认证安全设置,无法在华为集群节点机使用。

4 DolphinScheduler

通过将脚本文件挂在DS调度中,每天在Flink完成消费落盘后,即可执行该shell。DS的部署不在华为MRS集群,在客户端节点中,使用开源版本即可,DS更方便查看每天的调度执行日志。

需要注意的是,目前我的需求中每天的新增数据大约2000-10000条,可以在短时间内完成调度执行。文章来源地址https://www.toymoban.com/news/detail-800289.html

到了这里,关于基于华为MRS实时消费Kafka通过Flink落盘至HDFS的Hive外部表的调度方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • 基于Flink+kafka实时告警

    项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据。这种方式存在告警的延时实在是太高了。数据从产生到保存,从保存到判断都会存在时间差,按照保存数据定时5分钟一次,定时任务5分钟一次。最高会产生10分钟的误差,这

    2024年02月16日
    浏览(39)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

    目标 : 了解数据源的格式及实现模拟数据的生成 路径 step1:数据格式 step2:数据生成 实施 数据格式 消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号

    2024年02月04日
    浏览(43)
  • 在hadoop或docker环境下基于kafka和flink的实时计算大屏展示

    第一章 总体需求 1.1.课题背景 某股票交易机构已上线一个在线交易平台,平台注册用户量近千万,每日均 接受来自全国各地的分支机构用户提交的交易请求。鉴于公司发展及平台管理要 求,拟委托开发一个在线实时大数据系统,可实时观测股票交易大数据信息,展 示部分重

    2024年02月03日
    浏览(42)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

    目标 : 实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows版本安装手册.docx》安装FineBI 配置连接 数据准备 小结 实现FineBI访问MySQL结果数据集的配置 目标 : 实现FineBI实时报表构建 路径 step1:实时报表构建 step2:实时报表配置 step3:实时刷新测试 实施 实

    2024年02月04日
    浏览(43)
  • python 实时获取kafka消费队列信息

    安装 pykafka

    2024年02月16日
    浏览(51)
  • flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交

    1、背景 flink消费kafka数据,多并发,实现双流join 2、现象 (1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint (2)其中一个流的subtask显示finished (3)无背压 3、问题原因 (1)其中一个topic分区为1 (2)配置的并行

    2024年02月13日
    浏览(45)
  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(37)
  • 关于flink重新提交任务,重复消费kafka的坑

    按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据 我以为checkpoint和fsbackend要同时设置,其实,1.14.3版本,setCheckpointStorage和stateBackend改成了分着设置 我上边代码这样设置,相当于首先指定了以下checkpoint按照默认的backend存储,然后又指定了按

    2024年02月03日
    浏览(55)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包