elasticsearch数据同步到hive

这篇具有很好参考价值的文章主要介绍了elasticsearch数据同步到hive。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

来自用户的需求: 用户有一部分数据来自 elasticsearch ,我们已经支持了通过 presto 查询 es 数据。但是用户需要将 es 表 和 hive 表做关联查询,而 presto 是不能跨数据源进行 join 查询的。所以需要先把 es 数据导入到 hive 中

用户对数据同步周期的要求并不高 一天1-2次就可以了,所以继续使用我们emr集群中已有的 azkaban 服务进行调度,把 es 数据同步到 hive 的过程写到 azkaban 中,实现了7张表的定期同步

hive 创建 es 外表

参考教程-Elasticsearch-Hive

hive 引入 elasticsearch-hadoop 依赖包

hive 默认不支持创建 es 外表,需要引入 elasticsearch-hadoop 依赖包

修改 hive.aux.jars.path 配置, 多个可以用逗号分隔,如下:

hive.aux.jars.path=file:///opt/modules/hive/auxlib/elasticsearch-hadoop-hive-8.8.0.jar

创建 hive 外表

sql 示例:

CREATE EXTERNAL TABLE temp.es_external_table ( fieldNameA STRING, fieldNameB STRING ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='es索引名',
    'es.nodes'='es_host',
    'es.port'='es_port',
    'es.mapping.names'='fieldNameA:fieldNameA,fieldNameB:fieldNameB'
);

这里踩了一个坑: 由于 hive 创建表会忽略大小写,不管 sql 中定义的字段是什么样子,都会统一转成小写。所以导致es 中的驼峰名称字段 会映射失败,最后查出的数据都是 null

类似的坑-创建mongodb 外表时遇到的

因此需要显式地通过 es.mapping.names 配置指定字段名称的关联关系,像示例那样

同步脚本

从 es 表到 hive 表,大致步骤为: 创建 hive 外表,关联 es 数据 => 创建 hive 内表 => 同步外表数据到内表

过程写到脚本中如下: (create_hive_to_es_table.sh)

## 获取指定索引的所有 es 表字段
get_index_field_ret=`curl http://${es_address}/${index_name}?pretty=true`
field_arr=`echo ${get_index_field_ret} | jq -r ".${index_name}.mappings.properties | keys | join(\" \")"`

## 创建 hive 外表
temp_table_name="temp.es_${index_name}"
temp_rename_table_name="${hive_db}.es_${index_name}_bak"
actual_table_name="${hive_db}.es_${index_name}"

create_external_table_sql="CREATE EXTERNAL TABLE ${temp_table_name} ("
for current_field in ${field_arr[@]}
do
    create_external_table_sql="${create_external_table_sql} ${current_field} STRING,"
done
create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g'`

### 组装 es.mapping.names
create_external_table_sql="${create_external_table_sql}) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource'='${index_name}','es.nodes'='${es_host}','es.port'='${es_port}','es.mapping.names'='"
for current_field in ${field_arr[@]}
do
    create_external_table_sql="${create_external_table_sql}${current_field}:${current_field},"
done
create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g'`

create_external_table_sql="${create_external_table_sql}')"
drop_external_table_sql="drop table if exists ${temp_table_name}"
echo "create external sql: ${create_external_table_sql}"

beeline -n ${hive_user} -u ${hive_server} -e "${drop_external_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_external_table_sql}"

## 创建 hive 临时内表
create_temp_table_sql="CREATE TABLE ${temp_rename_table_name} AS SELECT * FROM ${temp_table_name}"
drop_temp_table_sql="drop table if exists ${temp_rename_table_name}"

echo "create temp table sql: ${create_temp_table_sql}"

beeline -n ${hive_user} -u ${hive_server} -e "${drop_temp_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_temp_table_sql}"

## 重命名表(用于快速重建用户直接用的表)

create_actual_table_sql="ALTER TABLE ${temp_rename_table_name} RENAME TO ${actual_table_name}"
drop_actual_table_sql="drop table if exists ${actual_table_name}"

echo "create actual table sql: ${create_actual_table_sql}"

beeline -n ${hive_user} -u ${hive_server} -e "${drop_actual_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_actual_table_sql}"

azkaban 任务

定义任务流程

需要重建7张表,因此定义成 父任务 -> 7个子任务

# es_to_hive_parent.job
type=command

command=echo "es to hive success!"

dependencies=table1,table2,table3,table4,table5,table6,table7

因为前面具体外表的创建流程 已经写在脚本中了,所以子任务这里直接调用 create_hive_to_es_table.sh 就行

# table1.job
type=flow

job.name=table1
flow.name=ES_TO_HIVE

index.name=es索引名
hive_db=目标 hive 库名

# ES_TO_HIVE.job
type=command

