Spark+Kafka构建实时分析Dashboard

这篇具有很好参考价值的文章主要介绍了Spark+Kafka构建实时分析Dashboard。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

说明

Spark+Kafka构建实时分析Dashboard【林子雨】
官方实验步骤:https://dblab.xmu.edu.cn/post/spark-kafka-dashboard/
前几天刚做完这个实验,学了不少知识,也遇到了不少问题,在这里记录一下自己的实验过程,与小伙伴们一起探讨下。

一、案例介绍

案例概述(详情见官网)

二、实验环境准备

1、实验系统和软件要求

Ubuntu: 16.04
Spark: 3.2.0  (注意Spark版本,官网教程安装的Spark 2.1.0版本亲测不行,后面实验莫名报错)
kafka: 2.8.0
Python: 3.7.13
Flask: 1.1.2
Flask-SocketIO: 5.1.0 (这个版本也不行的,不过后面有解决方法)
kafka-python: 2.0.2

2、系统和软件的安装

(1)安装Spark

详细步骤见官网教程:Spark的安装和使用
安装Hadoop
①、创建hadoop用户

# a、创建 hadoop 用户,并使用 /bin/bash 作为 shell
sudo useradd -m hadoop -s /bin/bash
# b、设置用户密码
sudo passwd hadoop
# c、为 hadoop 用户增加管理员权限
sudo adduser hadoop sudo

spark实时数据分析,spark,kafka,大数据
②、更新apt

sudo apt-get update

spark实时数据分析,spark,kafka,大数据
③、安装vi/vim

sudo apt-get install vim

spark实时数据分析,spark,kafka,大数据
④、安装Java环境

cd /usr/lib
sudo mkdir jvm #创建/usr/lib/jvm目录用来存放JDK文件
cd ~ #进入hadoop用户的主目录
cd Downloads  #注意区分大小写字母,刚才已经通过FTP软件把JDK安装包jdk-8u162-linux-x64.tar.gz上传到该目录下
sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm  #把JDK文件解压到/usr/lib/jvm目录下

spark实时数据分析,spark,kafka,大数据
设置环境变量

cd ~
vim ~/.bashrc

spark实时数据分析,spark,kafka,大数据
spark实时数据分析,spark,kafka,大数据
查看是否安装成功

source ~/.bashrc  # 让配置文件生效
java -version

spark实时数据分析,spark,kafka,大数据
这里官网还有其他两种方法安装JDK,其中第二种方法我试过了,不太行。
⑤、安装 Hadoop
这里我没使用 md5 等检测工具校验文件完整性,因为我比较懒。

# 解压到/usr/local中
sudo tar -zxf ~/下载/hadoop-2.6.0.tar.gz -C /usr/local    
cd /usr/local/
# 将文件夹名改为hadoop
sudo mv ./hadoop-2.6.0/ ./hadoop
# 修改文件权限          
sudo chown -R hadoop ./hadoop      

spark实时数据分析,spark,kafka,大数据
输入如下命令来检查 Hadoop 是否可用,成功则会显示 Hadoop 版本信息:

cd /usr/local/hadoop
./bin/hadoop version

spark实时数据分析,spark,kafka,大数据
Hadoop伪分布式配置
修改2个配置文件 core-site.xml 和 hdfs-site.xml。

vim ./etc/hadoop/core-site.xml

spark实时数据分析,spark,kafka,大数据

vim ./etc/hadoop/hdfs-site.xml

spark实时数据分析,spark,kafka,大数据
配置完成后,执行 NameNode 的格式化:

cd /usr/local/hadoop
./bin/hdfs namenode -format

spark实时数据分析,spark,kafka,大数据
开启 NameNode 和 DataNode 守护进程

cd /usr/local/hadoop
./sbin/start-dfs.sh  

spark实时数据分析,spark,kafka,大数据
通过命令 jps 来判断是否成功启动
spark实时数据分析,spark,kafka,大数据
安装 Spark
这里建议安装Spark3.2.0以上版本,我之前安装Spark2.4.0,后面执行程序时还是报错。

