将Parquet文件的数据导入Hive 、JSON文件导入ES

这篇具有很好参考价值的文章主要介绍了将Parquet文件的数据导入Hive 、JSON文件导入ES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

将Parquet文件的数据导入Hive

查询parquet文件格式

主要利用社区工具 https://github.com/apache/parquet-mr/

编译cli工具

 cd parquet-cli;
 mvn clean install -DskipTests;

查看元数据信息

 java -cp parquet-cli-1.13.1.jar;dependency/* org.apache.parquet.cli.Main meta yellow_tripdata_2023-03.parquet

将Parquet文件的数据导入Hive 、JSON文件导入ES

查询抽样数据

 java -cp parquet-cli-1.13.1.jar;dependency/* org.apache.parquet.cli.Main head -n 2 yellow_tripdata_2023-03.parquet
{"VendorID": 2, "tpep_pickup_datetime": 1677629203000000, "tpep_dropoff_datetime": 1677629803000000, "passenger_count": 1, "trip_distance": 0.0, "RatecodeID": 1, "store_and_fwd_flag": "N", "PULocationID": 238, "DOLocationID": 42, "payment_type": 2, "fare_amount": 8.6, "extra": 1.0, "mta_tax": 0.5, "tip_amount": 0.0, "tolls_amount": 0.0, "improvement_surcharge": 1.0, "total_amount": 11.1, "congestion_surcharge": 0.0, "Airport_fee": 0.0}
{"VendorID": 2, "tpep_pickup_datetime": 1677629305000000, "tpep_dropoff_datetime": 1677631170000000, "passenger_count": 2, "trip_distance": 12.4, "RatecodeID": 1, "store_and_fwd_flag": "N", "PULocationID": 138, "DOLocationID": 231, "payment_type": 1, "fare_amount": 52.7, "extra": 6.0, "mta_tax": 0.5, "tip_amount": 12.54, "tolls_amount": 0.0, "improvement_surcharge": 1.0, "total_amount": 76.49, "congestion_surcharge": 2.5, "Airport_fee": 1.25}      

parquet 和 hive 的 field 类型映射关系

parquet 字段类型 hive 字段类型
BINARY STRING
BOOLEAN BOOLEAN
DOUBLE DOUBLE
FLOAT FLOAT
INT32 INT
INT64 BIGINT
INT96 TIMESTAMP
BINARY + OriginalType UTF8 STRING
BINARY + OriginalType DECIMAL DECIMAL

创建hive表 数据存储格式采用parquet

# 创建以parquet存储的表
  CREATE TABLE `test_trino.yellow_taxi_trip_records_tmp`
(
  `VendorID` int COMMENT '仪表供应商ID', 
  `tpep_pickup_datetime` TIMESTAMP COMMENT '仪表启动时间', 
  `tpep_dropoff_datetime` TIMESTAMP COMMENT '仪表关闭时间',
  `passenger_count` bigint COMMENT '乘客数量', 
  `trip_distance` double COMMENT '行程距离',
  `RateCodeID` bigint COMMENT '费率编码',
  `store_and_fwd_flag` string COMMENT '是否存储',
  `PULocationID` bigint COMMENT '上车区域坐标',
  `DOLocationID` bigint COMMENT '下场区域坐标',
  `payment_type` bigint COMMENT '付款方式',
  `fare_amount` double COMMENT '票价',
  `extra` double COMMENT '杂费附加费',
  `mta_tax` double COMMENT '税费',
  `tip_amount` double COMMENT '小费',
  `tolls_amount` double COMMENT '过路费',
  `improvement_surcharge` double COMMENT '改善附加费',
  `total_amount` double COMMENT '费用总计,不包含现金小费',
  `congestion_surcharge` double COMMENT '拥堵费',
  `airport_fee` double COMMENT '机房上下车费用'
)
COMMENT '黄色的出租车记录'
PARTITIONED BY ( 
  `ym` string COMMENT '分区字段,年月(yyyyMM)')
STORED AS PARQUET;

加载文件

  # 利用hive客户端load parquet数据
    LOAD DATA LOCAL INPATH '/opt/yellow_tripdata_2023-02.parquet' OVERWRITE INTO TABLE `test_trino.yellow_taxi_trip_records_tmp` PARTITION (ym=202302);

将json数据导入ES

ES批量导入api

批量写入es需要使用bulk api,这个API支持json文件的数据导入。文章来源地址https://www.toymoban.com/news/detail-476257.html

原始json文件内容

{"geonameid": 2986043, "name": "Pic de Font Blanca", "latitude": 42.64991, "longitude": 1.53335, "country_code": "AD", "population": 0}
{"geonameid": 2994701, "name": "Roc Mélé", "latitude": 42.58765, "longitude": 1.74028, "country_code": "AD", "population": 0}
{"geonameid": 3007683, "name": "Pic des Langounelles", "latitude": 42.61203, "longitude": 1.47364, "country_code": "AD", "population": 0}
{"geonameid": 3017832, "name": "Pic de les Abelletes", "latitude": 42.52535, "longitude": 1.73343, "country_code": "AD", "population": 0}
{"geonameid": 3017833, "name": "Estany de les Abelletes", "latitude": 42.52915, "longitude": 1.73362, "country_code": "AD", "population": 0}
{"geonameid": 3023203, "name": "Port Vieux de la Coume d’Ose", "latitude": 42.62568, "longitude": 1.61823, "country_code": "AD", "population": 0}
{"geonameid": 3029315, "name": "Port de la Cabanette", "latitude": 42.6, "longitude": 1.73333, "country_code": "AD", "population": 0}
{"geonameid": 3034945, "name": "Port Dret", "latitude": 42.60172, "longitude": 1.45562, "country_code": "AD", "population": 0}
{"geonameid": 3038814, "name": "Costa de Xurius", "latitude": 42.50692, "longitude": 1.47569, "country_code": "AD", "population": 0}
{"geonameid": 3038815, "name": "Font de la Xona", "latitude": 42.55003, "longitude": 1.44986, "country_code": "AD", "population": 0}
{"geonameid": 3038816, "name": "Xixerella", "latitude": 42.55327, "longitude": 1.48736, "country_code": "AD", "population": 0}
{"geonameid": 3038818, "name": "Riu Xic", "latitude": 42.57165, "longitude": 1.67554, "country_code": "AD", "population": 0}
{"geonameid": 3038819, "name": "Pas del Xic", "latitude": 42.49766, "longitude": 1.57597, "country_code": "AD", "population": 0}
{"geonameid": 3038820, "name": "Roc del Xeig", "latitude": 42.56068, "longitude": 1.4898, "country_code": "AD", "population": 0}

索引结构

PUT allcountries
{
  "settings": {
    "index.number_of_replicas": 0
  },
  "mappings": {
        "_doc":{
            "dynamic": "strict",
            "properties": {
              "geonameid": {
                "type": "long"
              },
              "name": {
                "type": "text"
              },
              "latitude": {
                "type": "double"
              },
              "longitude": {
                "type": "double"
              },
              "country_code": {
                "type": "text"
              },
              "population": {
                "type": "long"
              }
            }
        }
  }
}

重组json脚本

# coding=UTF-8
# 将原始josn重组出适合ES bulk API导入的JSON数据
import json
import os
import io
current_path = os.path.dirname(__file__)
#w打开一个文件只用于写入,r用于只读
#如果该文件已存在则打开文件,并从开头开始编辑,即原有内容会被删除
#如果该文件不存在,创建新文件
new_jsonfile = io.open(current_path+'/es-test-bulk.json','w',encoding='utf-8')

with io.open(current_path+'/es-test.json','r',encoding='utf-8')as fp:
    for line in fp.readlines():
        json_data=json.loads(line)
        #添加index行
        new_data={}
        new_data['index']={}
        new_data['index']['_index']="allCountries"
        temp=json.dumps(new_data).encode("utf-8").decode('unicode_escape')
        new_jsonfile.write(temp)
        new_jsonfile.write('\n'.decode('utf-8'))

        #原json对象处理为1行
        old_data={}
        old_data['geonameid']=json_data['geonameid']
        old_data['name']=json_data['name']
        old_data['latitude']=json_data['latitude']
        old_data['longitude']=json_data['longitude']
        old_data['country_code']=json_data['country_code']
        old_data['population']=json_data['population']
        temp=json.dumps(old_data).encode("utf-8").decode('unicode_escape')
        new_jsonfile.write(temp)
        new_jsonfile.write('\n'.decode('utf-8'))
        
new_jsonfile.close()

重组后的json文件

{"index": {"_index": "allcountries"}}
{"name": "El Barrerol", "geonameid": 3040809, "longitude": 1.45207, "country_code": "AD", "latitude": 42.439579999999999, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Camí d’Easagents", "geonameid": 3040810, "longitude": 1.61341, "country_code": "AD", "latitude": 42.53349, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Pleta de Duedra", "geonameid": 3040811, "longitude": 1.4949399999999999, "country_code": "AD", "latitude": 42.625540000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Pleta de Duedra", "geonameid": 3040812, "longitude": 1.5637000000000001, "country_code": "AD", "latitude": 42.61985, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Plana Duedra", "geonameid": 3040813, "longitude": 1.5228900000000001, "country_code": "AD", "latitude": 42.59393, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Planella del Duc", "geonameid": 3040814, "longitude": 1.4995700000000001, "country_code": "AD", "latitude": 42.456490000000002, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal del Duc", "geonameid": 3040815, "longitude": 1.6195600000000001, "country_code": "AD", "latitude": 42.576920000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal Dreta", "geonameid": 3040816, "longitude": 1.5381, "country_code": "AD", "latitude": 42.551319999999997, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Canal Dreta", "geonameid": 3040817, "longitude": 1.4865900000000001, "country_code": "AD", "latitude": 42.506630000000001, "population": 0}
{"index": {"_index": "allcountries"}}
{"name": "Port Dret", "geonameid": 3040818, "longitude": 1.7001299999999999, "country_code": "AD", "latitude": 42.573979999999999, "population": 0}

bulk api调用

curl -H "Content-Type: application/x-ndjson"  -XPOST "192.168.1.1:9600/allcountries/_doc/_bulk" --data-binary @"/opt/es-documents-bulk.json"

到了这里,关于将Parquet文件的数据导入Hive 、JSON文件导入ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hive数据存储格式有哪些?TextFile、SequenceFile、RCFile、ORCFile、Parquet有什么区别?为什么绝大多数都使用ORCFile、Parquet格式?

    Hive 的数据存储,是 Hive 操作数据的基础。 选择一个合适的底层数据存储文件格式,即使在不改变当前 Hive SQL 的情况下,性能也能得到数量级的提升 。 这种优化方式对 MySQL 等关系型数据库有些类似,选择不同的数据存储引擎,代表着不同的数据组织方式,对于数据库的表现

    2024年02月02日
    浏览(55)
  • 一百三十三、Hive——Hive外部表加载含有JSON格式字段的CSV文件数据

    在Hive的ODS层建外部表,然后加载HDFS中的CSV文件数据 注意 :CSV文件中含有未解析的JSON格式的字段数据,并且JSON字段中还有逗号 JSON数据的字段track_data只显示一部分数据,因为JSON格式数据里面也含有逗号 [{\\\"id\\\":\\\"14\\\",\\\"length\\\":5.0,\\\"height\\\":3.0,\\\"posX\\\":63.0,\\\"posY\\\":37.0,\\\"acs\\\":99.0,\\\"angle\\\":83.0,\\\"alti

    2024年02月16日
    浏览(45)
  • Hive 表 DML 操作 第1关:将文件中的数据导入(Load)到 Hive 表中

    相关知识 之前系列实训中我们接触过导入本地文件到 Hive 表中,本关就进行导入的详细讲解。 为了完成本关任务,你需要掌握:1.导入命令语法,2.如何将本地 txt 文件导入到分区表中。 导入命令语法 Load 操作执行 copy/move 命令把数据文件 copy/move 到 Hive 表位于 HDFS 上的目录位

    2024年02月01日
    浏览(43)
  • 二百一十、Hive——Flume采集的JSON数据文件写入Hive的ODS层表后字段的数据残缺

    在用Flume把Kafka的数据采集写入Hive的ODS层表的HDFS文件路径后,发现HDFS文件中没问题,但是ODS层表中字段的数据却有问题,字段中的JSON数据不全 1、ODS层建静态分区外部表,Flume直接写入ODS层表的HDFS路径下 2、用get_json_object进行解析 注意 :使用JsonSerDe时,每行必须是一个完整

    2024年02月03日
    浏览(44)
  • 1、通过亿级数据量在hive和impala中查询比较text、orc和parquet性能表现(二)

    本文通过在hdfs中三种不同数据格式文件存储相同数量的数据,通过hive和impala两种客户端查询进行比较。 本文前提:熟悉hadoop、hive和impala、kafka、flink等,并且其环境都可正常使用。(在后续的专栏中都会将对应的内容补全,目前已经完成了zookeeper和hadoop的部分。) 本文分为

    2024年02月12日
    浏览(49)
  • 使用Python创建faker实例生成csv大数据测试文件并导入Hive数仓

    这段Python代码用于生成模拟的个人信息数据,并将数据保存为CSV文件。 导入必要的模块: csv :用于处理CSV文件的模块。 random :用于生成随机数。 faker :用于生成模拟数据的库。 定义生成数据所需的基本信息: file_base_path :生成的CSV文件的基本路径。 rows_per_file :每个C

    2024年02月07日
    浏览(35)
  • Postman导出json v2/v2.1文件导入YAPI报错:解析数据为空

    实测总结:导入Postman Collection v1的JSON file能成功 版本环境 Postman 版本:8.11.1 YAPI 版本:1.12.0 问题解决 如果Postman能直接导出v1的JSON file就不需要继续以下步骤; 可惜Postman v8.11.1 当前版本不支持导出v1。 需要使用postman-collection-transformer工具进行转换v2 - v1 Postman官网版本转换介

    2024年02月07日
    浏览(43)
  • 利用python将json格式的文件导入neo4j图数据库

    笔者收到了朋友的求助,希望我写一段python代码将含对用关系的json文件导入neo4j图数据库。json的格式如下: 他说数据量有几十万条,而且对应关系不唯一:但可以保证每条都含有名称和治疗,但是可能有若干条其他的对用关系,例如上文的例子:其既包含之前的三个模块,

    2024年02月16日
    浏览(43)
  • Flink之FileSink将数据写入parquet文件

    在使用FileSink将数据写入列式存储文件中时必须使用 forBulkFormat ,列式存储文件如 ORCFile 、 ParquetFile ,这里就以 ParquetFile 为例结合代码进行说明. 在Flink 1.15.3 中是通过构造 ParquetWriterFactory 然后调用 forBulkFormat 方法将构造好的 ParquetWriterFactory 传入,这里先讲一下构造 ParquetWriterF

    2024年02月03日
    浏览(40)
  • 用sqoop导出hive parquet 分区表到mysql

    确保你已经安装并配置好了Sqoop工具,并且可以连接到Hadoop集群和MySQL数据库。 创建一个MySQL表来存储导出的数据。请确保MySQL表的结构与Hive Parquet分区表的结构匹配。 使用Sqoop的export命令来执行导出操作。以下是一个示例命令: 替换 mysql_host、database_name、mysql_username 和 mysq

    2024年02月14日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包