大数据之Spark案例实操完整使用(第六章)

这篇具有很好参考价值的文章主要介绍了大数据之Spark案例实操完整使用(第六章)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、案例一

1、准备数据

大数据之Spark案例实操完整使用(第六章)
上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:

➢ 数据文件中每行数据采用下划线分隔数据

➢ 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种

➢ 如果搜索关键字为 null,表示数据不是搜索数据

➢ 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据

➢ 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示

➢ 支付行为和下单行为类似

详细字段说明

编号 字段名称 字段类型 字段含义
1 date String 用户点击行为的日期
2 user_id Long 用户的 ID
3 session_id String Session 的 ID
4 page_id Long 某个页面的 ID
5 action_time String 动作的时间点
6 search_keyword String 用户搜索的关键词
7 click_category_id Long 某一个商品品类的 ID
8 click_product_id Long 某一个商品的 ID
9 order_category_ids String 一次订单中所有品类的 ID 集合
10 order_product_ids String 一次订单中所有商品的 ID 集合
11 pay_category_ids String 一次支付中所有品类的 ID 集合
12 pay_product_ids String 一次支付中所有商品的 ID 集合
13 city_id Long 城市 id
//用户访问动作表
case class UserVisitAction(
 date: String,//用户点击行为的日期
 user_id: Long,//用户的 ID
 session_id: String,//Session 的 ID
 page_id: Long,//某个页面的 ID
 action_time: String,//动作的时间点
 search_keyword: String,//用户搜索的关键词
 click_category_id: Long,//某一个商品品类的 ID
 click_product_id: Long,//某一个商品的 ID
 order_category_ids: String,//一次订单中所有品类的 ID 集合
 order_product_ids: String,//一次订单中所有商品的 ID 集合
 pay_category_ids: String,//一次支付中所有品类的 ID 集合
 pay_product_ids: String,//一次支付中所有商品的 ID 集合
 city_id: Long
)//城市 id

2、需求 1:Top10 热门品类

大数据之Spark案例实操完整使用(第六章)

3、需求说明

品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的
公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数
例如,综合排名 = 点击数20%+下单数30%+支付数*50%
本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下 单数;下单数再相同,就比较支付数。

方案一、

分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_Req1_HotCategoryTop10Analysis {

  def main(args: Array[String]): Unit = {
    /**
     *
     * TODO 热门类品类
     */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)


    //TODO 1、读取原始日志数据
    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")

    //TODO 2、统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRdd.filter(
      action => {
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(6) != "-1"

      }
    )


    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => {
        val datas = action.split("_")
        //点击品类的ID就有了、数量就是1--(单独统计点击的品类)
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    //TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null

    val orderCountRDD = actionRdd.filter(
      action => {
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(8) != "null"

      }
    )

    val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))

      }
    ).reduceByKey(_ + _)

    //    value".collect().foreach(println)


    //TODO 4、统计品类的支付数量:(品类ID,支付数量)

    val payCountRDD = actionRdd.filter(
      action => {
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(10) != "null"

      }
    )

    val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)



    //TODO 5、将品类进行排序,并且提取前十名
    //点击数量排序、下单数量排序,支付数量排序
    //元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
    //(品类ID,(点击数量,下单数量,支付数量))

    //会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
    //cogroup = connect + group

    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(orderCount, payCount)

    val analysisRDD = cogroupRDD.mapValues {
      case (clickIter, orderIter, payIter) => {
        var clickCnt = 0
        val iter1 = clickIter.iterator
        if (iter1.hasNext) {
          clickCnt = iter1.next()
        }

        var orderCnt = 0
        val iter2 = orderIter.iterator
        if (iter2.hasNext) {
          orderCnt = iter2.next()
        }

        var payCnt = 0
        val iter3 = payIter.iterator
        if (iter3.hasNext) {
          payCnt = iter3.next()
        }
        (clickCnt, orderCnt, payCnt)
      }
    }

    val tuples = analysisRDD.sortBy(_._2, false).take(10)
    tuples.foreach(println)
    //TODO 6、将结果采集到控制台打印出来

    sc.stop()

    //TODO 7、统计品类的点击数量:(品类ID,点击数量)

  }

}

