第3章 创建项目并初始化业务数据(过程记录)

这篇具有很好参考价值的文章主要介绍了第3章 创建项目并初始化业务数据(过程记录)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

项目声明和依赖

ECommerceRecommendSystem [pom.xml]

  • 公用的声明、依赖、插件

properties 声明

  • log4g:处理日志的框架(日志的具体实现)
  • sel4g:简单日志门面(简单日志的接口)
  • mongodb-spark:MongoDB和Spark的接口
  • casbah:MongoDB在scala上的Driver(最新的有MongoScalaDriver)
  • redis、kafka、spark、scala
  • jblas:java线性代数库(矩阵运算)

dependences 依赖

  • dependencies:声明+引入
  • dependencyManagement:声明,不引入
<dependencies>
    <!-- 引入共同的日志管理工具 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>jcl-over-slf4j</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
</dependencies>

plugin 插件

  • plugin:声明+引入

  • pluginManagement:声明,不引入

  • scala引用可能有问题(recommender)
    第3章 创建项目并初始化业务数据(过程记录),# 【大数据开发】,大数据

  • 注意版本号需要文章来源地址https://www.toymoban.com/news/detail-539559.html


数据加载模块

  • Object:单例项目

package com.aguigu

import com.mongodb.MongoClientURI
import com.mongodb.casbah.Imports.MongoClientURI
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/** 样例类
 * Product数据集
 * 3982                           商品ID
 * Fuhlen 富勒 M8眩光舞者时尚节能     商品名称
 * 1057,439,736                   商品分类ID,不需要
 * B009EJN4T2                      亚马逊ID,不需要
 * https://images-cn-4.ssl-image   商品的图片URL
 * 外设产品|鼠标|电脑/办公           商品分类
 * 富勒|鼠标|电子产品|好用|外观漂亮   商品UGC标签
 */
case class Product( productId:Int, name:String, URL:String, categories:String, tags:String )

/**
 * Rating数据集
 * 4867        用户ID
 * 457976      商品ID
 * 5.0         评分
 * 1395676800  时间戳
 */
case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )

/**
 * MongoDB连接配置
 * @param uri    MongoDB的连接uri
 * @param db     要操作的db
 */
case class MongoConfig( uri: String, db: String )

object DataLoader {
  // 常量
  val PRODUCT_DATA_PATH = "/Users/liuhao/MyProject/ECommerceRecommendSystem/ECommerceRecommendSystem/recommender/DataLoader/src/main/resources/products.csv"
  val RATING_DATA_PATH = "/Users/liuhao/MyProject/ECommerceRecommendSystem/ECommerceRecommendSystem/recommender/DataLoader/src/main/resources/ratings.csv"
  // 定义mongodb中存储的表名
  val MONGODB_PRODUCT_COLLECTION = "Product"
  val MONGODB_RATING_COLLECTION = "Rating"

  def main(args: Array[String]): Unit = {
    /**
     * 配置项
     */
    val config = Map(
      "spark.cores" -> "local[*]", // 所有逻辑核占用
      "mongo.uri" -> "mongodb://localhost:27017/recommender", // MongoDB数据库连接
      "mongo.db" -> "recommender"
    )

    /**
     * 创建Spark相关:sparkConf, sparkSession
     */
    // 创建 spark config
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")

    // 创建 spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    /**
     * 加载数据
     */
    import spark.implicits._

    val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
    // RDD => DataFrame
    val productDF = productRDD.map( item => {
      // product数据通过'^'分割
      val attr = item.split("\\^")
      // 转换成product类
      Product( attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim )
    }).toDF()

    val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
    val ratingDF = ratingRDD.map(item => {
      val attr = item.split(",")
      Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
    }).toDF()

    implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db"))  // 隐式配置
    storeDataInMongoDB( productDF, ratingDF )

    spark.stop()
  }

  def storeDataInMongoDB( productDF:DataFrame, ratingDF:DataFrame)(implicit mongoConfig: MongoConfig): Unit ={
    // 新建mongodb连接(casbah),客户端
    val mongoClinet = MongoClient(MongoClientURI(mongoConfig.uri) )
    // 定义要操作的mongodb表,理解:db.product
    val productCollection = mongoClinet( mongoConfig.db )( MONGODB_PRODUCT_COLLECTION )
    val ratingCollection = mongoClinet( mongoConfig.db )( MONGODB_RATING_COLLECTION )

    // 方式1:如果表已存在,则删除
    productCollection.dropCollection()
    ratingCollection.dropCollection()

    // 方式2:将当前数据存入对应表
    productDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_PRODUCT_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    ratingDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 对表创建索引
    productCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
    ratingCollection.createIndex( MongoDBObject( "productId" -> 1 ) )
    ratingCollection.createIndex( MongoDBObject( "userId" -> 1 ) )

    mongoClinet.close()

  }
}

