Spark 读写 es 数据(scala 版)

这篇具有很好参考价值的文章主要介绍了Spark 读写 es 数据(scala 版)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. spark 读取 ES

import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark

object esReadToHdfs {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("es_read").getOrCreate()
    val sc = spark.sparkContext

    val options = Map(
      "es.index.auto.create" -> "true",
      "es.nodes.wan.only" -> "true",
      "es.nodes" -> "29.29.29.29:9200,29.29.29.29:9200",
      "es.port" -> "9200",
      "es.mapping.id" -> "id"
    )

    // 返回 RDD[(String, String]]
    // 元组:第一个:esmapping.id、第二个 json 字符串
    val resultRDD = EsSpark.esJsonRDD(sc, options).map(x => x._2)

    //    // 返回 RDD[(String, Map[String, AnyDef]]
    //    val resultRDD = EsSpark.esRDD(sc, options)

  }
}

读取 hdfs 文件

[hadoop@hadoop1 apps]$ hadoop fs -cat hdfs://hadoop1:9000/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

解析采用 fast-json

1、pom.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>

2、main 文件

package top.midworld.spark1031.create_df

import org.apache.spark.sql.SparkSession
import java.security.MessageDigest
import com.alibaba.fastjson.{JSON, JSONException, JSONObject}

object SaveToEs {
  def main(args: Array[String]): Unit = {
      // 提交到集群,去掉 master,不能采用 local[2]
    val spark = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
    val sc = spark.sparkContext


    val rdd = sc.textFile("hdfs://hadoop1:9000/people.json").map {
      //      x => JSON.parseObject(x).get("name")
      x =>
        val data = JSON.parseObject(x)
        val name = data.get("name").toString
        val md5 = hashMD5(name)	// name md5 
        data.put("@id", md5)	// 添加新的 key、value
        data
    }

    rdd.collect().foreach(println)
    sc.stop()
    spark.stop()
  }

  def hashMD5(url: String): String = {
    val md5 = MessageDigest.getInstance("MD5")
    val encoded = md5.digest((url).getBytes())
    encoded.map("%02x".format(_)).mkString
  }
}

运行结果:

{"name":"aaa","@id":"3e06fa3927cbdf4e9d93ba4541acce86"}
{"name":"aaa","@id":"0d2366f384b6c702db8e9dd8b74534db","age":30}
{"name":"aaa","@id":"06475174d922e7dcbb3ed34c0236dbdf","age":19}

2. spark 写入 ES

1、pom.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.0.0</version>
</dependency>

2、main 文件

package top.midworld.spark1031.create_df

import org.apache.spark.sql.SparkSession

import java.security.MessageDigest
import com.alibaba.fastjson.{JSON, JSONException, JSONObject}
import org.apache.spark.SparkConf
import org.elasticsearch.spark._

case class People(name: String, age: Int)

object SaveToEs {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
    val sc = spark.sparkContext

    val conf = new SparkConf().setAppName("save_to_es")
    conf.set("es.nodes", "hadoop1:9200,hadoop2:9200,hadoop3:9200")
    conf.set("es.port", "9200")
    conf.set("es.index.auto.create", "true")

    // 将 Map 对象写入 es
    val aa = Map("one" -> 1, "two" -> 2, "three" -> 3, "id" -> 11111)
    val bb = Map("OTP" -> "Otopeni", "SFO" -> "San Fran", "id" -> 2222)
    sc.makeRDD(Seq(aa, bb)).saveToEs("index_name/docs", Map("es.mapping.id" -> "id")) // docs 是 doc_type

    // 将 case class对象写入ElasticSearch
    val p1 = People("rose", 18)
    val p2 = People("lila", 19)
    sc.makeRDD(Seq(p1, p2)).saveToEs("index_name/docs")

    // 以上都是通过隐士转换才有 saveToEs 方法插入 es,也可以采用显示方法
    import org.elasticsearch.spark.rdd.EsSpark

    val rdd_case_class = sc.makeRDD(Seq(p1, p2))
    EsSpark.saveJsonToEs(rdd_case_class, "index_name/docs")

    // 将Json字符串写入ElasticSearch
    val json1 = """{"id" : 1, "name" : "rose", "age" : "18"}"""
    val json2 = """{"id" : 2, "name" : "lila", "age" : "19"}"""
    sc.makeRDD(Seq(json1, json2)).saveJsonToEs("index_name/docs")

