使用spark将MongoDB数据导入hive

这篇具有很好参考价值的文章主要介绍了使用spark将MongoDB数据导入hive。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用spark将MongoDB数据导入hive

一、pyspark
1.1 pymongo+spark
代码
import json,sys
import datetime, time
import pymongo
import urllib.parse
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

'127.0.0.1 27017 mongo_db mongo_collection hive_db hive_table mongo_user mongo_password'
print('host:',sys.argv[1], 'port:',sys.argv[2], 'mongo_db:',sys.argv[3], 'mongo_collection:',sys.argv[4], 'hive_db:',sys.argv[5], 'hive_table:',sys.argv[6], 'mongo_user:',sys.argv[7], 'mongo_password:',sys.argv[8])
# MongoDB连接信息
mongo_username =sys.argv[7]
mongo_password = sys.argv[8]
mongo_host = sys.argv[1]
mongo_database = sys.argv[3]
mongo_port=sys.argv[2]

# 转义用户名和密码
if mongo_username and mongo_password:
    escaped_username = urllib.parse.quote_plus(mongo_username)
    escaped_password = urllib.parse.quote_plus(mongo_password)
    # 构建MongoDB连接URL
    mongo_connection_url = "mongodb://{0}:{1}@{2}:{3}/{4}".format(escaped_username, escaped_password, mongo_host, mongo_port, mongo_database)
else:
    mongo_connection_url = "mongodb://{0}:{1}/{2}".format( mongo_host, mongo_port, mongo_database)
# 连接 MongoDB
mongo_client = pymongo.MongoClient(mongo_connection_url)
mongo_db = mongo_client[sys.argv[3]]
mongo_collection = mongo_db[sys.argv[4]]
mongo_data = mongo_collection.find()
# 自定义JSON可序列化函数
def datetime_serialization_handler(obj):
    if isinstance(obj, datetime.datetime):
        return obj.isoformat()
    elif isinstance(obj, datetime.date):
        return obj.isoformat()
    raise TypeError("Object of type '%s' is not JSON serializable" % type(obj).__name__)
# 从 MongoDB 读取数据
values = []
values_batch = []
flag=0
time_start = time.time()
for data in mongo_data:
    if flag==0:
        # 定义字段列表
        field_list = list(data.keys())
        flag=1
    res={}
    for key in field_list:
        try:
            res[key]=json.dumps( str(data[key]), ensure_ascii=False, default=datetime_serialization_handler)
        except:
            res[key]=None
    columns = list(res.values())
    values_batch.append(columns)

# 创建 SparkSession
spark = SparkSession.builder.appName("CreateStringDataFrame").master('local[*]').getOrCreate()
# 构建 StructType 对象
schema = StructType([StructField(field_name, StringType(), True) for field_name in field_list])
# 创建空的 DataFrame
df = spark.createDataFrame(values_batch, schema)
# 显示 DataFrame 结构
df.printSchema()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}".format(Hive_DB=sys.argv[5], Hive_Name=sys.argv[6]))
spark.stop()
time_end = time.time()
spark-submit
spark-submit --num-executors 1 --executor-memory 512M --executor-cores 1 --deploy-mode client --queue root.users.root  ./mongo.py 127.0.0.1 27017 test_db test_table tmp_can_delete_database mongo_test test1 test1
1.2 mongo-spark-connector

生产环境不方便使用,亲测各种报错文章来源地址https://www.toymoban.com/news/detail-814908.html

from pyspark.sql import SparkSession
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.2') \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}")
spark.stop()
二、Scala
2.1 pom.xml
<dependencies>
       	<dependency>
            <groupId>com.thoughtworks.paranamer</groupId>
            <artifactId>paranamer</artifactId>
            <version>2.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>

        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.12</artifactId>
            <version>2.4.0</version>
        </dependency>
    </dependencies>
2.2 代码
package org.spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object Main {
  private def insert(host:String,port:String,db:String,collection:String,Hive_DB:String,Hive_Name:String,username:Object=None,password:Object=None): Unit = {
    var url=""
    //判断username 和密码是否为空
    if(username==None || password==None){
      url=s"mongodb://${host}:${port}/${db}.${collection}"
    }else{
      url=s"mongodb://${username}:${password}@${host}:${port}/${db}.${collection}"
    }
    val spark = SparkSession.builder()
      .appName("mongo")
      .config("spark.mongodb.input.uri", url)
      .getOrCreate()
    val df=spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    df.show(5)
    val string_df=df.select(df.columns.map(c => col(c).cast("string").alias(c)): _*)

    string_df.write.mode(SaveMode.Overwrite).saveAsTable(s"${Hive_DB}.${Hive_Name}")

    spark.stop()
  }


  def main(args: Array[String]): Unit = {
    val start_time = System.currentTimeMillis()
    println(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))
    insert(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))