command=sh create_hive_to_es_table.sh ${es.address} ${index.name} ${hive.server} ${hive.user} ${hive.db}

总结

基于目前的资料搜索 这种方案应该是 es数据同步到 hive 比较通用的。但是确实不适合大批量数据同步的场景,也不能直接同步增量数据

想同步增量数据的话 应该需要从数据源头入手了,比如 es 数据是来自 kafka 的,那么需要通过类似 canal 的服务来同步增量数据,架构和这里说到的远远不同文章来源地址https://www.toymoban.com/news/detail-487245.html

到了这里,关于elasticsearch数据同步到hive的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringCloud:ElasticSearch之数据同步

    elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时, elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的 数据同步 。 常见的数据同步方案有三种: 同步调用 异步通知 监听 binlog 1.1.同步调用 方案一:同步调用 基本步骤如下: hotel-demo 对外提

    2024年02月04日
    浏览(27)
  • elasticsearch数据同步到hive

    来自用户的需求: 用户有一部分数据来自 elasticsearch ,我们已经支持了通过 presto 查询 es 数据。但是用户需要将 es 表 和 hive 表做关联查询,而 presto 是不能跨数据源进行 join 查询的。所以需要先把 es 数据导入到 hive 中 用户对数据同步周期的要求并不高 一天1-2次就可以了,所

    2024年02月09日
    浏览(27)
  • elasticsearch与mysql数据同步

    elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的 数据同步 。 常见的数据同步方案有三种: 同步调用 异步通知 监听binlog 1.同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用

    2024年02月01日
    浏览(40)
  • elasticsearch实现mysql数据同步

    当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。 常见的数据同步方案有三种: 同步调用 异步通知 监听binlog 以下使用异步通知同步elasticsearch的数据  constatnts 包下新建一个类 MqConstants,存储交换机和队列的名称 发送MQ消息   在增、删、改业务中分

    2024年02月09日
    浏览(25)
  • 4种数据同步到Elasticsearch方案

    上周听到公司同事分享 MySQL 同步数据到 ES 的方案,发现很有意思,感觉有必要将这块知识点再总结提炼一下,就有了这篇文章。 本文会先讲述数据同步的 4 种方案,并给出常用数据迁移工具 ,干货满满! 不 BB,上文章目录: 在实际项目开发中,我们经常将 MySQL 作为业务数

    2024年02月07日
    浏览(27)
  • elasticsearch 实现与mysql 数据同步

    mysql8相关的安装可以看下另一篇博客 https://editor.csdn.net/md/?articleId=135905811 1.下载安装logstash 2.logstash 配置 logstash.yml 3.pipelines.yml 配置 同步方式: 1.logstash 2.go-mysql-elasticsearch 3.canal(阿里云) 一.logstash 1.安装mysql-connector-java 插件(需与mysql 版本一致) 2.配置logstash.conf 3.启动logsta

    2024年04月12日
    浏览(29)
  • ElasticSearch集成SpringBoot实践及数据同步

    ES 全称 Elasticsearch 是一款分布式的全文搜索引擎,在互联网公司中,这款搜索引擎一直被程序员们所推崇。常见的使用场景如ELK日志分析,电商APP的商品推荐,社交APP的同城用户推荐等等。今天结合自己平时的一些学习对它与SpringBoot的基础集成以及一些实际项目中的使用做

    2024年02月07日
    浏览(40)
  • Logstash同步MySQL数据到ElasticSearch

    当MySQL数据到一定的数量级,而且索引不能实现时,查询就会变得非常缓慢,所以使用ElasticSearch来查询数据。本篇博客介绍使用Logstash同步MySQL数据到ElasticSearch,再进行查询。 测试环境 Windows系统 MySQL 5.7 Logstash 7.0.1 ElasticSearch 7.0.1 Kibana 7.0.1 ELK工具下载可访问:https://www.elastic

    2024年02月01日
    浏览(34)
  • 微服务学习|elasticsearch:数据聚合、自动补全、数据同步

    聚合 (aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类: 桶(Bucket)聚合:用来对文档做分组 TermAggregation:按照文档字段值分组 Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组 度量(Metric)聚合:用以计算一些值,比如: 最大值、最小值、平均

    2024年02月04日
    浏览(42)
  • 【ElasticSearch】深入探索 ElasticSearch 对数据的聚合、查询自动补全、与数据库间的同步问题以及使用 RabbitMQ 实现与数据库间的同步

    在本文中,我们将深入探讨 ElasticSearch 在数据处理中的关键功能,包括数据聚合、查询自动补全以及与数据库的同步问题。 首先,我们将聚焦于 ElasticSearch 强大的聚合功能,解释什么是聚合以及如何通过 DSL 语句和 RestClient 实现各种聚合操作。这一功能能够让我们更深入地了

    2024年02月08日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包