大数据编程实验四:SparkStreaming编程

这篇具有很好参考价值的文章主要介绍了大数据编程实验四:SparkStreaming编程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大数据编程实验四:SparkStreaming编程

一、实验目的与要求

  1. 通过实验掌握Spark Streaming的基本编程方法
  2. 熟悉利用Spark Streaming处理来自不同数据源的数据
  3. 熟悉DStream的各种转换操作
  4. 熟悉把DStream的数据输出保存到文本文件或MySQL数据库中

二、实验内容

  1. 参照教材示例,利用Spark Streaming对不同类型数据源的数据进行处理
  2. 参照教材示例,完成DStream的两种有状态转换操作
  3. 参照教材示例,完成把DStream的数据输出保存到文本文件或MySQL数据库中

三、实验步骤

1、利用Spark Streaming对不同类型数据源的数据进行处理

  • 文件流

    首先在虚拟机中打开第一个终端作为数据流终端,创建一个logfile目录:

    cd /usr/local/spark/mycode
    mkdir streaming
    cd streaming
    mkdir logfile
    

    然后我们打开第二个终端作为流计算终端,在我们创建的目录下面新建一个py程序:

    vim FileStreaming.py
    

    然后输入如下代码:

    from pyspark import SparkContext, SparkConf
    from pyspark.streaming import StreamingContext
    conf = SparkConf()
    conf.setAppName('TestDStream')
    conf.setMaster('local[2]')
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 10)
    lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
    words = lines.flatMap(lambda line: line.split(' '))
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
    wordCounts.pprint()
    ssc.start()
    ssc.awaitTermination()
    

    大数据编程实验四:SparkStreaming编程

    保存该文件并执行如下命令:

    /usr/local/spark/bin/spark-submit FileStreaming.py
    

    然后我们进入数据流终端,在logfile目录下新建一个log2.txt文件,然后往里面输入一些英文语句后保存退出,再次切换到流计算终端,就可以看见打印出单词统计信息了。

    大数据编程实验四:SparkStreaming编程

  • 套接字流

    我们继续在流计算端的streaming目录下创建一个socket目录,然后在该目录下创建一个DataSourceSocket.py程序:

    mkdir socket
    cd socket
    vim NetworkWordCount.py
    

    并在py程序中输入如下代码:

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
            exit(-1)
        sc = SparkContext(appName="PythonStreamingNetworkWordCount")
        ssc = StreamingContext(sc, 1)
        lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
        counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .reduceByKey(lambda a, b: a+b)
        counts.pprint()
        ssc.start()
        ssc.awaitTermination()
    

    大数据编程实验四:SparkStreaming编程

    我们再在数据流终端启动Socket服务器端:

    nc -lk 8888
    

    然后我们再进入流计算终端,执行如下代码启动流计算:

    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 8888
    

    然后我们在数据流终端内手动输入一行英文句子后回车,多输入几次,流计算终端就会不断执行词频统计并打印出信息。

    大数据编程实验四:SparkStreaming编程

  • RDD队列流

    我们继续在streaming目录下新建rddqueue目录并在该目录下创建py程序:

    mkdir rddqueue
    cd rddqueue/
    vim RDDQueueStreaming.py
    

    然后在py文件中输入如下代码:

    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
        sc = SparkContext(appName="PythonStreamingQueueStream")
        ssc = StreamingContext(sc, 2)
        #创建一个队列,通过该队列可以把RDD推给一个RDD队列流
        rddQueue = []
        for i in range(5):
            rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
            time.sleep(1)
        #创建一个RDD队列流
        inputStream = ssc.queueStream(rddQueue)
        mappedStream = inputStream.map(lambda x: (x % 10, 1))
        reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
        reducedStream.pprint()
        ssc.start()
        ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    大数据编程实验四:SparkStreaming编程

    保存退出后再执行如下命令:

    /usr/local/spark/bin/spark-submit RDDQueueStreaming.py
    

    大数据编程实验四:SparkStreaming编程

2、完成DStream的两种有状态转换操作

  • DStream无状态转换操作

    上面的词频统计程序NetworkWordCount就采取了无状态转换操作。

  • DStream有状态转换操作

    我们在socket目录下创建WindowedNetworkWordCount.py程序并输入如下代码:

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
            exit(-1)
        sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")
        ssc = StreamingContext(sc, 10)
        ssc.checkpoint("file:///usr/local/spark/mycode/streaming/socket/checkpoint")
        lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
        counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      . reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
        counts.pprint()
        ssc.start()
        ssc.awaitTermination()
    

    大数据编程实验四:SparkStreaming编程

    然后我们在数据流终端执行如下命令启动服务器:

    cd /usr/local/spark/mycode/streaming/socket/
    nc -lk 6666
    

    然后再在流计算终端运行我们刚写的代码:

    /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 6666
    

    在数据流终端输入英文就可以看见统计结果了。

    大数据编程实验四:SparkStreaming编程

3、完成把DStream的数据输出保存到MySQL数据库中

我们首先启动MySQL数据库:

systemctl start mysqld.service
mysql -u root -p

然后创建spark数据库和wordcount表:

mysql> create database spark;
mysql> use spark;
mysql> create table wordcount (word char(20), count int(4));

然后再在终端安装python连接MySQL的模块:

pip3 install PyMySQL

然后我们在streaming目录下新建stateful目录并在该目录下创建py文件:

mkdir stateful
cd stateful/
vim NetworkWordCountStatefulDB.py

并在py文件中输入如下代码:

