大数据编程实验四:Spark Streaming

这篇具有很好参考价值的文章主要介绍了大数据编程实验四:Spark Streaming。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、目的与要求

1、通过实验掌握Spark Streaming的基本编程方法;

2、熟悉利用Spark Streaming处理来自不同数据源的数据。

3、熟悉DStream的各种转换操作。

4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。

二、实验内容

1.参照教材示例,利用Spark Streaming对三种类型的基本数据源的数据进行处理。

2.参照教材示例,完成kafka集群的配置,利用Spark Streaming对Kafka高级数据源的数据进行处理,注意topic为你的姓名全拼

3.参照教材示例,完成DStream的两种有状态转换操作。

4.参照教材示例,完成把DStream的数据输出保存到文本文件或MySQL数据库中。

三、实验步骤(实验过程)

1. 文件流:

>>> from pyspark import SparkContext

>>> from pyspark.streaming import StreamingContext

>>> ssc=StreamingContext(sc,10)

>>> lines=ssc.\

... textFileStream('file:///usr/local/spark/mycode/streaming/logfile')

>>> words=lines.flatMap(lambda line:line.split())

>>> wordCounts=words.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)

>>> wordCounts.pprint()

>>> ssc.start()

套接字流:

from __future__ import print_function

import sys

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

if __name__ == "__main__":

    if len(sys.argv) != 3:

        print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)

        exit(-1)

    sc = SparkContext(appName="PythonStreamingNetworkWordCount")

    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

    counts = lines.flatMap(lambda line: line.split(" "))\

                  .map(lambda word: (word, 1))\

                  .reduceByKey(lambda a, b: a+b)

    counts.pprint()

    ssc.start()

    ssc.awaitTermination()

RDD队列流:

import time

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

if __name__ == "__main__":

    sc = SparkContext(appName="PythonStreamingQueueStream")

    ssc = StreamingContext(sc, 2)

    #创建一个队列,通过该队列可以把RDD推给一个RDD队列流

    rddQueue = []

    for i in range(5):

        rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]

        time.sleep(1)

    #创建一个RDD队列流

    inputStream = ssc.queueStream(rddQueue)

    mappedStream = inputStream.map(lambda x: (x % 10, 1))

    reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)

    reducedStream.pprint()

    ssc.start()

    ssc.stop(stopSparkContext=True, stopGraceFully=True)

2. from __future__ import print_function

import sys

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

if __name__=="__main__":

       if len(sys.argv)!= 3:

              print("Usage:KafkaWordCount.py <zk> <topic>",file=sys.stderr)

              exit(-1)

       sc=SparkContext(appName="PythonStreamingKafkaWordCount")

       ssc=StreamingContext(sc,1)

       zkQuorum,topic=sys.argv[1:]

       kvs=KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic:1})

       lines=kvs.map(lambda x:x[1])

       counts=lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)

       counts.pprint()

       ssc.start()

       ssc.awaitTermination()

3.无状态转换:第一问套接字流词频统计程序NetworkWordCount程序

 有状态转换:#!/usr/bin/env python3

from __future__ import print_function

import sys

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

if __name__=="__main__":

        if len(sys.argv)!=3:

                print("Usage:WindowedNetworkWordCount.py <hostname><port>",file=sys.stderr)

                exit(-1)

        sc=SparkContext(appName="PythonStreamingWindowedNetworkWordCount")

        ssc=StreamingContext(sc,10)

        ssc.checkpoint("file:///usr/local/spark/mycode/streaming/socket/checkpoint")

        lines=ssc.socketTextStream(sys.argv[1],int(sys.argv[2]))

        counts=lines.flatMap(lambda line:line.split(" "))\

                .map(lambda word:(word,1))\

                .reduceByKeyAndWindow(lambda x,y:x+y,lambda x,y:x-y,30,10)

        counts.pprint()

        ssc.start()

        ssc.awaitTermination()

4. from __future__ import print_function

import sys

import pymysql

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

if __name__ == "__main__":

    if len(sys.argv) != 3:

        print("Usage: NetworkWordCountStateful <hostname> <port>", file=sys.stderr)

        exit(-1)

    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")

    ssc = StreamingContext(sc, 1)

    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful")

    # RDD with initial state (key, value) pairs

    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])

    def updateFunc(new_values, last_sum):

        return sum(new_values) + (last_sum or 0)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

    running_counts = lines.flatMap(lambda line: line.split(" "))\

                          .map(lambda word: (word, 1))\

                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD)

    running_counts.pprint()

    def dbfunc(records):

        db = pymysql.connect("localhost","root","123456","spark")

        cursor = db.cursor()

        def doinsert(p):

            sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))

            try:

                cursor.execute(sql)

                db.commit()

            except:

                db.rollback()

        for item in records:

            doinsert(item)

    def func(rdd):

        repartitionedRDD = rdd.repartition(3)

        repartitionedRDD.foreachPartition(dbfunc)

    running_counts.foreachRDD(func)

    ssc.start()

    ssc.awaitTermination()

四、实验结果

1.文件流:在pyspark交互式环境中输入:

大数据编程实验四:Spark Streaming,大数据

