【Spark实训】--竞赛网站访问日志分析

这篇具有很好参考价值的文章主要介绍了【Spark实训】--竞赛网站访问日志分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 目录

一. 训练要点

二.需求说明

三.关键实现思路及步骤

 四、LogCount.scala文件完整代码实现:

五、运行过程与结果截图:

​ 六、具体实现步骤

 七、相关知识点 

 1、过滤出访问次数在 50 次以上的用户记录

 2、统计访问 50 次以上的用户主要访问的前 5 类网页

 3. 合并部分网页

 4.根据访问时间加入对应时段:


实训题目:竞赛网站访问日志分析

一. 训练要点

(1)搭建Spurk工程环境。

(2) Spark编程。

(3)通过spark-submit提交应用。

二.需求说明

     某竞赛网站每年都会开展数据挖据的竞赛,在竞赛期间网站会有大量人群访问,生成了大量的用户访向记录。现在提供2016年10月到2017年6月的部分脱敏访问日志数据。日志数据的基本内容如图所示,仅提供以下6个字段。

属性名称

属性解析

Id

序号

Content_id

网页ID

Page_path

网址

Userid

用户ID

Sessionid

缓存生成ID

Date_time

访问时间

     要求根据提供的用户访问日志数据,利用Spark技术统计访向的用户数、被访问的不同网页个数以及每月的访问量,并将结果保存到HDFS上。

文章所用文档以及目录等等说明:

(点击可免费下载)访问日志数据:    jc_content_viewlog.txt

IDEA内实现代码存储路径与名字:LogCount.scala

【Spark实训】--竞赛网站访问日志分析

  jc_content_viewlog.txt   内部分数据如下图:

【Spark实训】--竞赛网站访问日志分析

三.关键实现思路及步骤

(1)配置好Spark的IntelliJ IDEA开发环境。

(2)启动IntelliJ IDEA,并进行Spark编程。

(3)对访向记录中的网页去重,统计本周期内被访问网页的个数。

val logs_all: RDD[Array[String]]  = sc.textFile(args(0)).map{_.split(",")}
val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))

 (4) userid为用户注册登录的标识,对userid去重,统计登录用户的数量。

val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))

 (5)按月统计访问记录数。

val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)

 (6)将结果保存到不同文件中。

wy_count.repartition(1).saveAsTextFile(args(1))
user_count.repartition(1).saveAsTextFile(args(2))
ny_count.repartition(1).saveAsTextFile(args(3))

 (7)打包Spark工程,在集群提交应用程序。

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

注:jc.jar是上面文件生成的jar包改名并上传而来;

hdfs://node1:8020/user/root/jc_content_viewlog.txt  是hdfs里面jc_content_viewlog.txt存储路径,也需要自己上传,目录自己决定;

hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3  是设置它的输出存储路径,因为会输出三个不同数据,需要三个目录,不然会报错。

 四、LogCount.scala文件完整代码实现:

package net

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object LogCount {
  def main(args: Array[String]): Unit = {

    if(args.length < 2){
      println("请指定input和output")
      System.exit(1)//非0表示非正常退出程序
    }

    //TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
    val conf: SparkConf = new SparkConf().setAppName("wc")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //TODO 2.source/读取数据
    //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
    //RDD[就是一行行的数据]
       val logs_all: RDD[Array[String]]  = sc.textFile(args(0)).map{_.split(",")}
    //TODO 3.transformation/数据操作/转换
    //对访问记录中的网页去重,统计本周期内被访问网页的个数
    val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
    val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
    //userid为用户注册登录的标识,对userid去重,统计登录用户的数量
    val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
    val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
    //按月统计访问记录数
    val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
    val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)

    //TODO 4.sink/输出
    //输出到指定path(可以是文件/夹)
    wy_count.repartition(1).saveAsTextFile(args(1))
    user_count.repartition(1).saveAsTextFile(args(2))
    ny_count.repartition(1).saveAsTextFile(args(3))
    //为了便于查看Web-UI可以让程序睡一会
    Thread.sleep(1000 * 60)

    //TODO 5.关闭资源
    sc.stop()
  }
  //获取年月,时间段作为输入参数
  def date_time(date:String):String={
    val nianye =date.trim.substring(0,7)
    nianye
  }

}

五、运行过程与结果截图:

 


 六、具体实现步骤

1、修改打包好的jar名字,并把jar上传到node1结点

【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析

2、开启一系列集群:

start-dfs.sh   //一键开启
start-yarn.sh  //开启
cd /myserver/
 mr-jobhistory-daemon.sh start historyserver
 /myserver/spark301/sbin/start-history-server.sh
 jps  //查看