结果:
(15,(6120,1672,1259))
(2,(6119,1767,1196))
(20,(6098,1776,1244))
(12,(6095,1740,1218))
(11,(6093,1781,1202))
(17,(6079,1752,1231))
(7,(6074,1796,1252))
(9,(6045,1736,1230))
(19,(6044,1722,1158))
(13,(6036,1781,1161))

实现方案二

一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))
package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
 * 第二种实现方式
 */
object Spark02_Req1_HotCategoryTop10Analysis {

  def main(args: Array[String]): Unit = {
    /**
     *
     * TODO 热门类品类
     */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)
    //Q: actionRdd重复使用 -使用缓存
    //Q: cogroup性能可能较低

    //TODO 1、读取原始日志数据
    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    actionRdd.cache()

    //TODO 2、统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRdd.filter(
      action => {
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(6) != "-1"
      }
    )

    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => {
        val datas = action.split("_")
        //点击品类的ID就有了、数量就是1--(单独统计点击的品类)
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    //TODO 3、统计品类的下单数量:(品类ID,下单数量) ----下单的话一定不为null

    val orderCountRDD = actionRdd.filter(
      action => {
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(8) != "null"

      }
    )

    val orderCount: RDD[(String, Int)] = orderCountRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))

      }
    ).reduceByKey(_ + _)

    //    value".collect().foreach(println)

    //TODO 4、统计品类的支付数量:(品类ID,支付数量)

    val payCountRDD = actionRdd.filter(
      action => {
        val datas = action.split("_")
        //获取索引6的、去除不是-1的数据
        datas(10) != "null"

      }
    )

    val payCount: RDD[(String, Int)] = payCountRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    //TODO 5、将品类进行排序,并且提取前十名
    //点击数量排序、下单数量排序,支付数量排序
    //元组排序:先比较第一个,在比较第二个,在比较第三个,以此类推退
    //(品类ID,(点击数量,下单数量,支付数量))

    //会在自己的数据源建立一个分组、跟另外一个数据源做一个链接
    //cogroup = connect + group

    val rdd = clickCountRDD.map {
      case (cid, cnt) => {
        (cid, (cnt, 0, 0))
      }
    }

    val rdd1 = orderCount.map {
      case (cid, cnt) => {
        (cid, (0, cnt, 0))
      }
    }
    val rdd2 = payCount.map {
      case (cid, cnt) => {
        (cid, (0, 0, cnt))
      }
    }

    //将三个数据源合并在一起、统一进行聚合计算
    val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd.union(rdd1).union(rdd2)
    val analysisRDD = sourceRDD.reduceByKey {
      (t1, t2) => {
        (t1._1+t2._1,t1._2+ t2._2,t1._3+ t2._3)
      }
    }
//        sourceRDD.collect().foreach(println)
        val tuples = analysisRDD.sortBy(_._2, false).take(10)

    tuples.foreach(println)
    //TODO 6、将结果采集到控制台打印出来

    sc.stop()
    //TODO 7、统计品类的点击数量:(品类ID,点击数量)

  }

}

结果:
(15,(6120,1672,1259))
(2,(6119,1767,1196))
(20,(6098,1776,1244))
(12,(6095,1740,1218))
(11,(6093,1781,1202))
(17,(6079,1752,1231))
(7,(6074,1796,1252))
(9,(6045,1736,1230))
(19,(6044,1722,1158))
(13,(6036,1781,1161))

Process finished with exit code 0

实现方案三

使用累加器的方式聚合数据

package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable


/**
 * 使用累加器的方式聚合数据
 *
 */
