Hive to StarRocks

这篇具有很好参考价值的文章主要介绍了Hive to StarRocks。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

将Hive 中的数据导入到 StarRocks 中的Broker Load的导入方式。在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(如HDFS, S3)上的数据,利用自身的计算资源对数据进行预处理和导入。这是一种异步的导入方式,用户需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。有关Broker Load 的详细解释你可以查阅官网的解说。

需求:

每天将数仓中跑完的Hive的相关表导入到StarRocks。

场景:

  • 不更新历史数据

    1. 如果是分区表,我们增量导入到 StarRocks 中即可
    2. 非分区表全量导入
  • 更新历史数据

    这种情况主要存在分区表中,往往会更改前几个月数据或者时间更久的数据,这种情况下,就不的不将该表重新同步一边,使StarRocks中的数据和hive中的数据保持一致。

  • hive中表的元数据发生变化,和StarRocks中的表结构不一致

    这种情况下,就需要我们删除重新建表,重新同步数据

实操

这里直接通过编写shell 脚本,来完成数据的导入

function common(){
        database_name=$1
        table_name=$2
         #判断数据文件格式
         hive_file_type=$(echo `impala-shell -i $impala_host -c -q "describe formatted ${database_name}.${table_name};"`)
         type=$(echo "${hive_file_type#*InputFormat}" | awk -F \| '{print $2}')
         result=$(echo "$type" | grep "parquet")
         #这里只判断了两种格式,如果你们hive中的文件类型由多种,你都需要判断
         if [[ "$result" != "" ]]
                then
                        hive_file_type="parquet"
                        echo "file type: $hive_file_type"
                else
                        hive_file_type="csv"
                        echo "file type: $hive_file_type"
                fi

                #查询表的字段
                select_col_name_sql="desc ${table_name};"
                select_col_name=`mysql -h${StarRocks_ip}  -P${StarRocks_port}  -u${StarRocks_user} -p${StarRocks_password} ${database_name} -Bse "${select_col_name_sql}"`
                # 获取要同步的表的字段
                if [[ -f $CURRENT_DIR/tmp.txt ]]
                then
                        rm -rf $CURRENT_DIR/tmp.txt
                fi
				#这里将表的字段写入到了文件中,你也可以存储在数组或者其他容器中
                echo "$select_col_name" | awk '{print $1}' > ${CURRENT_DIR}/col_tmp.txt
                cat ${CURRENT_DIR}/col_tmp.txt | while read line
                do
                        echo "\`$line\`" >> ${CURRENT_DIR}/tmp.txt
                done
				# 指定该表的数据文件
                table_data_file="${hdfs_url}$database_name.db/$table_name/dt=*"
}

function trun_table(){
        database_name=$1
        table_name=$2
        truncate_table_sql="truncate table $table_name;"
        truncate_table=`mysql -h${StarRocks_ip}  -P${StarRocks_port}  -u$StarRocks_user} -p${StarRocks_password} $1 -Bse "${truncate_table_sql}"`
}

不更新历史数据

