基于Flume+spark+Flask的分布式实时日志分析与入侵检测系统

这篇具有很好参考价值的文章主要介绍了基于Flume+spark+Flask的分布式实时日志分析与入侵检测系统。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

完整项目地址:https://download.csdn.net/download/lijunhcn/88463174

基于Flume+spark+Flask的分布式实时日志分析与入侵检测系统

简介

LogVision是一个整合了web日志聚合、分发、实时分析、入侵检测、数据存储与可视化的日志分析解决方案。聚合采用Apache Flume,分发采用Apache Kafka,实时处理采用Spark Streaming,入侵检测采用Spark MLlib,数据存储使用HDFS与Redis,可视化采用Flask、SocketIO、Echarts、Bootstrap。

本文下述的使用方法均面向单机伪分布式环境,你可以根据需求进行配置上的调整以适应分布式部署。

项目结构

  • flask:Flask Web后端
  • spark:日志分析与入侵检测的实现
  • flume:Flume配置文件
  • log_gen:模拟日志生成器
  • datasets:测试日志数据集
  • images:README的图片

依赖与版本

  • 编译与Web端需要用到的:
    • Java 8, Scala 2.11.12, Python 3.8 (包依赖见requirements), sbt 1.3.8
  • 计算环境中需要用到的:
    • Java 8, Apache Flume 1.9.0, Kafka 2.4, Spark 2.4.5, ZooKeeper 3.5.7, Hadoop 2.9.2, Redis 5.0.8

使用说明

在开始之前,你需要修改源码或配置文件中的IP为你自己的地址。具体涉及到flume配置文件、Spark主程序、Flask Web后端。

编译Spark应用

在安装好Java8与Scala11的前提下,在spark目录下,初始化sbt

sbt

退出sbt shell并使用sbt-assembly对Spark项目进行编译打包:

sbt assembly

然后将生成的jar包重命名为logvision.jar

环境准备

你需要一个伪分布式环境(测试环境为CentOS 7),并完成了所有对应版本组件依赖的配置与运行。
使用flume目录下的standalone.conf启动一个Flume Agent。
datasets文件夹中的learning-datasets提交如下路径:

/home/logv/learning-datasets

datasets文件夹中的access_log提交如下路径:

/home/logv/access_log
入侵检测模型训练与测试

提交jar包至Spark集群并执行入侵检测模型的生成与测试:

spark-submit --class learning logvision.jar

你将可以看到如下结果:

两个表格分别代表正常与异常数据集的入侵检测结果,下面四个表格可用于判断识别准确率。如图中所示250条正常测试数据被检测为250条正常,识别率100%;250条异常测试数据被检测为240条异常,10条正常,准确率96%。

启动可视化后端

flask目录下执行如下命令,下载依赖包:

pip3 install -r requirements.txt

启动Flask Web:

python3 app.py
启动实时日志生成器

log_gen中的实时日志生成器可根据传入参数(每次写入行数、写入间隔时间)将样本日志中的特定行块追加至目标日志中,以模拟实时日志的生成过程,供后续实时处理。

java log_gen [日志源] [目标文件] [每次追加的行数] [时间间隔(秒)]

提交至环境,编译并运行,每2秒将/home/logv/access_log文件中的5行追加至/home/logSrc中:

javac log_gen.java
java log_gen /home/logv/access_log /home/logSrc 5 2
启动分析任务

提交jar包至Spark集群并执行实时分析任务:

spark-submit --class streaming logvision.jar
查看可视化结果

至此你已经完成了后端组件的配置,通过浏览器访问Web端主机的5000端口可以查看到实时日志分析的可视化结果:
欢迎界面:

部分源码:文章来源地址https://www.toymoban.com/news/detail-795624.html

# coding=utf-8
import ast
import time
from kafka import KafkaConsumer
import redis
import requests

from threading import Lock, Thread
from flask import Flask, render_template, session, request
from flask_socketio import SocketIO, emit

async_mode = None
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)

thread = None
thread_lock = Lock()

# 配置项目
time_interval = 1
kafka_bootstrap_servers = "10.0.0.222:9092"
redis_con_pool = redis.ConnectionPool(host='10.0.0.222', port=6379, decode_responses=True)


# 页面路由与对应页面的ws接口
# 系统时间
@socketio.on('connect', namespace='/sys_time')
def sys_time():
    def loop():
        while True:
            socketio.sleep(time_interval)
            current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            socketio.emit('sys_time',
                          {'data': current_time},
                          namespace='/sys_time')

    socketio.start_background_task(target=loop)