object Spark04_Req1_HotCategoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    /**
     *
     * TODO 热门类品类
     */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)


    //TODO 1、读取原始日志数据
    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    val acc = new HotCategoryAccumulator
    sc.register(acc, "HotCategory")


    //将数据转换结构
    val flatMapRDD = actionRdd.foreach(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          //点击的场合
          acc.add(datas(6), "click")
        } else if (datas(8) != "null") {
          //下单的场合
          val ids = datas(8).split(",")
          ids.foreach(
            id => {
              acc.add(id, "order")
            }
          )

        } else if (datas(10) != "null") {
          //支付的场合
          val ids = datas(10).split(",")
          ids.foreach(
            id => {
              acc.add(id, "pay")
            }
          )
        }
      }
    )


    val accVal: mutable.Map[String, HotCategory] = acc.value
    val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)


    val sort = categories.toList.sortWith(
      (left, right) => {
        if (left.clickCnt > right.clickCnt) {
          true
        } else if (left.clickCnt == right.clickCnt) {
          if (left.orderCnt > right.orderCnt) {
            true
          } else if (left.orderCnt == right.orderCnt) {
            left.payCnt > right.payCnt
          } else {
            false
          }
        } else {
          false
        }
      }
    )


    sort.take(10).foreach(println)
    //TODO 6、将结果采集到控制台打印出来

    sc.stop()
    //TODO 7、统计品类的点击数量:(品类ID,点击数量)
  }


  case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)

  /**
   * 自定义累加器
   * 1、继承AccumlatorV2,定义泛型
   * IN :(品类ID,行为类型)
   * OUT: mutable.Map[String,HotCategory]
   *
   * 2、重写方法(6)
   */
  class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {


    private val hcMap = mutable.Map[String, HotCategory]()

    //是不是当前的初始状态
    override def isZero: Boolean = {
      hcMap.isEmpty
    }


    override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
      new HotCategoryAccumulator()
    }

    override def reset(): Unit = {
      hcMap.clear()
    }

    override def add(v: (String, String)): Unit = {
      val cid = v._1
      val actionType = v._2
      val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
      if (actionType == "click") {
        category.clickCnt += 1
      } else if (actionType == "order") {
        category.orderCnt += 1
      } else if (actionType == "pay") {
        category.payCnt += 1
      }

      hcMap.update(cid, category)
    }

    override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
      val map1 = this.hcMap
      val map2 = other.value

      map2.foreach {
        case (cid, hc) => {
          val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
          category.clickCnt += hc.clickCnt
          category.orderCnt += hc.orderCnt
          category.payCnt += hc.payCnt
          map1.update(cid, category)
        }
      }

    }


    //返回结果
    override def value: mutable.Map[String, HotCategory] = hcMap
  }
}

二 、需求实现

1、需求 2:Top10 热门品类中每个品类的 Top10 活跃 Session 统计

需求说明:
在需求一的基础上,增加每个品类用户 session 的点击统计

package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable


/**
 * 第三种实现方式
 * 重新看- 没看懂
 *
 */
object Spark05_Req1_HotCategoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    /**
     *
     * TODO 热门类品类
     */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)

    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    actionRdd.cache()

    val top10Ids: Array[(String)] = top10Category(actionRdd)

    //1、过滤原始数据、保留点击和前10品类ID
    val filterActionRDD = actionRdd.filter(
      action => {
        val datas = action.split("_")
        //先满足这个条件
        if (datas(6) != "-1") {
          //判断datas(6) 在不在 top10Ids 里面
          top10Ids.contains(datas(6))
        } else {
          false
        }
      }
    )

    //根据品类ID和sessionID进行点击量的统计
    val reduceRDD = filterActionRDD.map(
      action => {
        val datas = action.split("_")
        ((datas(6), datas(2)), 1)
      }
    ).reduceByKey(_ + _)


    //3、将统计数据的结果进行结构的转换
    //((品类ID,sessionID),sum)=>(品类ID,(SessionID,sum))
    val mapRDD = reduceRDD.map {
      case ((cid, sid), sum) => {
        (cid, (sid, sum))
      }
    }

    //相同的品类进行分组
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()

    //5、将分组后的数据进行点击量的排序、取前10名
    val resultRDD = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
      }
    )


    resultRDD.collect().foreach(println)
    sc.stop()
  }


  def top10Category(actionRdd: RDD[String]): Array[(String)] = {
    val flatRDD: RDD[(String, (Int, Int, Int))] = actionRdd.flatMap(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          //点击的场合
          List((datas(6), (1, 0, 0)))

        } else if (datas(8) != "null") {
          //下单的场合
          val ids = datas(8).split(",")
          ids.map(id => (id, (0, 1, 0)))

        } else if (datas(10) != "null") {
          //支付的场合
          val ids = datas(10).split(",")
          ids.map(id => (id, (0, 0, 1)))

        } else {
          Nil
        }
      }
    )
    val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )
    analysisRDD.sortBy(_._2, false).take(10).map(_._1)
  }


}