    // 自定义 es.mapping.id,不指定 es 也会生成唯一的 20 字符长度的 id
    // 第三个参数指定 es.mapping.id 为数据中的 id 字段
    sc.makeRDD(Seq(json1, json2)).saveJsonToEs("index_name/docs", Map("es.mapping.id" -> "id"))

    sc.stop()
    spark.stop()
  }
}

参考文章文章来源地址https://www.toymoban.com/news/detail-509870.html

  • Spark读写ES数据时遇到的问题总结
  • Spark读写ES
  • 使用Apache Spark将数据写入ElasticSearch

到了这里,关于Spark 读写 es 数据(scala 版)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

    Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame: 利用反射机制推断 RDD 模式 使用编程方式定义 RDD 模式 下面使用到的数据 people.txt :         在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。 注意

    2024年02月09日
    浏览(54)
  • 大数据Spark SparkSession的3种创建方式 Scala语言实现

    SparkSession是Apache Spark 2.0版本引入的一个编程接口,用于与Spark进行交互。它是Spark应用程序的入口点,提供了一种方便的方式来创建DataFrame、DataSet和SQLContext等数据结构,并且可以配置各种Spark应用程序的选项。SparkSession还管理了Spark应用程序的运行环境,包括Spark集群的连接,

    2023年04月20日
    浏览(38)
  • Spark RDD编程 文件数据读写

    从本地文件系统读取数据,可以采用textFile()方法,可以为textFile()方法提供一个本地文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。 示例:读取一个本地文件word.txt val textFile中的textFile是变量名称,sc.t

    2024年02月05日
    浏览(41)
  • Spark读写MySQL数据库

    一、读取数据库 (一)通过RDD的方式读取MySQL数据库 四要素:驱动、连接地址、账号密码 (二)通过DataFrame的方式读取MySQL数据库 二、添加数据到MySQL (一)通过RDD的方式插入数据到MySQL 每个分区执行一次创建连接和关闭连接 (二)通过RDD的方式插入数据到MySQL 2 每个分区

    2024年04月23日
    浏览(29)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-2)

    Spark是专为大规模数据处理而设计的快速通用的计算引擎,它是由Scala语言开发实现的,关于大数据技术,本身就是计算数据,而Scala既有面向对象组织项目工程的能力,又具备计算数据的功能,同时Spark和Scala的紧密集成,本书将采用Scala语言开发Spark程序,所以学好Scala将有助

    2024年02月11日
    浏览(62)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    对于每一门编程语言来说,数组(Array)都是重要的数据结构之一,主要用来存储数据类型相同的元素。Scala中的数组分为定长数组和变长数组,定义定长数组,需要使用new,而定义变长数组时,则需要导包 import scala.collection.mutable.ArrayBuffer 。 数组(Array)主要用来存储

    2024年02月10日
    浏览(64)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-1)

    Spark是专为大规模数据处理而设计的快速通用的计算引擎,它是由Scala语言开发实现的,关于大数据技术,本身就是计算数据,而Scala既有面向对象组织项目工程的能力,又具备计算数据的功能,同时Spark和Scala的紧密集成,本书将采用Scala语言开发Spark程序,所以学好Scala将有助

    2024年02月11日
    浏览(67)
  • Apache Spark 练习六:使用Spark分析音乐专辑数据

    本章所分析的数据来自于Kaggle公开的、人工合成的音乐专辑发行数据(https://www.kaggle.com/datasets/revilrosa/music-label-dataset)。以下,我们只针对albums.csv文件进行分析。该数据具体包括以下字段: id: the album identifier; artist_id: the artist identifier; album_title: the title of the album; genre: the

    2024年02月15日
    浏览(63)
  • Linux CentOS下大数据环境搭建(zookeeper+hadoop+hbase+spark+scala)

    本篇文章是结合我个人学习经历所写,如果遇到什么问题或者我有什么错误,欢迎讨论。 百度网盘链接:https://pan.baidu.com/s/1DCkQQVYqYHYtPws9hWGpgw?pwd=zh1y 提取码:zh1y 软件在连接中VMwareWorkstation_V16.2.1_XiTongZhiJia的文件夹下。 双击运行安装包,这里下一步即可。 这里勾选我接受许可

    2024年04月15日
    浏览(69)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包