到了这里,关于第3章 创建项目并初始化业务数据(过程记录)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Git】git初始化项目时 | git默认创建main分之 | 如何将git默认分支从main改为master

    在 Git 中,如果你在第一次提交后想要将默认分支名从 main 修改为 master,你可以按照以下步骤进行操作: 创建 master 分支: 首先,你需要在当前的 main 分支基础上创建一个新的 master 分支。使用以下命令: 删除 main 分支: 现在你已经创建了一个新的 master 分支,可以将 main

    2024年02月08日
    浏览(45)
  • SqlSession的初始化过程

    mybatis-config.xml db.properties 首先来看 SqlSessionFactory 的构建过程,调用重载方法解析配置文件内容,将配置信息装载到Configuration对象中。 得到最终的SqlSessionFactory 接下来我们继续分析通过 DefaultSqlSessionFactory 工厂方法获得 SqlSession 的逻辑 这之中做了几件事情: 创建事务工厂 创

    2024年02月03日
    浏览(23)
  • Java项目初始化ES、MYSQL表结构及表数据

    其中SystemMapper为  如果索引不存在就创建es索引    根据需要、添加es别名

    2024年02月12日
    浏览(29)
  • 常见问题03:SpringBoot项目启动初始化数据(执行sql文件)

    使用外部资源中定义的 SQL 脚本填充、初始化或清理数据库。 调用addScript(org.springframework.core.io.Resource)以添加单个 SQL 脚本位置。 调用addScripts(org.springframework.core.io.Resource…)以添加多个 SQL 脚本位置。 请参阅此类中的 setter 方法以获取更多配置选项。 调用populate(java.sql.Connect

    2024年02月13日
    浏览(43)
  • spring事物初始化过程分析

    1.注入4个bd 2.执行 org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#postProcessBeforeInstantiation 逻辑分析:遍历所有的bd: 获取beanname-判断beanname是否有长度并且没有被处理过-是否遍历放入advisedBeans- 是否是基础类-是否该跳过  4大基础类 3.执行org.springframework.aop.framework.autoprox

    2024年02月02日
    浏览(24)
  • Cisco交换机初始化过程跟踪

    公司有几台交换机需要初始化配置,这里帖出过程,给大家参考下。 此初始化过程只测试过Cisco 2960和3550系列交换机,方法可用。 感谢测试过程中对我提出帮助的。 一:知道交换机特权密码的情况下 如果你知道交换机的特权密码,那初始化就相当easy了.   二、不知道特权密

    2024年02月05日
    浏览(51)
  • VMWare 虚拟机创建 + 初始化

    目录 概述 1. VMware创建虚拟机 2. IP 配置 nmtui nmcli 3. Yum 源配置 光盘的Packages作为Yum源 配置开机自动挂载(光盘) 配置私有Yum仓库 跟新私有yum仓库  报错和修复 4. 文件共享系统配置 跟新配置文件/etc/hosts + /etc/yum.repo.d/ftp.repo  同步配置文件 测试yum私有仓库 5. Ansible 安装配置  s

    2024年02月05日
    浏览(29)
  • K8S集群重新初始化--详细过程

    在引导k8s集群的过程时可能因为这个或那个的原因导致需要重新引导集群 。 下面整理了我在实际工作中初始化k8s集群的详细过程。 k8s环境部署总览 ip地址 类型 操作系统 服务配置 192.168.162.31 Master01 Centos7.6 2核CPU 2G内存 20G硬盘 192.168.162.41 node1 Centos7.6 2核CPU 2G内存 20G硬盘 192

    2024年02月02日
    浏览(31)
  • Python如何创建二维数组和初始化

            严格意义上说,Python中并没有数组的概念,Python中表达一组数据有多种形式,例如list,tuple,set等数据结构都可以表达一组数,并且这组数也没有C和C++中数组的的同质限制,这些数可以是任何一种数据类型。         以list为例(list又叫列表),要想实现一个所

    2024年02月20日
    浏览(38)
  • ElasticSearch没有记录初始化用户名密码

    初次使用启动ES时会初始化用户名:elastic,密码是随机生成的,在初次启动时控制台会展示相关信息(包含初始化密码),此时要注意保存,否则之后启动不会再显示。 对于我而言,我初次启动时没有保存记录密码,导致开启安全策略后无法通过用户密码登陆,并且我找了

    2024年02月10日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包