# 欢迎页面
@app.route('/')
@app.route('/welcome')
def welcome():
    return render_template('index.html', async_mode=socketio.async_mode)


# 实时日志流
@socketio.on('connect', namespace='/log_stream')
def log_stream():
    def loop():
        socketio.sleep(time_interval)
        consumer = KafkaConsumer("raw_log", bootstrap_servers=kafka_bootstrap_servers)
        cache = ""
        for msg in consumer:
            cache += bytes.decode(msg.value) + "\n"
            if len(cache.split("\n")) == 25:
                socketio.emit('log_stream',
                              {'data': cache},
                              namespace='/log_stream')
                cache = ""

    socketio.start_background_task(target=loop)


# 实时日志分析页面
@app.route('/analysis')
def analysis():
    return render_template('analysis.html', async_mode=socketio.async_mode)


# 实时计数器
@socketio.on('connect', namespace='/count_board')
def count_board():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrange("statcode", 0, 40, withscores=True)

            # 总请求数(日志行数)
            host_count = redis_con.zscore("line", "count")

            # 成功请求数(状态码属于normal的个数)
            normal = ["200", "201", "202", "203", "204", "205", "206", "207"]
            success_count = 0
            for i in res:
                if i[0] in normal:
                    success_count += int(i[1])

            # 其他请求数(其他状态码个数)
            other_count = 0
            for i in res:
                other_count += int(i[1])
            other_count -= success_count

            # 访客数(不同的IP个数)
            visitor_count = redis_con.zcard("host")

            # 资源数(不同的url个数)
            url_count = redis_con.zcard("url")

            # 流量大小(bytes的和,MB)
            traffic_sum = int(redis_con.zscore("traffic", "sum"))

            # 日志大小(MB)
            log_size = int(redis_con.zscore("size", "sum"))

            socketio.emit('count_board',
                          {'host_count': host_count,
                           'success_count': success_count,
                           'other_count': other_count,
                           'visitor_count': visitor_count,
                           'url_count': url_count,
                           'traffic_sum': traffic_sum,
                           'log_size': log_size},
                          namespace='/count_board')

    socketio.start_background_task(target=loop)


# 实时热门位置
@socketio.on('connect', namespace='/hot_geo')
def hot_geo():
    def loop():
        while True:
            socketio.sleep(2)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 50, withscores=True)
            data = []

            for i in res:
                # 调用接口获取地理坐标
                req = requests.get("http://api.map.baidu.com/location/ip",
                                   {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                    'ip': i[0],
                                    'coor': 'bd09ll'})
                body = eval(req.text)

                # 仅显示境内定位
                if body['status'] == 0:
                    coor_x = body['content']['point']['x']
                    coor_y = body['content']['point']['y']

                    data.append({"name": i[0], "value": [coor_x, coor_y, i[1]]})

            socketio.emit('hot_geo',
                          {'data': data},
                          namespace='/hot_geo')

    socketio.start_background_task(target=loop)


# 实时热门资源排名
@socketio.on('connect', namespace='/hot_url')
def hot_url():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("url", 0, 9, withscores=True)
            data = []
            no = 1

            for i in res:
                data.append({"no": no, "url": i[0], "count": i[1]})
                no += 1

            socketio.emit('hot_url',
                          {'data': data},
                          namespace='/hot_url')

    socketio.start_background_task(target=loop)


# 实时热门IP排名
@socketio.on('connect', namespace='/hot_ip')
def hot_ip():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 13, withscores=True)
            data = []
            no = 1

            for i in res:
                # 调用接口获取地理坐标
                req = requests.get("http://api.map.baidu.com/location/ip",
                                   {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                    'ip': i[0],
                                    'coor': 'bd09ll'})
                body = eval(req.text)

                # 仅显示境内定位
                if body['status'] == 0:
                    address = body['content']['address']

                    data.append({"no": no, "ip": i[0], "address": address, "count": i[1]})
                    no += 1

            socketio.emit('hot_ip',
                          {'data': data},
                          namespace='/hot_ip')

    socketio.start_background_task(target=loop)


# 实时状态码比例
@socketio.on('connect', namespace='/status_code_pie')
def status_code_pie():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("statcode", 0, 100, withscores=True)
            data = []
            legend = []

            for i in res:
                if i[0] != 'foo':
                    data.append({"value": i[1], "name": i[0]})
                    legend.append(i[0])

            socketio.emit('status_code_pie',
                          {'legend': legend, 'data': data},
                          namespace='/status_code_pie')

    socketio.start_background_task(target=loop)


