Spark读写MySQL数据库

这篇具有很好参考价值的文章主要介绍了Spark读写MySQL数据库。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark读写MySQL数据库

一、读取数据库

(一)通过RDD的方式读取MySQL数据库

四要素:驱动、连接地址、账号密码

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD读取MySQL数据库
 */
object spark_read_mysql {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_read_mysql") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    val url = "jdbc:mysql://192.168.80.145:3306/test"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //具体的SQL查询语句
    val sql = "select * from t_user where id>=? and id<=?"

    //查询
    val rsRDD = new JdbcRDD(
      sc,
      ()=>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL数据库的连接
        DriverManager.getConnection(url,username,password)
      },
      //需要执行的SQL语句
      sql,
      //查询的开始行
      1,
      //查询的结束行
      20,
      //运行几个分区执行
      2,
      //返回值的处理(将返回值变为RDD的元素),数字从1开始,表示字段的编号
      rs => (rs.getInt(1),rs.getString(2),rs.getInt(3))
    )

    //将RDD的元素打印在终端
    rsRDD.collect().foreach(println)

    sc.stop()
  }
}
(二)通过DataFrame的方式读取MySQL数据库
import org.apache.spark.sql.SparkSession

/**
 * 使用DataFrame读取MySQL数据库
 */
object spark_read_mysql2 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]")//指定运行的方式
      .appName("spark_read_mysql2")//程序的名字
      .getOrCreate()

    //创建DataFrame
    val jdbcDF = spark.read.format("jdbc")
      .option("url","jdbc:mysql://192.168.80.145:3306/test")//指定连接
      .option("driver","com.mysql.cj.jdbc.Driver")//指定驱动
      .option("user","root")//指定连接的用户
      .option("password","123456")//指定连接的用户的密码
      .option("dbtable","t_user")//查询的表
      .load()//加载数据库表

    //在终端显示DataFrame的内容
    jdbcDF.show()
  }
}

二、添加数据到MySQL

(一)通过RDD的方式插入数据到MySQL

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD插入数据到MySQL,RDD的每个元素都会执行一次创建连接和关闭连接
 */
object spark_write_mysql {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码
    val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //创建RDD
    val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))

    //打印RDD的元素
    //rdd.collect().foreach(println)

    //通过循环的方式读取RDD的每条元素,将元素插入MySQL;一个元素执行一次创建连接和插入和关闭连接
    rdd.foreach {
      case (name,age) =>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL的链接
        val conn = DriverManager.getConnection(url,username,password)
        //添加的SQL语句
        val sql = "insert into t_user(name,age) values(?,?)"
        //给SQL语句配置参数
        val ps = conn.prepareStatement(sql)
        //根据参数的类型配置参数
        ps.setString(1,name)
        ps.setInt(2,age)
        //执行SQL语句
        ps.executeUpdate()
        //关闭连接
        ps.close()
        conn.close()
      }
    }

    sc.stop()
  }
}
(二)通过RDD的方式插入数据到MySQL 2

每个分区执行一次创建连接和关闭连接文章来源地址https://www.toymoban.com/news/detail-856004.html

import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD插入数据到MySQL,RDD的每个分区执行一次创建连接和关闭连接;推荐
 */
object spark_write_mysql2 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql2") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码
    val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //创建RDD
    val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))

    //打印RDD的元素
    //rdd.collect().foreach(println)

    //通过循环的方式读取RDD的每个分区,将元素插入MySQL;一个分区执行一次创建连接和关闭连接
    rdd.foreachPartition {
      datas =>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL的链接
        val conn = DriverManager.getConnection(url,username,password)
        //添加的SQL语句
        val sql = "insert into t_user(name,age) values(?,?)"
        //给SQL语句配置参数
        val ps = conn.prepareStatement(sql)
        //根据参数的类型配置参数
        datas.foreach{
          case (name,age)=>{
            ps.setString(1,name)
            ps.setInt(2,age)

            //执行SQL语句
            ps.executeUpdate()
          }
        }
        //关闭连接
        ps.close()
        conn.close()
      }
    }

    sc.stop()
  }
}
(三)使用DataFrame插入数据到MySQL
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * 使用DataFrame插入数据到MySQL
 */