from __future__ import print_function 
import sys 
import pymysql 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext 
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetworkWordCountStateful <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful") 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) 
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0) 
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD) 
    running_counts.pprint() 
    def dbfunc(records):
        db = pymysql.connect("localhost","root","123456","spark")
        cursor = db.cursor() 
        def doinsert(p):
            sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()
        for item in records:
            doinsert(item) 
    def func(rdd):
        repartitionedRDD = rdd.repartition(3)
        repartitionedRDD.foreachPartition(dbfunc)
    running_counts.foreachRDD(func)
    ssc.start()
    ssc.awaitTermination()

大数据编程实验四:SparkStreaming编程

然后我们新建一个数据源终端并执行如下命令:

cd /usr/local/spark/mycode/streaming/stateful/
nc -lk 5555

然后再在我们的流计算终端运行我们该编写的代码:

/usr/local/spark/bin/spark-submit NetworkWordCountStatefulDB.py localhost 5555

然后就可以把词频统计的结果写入MySQL中了。文章来源地址https://www.toymoban.com/news/detail-474031.html

到了这里,关于大数据编程实验四:SparkStreaming编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据基础编程、实验和教程案例(实验二)

    本实验对应第 4 章的内容。 14.2.1 实验目的 (1)理解 HDFS 在 Hadoop 体系结构中的角色; (2)熟练使用 HDFS 操作常用的 Shell 命令; (3)熟悉 HDFS 操作常用的 Java API。 14.2.2 实验平台 操作系统 Linux Hadoop版本 3.1.3 JDK版本 1.8 Java IDE Eclipse 14.2.3 实验步骤 (一)编程实现以下功能,

    2024年02月05日
    浏览(31)
  • 大数据基础编程、实验和教程案例(实验七)

    你好# 大数据基础编程、实验和教程案例(实验七) 本实验对应第 9 章的内容。 14.7.1 实验目的 (1)掌握使用 Spark 访问本地文件和 HDFS 文件的方法 (2)掌握 Spark 应用程序的编写、编译和运行方法 14.7.2 实验平台 操作系统 Linux Hadoop版本 3.1.3 Spark 版本 2.4.0 14.7.3 实验步骤 1.

    2024年02月01日
    浏览(26)
  • SparkStreaming学习——读取socket的数据和kafka生产者的消息

    目录 一、Spark Streaming概述 二、添加依赖 三、配置log4j 1.依赖下载好后打开IDEA最左侧的外部库 2.找到spark-core 3.找到apache.spark目录 4.找到log4j-defaults.properties文件 5.将该文件放在资源目录下,并修改文件名 6.修改log4j.properties第19行的内容 四、Spark Streaming读取Socket数据流 1.代码编

    2023年04月27日
    浏览(25)
  • 实验7 数据库编程

    第1关 定义一个名为PROC_COUNT的无参数存储过程 任务描述 定义一个名为 PROC_COUNT 的无参数存储过程,查询工程名称中含有“厂”字的工程数量,并调用该存储过程。 相关知识 1、工程项目表 J 由工程项目代码( JNO )、工程项目名( JNAME )、工程项目所在城市( CITY )组成。 J 表如下

    2024年02月08日
    浏览(16)
  • JAVA 实验三 数据库编程

    编程管理学生数据。要求: 1. 自选数据库管理系统创建数据库stu,按照下表的结构创建\\\"student\\\"表: 字段名 Java数据类型 宽度 SQL数据类型 id int 1 0 int Name String 20 Char(20) Sex String 2 Char(2) Age Int 3 Integer 假设表中已有3个学生的数据: id Name Sex Age 1 张小明 男 1 8 2 李雷 男 19 3 韩梅梅

    2024年02月05日
    浏览(28)
  • 大数据编程实验四:Spark Streaming

    一、目的与要求 1、通过实验掌握Spark Streaming的基本编程方法; 2、熟悉利用Spark Streaming处理来自不同数据源的数据。 3、熟悉DStream的各种转换操作。 4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。 二、实验内容 1.参照教材示例,利用Spark Streaming对三种类型的基

    2024年02月03日
    浏览(43)
  • 实验6 网络和JDBC数据库编程

    1 .实验目的 (1)掌握Socket通信 (2)掌握多线程的网络编程 (3)掌握使用JDBC连接数据库; (4)对mysql数据库实现增、删、改、查操作。 2.实验要求 在Eclipse下创建Practice6项目,对未有包名要求的题目统一按照实验题名建包,然后将本题源代码放在同一包下。对有包名要求的

    2024年02月03日
    浏览(44)
  • 大数据编程实验二:熟悉常用的HDFS操作

    实验目的 1、理解HDFS在Hadoop体系结构中的角色 2、熟悉使用HDFS操作常用的Shell命令 3、熟悉HDFS操作常用的Java API 实验平台 1、操作系统:Windows 2、Hadoop版本:3.1.3 3、JDK版本:1.8 4、Java IDE:IDEA 前期:一定要先启动hadoop   1、编程实现以下功能,并利用Hadoop提供的Shell命令完成相

    2024年02月08日
    浏览(33)
  • 大数据技术原理与应用实验指南——HBase编程实践

    一、 实验目的 (1) 熟练使用HBase操作常用的Shell命令。 (2) 熟悉HBase操作常用的Java API。 二、 实验内容 (1) 安装HBase软件。 (2) 编程实现指定功能,并利用Hadoop提供的Shell命令完成相同的任务(实现增、删、改、查基本操作,统计表的行数,打印表的记录等操作)。

    2024年02月21日
    浏览(31)
  • 大数据编程实验一:HDFS常用操作和Spark读取文件系统数据

    这是我们大数据专业开设的第二门课程——大数据编程,使用的参考书是《Spark编程基础》,这门课跟大数据技术基础是分开学习的,但这门课是用的我们自己在电脑上搭建的虚拟环境进行实验的,不是在那个平台上,而且搭建的还是伪分布式,这门课主要偏向于有关大数据

    2024年04月10日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包