【电影推荐系统】实时推荐

这篇具有很好参考价值的文章主要介绍了【电影推荐系统】实时推荐。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概览

技术方案:

  • 日志采集服务:通过利用Flume-ng对业务平台中用户对于电影的一次评分行为进行采集,实时发送到Kafka集群。
  • 消息缓冲服务:项目采用Kafka作为流式数据的缓存组件,接受来自Flume的数据采集请求。并将数据推送到项目的实时推荐系统部分。
  • 实时推荐服务:项目采用Spark Streaming作为实时推荐系统,通过接收Kafka中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结构合并更新到MongoDB数据库。

1. 实现思路

我们应该如何实现?

  1. 首先应该redis安装,这里存储用户的第K次评分(用户评分存入redis中)
  2. 安装zookeeper,安装kafka,都是standlone模式
  3. 测试Kafka与Spark Streaming 联调。Kafka生产一条数据,Spark Streaming 可以消费成功,并根据redis中的数据和MongoDB数据进行推荐,存入MongoDB中
  4. 在业务系统写埋点信息,测试时写入本地文件,之后再远程测试写入云服务器log文件中
  5. flume配置文件书写,kafka创建两个topic,对整个过程进行测试

2 环境准备

1.1 redis 安装

  • redis安装redis安装
  • 密码:123456
  • 存入redis一些数据 lpush uid:1 mid:score
  • redis 教程:教程

1.2 zookeeper单机版安装

  • zookeeper安装:zookeeper安装
  • 版本:3.7.1
  • 遇到的坑:8080端口连接占用,我们需要在zoo.cpg文件中加上
    admin.serverPort=8001重新启动即可。

1.3 kafka单机安装

  • kafka安装:官网下载地址
  • 安装使用的为:127.0.0.1
  • 启动kafka:kafka教程
bin/kafka-server-start.sh config/server.properties
  • 创建一个topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic recommender
  • 生产一个消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic recommender
  • 消费一个消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic recommender --from-beginning

3 测试kafka与spark streaming联调

  • kafka版本:2.2.0
  • spark版本:2.3.0
  • 因此使用spark-streaming-kafka-0-10

【电影推荐系统】实时推荐,推荐系统,大数据,spark,推荐算法

  1. 启动kafka,生产一条信息
  2. 书写程序
// 定义kafka连接参数
    val kafkaParam = Map(
      "bootstrap.servers" -> "服务器IP:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "recommender",
      "auto.offset.reset" -> "latest"
    )
    // 通过kafka创建一个DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam )
    )

    // 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流
    // 1|31|4.5|
    val ratingStream = kafkaStream.map{
      msg =>
        val attr = msg.value().split("\\|")
        ( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
    }
  1. 若是kafka报错,如果你同样也是云服务器,请注意kafka的配置信息(很重要!)

(1)解决方法:修改kafka配置文件,设置为设置listeners为内网ip,设置外网ip

  • 解决方案修改内网ip

(2)重新启动,成功

  • 内网外网分流:内网外网分流
  • kafka入门教程:入门教程
  1. redis报错:开启保护模式了,需要修改conf文件

效果

在kafka生产一个数据,可以在MongoDB中得到推荐的电影结果

4 后端埋点

前端进行评分后,触发click事件,后端进行测试埋点,利用log4j写入本地文件中。

4.1 本地测试

  • log4j配置文件
log4j.rootLogger=INFO, file, stdout

# write to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n


# write to file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.Append=true
log4j.appender.FILE.Threshold=INFO
log4j.appender.file.File=F:/demoparent/business/src/main/log/agent.txt
log4j.appender.file.MaxFileSize=1024KB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n
  • 埋点实现
//埋点日志
import org.apache.log4j.Logger;

// 关键代码
Logger log = Logger.getLogger(MovieController.class.getName());
log.info(MOVIE_RATING_PREFIX + ":" + uid +"|"+ mid +"|"+ score +"|"+ System.currentTimeMillis()/1000)

4.2 写入远程测试

  1. Linux安装syslog服务,进行测试
  2. 主机log4j配置文件设置服务器ip
  • log4j配置:写入远程服务器
log4j.appender.syslog=org.apache.log4j.net.SyslogAppender
log4j.appender.syslog.SyslogHost= 服务器IP
log4j.appender.syslog.Threshold=INFO
log4j.appender.syslog.layout=org.apache.log4j.PatternLayout
log4j.appender.syslog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%20t]  %-130c:(line:%4L)  :   %m%n