2、页面单跳转换率统计

需求说明
1)页面单跳转化率
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中
访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,
那么单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV) 为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,
那么 B/A 就是 3-5 的页面单跳转化率。

大数据之Spark案例实操完整使用(第六章)

2)统计页面单跳转化率意义
产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的
表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
数据分析师,可以此数据做更深一步的计算和分析。
企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当
调整公司的经营战略或策略。

package com.spack.bigdata.core.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
 * 第三种实现方式
 * 重新看- 没看懂
 *
 */
object Spark06_Req3_HotCategoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    /**
     *
     * TODO 热门类品类
     */

    val operator = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(operator)

    val actionRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")

    val actionDataRdd: RDD[UserVisitAction] = actionRdd.map(
      action => {
        val datas = action.split("_")
        UserVisitAction(
          datas(0),
          datas(1).toLong,
          datas(2),
          datas(3).toLong,
          datas(4),
          datas(5),
          datas(6).toLong,
          datas(7).toLong,
          datas(8),
          datas(9),
          datas(10),
          datas(11),
          datas(12).toLong,
        )
      }
    )


    //TODO 对指定的页面连续跳转进行统计
    //1-2,2-3,3-4,4-5,5-6,6-7
    val ids = List[Long](1, 2, 3, 4, 5, 6, 7)
    val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)


    //TODO 计算分母 --求首页
    val pageidToCountMap: Map[Long, Long] = actionDataRdd.filter(
      //先过滤提高  ---init 不包含最后一个
      action => {
        ids.init.contains(action.page_id)
      }
    ).map(
      action => {
        (action.page_id, 1L)
      }
    ).reduceByKey(_ + _).collect().toMap

    actionDataRdd.cache()
    //TODO 计算分子
    val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRdd.groupBy(_.session_id)

    //分组后、根据访问时间进行排序(升序)
    val mvRdd: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
      iter => {
        val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time) //默认升序

        //[1,2,3,4]
        //[1,2] ,[2,3],[3,4]
        //[1-2,2-3,3-4]

        //Sliding:滑窗
        //【1,2,3,4】 flowIds就是【1,2,3,4】
        //【2,3,4】 就是 List[Long] 尾部信息
        //        zip 拉链
        val flowIds: List[Long] = sortList.map(_.page_id)

        val pageFlowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)

        //将不合法的页面跳转进行过滤
        pageFlowIds.filter(
          t => {
            okflowIds.contains(t)
          }
        ).map(
          t => {
            (t, 1)
          }
        )

      }
    )

    //((1,2),1) 拆开
    val flatRDD: RDD[((Long, Long), Int)] = mvRdd.map(_._2).flatMap(list => list)
    //((1,2),1)=>((1,2),SUM)
    val dataRDD = flatRDD.reduceByKey(_ + _)

    //TODO 计算单挑转换率
    //分子除以分母
    dataRDD.foreach {
      case ((pageid1, pageid2), sum) => {
        val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)
        println(s"页面${pageid1}挑转到页面${pageid2}单挑转化率为" + (sum.toDouble / lon))
      }
    }

    sc.stop()
  }

  //用户访问动作表
  case class UserVisitAction(
                              date: String, //用户点击行为的日期
                              user_id: Long, //用户的 ID
                              session_id: String, //Session 的 ID
                              page_id: Long, //某个页面的 ID
                              action_time: String, //动作的时间点
                              search_keyword: String, //用户搜索的关键词
                              click_category_id: Long, //某一个商品品类的 ID
                              click_product_id: Long, //某一个商品的 ID
                              order_category_ids: String, //一次订单中所有品类的 ID 集合
                              order_product_ids: String, //一次订单中所有商品的 ID 集合
                              pay_category_ids: String, //一次支付中所有品类的 ID 集合
                              pay_product_ids: String, //一次支付中所有商品的 ID 集合
                              city_id: Long
                            ) //城市 id
}

三、工程代码三层架构

1、三层介绍

大数据之Spark案例实操完整使用(第六章)

2、结构图