# 实时请求方式比例
@socketio.on('connect', namespace='/req_method_pie')
def req_method_pie():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("reqmt", 0, 100, withscores=True)
            data = []
            legend = []

            for i in res:
                if i[0] != 'foo':
                    data.append({"value": i[1], "name": i[0]})
                    legend.append(i[0])

            socketio.emit('req_method_pie',
                          {'legend': legend, 'data': data},
                          namespace='/req_method_pie')

    socketio.start_background_task(target=loop)


# 实时请求计数(按时间顺序)
@socketio.on('connect', namespace='/req_count_timeline')
def req_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = dict(redis_con.zrange("datetime", 0, 10000000, withscores=True))
            data = []
            date = []

            # 按时间排序
            for i in sorted(res):
                datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i) / 1000))
                data.append(res[i])
                date.append(datetime)

            socketio.emit('req_count_timeline',
                          {"data": data, "date": date},
                          namespace='/req_count_timeline')

    socketio.start_background_task(target=loop)


# IP请求数排序
@socketio.on('connect', namespace='/ip_ranking')
def timestamp_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 50, withscores=True)
            ip = []
            count = []

            for i in res:
                ip.append(i[0])
                count.append(i[1])

            socketio.emit('ip_ranking',
                          {"ip": ip, "count": count},
                          namespace='/ip_ranking')

    socketio.start_background_task(target=loop)


@app.route('/id')
def id():
    return render_template("id.html", async_mode=socketio.async_mode)


# 异常请求计数
@socketio.on('connect', namespace='/bad_count')
def bad_count():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = int(redis_con.zscore("bad", "bad"))

            socketio.emit('bad_count',
                          {"data": res},
                          namespace='/bad_count')

    socketio.start_background_task(target=loop)


# 正常请求计数
@socketio.on('connect', namespace='/good_count')
def bad_count():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = int(redis_con.zscore("good", "good"))

            socketio.emit('good_count',
                          {"data": res},
                          namespace='/good_count')

    socketio.start_background_task(target=loop)


# 正常请求地理标记
@socketio.on('connect', namespace='/good_geo')
def good_geo():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 调用接口获取地理坐标
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 仅显示境内定位
                        if body['status'] == 0:
                            coor_x = body['content']['point']['x']
                            coor_y = body['content']['point']['y']
                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"name": record['host'], "value": [coor_x, coor_y,
                                                                           record['url'],
                                                                           datetime,
                                                                           record['req_method'],
                                                                           record['protocol'],
                                                                           record['status_code']]})
                            socketio.emit('good_geo',
                                          {"data": data},
                                          namespace='/good_geo')

    socketio.start_background_task(target=loop)


# 异常请求地理标记
@socketio.on('connect', namespace='/bad_geo')
def bad_geo():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 调用接口获取地理坐标
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 仅显示境内定位
                        if body['status'] == 0:
                            coor_x = body['content']['point']['x']
                            coor_y = body['content']['point']['y']
                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"name": record['host'], "value": [coor_x, coor_y,
                                                                           record['url'],
                                                                           datetime,
                                                                           record['req_method'],
                                                                           record['protocol'],
                                                                           record['status_code']]})
                            socketio.emit('bad_geo',
                                          {"data": data},
                                          namespace='/bad_geo')

    socketio.start_background_task(target=loop)


# 实时入侵分类计数(按时间顺序)
@socketio.on('connect', namespace='/url_cate_count_timeline')
def url_cate_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            good_res = dict(redis_con.zrange("goodts", 0, 10000000, withscores=True))
            bad_res = dict(redis_con.zrange("badts", 0, 10000000, withscores=True))

            # 求正常和异常结果的时间戳的并集,并排序。再生成对应的正常和异常计数
            date = []
            date_ts = []
            good_date = []
            bad_date = []

            good_data = []
            bad_data = []
            # 求并集并排序
            for i in good_res:
                good_date.append(i)
            for j in bad_res:
                bad_date.append(j)
            for k in sorted(list(set(good_date) | set(bad_date))):
                date_ts.append(k)

            # 生成对应的计数
            for t in date_ts:
                if t in good_res:
                    good_data.append(good_res[t])
                else:
                    good_data.append(0)
                if t in bad_res:
                    bad_data.append(bad_res[t])
                else:
                    bad_data.append(0)
            # 时间戳转字符串
            for ts in date_ts:
                date.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(ts) / 1000)))

            socketio.emit('url_cate_count_timeline',
                          {"date": date, "good_data": good_data, "bad_data": bad_data},
                          namespace='/url_cate_count_timeline')

    socketio.start_background_task(target=loop)