5 flume配置

  1. flume对接kafka:flume对接文件
  2. flume设置source和sink,source为文件地址,sink为kafka的log
# log-kafka.properties
agent.sources = exectail
agent.channels = memoryChannel 
agent.sinks = kafkasink 
agent.sources.exectail.type = exec 
agent.sources.exectail.command = tail -f /project/logs/agent.log agent.sources.exectail.interceptors=i1 agent.sources.exectail.interceptors.i1.type=regex_filter agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+ agent.sources.exectail.channels = memoryChannel


agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkasink.kafka.topic = log agent.sinks.kafkasink.kafka.bootstrap.servers = 服务器地址:9092 agent.sinks.kafkasink.kafka.producer.acks = 1 agent.sinks.kafkasink.kafka.flumeBatchSize = 20 

agent.sinks.kafkasink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000

6 实时推荐

ratingStream.foreachRDD{
  rdds => rdds.foreach{
    case (uid, mid, score, timestamp) => {
      println("rating data coming! >>>>>>>>>>>>>>>>")
      println(uid+",mid:"+mid)
      // 1. 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]
      val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )
      println("用户最近的K次评分:"+userRecentlyRatings)
      // 2. 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid]
      val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )
      println("电影最相似的N个电影:"+candidateMovies)
      // 3. 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]
      val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )
      println("当前用户的实时推荐列表:"+streamRecs)
      // 4. 把推荐数据保存到mongodb
      saveDataToMongoDB( uid, streamRecs )
    }
  }
}
def computeMovieScores(candidateMovies: Array[Int],
                       userRecentlyRatings: Array[(Int, Double)],
                       simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={
  // 定义一个ArrayBuffer,用于保存每一个备选电影的基础得分
  val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
  // 定义一个HashMap,保存每一个备选电影的增强减弱因子
  val increMap = scala.collection.mutable.HashMap[Int, Int]()
  val decreMap = scala.collection.mutable.HashMap[Int, Int]()

  for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){
    // 拿到备选电影和最近评分电影的相似度
    val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )

    if(simScore > 0.7){
      // 计算备选电影的基础推荐得分
      scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
      if( userRecentlyRating._2 > 3 ){
        increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
      } else{
        decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
      }
    }
  }
  // 根据备选电影的mid做groupby,根据公式去求最后的推荐评分
  scores.groupBy(_._1).map{
    // groupBy之后得到的数据 Map( mid -> ArrayBuffer[(mid, score)] )
    case (mid, scoreList) =>
      ( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
  }.toArray.sortWith(_._2>_._2)
}

7 启动顺序

  1. 启动hadoop、spark的容器
  • cd /docker
  • docker-compose up -d
  • docker-compose ps
  1. 启动mongodb和redis服务
  • netstat -lanp | grep "27017"
  • bin/redis-server etc/redis.conf
  1. 启动zookeeper、kafka服务
  • ./zkServer.sh start
  • bin/kafka-server-start.sh config/server.properties
  1. 启动flume服务
  • bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent

实现效果

前端评分成功后写入日志文件,flume对接log日志文件无问题,kafka对接flume无问题,spark streaming处理收到的一条数据,进行推荐,存入MongoDB中。

【电影推荐系统】实时推荐,推荐系统,大数据,spark,推荐算法

总结

由于时间匆忙,写的有些匆忙,如果有需要前端设计代码和后端的代码可以评论我,我整理整理发到github上。

前端设计部分没有时间去详细做,后续再对前端页面进行美化。本科当时整合了一个管理系统,现在也没有时间做,总之,一周多时间把当时的系统快速复现了下,算是一个复习。

在进行开发时,遇到许多问题,版本问题、服务器内网外网问题、docker容器相关问题、协同过滤算法设计问题,但帮着自己复习了下Vue和SpringBoot。文章来源地址https://www.toymoban.com/news/detail-627320.html

遇到问题时
  • 遇到问题不应该盲目解决,应该静下心看看报错原因,想想为何报错
  • 版本尤其重要,因此最好在一个project的pom设定版本
  • 使用服务器搭建docker-compose,利用该方法来搭建集群,快速简单,但涉及的端口转发等一些网络知识需要耐下心来看
  • Vue-Cli+Element-ui搭配起来开发简单
  • 写程序时,我们应该提前约定好接口,否则后续会很混乱…
后续
  • 后续将优化下前端页面,设计更多功能
  • 改进推荐算法
  • 增加冷启动方案

