将Hive 中的数据导入到 StarRocks 中的Broker Load的导入方式。在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(如HDFS, S3)上的数据,利用自身的计算资源对数据进行预处理和导入。这是一种异步的导入方式,用户需要通过MySQL协议创建导入,并通过查看导入命令检查导入结果。有关Broker Load 的详细解释你可以查阅官网的解说。
需求:
每天将数仓中跑完的Hive的相关表导入到StarRocks。
场景:
-
不更新历史数据
- 如果是分区表,我们增量导入到 StarRocks 中即可
- 非分区表全量导入
-
更新历史数据
这种情况主要存在分区表中,往往会更改前几个月数据或者时间更久的数据,这种情况下,就不的不将该表重新同步一边,使StarRocks中的数据和hive中的数据保持一致。
-
hive中表的元数据发生变化,和StarRocks中的表结构不一致
这种情况下,就需要我们删除重新建表,重新同步数据
实操
这里直接通过编写shell 脚本,来完成数据的导入
function common(){
database_name=$1
table_name=$2
#判断数据文件格式
hive_file_type=$(echo `impala-shell -i $impala_host -c -q "describe formatted ${database_name}.${table_name};"`)
type=$(echo "${hive_file_type#*InputFormat}" | awk -F \| '{print $2}')
result=$(echo "$type" | grep "parquet")
#这里只判断了两种格式,如果你们hive中的文件类型由多种,你都需要判断
if [[ "$result" != "" ]]
then
hive_file_type="parquet"
echo "file type: $hive_file_type"
else
hive_file_type="csv"
echo "file type: $hive_file_type"
fi
#查询表的字段
select_col_name_sql="desc ${table_name};"
select_col_name=`mysql -h${StarRocks_ip} -P${StarRocks_port} -u${StarRocks_user} -p${StarRocks_password} ${database_name} -Bse "${select_col_name_sql}"`
# 获取要同步的表的字段
if [[ -f $CURRENT_DIR/tmp.txt ]]
then
rm -rf $CURRENT_DIR/tmp.txt
fi
#这里将表的字段写入到了文件中,你也可以存储在数组或者其他容器中
echo "$select_col_name" | awk '{print $1}' > ${CURRENT_DIR}/col_tmp.txt
cat ${CURRENT_DIR}/col_tmp.txt | while read line
do
echo "\`$line\`" >> ${CURRENT_DIR}/tmp.txt
done
# 指定该表的数据文件
table_data_file="${hdfs_url}$database_name.db/$table_name/dt=*"
}
function trun_table(){
database_name=$1
table_name=$2
truncate_table_sql="truncate table $table_name;"
truncate_table=`mysql -h${StarRocks_ip} -P${StarRocks_port} -u$StarRocks_user} -p${StarRocks_password} $1 -Bse "${truncate_table_sql}"`
}
不更新历史数据
function sync_ordinary(){
database_name=$1
table_name=$2
# 判断该文件夹是否存在,如果存在级即为分区表,否为非分区表
hadoop fs -test -d $table_data_file
if [ $? -eq 0 ] ;then
table_data_file="${hdfs_url}${database_name}.db/${table_name}/dt*/*"
# 获取要同步的表的字段
sed -i '1d' ${CURRENT_DIR}/tmp.txt
echo `cat "${CURRENT_DIR}/tmp.txt"` | tr ' ' ',' > ${CURRENT_DIR}/col_name.txt
col_name=`cat "${CURRENT_DIR}/col_name.txt"`
#load data
load_data_sql="LOAD LABEL ${database_name}.${table_name}_${sync_time} ( DATA INFILE ("\"${table_data_file}\"") INTO TABLE "\`${table_name}\`" COLUMNS TERMINATED BY \"\\\x01\" FORMAT AS \"${hive_file_type}\" (${col_name}) COLUMNS FROM PATH as (dt) ) WITH BROKER $broker_name ( \"hadoop.security.authentication\"="\"${authentication}\"", \"username\"="\"${hdfs_username}\"", \"password\"="\"${hdfs_password}\"", \"dfs.nameservices\"=\"ns\", \"dfs.ha.namenodes.ns\"=\"nn1,nn2\",\"dfs.namenode.rpc-address.ns.nn1\"="\"$hdfs_nn1\"", \"dfs.namenode.rpc-address.ns.nn2\"="\"$hdfs_nn2\"", \"dfs.client.failover.proxy.provider\"=\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\" ) PROPERTIES ( \"timeout\" = \"3600\", \"timezone\" = \"Asia/Shanghai\" );" else table_data_file="${hdfs_url}$database_name.db/$table_name/*"
# 获取要同步的表的字段
echo `cat "${CURRENT_DIR}/tmp.txt"` | tr ' ' ',' > ${CURRENT_DIR}/col_name.txt
col_name=`cat "${CURRENT_DIR}/col_name.txt"`
# truncate table
trun_table ${database_name} ${table_name}
load_data_sql="LOAD LABEL ${database_name}.${table_name}_${sync_time} (DATA INFILE ("\"${table_data_file}\"") INTO TABLE "\`${table_name}\`" COLUMNS TERMINATED BY \"\\\x01\" FORMAT AS \"parquet\" (${col_name}) ) WITH BROKER $broker_name ( \"hadoop.security.authentication\"="\"${authentication}\"", \"username\"="\"${hdfs_username}\"", \"password\"="\"${hdfs_password}\"", \"dfs.nameservices\"=\"ns\", \"dfs.ha.namenodes.ns\"=\"nn1,nn2\",\"dfs.namenode.rpc-address.ns.nn1\"="\"$hdfs_nn1\"", \"dfs.namenode.rpc-address.ns.nn2\"="\"$hdfs_nn2\"", \"dfs.client.failover.proxy.provider\"=\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\" ) PROPERTIES ( \"timeout\" = \"3600\", \"timezone\" = \"Asia/Shanghai\" );" fi #load data
echo "$load_data_sql"
load_data=`mysql -h${StarRocks_ip} -P${StarRocks_port} -u${StarRocks_user} -p${StarRocks_password} ${database_name} -Bse "${load_data_sql}"`
}
更新历史数据
这种情况下就比较麻烦了,因为有的表需要更新前几个月的数据,有的表需要更新前几天的数据,这种情况下我的处理方法是将这些要更新历史数据的表做了统计梳理,写入到一个文件中,每天全量同步这些数据即可,如果表的数据量太大,这种情况下就需要特殊处理,不建议全量同步。
脚本的编写基本和上面的脚本一致,只不过在同步之前首先要清空表,然后再重新全量同步即可。
元数据不一致
当hive中表的元数据发生变化,比如增加、删除、修改字段等时,和StarRocks中的表结构不一致,这种情况下,就需要我手动干预,需要删除重新建表,重新同步数据至StarRocks中。
关于hive中的数据同步至 StarRocks 中,基本上就着几类问题,当然也有一些细节问题,比如hive 表中有多个字段作为分区等一些情况,但是StarRocks中也给出了很好的解决方案,利用 COLUMNS FROM PATH as (dt) 这种形式去获得hive中的分区字段,在同步的过程中需要做一些数据类型的转换等,这些细节问题官网的文档写的也非常详细,这里不再赘述了。文章来源:https://www.toymoban.com/news/detail-509070.html
不足
总结目前线上hive中的数据同步至 StarRocks 中的问题:文章来源地址https://www.toymoban.com/news/detail-509070.html
- hive中的元数据发生改变需要人为手动干预
- string字符串的长度限制,导致部份表导入失败
到了这里,关于Hive to StarRocks的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!