object spark_write_mysql3 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql3") //程序的名字
      .getOrCreate()

    //1.创建DataFrame
    //1.1 schema
    val schema = StructType(List(StructField("name", StringType,true),StructField("age",IntegerType,true)))
    //1.2 行rows
    //1.2.1 创建RDD
    val dataRDD = spark.sparkContext.parallelize(Array(Array("李四",20),Array("王五",20)))
    //1.2.2 创建rows
    val rows = dataRDD.map(x=>Row(x(0),x(1)))
    //1.3 拼接表头(schema)和行内容(rows)
    val df = spark.createDataFrame(rows,schema)

    //2.通过DataFrame插入数据到MySQL
    //如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表,需要注意表不能存在
    //df.write.mode("append"),是以追加的方式将数据写入到已经存在的表中
    df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8") //指定连接
      .option("driver", "com.mysql.cj.jdbc.Driver") //指定驱动
      .option("user", "root") //指定连接的用户
      .option("password", "123456") //指定连接的用户的密码
      .option("dbtable", "t_user2") //查询的表
      .save()//保存数据
  }
}

到了这里,关于Spark读写MySQL数据库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MySQL-数据库读写分离(上)

    ♥️ 作者:小刘在C站 ♥️ 个人主页:  小刘主页  ♥️ 努力不一定有回报,但一定会有收获加油!一起努力,共赴美好人生! ♥️ 学习两年总结出的运维经验,以及思科模拟器全套网络实验教程。专栏: 云计算技术 ♥️小刘私信可以随便问,只要会绝不吝啬,感谢CSD

    2024年02月16日
    浏览(53)
  • MySQL-数据库读写分离(中)

    ♥️ 作者:小刘在C站 ♥️ 个人主页:  小刘主页  ♥️ 努力不一定有回报,但一定会有收获加油!一起努力,共赴美好人生! ♥️ 学习两年总结出的运维经验,以及思科模拟器全套网络实验教程。专栏: 云计算技术 ♥️小刘私信可以随便问,只要会绝不吝啬,感谢CSD

    2024年02月16日
    浏览(53)
  • MySQL-数据库读写分离(下)

    ♥️ 作者:小刘在C站 ♥️ 个人主页:  小刘主页  ♥️ 努力不一定有回报,但一定会有收获加油!一起努力,共赴美好人生! ♥️ 学习两年总结出的运维经验,以及思科模拟器全套网络实验教程。专栏: 云计算技术 ♥️小刘私信可以随便问,只要会绝不吝啬,感谢CSD

    2024年02月15日
    浏览(38)
  • 利用python读写mysql数据库数据

    方法一: 1. python连接mysql数据库:需要用到 pymysql 库和 sqlalchemy库: 创建连接 create_engine: 2. 提供 sql 语句 3. 执行查询,使用 read_sql_query 得到pandas的dataframe数据 4. 写数据到 mysql 数据库 方法二:使用 read_sql 方法三:直接使用pymysql进行查询,插入数据、更新数据、删除数据

    2024年02月09日
    浏览(58)
  • MySQL数据库 主从复制与读写分离

    读写分离,基本的原理是让主数据库处理事务性增、改、删操作(INSERT、UPDATE、DELETE),而从数据库处理SELECT查询操作。数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库。 因为数据库的“写”(写10000条数据可能要3分钟)操作是比较耗时的。 但是数据库

    2024年02月10日
    浏览(48)
  • 【MySQL数据库】主从复制与读写分离

    读写分离,基本的原理是让主数据库处理事务性增、改、删操作(INSERT、UPDATE、DELETE),而从数据库处理SELECT查询操作。数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库。 因为数据库的“写”(写10000条数据可能要3分钟)操作是比较耗时的。 但是数据库

    2024年02月11日
    浏览(59)
  • 【数据库】mysql主从复制与读写分离

      读写分离,基本的原理是让主数据库处理事务性增、改、删操作(INSERT、UPDATE、DELETE ),而从数据库处理SELECT查询操作。数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库。   因为数据库的\\\"写”(写10000条数据可能要3分钟)操作是比较耗时的。   

    2024年02月11日
    浏览(72)
  • 【数据库七】MySQL主从复制与读写分离

    读写分离,基本的原理是让主数据库处理事务性增、改、删操作 (insert、update、delete),而 从数据库处理select查询操作 。 数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库 。 因为数据库的“写”(写10000条数据可能要3分钟)操作是比较耗时的。 但是数据

    2024年02月11日
    浏览(59)
  • 看!MySQL 8.2 数据库支持读写分离啦!

    MySQL 8.2.0创新版本已于2023-10-17发布,MySQL Router 8.2 支持数据库的读/写分离,这里将在InnoDB Cluster集群中演示数如何进行读写分离,本篇内容包括:MySQL Server数据库安装、MySQL Shell安装、MySQL Router安装、InnoDB Cluster安装与读写分离演示,若您只关注读写分离的演示,可直接跳至最

    2024年02月05日
    浏览(165)
  • Flink读取mysql数据库(java)

    代码如下: 运行结果如下:

    2024年02月12日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包