这里不再具体说明如何开启。

3、上传jc_content_viewlog.txt到node1节点,并上传到hdfs

​
[root@node1 ~]# hdfs dfs -put jc_content_viewlog.txt  /user/root/

【Spark实训】--竞赛网站访问日志分析

【Spark实训】--竞赛网站访问日志分析

 4、在集群提交应用程序

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析【Spark实训】--竞赛网站访问日志分析

 七、相关知识点 

进入spark-shell

[root@node1 bin]# /myserver/spark301/bin/spark-shell

【Spark实训】--竞赛网站访问日志分析

 1、过滤出访问次数在 50 次以上的用户记录

(1)统计用户访问次数并筛选出访问次数在50次以上的用户ID

scala> val data = sc.textFile("hdfs://node1:8020/user/root/jc_content_viewlog.txt").map{x=> x.split(",")}

data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24

scala> val userid=data.map(line=>(line(3),1)).reduceByKey((a,b)=>a+b).filter(x=>x._2>50).keys .collect

 (2)根据过滤后的用户ID,在原数据中筛选出这一部分用户的访问记录

scala> val valib_data=data.filter(x=>userid.contains(x(3)))

valib_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:27

scala> valib_data.take(2)   //查看

res1: Array[Array[String]] = Array(Array(480343, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:56:49), Array(480358, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:58:50))

 2、统计访问 50 次以上的用户主要访问的前 5 类网页

 scala> val web = valib_data.map(x=>(x(2),1)).reduceByKey((a,b)=>a+b).sortBy(x=>x._2,false)

web: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at sortBy at <console>:25

scala> web.take(5)

res2: Array[(String, Int)] = Array((/jingsa/1030.jhtml,67899), (/view/contentViewLog.jspx,5008), (/jingsa/712.jhtml,2551), (/youxiuzuopin/823.jhtml,1212), (/jingsa/613.jhtml,968))

 3. 合并部分网页

(URL 后面带有_1、_2 字样的翻页网址,统一为一个网址)通过字符串截取的方法,对网页网址字符串进行截取,只截取“_”前面的字符串

【Spark实训】--竞赛网站访问日志分析

 scala> val data2=data.filter(_.length>=6).map{

    x=>

      var page="";

      if(x(2).contains("_"))

        { page=x(2).substring(0,x(2).lastIndexOf("_")) }

      else

        { page=x(2) };

      (x(0),x(1),page,x(3),x(4),x(5))

      }

data2: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = MapPartitionsRDD[14] at map at <console>:25

 4.根据访问时间加入对应时段:

6:30~11:30 为上午,11:30~14:00 为中午,14:00~17:30为下午,17:30~19:00 为傍晚,19:00~23:00 为晚上,23:00~6:30 为深夜,统计所有用户各时段访问情况

(1)首先定义一个函数,用于匹配时间段并返回相应的字段值

scala> def date_time(date:String):String={
           val hour=date.substring(date.indexOf(" ")+1,date.indexOf(":")).toInt
           val min=date.substring(date.indexOf(":")+1,date.lastIndexOf(":")).toInt
           if(hour<6 && hour>=23) "深夜"
           else if(hour==6 && min<=30) "深夜"
           else if(hour<11 && hour>=6) "上午"
           else if(hour==11 && min<=30) "上午"
           else if(hour<14 && hour>=11) "中午"
           else if(hour>=14 && hour<17) "下午"
           else if(hour==17 && hour<=30) "下午"
           else if(hour>=17 && hour<19) "傍晚"
           else if(hour==19 && min<=30) "傍晚"
           else "晚上"
          }
date_time: (date: String)String

(2)通过map方法对每一条记录的时间进行匹配,增加一个时间段的值到记录中

 scala> val data_new = data2.map{x=>(x._1,x._2,x._3,x._4,x._5,x._6,date_time(x._6))}

data_new: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String)] = MapPartitionsRDD[17] at map at <console>:27

(3)将时段值作为键,值为1,利用reduceByKey的方法统计各时段访问情况 

scala> val date_count = data_new.map(x=>(x._7,1)).reduceByKey((a,b)=>a+b)

date_count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:25

scala> date_count.take(10)

res3: Array[(String, Int)] = Array((上午,31675), (傍晚,14511), (中午,18799), (下午,39720), (深夜,81), (晚上,67073))文章来源地址https://www.toymoban.com/news/detail-430718.html