sudo tar -zxf ~/下载/spark-3.2.0-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-3.2.0-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark          

spark实时数据分析,spark,kafka,大数据
修改Spark的配置文件spark-env.sh。

cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
vim ./conf/spark-env.sh

spark实时数据分析,spark,kafka,大数据
spark实时数据分析,spark,kafka,大数据
通过运行Spark自带的示例,验证Spark是否安装成功。

cd /usr/local/spark
bin/run-example SparkPi 2>&1 | grep "Pi is"

spark实时数据分析,spark,kafka,大数据

(2)安装Kafka

详细步骤见官网教程:Kafka的安装和简单实例测试

cd ~/下载
sudo tar -zxf kafka_2.12-2.8.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.8.0/ ./kafka
sudo chown -R hadoop ./kafka

spark实时数据分析,spark,kafka,大数据
测试简单实例

# 进入kafka所在的目录
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

spark实时数据分析,spark,kafka,大数据
启动新的终端,输入如下命令:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

spark实时数据分析,spark,kafka,大数据
启动另外一个终端,输入如下命令:

cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
bin/kafka-topics.sh --list --zookeeper localhost:2181  

spark实时数据分析,spark,kafka,大数据
接下来用producer生产点数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

spark实时数据分析,spark,kafka,大数据
使用consumer来接收数据,输入如下命令:

cd /usr/local/kafka
# 官网那个命令失效了,用下面这个命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning  

spark实时数据分析,spark,kafka,大数据

(3)安装Python

详细步骤见官网教程:Anaconda的下载和使用方法
安装Anaconda方便切换python版本,而且也方便后面安装依赖。如果自己的python版本匹配的话,不安装其实也没啥问题。
spark实时数据分析,spark,kafka,大数据
安装过程有是否运行conda初始化,这里一定要输入yes,然后回车。教程是这样说的,但是当时实验的时候,我没留意它的安装进度,啪的一下,它就给我自动选择了no,然后刷新环境变量,(base)也不显示。如果你也遇到这个问题,莫慌,用下面命令初始化即可。

cd ~/anaconda3/bin/
/home/hadoop/anaconda3/bin/conda init

spark实时数据分析,spark,kafka,大数据
使用如下命令刷新环境变量

source ~/.bashrc

之后会发现命令行前面出现(base)的字样,就代表已经安装成功,并且环境变量和默认python都已经装配好。

我安装的python版本是3.7.13,用conda指令安装和切换即可:
spark实时数据分析,spark,kafka,大数据
切换版本:
spark实时数据分析,spark,kafka,大数据

(4)安装Python依赖库

这里一定要先切换到你要使用的python版本。

conda install flask
conda install flask-socketio
conda install kafka-python # 这个命令已经失效了

运行上面第三条指令时候寄了,提示我到下面这个网站下载。
所以咱可以用新的命令下载kafka-python依赖库。

conda install -c conda-forge kafka-python

spark实时数据分析,spark,kafka,大数据

(5)安装PyCharm

cd ~  #进入当前hadoop用户的主目录
sudo tar -zxvf ~/下载/pycharm-community-2016.3.5.tar.gz -C /usr/local  #把pycharm解压缩到/usr/local目录下
cd /usr/local
sudo mv pycharm-community-2016.3.5 pycharm  #重命名
sudo chown -R hadoop ./pycharm  #把pycharm目录权限赋予给当前登录Ubuntu系统的hadoop用户

spark实时数据分析,spark,kafka,大数据
执行如下命令启动PyCharm

cd /usr/local/pycharm
./bin/pycharm.sh  #启动PyCharm

spark实时数据分析,spark,kafka,大数据

三、数据处理和Python操作Kafka

详情见官网教程:数据处理和Python操作Kafka
写如下Python代码,文件名为producer.py:
spark实时数据分析,spark,kafka,大数据
写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py:
spark实时数据分析,spark,kafka,大数据
在开启KafkaProducer和KafkaConsumer之前,需要先开启Kafka。
点击Run 'producer’以及Run ‘consumer’,来运行生产者和消费者。如果生产者和消费者运行成功,则在consumer窗口会输出如下信息:
spark实时数据分析,spark,kafka,大数据

