Spark编程实验五:Spark Structured Streaming编程

这篇具有很好参考价值的文章主要介绍了Spark编程实验五:Spark Structured Streaming编程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Syslog介绍

2、通过Socket传送Syslog到Spark

3、Syslog日志拆分为DateFrame

4、对Syslog进行查询

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Structured Streaming的基本编程方法;
2、掌握日志分析的常规操作,包括拆分日志方法和分析场景。

二、实验内容

1、通过Socket传送Syslog到Spark

        日志分析是一个大数据分析中较为常见的场景。在Unix类操作系统里,Syslog广泛被应用于系统或者应用的日志记录中。Syslog通常被记录在本地文件内,也可以被发送给远程Syslog服务器。Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。

        日志一般会通过Kafka等有容错保障的源发送,本实验为了简化,直接将Syslog通过Socket源发送。新建一个终端,执行如下命令:

$ tail -n+1 -f /var/log/syslog | nc -lk 9988

        “tail -n+1 -f /var/log/syslog”表示从第一行开始打印文件syslog的内容。“-f”表示如果文件有增加则持续输出最新的内容。然后,通过管道把文件内容发送到nc程序(nc程序可以进一步把数据发送给Spark)。

        如果/var/log/syslog内的内容增长速度较慢,可以再新开一个终端(计作“手动发送日志终端”),手动在终端输入如下内容来增加日志信息到/var/log/syslog内:

$ logger ‘I am a test error log message.’

2、对Syslog进行查询

由Spark接收nc程序发送过来的日志信息,然后完成以下任务:

(1)统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。
(2)统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。
(3)输出所有日志内容带error的日志。

三、实验步骤

1、Syslog介绍

        分析日志是一个大数据分析中较为常见的场景。在Unix类操作系统里,Syslog广泛被应用于系统或者应用的日志记录中。Syslog通常被记录在本地文件内,也可以被发送给远程Syslog服务器。Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。

2、通过Socket传送Syslog到Spark

        日志一般会通过kafka等有容错保障的源发送,本实验为了简化,直接将syslog通过Socket源发送。新开一个终端,命令为“tail终端”,输入

tail -n+1 -f /var/log/syslog | nc -lk 9988

        tail命令加-n+1代表从第一行开始打印文件内容。-f代表如果文件有增加则持续输出最新的内容。通过管道发送到nc命令起的在本地9988上的服务上。
        如果/var/log/syslog内的内容增长速度较慢,可以再新开一个终端,命名为“手动发送log终端”,手动在终端输入

logger ‘I am a test error log message.’

来增加日志信息到/var/log/syslog内。

3、Syslog日志拆分为DateFrame

        Syslog每行的数据类似以下:

Nov 24 13:17:01 spark CRON[18455]: (root) CMD (cd / && run-parts --report /etc/cron.hourly)

        最前面为时间,接着是主机名,进程名,可选的进程ID,冒号后是日志内容。在Spark内,可以使用正则表达式对syslog进行拆分成结构化字段,以下是示例代码:

 # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
    fields = partial(
        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
    )

    words = lines.select(
        to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
        fields(idx=2).alias("hostname"),
        fields(idx=3).alias("tag"),
        fields(idx=4).alias("content"),
    )

        to_timestamp(format_string('2018 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),这句是对Syslog格式的一个修正,因为系统默认的Syslog日期是没有年的字段,所以使用format_string函数强制把拆分出来的第一个字段前面加上2019年,再根据to_timestamp格式转换成timestamp字段。在接下来的查询应当以这个timestamp作为事件时间。

4、对Syslog进行查询

由Spark接收nc程序发送过来的日志信息,然后完成以下任务。

(1)统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。

        在新开的终端内输入 vi spark_exercise_testsyslog1.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3

from functools import partial

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredSyslog") \
        .getOrCreate()

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9988) \
        .load()

    # Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)
    # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
    fields = partial(
        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
    )

    words = lines.select(
        to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
        fields(idx=2).alias("hostname"),
        fields(idx=3).alias("tag"),
        fields(idx=4).alias("content"),
    )

    # (1).  统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。
    windowedCounts1 = words \
        .filter("tag = 'CRON'") \
        .withWatermark("timestamp", "1 minutes") \
        .groupBy(window('timestamp', "1 hour")) \
        .count() \
        .sort(asc('window'))

    # 开始运行查询并在控制台输出
    query = windowedCounts1 \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option('truncate', 'false')\
        .trigger(processingTime="3 seconds") \
        .start()

    query.awaitTermination()

(2)统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。

        在新开的终端内输入 vi spark_exercise_testsyslog2.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3

from functools import partial

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredSyslog") \
        .getOrCreate()

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9988) \
        .load()

    # Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)
    # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
    fields = partial(
        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
    )

    words = lines.select(
        to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
        fields(idx=2).alias("hostname"),
        fields(idx=3).alias("tag"),
        fields(idx=4).alias("content"),
    )

    # (2).  统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。
    windowedCounts2 = words \
        .withWatermark("timestamp", "1 minutes") \
        .groupBy('tag', window('timestamp', "1 hour")) \
        .count() \
        .sort(asc('window'))

    # 开始运行查询并在控制台输出
    query = windowedCounts2 \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option('truncate', 'false')\
        .trigger(processingTime="3 seconds") \
        .start()

    query.awaitTermination()