function sync_ordinary(){
        database_name=$1
        table_name=$2
        # 判断该文件夹是否存在,如果存在级即为分区表,否为非分区表
        hadoop fs -test -d $table_data_file
        if [ $? -eq 0 ] ;then
                table_data_file="${hdfs_url}${database_name}.db/${table_name}/dt*/*"
                # 获取要同步的表的字段
                sed -i '1d' ${CURRENT_DIR}/tmp.txt
                echo `cat "${CURRENT_DIR}/tmp.txt"` |  tr ' ' ',' > ${CURRENT_DIR}/col_name.txt
                col_name=`cat "${CURRENT_DIR}/col_name.txt"`
                #load data 
                load_data_sql="LOAD LABEL ${database_name}.${table_name}_${sync_time} ( DATA INFILE ("\"${table_data_file}\"")  INTO TABLE "\`${table_name}\`" COLUMNS TERMINATED BY \"\\\x01\" FORMAT AS \"${hive_file_type}\"  (${col_name}) COLUMNS FROM PATH as (dt) ) WITH BROKER $broker_name ( \"hadoop.security.authentication\"="\"${authentication}\"", \"username\"="\"${hdfs_username}\"", \"password\"="\"${hdfs_password}\"", \"dfs.nameservices\"=\"ns\", \"dfs.ha.namenodes.ns\"=\"nn1,nn2\",\"dfs.namenode.rpc-address.ns.nn1\"="\"$hdfs_nn1\"", \"dfs.namenode.rpc-address.ns.nn2\"="\"$hdfs_nn2\"", \"dfs.client.failover.proxy.provider\"=\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\" ) PROPERTIES ( \"timeout\" = \"3600\", \"timezone\" = \"Asia/Shanghai\" );"        else                table_data_file="${hdfs_url}$database_name.db/$table_name/*"
                # 获取要同步的表的字段
                echo `cat "${CURRENT_DIR}/tmp.txt"` |  tr ' ' ',' > ${CURRENT_DIR}/col_name.txt
                col_name=`cat "${CURRENT_DIR}/col_name.txt"`
                # truncate table
                trun_table ${database_name} ${table_name}
                load_data_sql="LOAD LABEL ${database_name}.${table_name}_${sync_time} (DATA INFILE ("\"${table_data_file}\"") INTO TABLE "\`${table_name}\`"  COLUMNS TERMINATED BY \"\\\x01\" FORMAT AS \"parquet\" (${col_name}) ) WITH BROKER $broker_name ( \"hadoop.security.authentication\"="\"${authentication}\"", \"username\"="\"${hdfs_username}\"", \"password\"="\"${hdfs_password}\"", \"dfs.nameservices\"=\"ns\", \"dfs.ha.namenodes.ns\"=\"nn1,nn2\",\"dfs.namenode.rpc-address.ns.nn1\"="\"$hdfs_nn1\"", \"dfs.namenode.rpc-address.ns.nn2\"="\"$hdfs_nn2\"", \"dfs.client.failover.proxy.provider\"=\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\" ) PROPERTIES ( \"timeout\" = \"3600\", \"timezone\" = \"Asia/Shanghai\" );"        fi        #load data
        echo "$load_data_sql"
        load_data=`mysql -h${StarRocks_ip}  -P${StarRocks_port}  -u${StarRocks_user} -p${StarRocks_password} ${database_name} -Bse "${load_data_sql}"`
        
 }

更新历史数据

这种情况下就比较麻烦了,因为有的表需要更新前几个月的数据,有的表需要更新前几天的数据,这种情况下我的处理方法是将这些要更新历史数据的表做了统计梳理,写入到一个文件中,每天全量同步这些数据即可,如果表的数据量太大,这种情况下就需要特殊处理,不建议全量同步。

脚本的编写基本和上面的脚本一致,只不过在同步之前首先要清空表,然后再重新全量同步即可。

元数据不一致

当hive中表的元数据发生变化,比如增加、删除、修改字段等时,和StarRocks中的表结构不一致,这种情况下,就需要我手动干预,需要删除重新建表,重新同步数据至StarRocks中。

关于hive中的数据同步至 StarRocks 中,基本上就着几类问题,当然也有一些细节问题,比如hive 表中有多个字段作为分区等一些情况,但是StarRocks中也给出了很好的解决方案,利用 COLUMNS FROM PATH as (dt) 这种形式去获得hive中的分区字段,在同步的过程中需要做一些数据类型的转换等,这些细节问题官网的文档写的也非常详细,这里不再赘述了。

不足

总结目前线上hive中的数据同步至 StarRocks 中的问题:文章来源地址https://www.toymoban.com/news/detail-509070.html

  • hive中的元数据发生改变需要人为手动干预
  • string字符串的长度限制,导致部份表导入失败

