datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)

这篇具有很好参考价值的文章主要介绍了datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、同步环境

1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控)
2.datax版本:自己编译的DataX-datax_v202210
3.hdfs版本:3.1.3
4.hive版本:3.1.2

二、同步思路

1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个小时的数据依次循环调用datax同步至hdfs,利用shell脚本和调度器定时装载至hive中形成ods层,并和其他表关联处理形成dwd层,提供给需求方。
2.全量数据:历史数据才用datax编写脚本循环读取+调度+hive动态分区方式同步至hive。因为hive动态分区默认只支持100个分区,我是按小时进行分区的,因此我每次只拉取4天数据,拉取太多报错,编写脚本,需要多少天,拉取多少天。(比较笨的方法,有更好的方式欢迎评论区讨论)

三、datax配置

{
    "job": {
        "content": [
          {
              "reader": {
                  "name": "mongodbreader",
                  "parameter": {
                      "address": ["xxxxxxxx:27017"],
                      "authDb": "admin",
                      "userName": "xxxxx",
                      "userPassword": "xxxx",
                      "dbName": "xxxx",
                      "collectionName": "xxxx",
                      "column": [
                          {
                              "name": "_id",
                              "type": "string"
                          },
                          {
                              "name": "data",
                              "type": "string"
                          },
                          {
                              "name": "gid",
                              "type": "string"
                          },
                          {
                              "name": "text",
                              "type": "string"
                          },
                          {
                              "name": "time",
                              "type": "bigint"
                          },
                          {
                              "name": "uid",
                              "type": "string"
                          }
                      ],

                      "query":"{\"time\":{ \"$gte\": ${start_time}, \"$lt\": ${end_time}}}"

                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                             {
                              "name": "ask_id",
                              "type": "string"
                          },
                          {
                              "name": "data",
                              "type": "string"
                          },
                          {
                              "name": "gid",
                              "type": "string"
                          },
                          {
                              "name": "text",
                              "type": "string"
                          },
                          {
                              "name": "time",
                              "type": "string"
                          },
                          {
                              "name": "uid",
                              "type": "string"
                          }
                        ],
                        "compress": "gzip",
                        "defaultFS": "xxxx:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "xxxx",
                        "fileType": "text",
                        "path": "${targetdir}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

这里面有两个坑。
第一个:datax连接mongodb一定注意"authDb": “admin”,这个配置,要明确同步账号认证库的位置,账号在那个库里面认证的就写哪个库,由于mongodb每个库是单独认证的,一直报:

com.alibaba.datax.common.exception.DataXException: Code:[Framework-02], Description:[DataX引擎运行过程出错,具体原因请参看DataX运行结束时的错误诊断信息  .].  - com.mongodb.MongoCommandException: Command failed with error 13: 'command count requires authentication' on server xxx:27117. The full response is { "ok" : 0.0, "errmsg" : "command count requires authentication", "code" : 13, "codeName" : "Unauthorized" }

找过很多资料,两种方式解决账号认证问题。一种是,刚才提到的指明账号认证库;第二种,就是同步哪个库,单独给这个账号再授权一遍库的权限,代码如下:

db.createUser({user:"x x 
x x x",pwd:"xxxxxx",roles:[{"role":"read","db":"xxxx"}]})

查询同步不需要太高的权限,read即可
第二坑:mongodb的query查询,用的是json语句,网上有大神分享的源码分析,里面的查询条件是“and”语句,也就是说,用逗号分隔的查询条件是and,想用or要多次查询(但是我测试十几也不全是and,好像是同样的字段以最后一条为准,留着后面再研究班),哎,没办法,谁让我懒得自己写代码,凑合着用吧。分享query查询语句多个条件的用法:

                      "query":"{\"time\":{ \"$gte\": 1646064000, \"$lte\": 1648742399},\"time\":{ \"$gte\": 1654012800, \"$lte\": 1656604799},\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"

四、datax同步调度脚本

#!/bin/bash

# 定义变量方便修改
APP=xxx
TABLE=xxx
DATAX_HOME=xxxx
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一小时

    do_date=2022111416
   hr1=${do_date: 8: 2}
   date1=${do_date: 0: 8}
  

hdfs_path=xxx

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
  hadoop fs -test -e $hdfs_path
  if [[ $? -eq 1 ]]; then
    echo "路径 $hdfs_path 不存在,正在创建......"
    hadoop fs -mkdir -p $hdfs_path
  else
    echo "路径 $hdfs_path 已经存在"
    fs_count=$(hadoop fs -count $hdfs_path)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$hdfs_path为空"
    else
      echo "路径$hdfs_path不为空,正在清空......"
      hadoop fs -rm -r -f $hdfs_path/*
    fi
  fi


#数据同步

for i in  xxx xxx xxx
do  
echo ================== $i 装载日期为 $do_date ==================
python  $DATAX_HOME/bin/datax.py -p"-Dcollection=$i -Dtargetdir=$hdfs_path"   $DATAX_HOME/xxx
done 

五、datax同步至es 配置

mongodb同步至es有一个专用的组件,monstache;知道,但还没用过,留白,由于时间紧张用的datax,此处三个注意点:
1.object格式可以datax读取的时候可用string,导入es再改回object
2.es重名没问题
3.想用es中文分词统计词频,除了要配置中文ik,也需要filedata=true;

{
    "job": {
        "content": [
          {
              "reader": {
                  "name": "mongodbreader",
                  "parameter": {
                      "address": ["xxxx:27017"],
                      "userName": "xxx",
                      "authDb": "xxx",
                      "userPassword": "xxxx",
                      "dbName": "xxxx",
                      "collectionName": "${collection}",
                      "column": [
                          {
                              "name": "_id",
                              "type": "string" #原有格式为objectid,用此处用string
                          },
                          {
                              "name": "data",
                              "type": "string" #原有格式为list(object),用string可以倒进去
                          },
                          {
                              "name": "gid",
                              "type": "string"
                          },
                          {
                              "name": "text",
                              "type": "string"
                          },
                          {
                              "name": "time",
                              "type": "bigint"
                          },
                          {
                              "name": "uid",
                              "type": "string"
                          },
                          {
                              "name": "deleted",
                              "type": "bigint"
                          }
                      ],

                      "query":"{\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"
                  }
                },
                "writer": {
                "name": "elasticsearchwriter",
                "parameter": {
                  "endpoint": "xxxxxx:9200",
                  "index": "xxxx",
                  "type": "xxxx",
                  "cleanup": false,
                  "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
                  "discovery": false,
                  "batchSize": 2048,
                  "splitter": ",",
                  "column": [
                   {
                              "name": "_id",
                              "type": "id"
                          },
                          {
                              "name": "data",
                              "type": "object" #源数据为object,此处也为object
                          
                          },
                          {
                              "name": "gid",
                              "type": "keyword"
                          },
                          {
                              "name": "text",#即使和关键词重名也不影响,挺好
                              "type": "text","analyzer": "ik_smart"
                          },#此处想用es分词,来统计词频的小伙伴建议开启filedata:true,不知道能不能用哈,反正我知道不开启,不能用,有兴趣可以研究下,告诉我
                          {
                              "name": "time",
                              "type": "long"
                          },
                          {
                              "name": "uid",
                              "type": "keyword"
                          },
                          {
                              "name": "deleted",
                              "type": "long"
                          }
                  ]
                }
              }
            }
        ],
        "setting": {
            "speed": {
                "channel": 4
            }
        }
    }
}

六、其他问题

其他就比较简单了,懒得记了,后面有问题再补充文章来源地址https://www.toymoban.com/news/detail-421879.html

到了这里,关于datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • DataX将MySQL数据同步到HDFS中时,空值不处理可以吗

    DataX将MySQL数据同步到HDFS中时,空值(NULL)存到HDFS中时,默认是存储为空字符串(‘’)。 HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(‘’),而Hive默认的null值存储格式为N。所以

    2024年02月12日
    浏览(52)
  • 4、sybase相关同步-sybase通过datax同步到hdfs

    1、datax3.0部署与验证 2、mysql相关同步-mysql同步到mysql、mysql和hdfs相互同步 3、oracle相关同步-oracle到hdfs 4、sybase相关同步-sybase到hdfs 5、ETL工具的比较(DataPipeline,Kettle,Talend,Informatica,Datax ,Oracle Goldeng 本文介绍sybase的相关同步,sybase到hdfs同步。 本文分为三部分,。 本文的前

    2024年02月08日
    浏览(36)
  • HDFS 跨集群数据同步(hive,hadoop)

    两个不同的HDFS 集群数据迁移( A集群的数据 - B 集群) 采用的是 SHELL 脚本  按表进行; 日期分区进行; #!/bin/bash ##################### #创建人:DZH #创建日期: 2020-04 #内容: 数据迁移 ##################### ##################################### [ \\\"$#\\\" -ne 0 ] FILE=$1 path=$(cd `dirname $0`; pwd) ############## 获取执

    2024年04月27日
    浏览(58)
  • 【数据库开发】DataX开发环境的安装部署(Python、Java)

    DataX是阿里云DataWorks数据集成的开源版本。下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。 官网地址: https://github.com/alibaba/DataX DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括

    2024年02月02日
    浏览(50)
  • mongodb 数据库管理(数据库、集合、文档)

    目录 一、数据库操作 1、创建数据库 2、删除数据库 二、集合操作 1、创建集合 2、删除集合 三、文档操作 1、创建文档 2、 插入文档 3、查看文档 4、更新文档 1)update() 方法 2)replace() 方法 创建数据库的语法格式如下: 如果数据库不存在,则创建数据库,否则切换到该数据

    2024年02月12日
    浏览(52)
  • [虚幻引擎 MongoDB Client 插件说明] DTMongoDB MongoDB数据库连接插件,UE蓝图可以操作MongoDB数据库增删改查。

    本插件可以在UE里面使用蓝图操作MongoDB数据库, 对数据库进行查询,删除,插入,替换,更新操作。 插件下载地址在文章最后。 Create MongoDB Client - 创建客户端对象 创建一个 MongoDB 客户端对象。 Connect By Url - 连接到数据库 Url :MongoDB的连接地址。 如 mongoDB://account:password@ip:

    2024年02月14日
    浏览(96)
  • MongoDB数据库从入门到精通系列文章之:MongoDB数据库百篇技术文章汇总

    MongoDB数据库系列文章持续更新中: 更多数据库内容请阅读博主数据库专栏,数据库专栏涵盖了Mysql、SQLServer、PostgreSQL、MongoDB、Oracle、Cassandra等数据库 数据库专栏 文章名称 文章链接 数据库安装部署系列之:部署Mongodb5.0.6高可用集群详细步骤 数据库安装部署系列之:部署M

    2024年02月11日
    浏览(59)
  • redis数据库和MongoDB数据库基本操作

    (1) 设置键值 (2) 读取键值 (3) 数值类型自增1 (4) 数值类型自减1 (5) 查看值的长度 (1)对列表city插入元素:Shanghai Suzhou Hangzhou (2)将列表city里的头部的元素移除 (3) 对一个已存在的列表插入新元素 (4)查看list的值长度 (1) 设置一个hash表,order表里包括的

    2024年02月16日
    浏览(63)
  • MongoDb数据库

    1.显示所有数据库: show dbs 2.切换到指定数据库,如果没有则自动创建数据库 use databaseName 3.显示当前所在数据库 db 4.删除当前数据库 use 库名 db.dropDatabase() 1.创建集合 db.createCollection(\\\'集合名称\\\') 2.显示当前数据库中所有集合 show colletions  3.删除某个集合 db.xxx.drop(); 4.重命名集

    2024年02月04日
    浏览(56)
  • Mongodb连接数据库

    npm init   npm i mongoose  const mongoose=require(\\\"mongoose\\\") mongoose.connect(\\\"mongodb://127.0.0.1:27017/user\\\") 说明:mongodb是协议,user是数据库,如果没有会自动创建user数据库 。 node 文件名     mongoose.disconnect()

    2024年02月15日
    浏览(65)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包