(3)输出所有日志内容带error的日志。

        在新开的终端内输入 vi spark_exercise_testsyslog3.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3

from functools import partial

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredSyslog") \
        .getOrCreate()

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9988) \
        .load()

    # Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)
    # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
    fields = partial(
        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
    )

    words = lines.select(
        to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
        fields(idx=2).alias("hostname"),
        fields(idx=3).alias("tag"),
        fields(idx=4).alias("content"),
    )

    # (3).  输出所有日志内容带error的日志。
    windowedCounts3 = words \
        .filter("content like '%error%'")

    # 开始运行查询并在控制台输出
    query = windowedCounts3 \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .option('truncate', 'false')\
        .trigger(processingTime="3 seconds") \
        .start()

    query.awaitTermination()

四、结果分析与实验体会

        Spark Structured Streaming 是 Spark 提供的用于实时流处理的 API,它提供了一种统一的编程模型,使得批处理和流处理可以共享相同的代码逻辑,让开发者更容易地实现复杂的实时流处理任务。通过对 Structured Streaming 的实验,有以下体会:

  1. 简单易用: Structured Streaming 提供了高级抽象的 DataFrame 和 Dataset API,使得流处理变得类似于静态数据处理,降低了学习成本和编程复杂度。

  2. 容错性强大: Structured Streaming 内置了端到端的 Exactly-Once 语义,能够保证在发生故障时数据处理的准确性,给开发者提供了更可靠的数据处理保障。

  3. 灵活性和扩展性: Structured Streaming 支持丰富的数据源和数据接收器,可以方便地与其他数据存储和处理系统集成,同时也支持自定义数据源和输出操作,满足各种不同场景的需求。

  4. 优化性能: Structured Streaming 内置了优化器和调度器,能够根据任务的特性自动优化执行计划,提升处理性能,同时还可以通过调整配置参数和优化代码来进一步提高性能。

  5. 监控和调试: Structured Streaming 提供了丰富的监控指标和集成的调试工具,帮助开发者实时监控作业运行状态、诊断问题,并进行性能调优。

        通过实验和实践,更深入地理解 Structured Streaming 的特性和工作原理,掌握实时流处理的开发技巧和最佳实践,为构建稳健可靠的实时流处理应用打下坚实基础。

        Syslog 是一种常用的日志标准,它定义了一个网络协议,用于在计算机系统和网络设备之间传递事件消息和警报。通过对 Syslog 的实验,有以下体会:

  1. 灵活性: Syslog 可以用于收集各种类型的事件和日志信息,包括系统日志、安全事件、应用程序消息等等,具有很高的灵活性和可扩展性。

  2. 可靠性: Syslog 提供了可靠的传输和存储机制,确保事件和日志信息不会丢失或损坏,在故障恢复和安全审计方面非常重要。

  3. 标准化: Syslog 是一种通用的日志标准,已经被广泛采用和支持,可以与各种操作系统、应用程序、设备和服务集成,提供了统一的数据格式和接口。

  4. 安全性: Syslog 支持基于 TLS 和 SSL 的加密和身份认证机制,确保传输的信息不会被窃听或篡改,保证了日志传输的安全性。

  5. 可视化: 通过将 Syslog 收集到集中式的日志管理系统中,可以方便地进行搜索、分析和可视化,使日志信息变得更加易于理解和利用。

        通过实验和实践,更深入地了解 Syslog 的工作原理和应用场景,学会如何配置和使用 Syslog,掌握日志收集、存储、分析和可视化的技巧和最佳实践,为构建高效、可靠、安全的日志管理系统打下坚实基础。文章来源地址https://www.toymoban.com/news/detail-829263.html