//    insert("127.0.0.1","27017","mongo_db","mongo_collection","hive_db","hive_table","mongo_username","mongo_password")
    val end_time = System.currentTimeMillis()
    val duration = (end_time - start_time) / 1000.0
    println(s"程序运行时间为 $duration 秒")
  }
}

到了这里,关于使用spark将MongoDB数据导入hive的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【工作笔记-0038】mongodb mongorestore 命令行导入 bson.gz数据

    1. 导出的集合文件格式如下( 也就是导出的表文件 ): 例如:    D:Filesxxxx集合名称.bson.gz 怎样导出,这里不做介绍,用 mongodb compass 或者 studio 3t 都可以 2. 下载命令行导入工具: 官方下载地址:Download MongoDB Command Line Database Tools | MongoDB 选择 zip 文件下载即可,解压就能用

    2024年02月11日
    浏览(32)
  • datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)

    1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控) 2.datax版本:自己编译的DataX-datax_v202210 3.hdfs版本:3.1.3 4.hive版本:3.1.2 1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个

    2023年04月23日
    浏览(33)
  • Apache Doris (三十一):Doris 数据导入(九)Spark Load 4- 导入Hive数据及注意事项

    目录 1. Spark Load导入Hive非分区表数据 2. Spark Load 导入Hive分区表数据 3. 注意事项 进入正文之前,欢迎订阅专题、对博文点赞、评论、收藏,关注IT贫道,获取高质量博客内容!

    2024年02月17日
    浏览(36)
  • Python使用MongoDB数据库

    MongoDB是一种流行的NoSQL数据库,可以用于存储和管理大量的非结构化或半结构化数据。Python是一种流行的编程语言,也可以使用MongoDB来存储和管理数据。在本文中,我们将介绍如何使用Python访问MongoDB数据库。 安装MongoDB和Python驱动程序 首先,您需要安装MongoDB数据库和Python的

    2024年02月10日
    浏览(34)
  • 使用 Docker 安装 MongoDB 数据库

    by emanjusaka from https://www.emanjusaka.top/2024/01/docker-create-mongo-db 彼岸花开可奈何 本文欢迎分享与聚合,全文转载请留下原文地址。 -d 后台运行 --restart=always 自动重新启动 --privileged 允许容器以特权模式运行 -v /opt/doc/data:/data/db 数据挂载 -p 27017:27017 端口挂载 --auth 启用了身份验证

    2024年01月18日
    浏览(36)
  • MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(3)-系统数据集合设计

    前几章教程我们把ToDoList系统的基本框架搭建好了,现在我们需要根据我们的需求把ToDoList系统所需要的系统集合(相当于关系型数据库中的数据库表)。接下来我们先简单概述一下这个系统主要需要实现的功能以及实现这些功能我们需要设计那些数据库集合。 MongoDB从入门到

    2024年02月21日
    浏览(31)
  • MongoDB使用GridFS存储大数据(Java)

    MongoDB 是一个灵活的 NoSQL 数据库,能够存储大量的数据。但是,当涉及到特别大的数据项,比如大文件、视频或大型图片时,MongoDB 提供了一个特殊的方法来存储这些数据:GridFS。 1. 什么是 GridFS? GridFS 是 MongoDB 提供的一个规范和工具集,用于将大文件切分成多个较小的数据

    2024年02月11日
    浏览(27)
  • NoSql数据库及使用Python连接MongoDB

    NoSQL 数据库是非关系数据库,不使用结构化查询语言 (SQL) 进行数据操作。相反,他们使用其他数据模型进行访问和数据存储。SQL 数据库通常用于处理结构化数据,但它们可能不是处理非结构化或半结构化数据的最佳选择。 NoSQL 数据库提供了快速高效地存储和检索大量数据的

    2024年02月09日
    浏览(89)
  • Hive/Spark 整库导出/导入脚本

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月12日
    浏览(33)
  • window10 使用docker 本地安装部署mongodb数据库

    一、window10 安装docker 可以参看笨鸟教程【Windows Docker 安装】 安装完后的Docker Desktop是这个样子: 原始的docker镜像拉取仓库速度较慢,为了方便docker拉取镜像,可以设置国内的加速镜像,如:阿里 云、有道等【镜像加速】: 二、docker下载运行mongodb镜像 1 直接在 windows powerShel

    2024年02月03日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包