到了这里,关于Hive to StarRocks的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hive 表 DML 操作 第1关:将文件中的数据导入(Load)到 Hive 表中

    相关知识 之前系列实训中我们接触过导入本地文件到 Hive 表中,本关就进行导入的详细讲解。 为了完成本关任务,你需要掌握:1.导入命令语法,2.如何将本地 txt 文件导入到分区表中。 导入命令语法 Load 操作执行 copy/move 命令把数据文件 copy/move 到 Hive 表位于 HDFS 上的目录位

    2024年02月01日
    浏览(43)
  • MySQL中的业务数据该如何正确导入到Hive中 - Sqoop

    水善利万物而不争,处众人之所恶,故几于道💦   1. 使用Sqoop脚本将MySQL中的数据迁移到HDFS   2. 在Hive中建立与之对应的表   3. 将HDFS中的数据load到 Hive 数仓的ODS层的表中 1 . 使用Sqoop 将 MySQL中的数据导入到HDFS上 使用示例: mysql_to_hdfs.sh all 2021-02-01 导出的数据用lzo压缩,并

    2024年02月11日
    浏览(46)
  • 第3.2章:StarRocks数据导入--Stream Load

    Stream Load可以说是StarRocks最为核心的导入方式,StarRocks的主要导入方式例如Routine Load、Flink Connector、DataX StarRocksWriter等,底层实现都是基于Stream Load的思想,所以我们着重介绍。 Stream Load是由用户发送HTTP请求将本地文件或数据流导入至StarRocks中的导入方式,其本身不依赖其他

    2024年02月08日
    浏览(37)
  • 第3.1章:StarRocks数据导入--Insert into

    Insert Into是我们在MySQL中常用的导入方式,StarRocks同样也支持使用Insert into的方式进行数据导入,并且每次insert into操作都是一次完整的导入事务。 在StarRocks中,Insert的语法和MySQL等数据库的语法类似,具体可以参考官网文档: Insert Into 导入 @ InsertInto @ StarRocks Docs https://docs.s

    2024年02月08日
    浏览(48)
  • 第3.3章:StarRocks数据导入--Stream Load

         Stream Load是StarRocks常见的数据导入方式,用户通过发送HTTP请求将本地文件或数据流导入至StarRocks中,该导入方式不依赖其他组件。     Stream Load作是一种同步导入方式,可以直接通过请求的返回值判断导入是否成功,无法手动取消Stream Load任务,在超时或者导入错误后会

    2024年02月21日
    浏览(36)
  • 第3.1章:StarRocks数据导入——Insert into 同步模式

       在StarRocks中,insert的语法和mysql等数据库的语法类似,并且每次insert into操作都是一次完整的导入事务。  主要的 insertInto 命令包含以下两种: insert into tbl select ... insert into tbl (col1, col2, ...) values (1, 2, ...), (1,3, ...);   其中第二种命令仅用于demo,不要使用在测试或生产环境

    2024年02月21日
    浏览(49)
  • 第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步

    Flink作为当前流行的流式计算框架,在对接StarRocks时,若直接使用JDBC的方式“流式”写入数据,对StarRocks是不友好的,StarRocks作为一款MVCC的数据库,其导入的核心思想还是“攒微批+降频率”。为此,StarRocks单独开发了flink-connector-starrocks,其内部实现仍是通过对数据缓存攒批

    2023年04月15日
    浏览(76)
  • Apache Doris 数据导入:Insert Into语句;Binlog Load;Broker Load;HDFS Load;Spark Load;例行导入(Routine Load)

    Doris 提供多种数据导入方案,可以针对不同的数据源进行选择不同的数据导入方式。Doris支持各种各样的数据导入方式:Insert Into、json格式数据导入、Binlog Load、Broker Load、Routine Load、Spark Load、Stream Load、S3 Load,下面分别进行介绍。 注意: Doris 中的所有导入操作都有原子性保

    2024年02月21日
    浏览(56)
  • StarRocks 中的数据模型和索引使用

    StarRocks 支持四种数据模型,分别是明细模型 ( Duplicate Key Model )、聚合模型 ( Aggregate Key Model )、更新模型 ( Unique Key Model ) 和主键模型 ( Primary Key Model )。 1.1 明细模型 明细模型是默认的建表模型。如果在建表时未指定任何模型,默认创建的是明细类型的表。排序列使用稀疏索引

    2024年02月07日
    浏览(38)
  • 备份StarRocks数据到对象存储minio中/外表查minio中的数据

    1.部署minio环境 宿主机与容器挂在映射 宿主机位置 容器位置 /data/minio/config /data /data/minio/data /root/.minio 拉起环境: 2.准备starrocks环境 参考docker部署starrocks 使用 Docker 部署 StarRocks @ deploy_with_docker @ StarRocks Docs 3.minio文件查询/全库备份·实操 借助python生成parquet文件  3.1 去查存在

    2024年02月10日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包