/usr/local/spark/mycode/streaming/logfile目录下创建一个log1.txt文件,输入:I love spark  I love hadoop,则流计算终端显示:

大数据编程实验四:Spark Streaming,大数据

套接字流:创建NetworkWordCount.py

大数据编程实验四:Spark Streaming,大数据

在数据流终端启动服务器,端口号设置为9999,在里面输入一些单词,则在流计算终端显示:

大数据编程实验四:Spark Streaming,大数据

RDD队列流:

大数据编程实验四:Spark Streaming,大数据

大数据编程实验四:Spark Streaming,大数据

2.kafka配置:

启动zookeeper

大数据编程实验四:Spark Streaming,大数据

启动Kafka

大数据编程实验四:Spark Streaming,大数据

创建自己名字拼音的主题:

大数据编程实验四:Spark Streaming,大数据

通过spark streaming程序使用kafka数据源:

大数据编程实验四:Spark Streaming,大数据

大数据编程实验四:Spark Streaming,大数据

大数据编程实验四:Spark Streaming,大数据

3.无状态转换见套接字流NetworkWordCount词频统计程序

有状态转换:

大数据编程实验四:Spark Streaming,大数据大数据编程实验四:Spark Streaming,大数据

大数据编程实验四:Spark Streaming,大数据

4. 在终端安装python连接MySQL的模块:

大数据编程实验四:Spark Streaming,大数据

大数据编程实验四:Spark Streaming,大数据

大数据编程实验四:Spark Streaming,大数据

查看数据库:

大数据编程实验四:Spark Streaming,大数据文章来源地址https://www.toymoban.com/news/detail-774135.html

到了这里,关于大数据编程实验四:Spark Streaming的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spark编程基础】第7章 Structured Streaming

    7.1.1 基本概念 Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表 可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询 在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发

    2024年02月08日
    浏览(57)
  • 大数据——Spark Streaming

    Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。 之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T+1的。但在企业任务中存在很多的实时性的任务需求,列如双十一的京东阿里都会要求做一个

    2024年02月07日
    浏览(45)
  • 大数据编程实验一:HDFS常用操作和Spark读取文件系统数据

    这是我们大数据专业开设的第二门课程——大数据编程,使用的参考书是《Spark编程基础》,这门课跟大数据技术基础是分开学习的,但这门课是用的我们自己在电脑上搭建的虚拟环境进行实验的,不是在那个平台上,而且搭建的还是伪分布式,这门课主要偏向于有关大数据

    2024年04月10日
    浏览(54)
  • 大数据技术原理及应用课实验7 :Spark初级编程实践

    实验7  Spark初级编程实践 一、实验目的 1. 掌握使用Spark访问本地文件和HDFS文件的方法 2. 掌握Spark应用程序的编写、编译和运行方法 二、实验平台 1. 操作系统:Ubuntu18.04(或Ubuntu16.04); 2. Spark版本:2.4.0; 3. Hadoop版本:3.1.3。 三、实验步骤(每个步骤下均需有运行截图) 实

    2024年01月22日
    浏览(52)
  • Spark Streaming实时数据处理

    作者:禅与计算机程序设计艺术 Apache Spark™Streaming是一个构建在Apache Spark™之上的快速、微批次、容错的流式数据处理系统,它可以对实时数据进行高吞吐量、低延迟地处理。Spark Streaming既可用于流计算场景也可用于离线批处理场景,而且可以将结构化或无结构化数据源(如

    2024年02月06日
    浏览(54)
  • Spark Streaming实时流式数据处理

    作者:禅与计算机程序设计艺术 Apache Spark Streaming 是 Apache Spark 提供的一个用于高吞吐量、容错的流式数据处理引擎。它可以实时的接收数据并在系统内部以微批次的方式进行处理,并将结果输出到文件、数据库或实时消息系统中。Spark Streaming 支持 Java、Scala 和 Python 编程语言

    2024年02月08日
    浏览(50)
  • Spark Streaming + Kafka构建实时数据流

    1. 使用Apache Kafka构建实时数据流 参考文档链接:https://cloud.tencent.com/developer/article/1814030 2. 数据见UserBehavior.csv 数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集 根据这一csv文档运用Kafka模拟实时数据流,

    2024年02月12日
    浏览(44)
  • 实时大数据流处理技术:Spark Streaming与Flink的深度对比

    引言 在当前的大数据时代,企业和组织越来越多地依赖于实时数据流处理技术来洞察和响应业务事件。实时数据流处理不仅能够加快数据分析的速度,还能提高决策的效率和准确性。Apache Spark Streaming和Apache Flink是目前两个主要的实时数据流处理框架,它们各自拥有独特的特

    2024年03月10日
    浏览(62)
  • Spark编程实验三:Spark SQL编程

    目录 一、目的与要求 二、实验内容 三、实验步骤 1、Spark SQL基本操作 2、编程实现将RDD转换为DataFrame 3、编程实现利用DataFrame读写MySQL的数据 四、结果分析与实验体会 1、通过实验掌握Spark SQL的基本编程方法; 2、熟悉RDD到DataFrame的转化方法; 3、熟悉利用Spark SQL管理来自不同

    2024年02月03日
    浏览(39)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包