四、Structured Streaming实时处理数据

1、配置Spark开发Kafka环境

spark实时数据分析,spark,kafka,大数据
在/usr/local/spark/jars目录下新建kafka目录,把/usr/local/kafka/libs下所有函数库复制到/usr/local/spark/jars/kafka目录下,命令如下:

cd /usr/local/spark/jars
mkdir kafka
cd kafka
cp /usr/local/kafka/libs/* .

spark实时数据分析,spark,kafka,大数据
修改 Spark 配置文件,命令如下:
spark实时数据分析,spark,kafka,大数据
spark实时数据分析,spark,kafka,大数据

2、建立pySpark项目

在kafka这个目录下创建一个kafka_test.py文件
spark实时数据分析,spark,kafka,大数据

3、运行项目

编写好程序之后,接下来编写运行脚本,在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容:
spark实时数据分析,spark,kafka,大数据
运行如下命令即可执行刚编写好的Structured Streaming程序:

sh startup.sh

spark实时数据分析,spark,kafka,大数据

4、测试程序

下面开启之前编写的KafkaProducer投递消息,然后将KafkaConsumer中接收的topic改为result,验证是否能接收topic为result的消息,更改之后的KafkaConsumer为:
spark实时数据分析,spark,kafka,大数据
在同时开启Structured Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出:
spark实时数据分析,spark,kafka,大数据

五、结果展示

1、Flask-SocketIO实时推送数据

创建如图中的app.py文件,app.py的功能就是作为一个简易的服务器,处理连接请求,以及处理从kafka接收的数据,并实时推送到浏览器。app.py的代码如下:
spark实时数据分析,spark,kafka,大数据

2、浏览器获取数据并展示

index.html文件负责获取数据并展示效果,该文件中的代码内容如下:
spark实时数据分析,spark,kafka,大数据

3、效果展示

经过以上步骤,一切准备就绪,我们就可以启动程序来看看最后的效果。启动步骤如下:
①、确保kafka开启。
②、开启producer.py模拟数据流。
③、启动Structured Streaming实时处理数据。
④、启动app.py。
启动后的效果如下图:
spark实时数据分析,spark,kafka,大数据
用浏览器访问上图中给出的网址 http://127.0.0.1:5000/ ,就可以看到最终效果图了。
spark实时数据分析,spark,kafka,大数据
spark实时数据分析,spark,kafka,大数据
spark实时数据分析,spark,kafka,大数据

4、相关问题的解决方法

①、运行app.py报错:WebSocket transport not available. Install simple-websocket for improved performance.
解决方法:在使用的python环境下执行以下命令

pip install simple-websocket

②、运行app.py报错:
spark实时数据分析,spark,kafka,大数据
解决方法:flask-socketio版本问题,将版本改为4.3.1版本。
a、删除原先版本

conda remove flask-socketio

spark实时数据分析,spark,kafka,大数据
b、重新下载新版本

conda install flask-socketio==4.3.1

spark实时数据分析,spark,kafka,大数据
③、运行app.py报错:ImportError:cannot import name 'run_with_reloader' from 'werkzeug.serving'
spark实时数据分析,spark,kafka,大数据
解决方法:在使用的python环境下执行以下命令。文章来源地址https://www.toymoban.com/news/detail-543427.html

pip3 install --upgrade flask==1.1.4
pip3 install --upgrade Werkzeug==1.0.1
pip3 install --upgrade itsdangerous==1.1.0
pip3 install --upgrade Jinja2==2.11.2
pip3 install --upgrade MarkupSafe==2.0.1

到了这里,关于Spark+Kafka构建实时分析Dashboard的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • vivo 基于 StarRocks 构建实时大数据分析平台,为业务搭建数据桥梁

    在大数据时代,数据分析和处理能力对于企业的决策和发展至关重要。 vivo 作为一家全球移动互联网智能终端公司,需要基于移动终端的制造、物流、销售等各个方面的数据进行分析以满足业务决策。 而随着公司数字化服务的演进,业务诉求和技术架构有了新的调整,已有的

    2024年02月22日
    浏览(51)
  • ClickHouse 与 Kafka 整合: 实时数据流处理与分析解决方案

    随着数据量的不断增长,实时数据处理和分析变得越来越重要。ClickHouse 和 Kafka 都是在现代数据技术中发挥着重要作用的工具。ClickHouse 是一个高性能的列式数据库,专为 OLAP 和实时数据分析而设计。Kafka 是一个分布式流处理平台,用于构建实时数据流管道和流处理应用程序

    2024年02月22日
    浏览(49)
  • 大数据毕设-基于hadoop+spark+大数据+机器学习+大屏的电商商品数据分析可视化系统设计实现 电商平台数据可视化实时监控系统 评论数据情感分析

    🔥作者:雨晨源码🔥 💖简介:java、微信小程序、安卓;定制开发,远程调试 代码讲解,文档指导,ppt制作💖 精彩专栏推荐订阅:在下方专栏👇🏻👇🏻👇🏻👇🏻 Java精彩实战毕设项目案例 小程序精彩项目案例 Python实战项目案例 ​💕💕 文末获取源码 本次文章主要是

    2024年02月03日
    浏览(110)
  • 大数据流处理与实时分析:Spark Streaming和Flink Stream SQL的对比与选择

    作者:禅与计算机程序设计艺术

    2024年02月07日
    浏览(41)
  • Flink CEP完全指南:捕获数据的灵魂,构建智慧监控与实时分析大师级工具

    Flink CEP(Complex Event Processing)是 Apache Flink 的一个库,用于实现复杂的事件流处理和模式匹配。它可以用来识别事件流中的复杂模式和序列,这对于需要在实时数据流中进行模式识别的应用场景非常有用,比如监控、异常检测、业务流程管理等。 在Flink CEP中,你可以定义复杂

    2024年02月03日
    浏览(57)
  • 推荐系统架构设计实践:Spark Streaming+Kafka构建实时推荐系统架构

    作者:禅与计算机程序设计艺术 推荐系统(Recommendation System)一直都是互联网领域一个非常火热的话题。其主要目标是在用户多样化的信息环境中,通过分析用户的偏好、消费习惯等数据,提供个性化的信息推送、商品推荐、购物指导等服务。如何设计一个推荐系统的架构及

    2024年02月08日
    浏览(48)
  • 【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

    需要源码请点赞关注收藏后评论区留言私信~~~ 新闻话题实时统计分析系统以搜狗实验室的用户查询日志为基础,模拟生成用户查询日志,通过Flume将日志进行实时采集、汇集,分析并进行存储。利用Spark Streaming实时统计分析前20名流量最高的新闻话题,并在前端页面实时显示

    2024年02月06日
    浏览(51)
  • 天猫数据分析工具(天猫实时数据)

    后疫情时代,聚会、聚餐与送礼热度上涨,酒类产品既作为送礼首选又作为佐餐饮品的热门选手也受此影响迎来消费小高峰。在此背景下,白酒市场也开始复苏并不断加快速度。 根据鲸参谋电商数据分析平台的相关数据显示,2023年1月份至4月份,天猫平台上白酒的销量超过

    2024年02月13日
    浏览(46)
  • 【spark大数据】spark大数据处理技术入门项目--购物信息分析

    购物信息分析基于spark 目录 本案例中三个文案例中需要处理的文件为 order_goods.txt、products.txt 以及 orders.txt 三个文件,三个文件的说明如下 一、本实训项目针对实验数据主要完成了哪些处理? 二、Hadoop+Spark集群环境的搭建步骤有哪些?(只介绍完全分布式集群环境的搭建)

    2023年04月08日
    浏览(64)
  • 基于Spark的气象数据分析

    研究背景与方案 1.1.研究背景 在大数据时代背景下,各行业数据的规模大幅度增加,数据类别日益复杂,给数据分析工作带来极大挑战。 气象行业和人们 的生活息息相关,随着信息时代的发展,大数据技术的出现为气象数据的发展带来机遇。基于此,本项目使用 Spark 等大

    2024年02月09日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包