到了这里,关于Spark编程实验五:Spark Structured Streaming编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark Structured Streaming使用教程

    Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。 Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表

    2024年02月03日
    浏览(45)
  • 实验四 Spark Streaming编程初级实践

    数据流  :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。 1.下载安装包 https://www.apache.org/dyn/closer.lua/flume/

    2024年04月26日
    浏览(35)
  • Structured Streaming: Apache Spark的流处理引擎

    欢迎来到我们的技术博客!今天,我们要探讨的主题是Apache Spark的一个核心组件——Structured Streaming。作为一个可扩展且容错的流处理引擎,Structured Streaming使得处理实时数据流变得更加高效和简便。 Structured Streaming是基于Apache Spark SQL引擎构建的高级流处理框架。它允许用户

    2024年01月25日
    浏览(42)
  • 实验三 Spark SQL基础编程

    1. 掌握 Spark SQL 的基本编程方法; 2. 熟悉 RDD 到 DataFrame 的转化方法; 3. 熟悉利用 Spark SQL 管理来自不同数据源的数据。 1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。 { \\\"id\\\":1, \\\"name\\\":\\\"Ella\\\", \\\"age\\\":36 } { \\\"id\\\":2, \\\"name\\\":\\\"Bob\\\", \\\"age\\\":29 } { \\\"id\\\":3, \\\"na

    2024年02月06日
    浏览(29)
  • Spark Streaming 编程权威使用指南

    注意:本文档为Spark的旧版本Streaming引擎。Spark Streaming 不再更新,是一个遗留项目。在Spark中有一种新的、更易用的流处理引擎,称为结构化流式处理。您应该使用Spark结构化流处理来开发流式应用和流水线。请参阅结构化流式处理编程指南。 Spark Streaming 是 Spark 核心 API 的扩

    2024年02月04日
    浏览(42)
  • 云计算技术 实验九 Spark的安装和基础编程

    1 . 实验学时 4学时 2 . 实验目的 熟悉Spark Shell。 编写Spark的独立的应用程序。 3 . 实验内容 (一)完成Spark的安装,熟悉Spark Shell。 首先安装spark: 将下好的压缩文件传入linux,然后进行压解: 之后移动文件,修改文件权限: 然后是配置相关的文件: Vim进入进行修改: 然后是

    2024年02月05日
    浏览(38)
  • 【Spark编程基础】实验三RDD 编程初级实践(附源代码)

    1、熟悉 Spark 的 RDD 基本操作及键值对操作; 2、熟悉使用 RDD 编程解决实际具体问题的方法 1、Scala 版本为 2.11.8。 2、操作系统:linux(推荐使用Ubuntu16.04)。 3、Jdk版本:1.7或以上版本。 请到本教程官网的“下载专区”的“数据集”中下载 chapter5-data1.txt,该数据集包含了某大

    2024年03月25日
    浏览(49)
  • 大数据——Spark Streaming

    Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。 之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T+1的。但在企业任务中存在很多的实时性的任务需求,列如双十一的京东阿里都会要求做一个

    2024年02月07日
    浏览(37)
  • Spark Streaming实时数据处理

    作者:禅与计算机程序设计艺术 Apache Spark™Streaming是一个构建在Apache Spark™之上的快速、微批次、容错的流式数据处理系统,它可以对实时数据进行高吞吐量、低延迟地处理。Spark Streaming既可用于流计算场景也可用于离线批处理场景,而且可以将结构化或无结构化数据源(如

    2024年02月06日
    浏览(40)
  • 大数据技术原理及应用课实验7 :Spark初级编程实践

    实验7  Spark初级编程实践 一、实验目的 1. 掌握使用Spark访问本地文件和HDFS文件的方法 2. 掌握Spark应用程序的编写、编译和运行方法 二、实验平台 1. 操作系统:Ubuntu18.04(或Ubuntu16.04); 2. Spark版本:2.4.0; 3. Hadoop版本:3.1.3。 三、实验步骤(每个步骤下均需有运行截图) 实

    2024年01月22日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包