spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

这篇具有很好参考价值的文章主要介绍了spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
    val ssc = new StreamingContext(sparkconf,Seconds(3))

    // 创建一个队列对象,队列中存放的是RDD
    val queue = new mutable.Queue[RDD[String]]()
    // 通过队列创建DStream
    val queueDS: InputDStream[String] = ssc.queueStream(queue)

    queueDS.print()

    // 启动采集器
    ssc.start()
       //这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产
       for(i <- 1 to 5 ){
          // 循环创建rdd
          val rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))
          // 将RDD存放到队列中
          queue.enqueue(rdd)
          // 当前线程休眠1秒
          Thread.sleep(6000)         
       }
        // 等待采集器的结束
    ssc.awaitTermination()
    }

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //从文件中读取数据
    val textDS: DStream[String] = ssc.textFileStream("in")
    textDS.print()
  
    // 启动采集器
    ssc.start()

    // 等待采集器的结束
    ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 获取采集的流
    val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))
    ds.print()

    ssc.start()
    ssc.awaitTermination()
  }

  // 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
  class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    private var socket: Socket = _
    def receive = {
     // 获取输入流
      val reader = new BufferedReader(
        new InputStreamReader(
          socket.getInputStream,
          "UTF-8"
        )
      )
      // 设定一个间接变量
      var s: String = null
      while (true) {
        // 按行读取数据
        s = reader.readLine()
        if (s != null) {
      // 将数据进行封装
          store(s)
        }
      }

    }

    // 1. 启动采集器
    override def onStart(): Unit = {
      socket = new Socket(host, port)
      new Thread("Socket Receiver") {
        setDaemon(true)
        override def run() {
          receive
        }
      }.start()


    }

    // 2. 停止采集器
    override def onStop(): Unit = {
      socket.close()
      socket = null


    }
  }

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境
    // SparkStreaming使用核数最少是2个
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // TODO 使用SparkStreaming读取Kafka的数据

    // Kafka的配置信息
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
      )
    // 获取数据,key是null,value是真实的数据
    val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
   
    valueDStream.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    ssc.start()
    // 等待采集器的结束
    ssc.awaitTermination()

 文章来源地址https://www.toymoban.com/news/detail-723305.html

到了这里,关于spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

      目录 零、本讲学习目标 一、基本操作 二、默认数据源 (一)默认数据源Parquet (二)案例演示读取Parquet文件 1、在Spark Shell中演示 2、通过Scala程序演示 三、手动指定数据源 (一)format()与option()方法概述 (二)案例演示读取不同数据源 1、读取房源csv文件 2、读取json,保

    2024年02月09日
    浏览(44)
  • Spark SQL数据源的基本操作

    Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入指定的数据源。 默认情况下,load()方法和save()方法只支持Parquet格式的文件,Parquet文件是以二进制方式存储数据的,因此

    2024年02月09日
    浏览(47)
  • 4.2 Spark SQL数据源 - 基本操作

    案例演示读取Parquet文件 查看Spark的样例数据文件users.parquet 1、在Spark Shell中演示 启动Spark Shell 查看数据帧内容 查看数据帧模式 对数据帧指定列进行查询,查询结果依然是数据帧,然后通过write成员的save()方法写入HDFS指定目录 查看HDFS上的输出结果 执行SQL查询 查看HDFS上的输

    2024年02月08日
    浏览(47)
  • spring boot下基于spring data jpa配置mysql+达梦多数据源(以不同包路径方式,mysql为主数据源)

    :mysql 达梦/dameng jpa 多数据源 spring boot:2.1.17.RELEASE mysql驱动:8.0.21(跟随boot版本) 达梦驱动:8.1.2.192 lombok:1.18.12(跟随boot版本) 以mysql为主数据源,达梦为第二数据源方式配置 适用于旧项目二次开发接入达梦数据库或基于通用二方/三方包做业务扩展等场景 将以不

    2024年02月05日
    浏览(64)
  • 一百八十二、大数据离线数仓完整流程——步骤一、用Kettle从Kafka、MySQL等数据源采集数据然后写入HDFS

    经过6个月的奋斗,项目的离线数仓部分终于可以上线了,因此整理一下离线数仓的整个流程,既是大家提供一个案例经验,也是对自己近半年的工作进行一个总结。 项目行业属于交通行业,因此数据具有很多交通行业的特征,比如转向比数据就是统计车辆左转、右转、直行

    2024年02月07日
    浏览(53)
  • springboot整合多数据源的配置以及动态切换数据源,注解切换数据源

    在许多应用程序中,可能需要使用多个数据库或数据源来处理不同的业务需求。Spring Boot提供了简便的方式来配置和使用多数据源,使开发人员能够轻松处理多个数据库连接。如果你的项目中可能需要随时切换数据源的话,那我这篇文章可能能帮助到你 ℹ️:这里对于pom文件

    2024年02月10日
    浏览(54)
  • NamedParameterJdbcTemplate多数据源指定数据源

    实战例子记录 pom config NamedParameterJdbcTemplate(动态sql调用)

    2024年02月08日
    浏览(52)
  • 数据源作用以及spring配置数据源

    数据源,简单理解为数据源头,提供了应用程序所需要数据的位置。数据源保证了应用程序与目标数据之间交互的规范和协议,它可以是数据库,文件系统等等。其中数据源定义了位置信息,用户验证信息和交互时所需的一些特性的配置,同时它封装了如何建立与数据源的连

    2024年02月07日
    浏览(54)
  • SpringBoot——动态数据源(多数据源自动切换)

    日常的业务开发项目中只会配置一套数据源,如果需要获取其他系统的数据往往是通过调用接口, 或者是通过第三方工具比如kettle将数据同步到自己的数据库中进行访问。 但是也会有需要在项目中引用多数据源的场景。比如如下场景: 自研数据迁移系统,至少需要新、老两

    2024年02月16日
    浏览(40)
  • SpringBoot从数据库读取数据数据源配置信息,动态切换数据源

            首先准备多个数据库,主库smiling-datasource,其它库test1、test2、test3         接下来,我们在主库smiling-datasource中,创建表databasesource,用于存储多数据源相关信息。表结构设计如下         创建好表之后,向表databasesource中存储test1、test2、test3三个数据库的相关配置

    2024年01月16日
    浏览(68)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包