到了这里,关于【电影推荐系统】实时推荐的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于协同过滤算法的电影推荐系统(亮点:智能推荐、协同过滤算法、在线支付、视频观看)

    💗 博主介绍 :✌全网粉丝10W+,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 👇🏻 精彩专栏 推荐订阅 👇🏻 2023-2024年最值得选的微信小程序毕业设

    2024年02月08日
    浏览(47)
  • 基于springboot+vue协同过滤算法的电影推荐系统054

    大家好✌!我是CZ淡陌。一名专注以理论为基础实战为主的技术博主,将再这里为大家分享优质的实战项目,本人在Java毕业设计领域有多年的经验,陆续会更新更多优质的Java实战项目,希望你能有所收获,少走一些弯路,向着优秀程序员前行! 🍅更多优质项目👇🏻👇🏻可

    2024年02月07日
    浏览(32)
  • 深度学习推荐系统(三)NeuralCF及其在ml-1m电影数据集上的应用

    在2016年, 随着微软的Deep Crossing, 谷歌的WideDeep以及FNN、PNN等一大批优秀的深度学习模型被提出, 推荐系统全面进入了深度学习时代, 时至今日, 依然是主流。 推荐模型主要有下面两个进展: 与传统的机器学习模型相比, 深度学习模型的表达能力更强, 能够挖掘更多数据

    2024年02月10日
    浏览(39)
  • 计算机毕业设计springboot基于Hadoop平台的电影推荐系统541039【附源码+数据库+部署+LW】

    本项目包含程序+源码+数据库+LW+调试部署环境,文末可获取一份本项目的java源码和数据库参考。 系统的选题背景和意义 选题背景: 随着互联网的快速发展和大数据技术的成熟应用,电影推荐系统成为了电影行业中不可或缺的一部分。基于Hadoop平台的电影推荐系统应运而生,

    2024年02月06日
    浏览(46)
  • python协同过滤算法实现电影推荐(附源码)

    数据集请点赞收藏关注后评论区留言并且私信博主要  本例中使用得是著名得电影数据集MovieLens-100数据集 MoviesLens数据集是实现和测试电影推荐最常用得数据集之一,包含943个用户为精选得1682部电影给出得100000个电影评分 主要文件如下 1:u.data 2:u.item 3:u.user 1:查看用户/电影

    2024年02月11日
    浏览(33)
  • 机器学习30:《推荐系统-III》使用 TensorFlow 构建电影推荐系统

    本文将介绍基于 MovieLens 数据集创建一个电影推荐系统的方法。具体而言,包括探索电影数据,训练矩阵分解模型,检查嵌入,矩阵分解中的正则化,Softmax 模型训练等内容。 目录 1.准备工作 1.1 导入依赖模块 1.2 加载数据 1.3 探索电影镜头数据

    2024年02月16日
    浏览(32)
  • python+vue 基于推荐算法的在线电影视播放网站

    以广大影视剧迷们为研究对象,深入了解影视剧迷对在线视频观看视频的需求进行分析,形成系统需求分析设计一个符合影视剧迷们需求的在线视频网站。设计网站的前期工作包括对系统的各个功能进行详细分析,对数据库设计进行详细的描述,并画出各个模块的业务流程和

    2024年02月07日
    浏览(34)
  • Django个性化推荐系统,以电影为例

    随着科学技术发展,电脑已成为人们生活中必不可少的生活办公工具,在这样的背景下,网络技术被应用到各个方面,为了提高办公生活效率,网络信息技术飞速发展。在这样的背景下人类社会进入了全新的信息化的时代。电影个性化推荐信息管理一直是信息管理的一大难题

    2024年02月01日
    浏览(30)
  • 基于python大数据的电影可视化分析及电影推荐

    随着信息技术和互联网技术的快速发展,利用数据采集技术实现用户感兴趣的数据收集分析成为很多互联网公司研究讨论的热门话题。通过对基于Python的大数据的电影可视化分析与电影推荐,采集进行电影热度动态变化的需求进行调查分析,发现作为研究电影热度波动变化的

    2023年04月23日
    浏览(37)
  • 大数据毕设分享(含算法) 大数据电影数据分析与可视化系统

    今天学长向大家介绍一个机器视觉的毕设项目 🚩基于大数据的电影数据分析与可视化系统 项目运行效果(视频): 毕业设计 大数据电影评论情感分析 项目获取: https://gitee.com/assistant-a/project-sharing 研究中国用户电影数据,有助于窥探中国电影市场发展背后的规律,理解其来龙去

    2024年02月22日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包