大数据之Spark案例实操完整使用(第六章)

3、代码

application包

package com.spack.bigdata.core.framework.application

import com.spack.bigdata.core.framework.common.TApplication
import com.spack.bigdata.core.framework.controller.WordCountController

object WordCountApplication extends App with TApplication {
  //启动应用程序
  start() {
    val controller = new WordCountController()
    controller.dispatch()
  }

}

controller包

package com.spack.bigdata.core.framework.controller

import com.spack.bigdata.core.framework.common.TController
import com.spack.bigdata.core.framework.service.WordCountService

/**
 * 控制层
 */
class WordCountController extends  TController{

  private val wordCountService = new WordCountService()
  //调度
  def dispatch(): Unit = {
    //TODO 执行业务逻辑
    val array = wordCountService.dataAnalysis()
    array.foreach(println)


  }

}

dao包

package com.spack.bigdata.core.framework.dao


import com.spack.bigdata.core.framework.common.TDao

class WordCountDao extends TDao{
//
//
//  def readFile(path: String) = {
//     sc.textFile("datas")
//  }
}

service包

package com.spack.bigdata.core.framework.service

import com.spack.bigdata.core.framework.common.TService
import com.spack.bigdata.core.framework.dao.WordCountDao
import org.apache.spark.rdd.RDD

class WordCountService extends TService {

  private val wordCountDao = new WordCountDao()

  //数据分析
  def dataAnalysis() = {

    val lines = wordCountDao.readFile("datas/word.txt")


    // hello world =>hello,word, hello,word
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val wordToOne = words.map(
      word => (word, 1)
    )

    //3、将数据根据单词进行分组、便于统计
    //  (hello,hello,hello),(world, world)
    val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(
      t => t._1
    )

    //4、对分组后的数据进行转换
    val wordToCount = wordGroup.map {
      // word 是 单词(list[]) 格式
      case (word, list) => {
        list.reduce(
          (t1, t2) => {
            (t1._1, t1._2 + t2._2)
          }
        )


      }
    }
    val array: Array[(String, Int)] = wordToCount.collect()
    array
  }

}

util包

package com.spack.bigdata.core.framework.util

import org.apache.spark.SparkContext

/**
 * ThreadLocal
 */
object EnvUtil {

  private val scLocal = new ThreadLocal[SparkContext]
  def put(sc: SparkContext): Unit = {
    scLocal.set(sc)
  }


  def take(): SparkContext = {
    scLocal.get()
  }

  def clear(): Unit = {
    scLocal.remove()
  }
}

common包

TApplication

package com.spack.bigdata.core.framework.common

import com.spack.bigdata.core.framework.controller.WordCountController
import com.spack.bigdata.core.framework.util.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}

trait TApplication {

  def start(master: String = "local[*]", app: String = "Application")(op: => Unit): Unit = {
    val conf = new SparkConf().setMaster(master).setAppName(app)
    val sc = new SparkContext(conf)
    EnvUtil.put(sc)

    try {
      op
    } catch {
      case ex => println(ex.getMessage)
    }

    //TODO 关闭连接
    sc.stop()
    EnvUtil.clear()
  }

}

TController

package com.spack.bigdata.core.framework.common

trait TController {
  def dispatch(): Unit
}

TDao文章来源地址https://www.toymoban.com/news/detail-500586.html

package com.spack.bigdata.core.framework.common

import com.spack.bigdata.core.framework.util.EnvUtil
import org.apache.spark.SparkContext

trait TDao {
  def readFile(path: String) = {
   EnvUtil.take().textFile(path)
  }
}

package com.spack.bigdata.core.framework.common

trait TService {
  def dataAnalysis(): Any
}