# 实时异常请求概览
@socketio.on('connect', namespace='/bad_detail')
def bad_detail():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 调用接口获取地理坐标
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 仅显示境内定位
                        if body['status'] == 0:
                            address = body['content']['address']

                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"host": record['host'], "address": address, "url": record['url'],
                                         "datetime": datetime, "req_method": record['req_method'],
                                         "protocol": record['protocol'], "status_code": record['status_code'],
                                         "pred": record['prediction'], 'prob': record['probability']['values']})

                            socketio.emit('bad_detail',
                                          {"data": data},
                                          namespace='/bad_detail')
    socketio.start_background_task(target=loop)


# 实时正常请求概览
@socketio.on('connect', namespace='/good_detail')
def good_detail():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 调用接口获取地理坐标
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 仅显示境内定位
                        if body['status'] == 0:
                            address = body['content']['address']

                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"host": record['host'], "address": address, "url": record['url'],
                                         "datetime": datetime, "req_method": record['req_method'],
                                         "protocol": record['protocol'], "status_code": record['status_code'],
                                         "pred": record['prediction'], 'prob': record['probability']['values']})

                            socketio.emit('good_detail',
                                          {"data": data},
                                          namespace='/good_detail')
    socketio.start_background_task(target=loop)


@app.route('/about')
def about():
    return render_template("about.html", async_mode=socketio.async_mode)


if __name__ == '__main__':
    socketio.run(app, host="0.0.0.0", port=5000, debug=True)

到了这里,关于基于Flume+spark+Flask的分布式实时日志分析与入侵检测系统的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据

    本文尝试基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据。 一、用文心一言AI大模型将需求转化为样例代码 1、第一次对话:“python3写一段从rest服务器获取数据的样例代码” 同时生成了以下注解  这段代码首先定义了一个函数  get_da

    2024年02月03日
    浏览(46)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(66)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(77)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(45)
  • 分布式内存计算Spark环境部署与分布式内存计算Flink环境部署

    目录 分布式内存计算Spark环境部署 1.  简介 2.  安装 2.1【node1执行】下载并解压 2.2【node1执行】修改配置文件名称 2.3【node1执行】修改配置文件,spark-env.sh 2.4 【node1执行】修改配置文件,slaves 2.5【node1执行】分发 2.6【node2、node3执行】设置软链接 2.7【node1执行】启动Spark集群

    2024年02月08日
    浏览(73)
  • Spark分布式内存计算框架

    目录 一、Spark简介 (一)定义 (二)Spark和MapReduce区别 (三)Spark历史 (四)Spark特点 二、Spark生态系统 三、Spark运行架构 (一)基本概念 (二)架构设计 (三)Spark运行基本流程 四、Spark编程模型 (一)核心数据结构RDD (二)RDD上的操作 (三)RDD的特性 (四)RDD 的持

    2024年02月04日
    浏览(61)
  • Spark弹性分布式数据集

    1. Spark RDD是什么 RDD(Resilient Distributed Dataset,弹性分布式数据集)是一个不可变的分布式对象集合,是Spark中最基本的数据抽象。在代码中RDD是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 每个RDD都被分为多个分区,这些分区运行在集群中

    2024年02月13日
    浏览(56)
  • 分布式计算MapReduce | Spark实验

    题目1 输入文件为学生成绩信息,包含了必修课与选修课成绩,格式如下: 班级1, 姓名1, 科目1, 必修, 成绩1 br (注: br 为换行符) 班级2, 姓名2, 科目1, 必修, 成绩2 br 班级1, 姓名1, 科目2, 选修, 成绩3 br ………., ………, ………, ………, ……… br 编写两个Hadoop平台上的MapRed

    2024年02月08日
    浏览(58)
  • 分布式搭建(hadoop+hive+spark)

    hadoop-master 192.168.43.141 hadoop-slave1 192.168.43.142 hadoop-slave2 192.168.43.143 链接:https://pan.baidu.com/s/1OwKLvZAaw8AtVaO_c6mvtw?pwd=1234 提取码:1234 MYSQL5.6:wget http://repo.mysql.com/mysql-community-release-el6-5.noarch.rpm Scale:wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.tgz

    2024年02月12日
    浏览(42)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(65)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包