到了这里,关于【Spark实训】--竞赛网站访问日志分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Spark的大规模日志分析

    摘要: 本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。 本文分享自华为云社区《【实战经验分享】基于Spark的大规模日志分析【上进小菜猪大数据系列】》,作者:上进小菜猪。 随着互联网的普及和应用范围的扩

    2024年02月09日
    浏览(49)
  • Spark安全日志分析与事件调查:实战指南

    摘要: 在当今数字化时代,安全日志分析和事件调查变得至关重要。本博客将介绍如何使用Spark进行安全日志分析和事件调查,展示了项目经验、详细的技术细节和提供了代码示例。通过深入理解和准备,您将能够展示您在Spark上的专业知识,为安全团队提供强大的分析和调

    2024年04月13日
    浏览(55)
  • Hadoop—20.网站日志分析项目案例(详解)

    目录 一、导入数据 1、启动hadoop 2、在hdfs下创建文件夹hadoop_class 3、查询文件夹是否创建成功  4、在hadoop_class下创建一个文件夹存放总数据  5、检查是否创建成功  6、在web_log中创建两个文件分别存放已处理的数据和未处理的数据 7、查看是否创建成功 8、因为日志数据共有两

    2024年02月05日
    浏览(46)
  • nginx上web服务的基本安全优化、服务性能优化、访问日志优化、目录资源优化和防盗链配置简介

    目录 一.基本安全优化 1.隐藏nginx软件版本信息 2.更改源码来隐藏软件名和版本 (1)修改第一个文件(核心头文件),在nginx安装目录下找到这个文件并修改 (2)第二个文件 (3)第三个文件,内置响应信息页面 (4)第四个文件 (5)重新编译安装并重启 3.更改nginx服务的默

    2024年02月13日
    浏览(45)
  • nginx访问日志分析

    1、根据访问IP统计UV awk \\\'{print $1}\\\' paycenteraccess.log | sort -n | uniq | wc -l 2、查询访问最频繁的IP(前10) awk \\\'{print $1}\\\' /www/server/nginx/logs/access.txt | sort -n |uniq -c | sort -rn | head -n 10 3、查看某一时间段的IP访问量(1-8点) awk \\\'$2 =\\\"[2023-01-29T11:00:00+08:00]\\\" $2 =\\\"[2023-01-29T11:30:00+08:00]\\\"\\\' /www/server/nginx

    2024年02月10日
    浏览(53)
  • 基于Flume+spark+Flask的分布式实时日志分析与入侵检测系统

    完整项目地址:https://download.csdn.net/download/lijunhcn/88463174 简介 LogVision是一个整合了web日志聚合、分发、实时分析、入侵检测、数据存储与可视化的日志分析解决方案。聚合采用Apache Flume,分发采用Apache Kafka,实时处理采用Spark Streaming,入侵检测采用Spark MLlib,数据存储使用H

    2024年01月16日
    浏览(45)
  • shell脚本-Nginx访问日志分析

    可以通过/usr/local/nginx/logs/access.log 文件-查看nginx的日志 /usr/local/nginx/conf/nginx.conf 文件-定义了日志输出的格式 可以通过awk命令来取出所需要的数据 Nginx访问日志分析脚本

    2024年02月16日
    浏览(64)
  • 使用宝塔面板如何查看网站日志分析搜索引擎蜘蛛数据

    网站日志(确切的讲应该是服务器日志)是记录WEB服务器接收处理请求以及运行错误等各种原始信息的文件。通过查看网站日志分析数据我们可以获得很有有用的数据,例如蜘蛛访问、是否被恶意访问、网站访客来源等等网站访客在寻找什么?哪个页面最受欢迎?网站访客从

    2024年02月09日
    浏览(55)
  • 面试题分析:统计网站访问次数

    难度:较低 平台的访问量非常高,需要实时统计网站的访问次数,请设计一个计数器解决: 初级工程师,可能回答使用synchronized锁或重入锁,进一步探讨,synchronized锁太重,有没其他方式,可能回答atomic类,进一步问,atomic类原理,什么场景下适合用,什么场景下不适合用

    2024年02月12日
    浏览(82)
  • 大数据技术之Hadoop(十一)——网站流量日志数据分析系统

    目录 素材: 一、模块开发——数据预处理 1、分析预处理的数据 2、实现数据的预处理 (1)创建Maven项目,添加相关依赖 (2)创建JavaBean对象,封装日志记录 (3)创建MapReduce程序,执行数据预处理  二、模块开发——数据仓库开发 1、上传文件 2、实现数据仓库 三、模块开

    2023年04月08日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包