到了这里,关于大数据之Spark案例实操完整使用(第六章)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【JavaWeb后端开发-第六章(3)】SpringBootWeb案例—登录认证

        在前面的章节中,我们已经实现了 部门管理 、 员工管理 的基本功能,但是大家会发现,我们并没有登录,就直接访问到了Tlias智能学习辅助系统的后台。 这是不安全的,所以我们今天的主题就是 登录认证 。 最终我们要实现的效果就是用户必须登录之后,才可以访

    2024年01月19日
    浏览(52)
  • 【JavaWeb后端开发-第六章(4)】SpringBootWeb案例—事务&AOP

        在数据库阶段我们已学习过事务了,我们讲到:      事务 是一组操作的集合,它是一个不可分割的工作单位。事务会把所有的操作作为一个整体,一起向数据库提交或者是撤销操作请求。所以这组操作要么同时成功,要么同时失败。     怎么样来控制这组

    2024年01月19日
    浏览(55)
  • 学生管理系统-课后程序(JAVA基础案例教程-黑马程序员编著-第六章-课后作业)

    【案例6-2】 学生管理系统 【案例介绍】 1.任务描述 在一所学校中,对学生人员流动的管理是很麻烦的,本案例要求编写一个学生管理系统,实现对学生信息的添加、删除、修改和查询功能。每个功能的具体要求如下: 系统的首页:用于显示系统所有的操作,并根据用户在控

    2024年02月03日
    浏览(64)
  • python数据分析与应用:使用scikit-learn构建模型分析 第六章实训(1,2)

    有问题可以加我微信交流学习,bmt1014 (gcc的同学不要抄袭呀!) 一、实验目的 1、掌握skleam转换器的用法。 2、掌握训练集、测试集划分的方法。 3、掌握使用sklearm进行PCA降维的方法。 4、掌握 sklearn 估计器的用法。 5、掌握聚类模型的构建与评价方法。 6、掌握分类模型的构

    2024年02月09日
    浏览(51)
  • 数据结构:第六章 图

    ps:图不可以为空图。 对于图中的边,两头必须要有结点。 边集是可以没有的,如上图最右边。 关于无向图和有向图的应用如下 比如你微信里的好友关系,你要和一个人建立关系(也就是图的两个结点连上),你只需要加1次就可以了,也不需要你加我,我还要加你。 具体

    2024年02月14日
    浏览(46)
  • 《数据结构》第六章:二叉树

    二叉树是一种递归数据的数据结构。 二叉树(BT) 是含有n(n≥0)个结点的有限结合。当n=0时称为空二叉树。在非空二叉树中: 有且仅有一个称为 根 的结点: 其余结点划分为两个互不相交的子集L和R也是一棵二叉树,分别称为 左二叉树 和 右二叉树。 二叉树有五种基本形

    2024年01月17日
    浏览(46)
  • 《数据结构》王道 第六章 图

    2.1.1 邻接矩阵存储带权图(网) 2.1.2 邻接矩阵的性能分析 2.1.3 邻接矩阵的性质 以此类推,可以得到A 2 的矩阵。 A 3 也是同样的道理,则表示A[i][j] 由 i 到 j 路径长度为3的路径数目。 这种存储图的方法其实跟树的孩子表示法有点相似。 邻接矩阵存储无向图时,一条边会有两

    2024年02月01日
    浏览(45)
  • 第六章:进制转换与数据存储

    进制转换是程序员的基本功。 进制 组成 二进制 0-1 ,满2进1以0b或0B开头 十进制 0-9 ,满10进1 八进制 0-7,满8进1以数字0开头表示 十六进制 0-9及A(10)-F(15),满16进1以0x或0X开头表示,此处的 A-F 不区分大小写 进制转换 转换方式 二进制转十进制 从最低位(右边)开始, 将每个位上

    2024年02月05日
    浏览(35)
  • 数据结构 第六章 图——图的遍历

    在前面我们知道,树是一种非线性结构,为了方便它在计算机中的存储,对树进行遍历使它线性化。 而图同样也是一种非线性结构,但是图又是一种不同于树的多对多结构,所以在前面我们将其转换为了多个一对多的结构来描述它的存储结构。 图的遍历同树类似,也是从某

    2024年02月08日
    浏览(46)
  • 算法与数据结构 第六章 图(详解)

    目录 一、判断题 二、选择题  在开始之前,先为大家推荐四篇介绍该章四个主要算法的的文章,供大家参考。 Dijkstra算法求最短路径:Dijkstra算法原理_平凡的L同学的博客-CSDN博客_dijiesitela Floyd算法求最短路径:Floyd算法求最短路径 Prim算法求最小生成树:Prim算法求最小生成树

    2024年02月09日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包