个人简介:
> 📦个人主页:赵四司机
> 🏆学习方向:JAVA后端开发
> ⏰往期文章:SpringBoot项目整合微信支付
> 🔔博主推荐网站:牛客网 刷题|面试|找工作神器
> 📣种一棵树最好的时间是十年前,其次是现在!
> 💖喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。
前言:
最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。
如果你想要一个可以系统学习的网站,那么我推荐的是牛客网,个人感觉用着还是不错的,页面很整洁,而且内容也很全面,语法练习,算法题练习,面试知识汇总等等都有,论坛也很活跃,传送门链接:牛客刷题神器
目录
一:Springboot集成Kafka Stream
1.设置配置类信息
2.修改application.yml文件
3.新增配置类,创建KStream对象,进行聚合
二:热点文章实时计算
1.实现思路
2.环境搭建
2.1:在文章微服务中集成Kafka生产者配置
2.2:记录用户行为
2.3:定义Stream实现消息接收并聚合
2.4:重新计算文章分值并更新Redis缓存数据
2.5:设置监听类
三:功能测试
一:Springboot集成Kafka Stream
1.设置配置类信息
package com.my.kafka.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
可能你会有这样的疑问,前面介绍Kafka时候不是直接在yml文件里面设置参数就行了吗?为什么这里还要自己写配置类呢?是因为Spring对KafkaStream的集成并不是很好,所以我们才需要自己去写配置类信息。需要注意的一点是,配置类中必须添加@EnableKafkaStreams这一注解。
2.修改application.yml文件
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}
3.新增配置类,创建KStream对象,进行聚合
package com.my.kafka.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Slf4j
@Configuration
public class KafkaStreamHelloListener {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//创建KStream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
}
这里实现的功能还是计算单词个数,假如你有其他计算需求你可以更改里面的逻辑代码以符合你的需求。该类可注入StreamBuilder,其返回值必须是KStream且放入Spring容器中(添加了@Bean注解)。
二:热点文章实时计算
1.实现思路
实现思路很简单,当用户有点赞、收藏、阅读等行为记录时候,就将消息发送给Kafka进行流式处理,随后Kafka再进行聚合并重新计算文章分值,除此之外还需要更新数据库中的数据。需要注意的是,按常理来说当天的文章热度权重是要比非当天的文章热度权重大的,因此当日文章的热度权重需要乘以3,随后查询Redis中的数据,假如该文章分数大于Redis中最低分文章,这时候就需要进行替换操作,更新Redis数据。
2.环境搭建
2.1:在文章微服务中集成Kafka生产者配置
(1)修改nacos,增加内容:
kafka:
bootstrap-servers: 49.234.52.192:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
hosts: 49.234.52.192:9092
group: ${spring.application.name}
(2)定义相关实体类、常量
package com.my.model.mess;
import lombok.Data;
@Data
public class UpdateArticleMess {
/**
* 修改文章的字段类型
*/
private UpdateArticleType type;
/**
* 文章ID
*/
private Long articleId;
/**
* 修改数据的增量,可为正负
*/
private Integer add;
public enum UpdateArticleType{
COLLECTION,COMMENT,LIKES,VIEWS;
}
}
2.2:记录用户行为
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 读文章行为记录(阅读量+1)
* @param map
* @return
*/
public ResponseResult readBehavior(Map map) {
if(map == null || map.get("articleId") == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
Long articleId = Long.parseLong((String) map.get("articleId"));
ApArticle apArticle = getById(articleId);
if(apArticle != null) {
//获取文章阅读数
Integer views = apArticle.getViews();
if(views == null) {
views = 0;
}
//调用Kafka发送消息
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(articleId);
mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
mess.setAdd(1);
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
//更新文章阅读数
LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
luw.eq(ApArticle::getId,articleId);
luw.set(ApArticle::getViews,views + 1);
update(luw);
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
/**
* 用户点赞
* @param map
* @return
*/
@Override
public ResponseResult likesBehavior(Map map) {
if(map == null || map.get("articleId") == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
Long articleId = Long.parseLong((String) map.get("articleId"));
Integer operation = (Integer) map.get("operation");
ApArticle apArticle = getById(articleId);
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(articleId);
mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
if(apArticle != null) {
//获取文章点赞数
Integer likes = apArticle.getLikes();
if(likes == null) {
likes = 0;
}
//更新文章点赞数
LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
luw.eq(ApArticle::getId,articleId);
if(operation == 0) {
//点赞
log.info("用户点赞文章...");
luw.set(ApArticle::getLikes,likes + 1);
//分值增加
mess.setAdd(1);
} else {
//取消点赞
log.info("用户取消点赞文章...");
luw.set(ApArticle::getLikes,likes - 1);
//分值减少
mess.setAdd(-1);
}
//调用Kafka发送消息
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
update(luw);
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
/**
* 用户收藏
* @param map
* @return
*/
@Override
public ResponseResult collBehavior(Map map) {
if(map == null || map.get("entryId") == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
Long articleId = Long.parseLong((String) map.get("entryId"));
Integer operation = (Integer) map.get("operation");
ApArticle apArticle = getById(articleId);
//消息载体
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(articleId);
mess.setType(UpdateArticleMess.UpdateArticleType.COLLECTION);
if(apArticle != null) {
//获取文章收藏数
Integer collection = apArticle.getCollection();
if(collection == null) {
collection = 0;
}
//更新文章收藏数
LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
luw.eq(ApArticle::getId,articleId);
if(operation == 0) {
//收藏
log.info("用户收藏文章...");
luw.set(ApArticle::getCollection,collection + 1);
mess.setAdd(1);
} else {
//取消收藏
log.info("用户取消收藏文章...");
luw.set(ApArticle::getCollection,collection - 1);
mess.setAdd(-1);
}
//调用Kafka发送消息
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
update(luw);
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
这一步主要是当用户对文章进行访问、点赞、评论或者收藏时候就会更新数据库中的记录,同时还要将该行为记录封装并发送至Kafka。
2.3:定义Stream实现消息接收并聚合
package com.my.article.stream;
import com.alibaba.fastjson.JSON;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import com.my.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
@Slf4j
public class HotArticleStreamHandler {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//接收消息
KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
//聚合流式处理
stream.map((key,value)->{
UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
//重置消息的key:1234343434 和 value: likes:1
return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
})
//按照文章id进行聚合
.groupBy((key,value)->key)
//时间窗口 每十秒聚合一次
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
/*
自行地完成聚合的计算
*/
.aggregate(new Initializer<String>() {
/**
* 初始方法,返回值是消息的value
*/
@Override
public String apply() {
return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
}
/*
真正的聚合操作,返回值是消息的value
*/
}, new Aggregator<String, String, String>() {
/**
* 聚合并返回
* @param key 文章id
* @param value 重置后的value ps:likes:1
* @param aggValue "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"
* @return aggValue格式
*/
@Override
public String apply(String key, String value, String aggValue) {
//用户没有进行任何操作
if(StringUtils.isBlank(value)){
return aggValue;
}
String[] aggAry = aggValue.split(",");
//收藏、评论、点赞、阅读量初始值
int col = 0,com=0,lik=0,vie=0;
for (String agg : aggAry) {
//for --> COLLECTION:0
String[] split = agg.split(":");
//split[0]:COLLECTION,split[1]:0
/*
获得初始值,也是时间窗口内计算之后的值
第一次获取到的值为0
*/
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
col = Integer.parseInt(split[1]);
break;
case COMMENT:
com = Integer.parseInt(split[1]);
break;
case LIKES:
lik = Integer.parseInt(split[1]);
break;
case VIEWS:
vie = Integer.parseInt(split[1]);
break;
}
}
/*
累加操作
*/
String[] valAry = value.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
case COLLECTION:
col += Integer.parseInt(valAry[1]);
break;
case COMMENT:
com += Integer.parseInt(valAry[1]);
break;
case LIKES:
lik += Integer.parseInt(valAry[1]);
break;
case VIEWS:
vie += Integer.parseInt(valAry[1]);
break;
}
String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
log.info("文章的id:{}",key);
log.info("当前时间窗口内的消息处理结果:{}",formatStr);
//必须返回和apply()的返回类型
return formatStr;
}
}, Materialized.as("hot-article-stream-count-001"))
.toStream()
.map((key,value)->{
return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
})
//发送消息
.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
return stream;
}
/**
* 格式化消息的value数据
* @param articleId 文章id
* @param value 聚合结果
* @return String
*/
public String formatObj(String articleId,String value){
ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
mess.setArticleId(Long.valueOf(articleId));
//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
String[] valAry = value.split(",");
for (String val : valAry) {
String[] split = val.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
mess.setCollect(Integer.parseInt(split[1]));
break;
case COMMENT:
mess.setComment(Integer.parseInt(split[1]));
break;
case LIKES:
mess.setLike(Integer.parseInt(split[1]));
break;
case VIEWS:
mess.setView(Integer.parseInt(split[1]));
break;
}
}
log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
return JSON.toJSONString(mess);
}
}
这一步是最难但是也是最重要的,首先我们接收到消息之后需要先对其key和value进行重置,因为这时候接收到的数据是一个JSON字符串格式的UpdateArticleMess对象,我们需要将其重置为key value键值对的格式。也即将其格式转化成key为文章id,value为用户行为记录,如key:182738789987,value:LIKES:1,表示用户对该文章点赞一次。随后选择对文章id进行聚合,每10秒钟聚合一次,需要注意的是,apply()函数中返回结构必须是“COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0”格式。
2.4:重新计算文章分值并更新Redis缓存数据
@Service
@Transactional
@Slf4j
public class ApArticleServiceImpl extends ServiceImpl<ApArticleMapper, ApArticle> implements ApArticleService {
/**
* 更新文章分值,同时更新redis中热点文章数据
* @param mess
*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {
//1.获取文章数据
ApArticle apArticle = getById(mess.getArticleId());
//2.计算文章分值
Integer score = computeScore(apArticle);
score = score * 3;
//3.替换当前文章对应频道热点数据
replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
//4.替换推荐频道文章热点数据
replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + ArticleConstas.DEFAULT_TAG);
}
/**
* 根据权重计算文章分值
* @param apArticle
* @return
*/
private Integer computeScore(ApArticle apArticle) {
Integer score = 0;
if(apArticle.getLikes() != null){
score += apArticle.getLikes() * ArticleConstas.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
score += apArticle.getViews();
}
if(apArticle.getComment() != null){
score += apArticle.getComment() * ArticleConstas.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
score += apArticle.getCollection() * ArticleConstas.HOT_ARTICLE_COLLECTION_WEIGHT;
}
return score;
}
/**
* 替换数据并存入到redis
* @param apArticle 文章信息
* @param score 文章新的得分
* @param key redis数据的key值
*/
private void replaceDataToRedis(ApArticle apArticle,Integer score, String key) {
String articleListStr = cacheService.get(key);
if(StringUtils.isNotBlank(articleListStr)) {
List<HotArticleVo> hotArticleVos = JSON.parseArray(articleListStr, HotArticleVo.class);
boolean flag = true;
//如果缓存中存在该文章,直接更新文章分值
for (HotArticleVo hotArticleVo : hotArticleVos) {
if(hotArticleVo.getId().equals(apArticle.getId())) {
if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
log.info("频道{}缓存中存在该文章,文章{}分值更新{}-->{}",apArticle.getChannelName(),apArticle.getId(),hotArticleVo.getScore(),score);
} else {
log.info("推荐频道缓存中存在该文章,文章{}分值更新{}-->{}",apArticle.getId(),hotArticleVo.getScore(),score);
}
hotArticleVo.setScore(score);
flag = false;
break;
}
}
//如果缓存中不存在该文章
if(flag) {
//缓存中热点文章数少于30,直接增加
if(hotArticleVos.size() < 30) {
log.info("该文章{}不在缓存,但是文章数少于30,直接添加",apArticle.getId());
HotArticleVo hotArticleVo = new HotArticleVo();
BeanUtils.copyProperties(apArticle,hotArticleVo);
hotArticleVo.setScore(score);
hotArticleVos.add(hotArticleVo);
} else {
//缓存中热点文章数大于或等于30
//1.排序
hotArticleVos = hotArticleVos.stream().sorted(Comparator.
comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
//2.获取最小得分值
HotArticleVo minScoreHotArticleVo = hotArticleVos.get(hotArticleVos.size() - 1);
if(minScoreHotArticleVo.getScore() <= score) {
//3.移除分值最小文章
log.info("替换分值最小的文章...");
hotArticleVos.remove(minScoreHotArticleVo);
HotArticleVo hotArticleVo = new HotArticleVo();
BeanUtils.copyProperties(apArticle,hotArticleVo);
hotArticleVo.setScore(score);
hotArticleVos.add(hotArticleVo);
}
}
}
//重新排序并缓存到redis
hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
.collect(Collectors.toList());
cacheService.set(key,JSON.toJSONString(hotArticleVos));
if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
log.info("成功刷新{}频道中热点文章缓存数据",apArticle.getChannelName());
} else {
log.info("成功刷新推荐频道中热点文章缓存数据");
}
}
}
}
这一步主要是逻辑处理部分,在这里我们需要完成对文章的得分进行重新计算并根据计算结果更新Redis中的缓存数据。计算到得分之后,我们需要分别对不同频道和推荐频道进行处理,但是处理流程相同。首先我们会先判断缓存中的数据有没有满30条,如果没满则直接该文章添加到缓存中作为热榜文章;如果缓存中已满30条数据,这时候就要分两种情况处理,如果缓存中存在该文章数据,则直接对其得分进行更新,如若不然则需要将该文章分值与缓存中的最低分进行比较,如果改文章得分比最低分高则直接进行替换,否则不做处理。最后还需要对缓存中的数据重新排序并再次发送到Reids中。
2.5:设置监听类
package com.my.article.listener;
import com.alibaba.fastjson.JSON;
import com.my.article.service.ApArticleService;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ArticleIncrHandleListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
public void onMessage(String mess){
if(StringUtils.isNotBlank(mess)){
ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
apArticleService.updateScore(articleVisitStreamMess);
}
}
}
三:功能测试
打开App端对一篇文章进行浏览并点赞收藏
到控制台查看日志信息
可以看到 成功记录用户行为并且将文章得分进行了更改,其处理流程是这样的,首先接收到的是用户的点赞数据,随后接收到用户的浏览记录,最后接收到的是用户的收藏记录,由于前面提到的消息处理是增加而不是更新,所以最后我们可以看到时间窗口处理结果为COLLECTION:1,COMMENT:0,LIKES:1,VIEWS:1,10秒钟之后就会对消息进行聚合,假如这10秒之内还有其他用户也进行了点赞阅读操作,这时候就会继续将消息增加在原来处理结果上面,过了10秒之后就会进行一次聚合处理,也即拿着这批数据进行数据更新操作。
至此该项目的开发就告一段落了,后续有什么优化我会再发文介绍。文章来源:https://www.toymoban.com/news/detail-780527.html
友情链接: 牛客网 刷题|面试|找工作神器文章来源地址https://www.toymoban.com/news/detail-780527.html
到了这里,关于猿创征文 | 项目整合KafkaStream实现文章热度实时计算的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!