猿创征文 | 项目整合KafkaStream实现文章热度实时计算

这篇具有很好参考价值的文章主要介绍了猿创征文 | 项目整合KafkaStream实现文章热度实时计算。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka streams 中,哪个方法用于创建一个 kstream 对象,# 微服务学习笔记,JAVA学习笔记,微服务,spring boot,java,kafka

 个人简介: 

> 📦个人主页:赵四司机
> 🏆学习方向:JAVA后端开发 
> ⏰往期文章:SpringBoot项目整合微信支付
> 🔔博主推荐网站:牛客网 刷题|面试|找工作神器
> 📣种一棵树最好的时间是十年前,其次是现在!
> 💖喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。

前言:

最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。

如果你想要一个可以系统学习的网站,那么我推荐的是牛客网,个人感觉用着还是不错的,页面很整洁,而且内容也很全面,语法练习,算法题练习,面试知识汇总等等都有,论坛也很活跃,传送门链接:牛客刷题神器

目录

一:Springboot集成Kafka Stream

1.设置配置类信息

2.修改application.yml文件

3.新增配置类,创建KStream对象,进行聚合

二:热点文章实时计算

1.实现思路

2.环境搭建

2.1:在文章微服务中集成Kafka生产者配置

2.2:记录用户行为

2.3:定义Stream实现消息接收并聚合

2.4:重新计算文章分值并更新Redis缓存数据

2.5:设置监听类

三:功能测试


一:Springboot集成Kafka Stream

1.设置配置类信息

package com.my.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

        可能你会有这样的疑问,前面介绍Kafka时候不是直接在yml文件里面设置参数就行了吗?为什么这里还要自己写配置类呢?是因为Spring对KafkaStream的集成并不是很好,所以我们才需要自己去写配置类信息。需要注意的一点是,配置类中必须添加@EnableKafkaStreams这一注解。

2.修改application.yml文件

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

3.新增配置类,创建KStream对象,进行聚合

package com.my.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Slf4j
@Configuration
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建KStream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

        这里实现的功能还是计算单词个数,假如你有其他计算需求你可以更改里面的逻辑代码以符合你的需求。该类可注入StreamBuilder,其返回值必须是KStream且放入Spring容器中(添加了@Bean注解)。

二:热点文章实时计算

1.实现思路

kafka streams 中,哪个方法用于创建一个 kstream 对象,# 微服务学习笔记,JAVA学习笔记,微服务,spring boot,java,kafka

        实现思路很简单,当用户有点赞、收藏、阅读等行为记录时候,就将消息发送给Kafka进行流式处理,随后Kafka再进行聚合并重新计算文章分值,除此之外还需要更新数据库中的数据。需要注意的是,按常理来说当天的文章热度权重是要比非当天的文章热度权重大的,因此当日文章的热度权重需要乘以3,随后查询Redis中的数据,假如该文章分数大于Redis中最低分文章,这时候就需要进行替换操作,更新Redis数据。 

2.环境搭建

2.1:在文章微服务中集成Kafka生产者配置

(1)修改nacos,增加内容:

kafka:
    bootstrap-servers: 49.234.52.192:9092
    producer:
        retries: 10
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    hosts: 49.234.52.192:9092
    group: ${spring.application.name}

(2)定义相关实体类、常量

package com.my.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;

    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

2.2:记录用户行为

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 读文章行为记录(阅读量+1)
 * @param map
 * @return
 */
public ResponseResult readBehavior(Map map) {
    if(map == null || map.get("articleId") == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    Long articleId = Long.parseLong((String) map.get("articleId"));
    ApArticle apArticle = getById(articleId);

    if(apArticle != null) {
        //获取文章阅读数
        Integer views = apArticle.getViews();
        if(views == null) {
            views = 0;
        }

        //调用Kafka发送消息
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(articleId);
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        //更新文章阅读数
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        luw.set(ApArticle::getViews,views + 1);
        update(luw);
    }

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

/**
 * 用户点赞
 * @param map
 * @return
 */
@Override
public ResponseResult likesBehavior(Map map) {
    if(map == null || map.get("articleId") == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    Long articleId = Long.parseLong((String) map.get("articleId"));
    Integer operation = (Integer) map.get("operation");
    ApArticle apArticle = getById(articleId);

    UpdateArticleMess mess = new UpdateArticleMess();
    mess.setArticleId(articleId);
    mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);

    if(apArticle != null) {
        //获取文章点赞数
        Integer likes = apArticle.getLikes();
        if(likes == null) {
            likes = 0;
        }

        //更新文章点赞数
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        if(operation == 0) {
            //点赞
            log.info("用户点赞文章...");
            luw.set(ApArticle::getLikes,likes + 1);
            //分值增加
            mess.setAdd(1);
        } else {
            //取消点赞
            log.info("用户取消点赞文章...");
            luw.set(ApArticle::getLikes,likes - 1);
            //分值减少
            mess.setAdd(-1);
        }

        //调用Kafka发送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        update(luw);
    }

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

/**
 * 用户收藏
 * @param map
 * @return
 */
@Override
public ResponseResult collBehavior(Map map) {
    if(map == null || map.get("entryId") == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    Long articleId = Long.parseLong((String) map.get("entryId"));
    Integer operation = (Integer) map.get("operation");
    ApArticle apArticle = getById(articleId);

    //消息载体
    UpdateArticleMess mess = new UpdateArticleMess();
    mess.setArticleId(articleId);
    mess.setType(UpdateArticleMess.UpdateArticleType.COLLECTION);

    if(apArticle != null) {
        //获取文章收藏数
        Integer collection = apArticle.getCollection();
        if(collection == null) {
            collection = 0;
        }

        //更新文章收藏数
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        if(operation == 0) {
            //收藏
            log.info("用户收藏文章...");
            luw.set(ApArticle::getCollection,collection + 1);
            mess.setAdd(1);
        } else {
            //取消收藏
            log.info("用户取消收藏文章...");
            luw.set(ApArticle::getCollection,collection - 1);
            mess.setAdd(-1);
        }

        //调用Kafka发送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        update(luw);
    }

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

        这一步主要是当用户对文章进行访问、点赞、评论或者收藏时候就会更新数据库中的记录,同时还要将该行为记录封装并发送至Kafka。

2.3:定义Stream实现消息接收并聚合

package com.my.article.stream;

import com.alibaba.fastjson.JSON;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import com.my.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434   和  value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口  每十秒聚合一次
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /*
                  自行地完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /*
                      真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    /**
                     * 聚合并返回
                     * @param key  文章id
                     * @param value  重置后的value  ps:likes:1
                     * @param aggValue "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"
                     * @return  aggValue格式
                     */
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        //用户没有进行任何操作
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        //收藏、评论、点赞、阅读量初始值
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            //for --> COLLECTION:0
                            String[] split = agg.split(":");
                            //split[0]:COLLECTION,split[1]:0
                            /*
                              获得初始值,也是时间窗口内计算之后的值
                              第一次获取到的值为0
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /*
                          累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        log.info("文章的id:{}",key);
                        log.info("当前时间窗口内的消息处理结果:{}",formatStr);

                        //必须返回和apply()的返回类型
                        return formatStr;
                    }
                }, Materialized.as("hot-article-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * 格式化消息的value数据
     * @param articleId  文章id
     * @param value  聚合结果
     * @return  String
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);
    }
}

        这一步是最难但是也是最重要的,首先我们接收到消息之后需要先对其key和value进行重置,因为这时候接收到的数据是一个JSON字符串格式的UpdateArticleMess对象,我们需要将其重置为key value键值对的格式。也即将其格式转化成key为文章id,value为用户行为记录,如key:182738789987,value:LIKES:1,表示用户对该文章点赞一次。随后选择对文章id进行聚合,每10秒钟聚合一次,需要注意的是,apply()函数中返回结构必须是“COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0”格式。

2.4:重新计算文章分值并更新Redis缓存数据

@Service
@Transactional
@Slf4j
public class ApArticleServiceImpl extends ServiceImpl<ApArticleMapper, ApArticle> implements ApArticleService {
    /**
     * 更新文章分值,同时更新redis中热点文章数据
     * @param mess
     */
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {
        //1.获取文章数据
        ApArticle apArticle = getById(mess.getArticleId());
        //2.计算文章分值
        Integer score = computeScore(apArticle);
        score = score * 3;

        //3.替换当前文章对应频道热点数据
        replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

        //4.替换推荐频道文章热点数据
        replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + ArticleConstas.DEFAULT_TAG);
    }

    /**
     * 根据权重计算文章分值
     * @param apArticle
     * @return
     */
    private Integer computeScore(ApArticle apArticle) {
        Integer score = 0;
        if(apArticle.getLikes() != null){
            score += apArticle.getLikes() * ArticleConstas.HOT_ARTICLE_LIKE_WEIGHT;
        }
        if(apArticle.getViews() != null){
            score += apArticle.getViews();
        }
        if(apArticle.getComment() != null){
            score += apArticle.getComment() * ArticleConstas.HOT_ARTICLE_COMMENT_WEIGHT;
        }
        if(apArticle.getCollection() != null){
            score += apArticle.getCollection() * ArticleConstas.HOT_ARTICLE_COLLECTION_WEIGHT;
        }

        return score;
    }

    /**
     * 替换数据并存入到redis
     * @param apArticle 文章信息
     * @param score 文章新的得分
     * @param key redis数据的key值
     */
    private void replaceDataToRedis(ApArticle apArticle,Integer score, String key) {
        String articleListStr = cacheService.get(key);
        if(StringUtils.isNotBlank(articleListStr)) {
            List<HotArticleVo> hotArticleVos = JSON.parseArray(articleListStr, HotArticleVo.class);

            boolean flag = true;

            //如果缓存中存在该文章,直接更新文章分值
            for (HotArticleVo hotArticleVo : hotArticleVos) {
                if(hotArticleVo.getId().equals(apArticle.getId())) {
                    if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
                        log.info("频道{}缓存中存在该文章,文章{}分值更新{}-->{}",apArticle.getChannelName(),apArticle.getId(),hotArticleVo.getScore(),score);
                    } else {
                        log.info("推荐频道缓存中存在该文章,文章{}分值更新{}-->{}",apArticle.getId(),hotArticleVo.getScore(),score);
                    }
                    hotArticleVo.setScore(score);
                    flag = false;
                    break;
                }
            }

            //如果缓存中不存在该文章
            if(flag) {
                //缓存中热点文章数少于30,直接增加
                if(hotArticleVos.size() < 30) {
                    log.info("该文章{}不在缓存,但是文章数少于30,直接添加",apArticle.getId());
                    HotArticleVo hotArticleVo = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle,hotArticleVo);
                    hotArticleVo.setScore(score);
                    hotArticleVos.add(hotArticleVo);
                } else {
                    //缓存中热点文章数大于或等于30
                    //1.排序
                    hotArticleVos = hotArticleVos.stream().sorted(Comparator.
                            comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                    //2.获取最小得分值
                    HotArticleVo minScoreHotArticleVo = hotArticleVos.get(hotArticleVos.size() - 1);
                    if(minScoreHotArticleVo.getScore() <= score) {
                        //3.移除分值最小文章
                        log.info("替换分值最小的文章...");
                        hotArticleVos.remove(minScoreHotArticleVo);
                        HotArticleVo hotArticleVo = new HotArticleVo();
                        BeanUtils.copyProperties(apArticle,hotArticleVo);
                        hotArticleVo.setScore(score);
                        hotArticleVos.add(hotArticleVo);
                    }
                }
            }

            //重新排序并缓存到redis
            hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
                    .collect(Collectors.toList());
            cacheService.set(key,JSON.toJSONString(hotArticleVos));
            if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) {
                log.info("成功刷新{}频道中热点文章缓存数据",apArticle.getChannelName());
            } else {
                log.info("成功刷新推荐频道中热点文章缓存数据");
            }
        }
    }
}

        这一步主要是逻辑处理部分,在这里我们需要完成对文章的得分进行重新计算并根据计算结果更新Redis中的缓存数据。计算到得分之后,我们需要分别对不同频道和推荐频道进行处理,但是处理流程相同。首先我们会先判断缓存中的数据有没有满30条,如果没满则直接该文章添加到缓存中作为热榜文章;如果缓存中已满30条数据,这时候就要分两种情况处理,如果缓存中存在该文章数据,则直接对其得分进行更新,如若不然则需要将该文章分值与缓存中的最低分进行比较,如果改文章得分比最低分高则直接进行替换,否则不做处理。最后还需要对缓存中的数据重新排序并再次发送到Reids中。

2.5:设置监听类

package com.my.article.listener;

import com.alibaba.fastjson.JSON;
import com.my.article.service.ApArticleService;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ArticleIncrHandleListener {

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);
        }
    }
}

三:功能测试

打开App端对一篇文章进行浏览并点赞收藏

kafka streams 中,哪个方法用于创建一个 kstream 对象,# 微服务学习笔记,JAVA学习笔记,微服务,spring boot,java,kafka

到控制台查看日志信息 

kafka streams 中,哪个方法用于创建一个 kstream 对象,# 微服务学习笔记,JAVA学习笔记,微服务,spring boot,java,kafka

        可以看到 成功记录用户行为并且将文章得分进行了更改,其处理流程是这样的,首先接收到的是用户的点赞数据,随后接收到用户的浏览记录,最后接收到的是用户的收藏记录,由于前面提到的消息处理是增加而不是更新,所以最后我们可以看到时间窗口处理结果为COLLECTION:1,COMMENT:0,LIKES:1,VIEWS:1,10秒钟之后就会对消息进行聚合,假如这10秒之内还有其他用户也进行了点赞阅读操作,这时候就会继续将消息增加在原来处理结果上面,过了10秒之后就会进行一次聚合处理,也即拿着这批数据进行数据更新操作。

至此该项目的开发就告一段落了,后续有什么优化我会再发文介绍。

友情链接: 牛客网  刷题|面试|找工作神器文章来源地址https://www.toymoban.com/news/detail-780527.html

到了这里,关于猿创征文 | 项目整合KafkaStream实现文章热度实时计算的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 猿创征文 | Shell编程【上篇】

    目录 1,Shell编程 1.1:简介 1.1.1:shell解释器 1.2:快速入门 1.2.1:编写脚本 1.2.2:执行shell脚本 1.3:shell变量 1.3.1:简介 1.3.2:使用变量 1.3.3:删除变量 1.3.4:只读变量  1.4:字符串 1.4.1:单引号 1.4.2:双引号  1.4.3:获取字符串长度   1.4.4:提取子字符串  1.5:传递参数 1

    2024年02月02日
    浏览(50)
  • 猿创征文| redis基本数据类型

    📃个人主页:不断前进的皮卡丘 🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记 🔥个人专栏:微服务专栏 ✔️redis常见的操作命令:http://www.redis.cn/commands.html 命令 功能 keys * 查看当前库的所有key exists key 判断

    2023年04月08日
    浏览(30)
  • 以太坊是什么?|猿创征文

    以太坊是一个可编程、可视化、更易用的区块链,它允许任何人编写智能合约和发行代币。 在以太坊(Ethereum)出现之前,各种区块链应用的功能非常有限,例如,比特币和其他加密货币都只是纯粹的数字货币。 以太坊(Ethereum)创始人Vitalik Buterin将以太坊(Ethereum)设想为开发人员

    2024年02月02日
    浏览(58)
  • 猿创征文|【HTML】标签学习之路

    💖 目录 一、HTML语法规范 1.基本语法概述 2.标签关系 二、HTML基本结构标签 1.第一个HTML页面 2.HTML基本结构标签总结 1.基本语法概述 html是由尖括号包围的,列如: html 。 html标签通常是成对出现的,列如:html和/html,我们称为 双标签 。标签对里的第一个标签是开始标

    2024年01月16日
    浏览(34)
  • 猿创征文|ZooKeeper(伪)集群搭建

    前言:zookeeper作为一款分布式协调中间件,其重要性不言而喻,因此需要保证其高可用性。所以一般都会搭建zookeeper集群,今天叶秋带领大家在一台服务器上搭建伪集群。 目录 1、 搭建要求 2、 准备工作 3、 配置集群  4 启动集群  5 模拟集群异常 1、 搭建要求 真实的集群是

    2024年02月01日
    浏览(63)
  • 猿创征文|【深度学习前沿应用】文本生成

    作者简介 :在校大学生一枚,C/C++领域新星创作者,华为云享专家,阿里云专家博主,腾云先锋(TDP)成员,云曦智划项目总负责人,全国高等学校计算机教学与产业实践资源建设专家委员会(TIPCC)志愿者,以及编程爱好者,期待和大家一起学习,一起进步~ . 博客主页 :

    2024年02月06日
    浏览(30)
  • 猿创征文|“云“创新展望:数据之浩瀚

    💗wei_shuo的个人主页 💫wei_shuo的学习社区 🌐Hello World ! AWS亚马逊云科技提供全球覆盖广泛、服务深入的云平台,全球数据中心提供超过 200 项功能齐全的服务 连续 11 年被 Gartner 评为\\\"全球云计算领导者\\\" ;2021 年全新 Gartner 魔力象限中被评为\\\"云基础设施与平台服务(Iaas Pa

    2023年04月24日
    浏览(69)
  • 猿创征文|Hadoop大数据技术综合实验

    当前互联网应用中,万维网(World Wide Web)应用占据了绝大部分的份额。万维网应用对外提供服务需要架设Web服务器软件。典型的Web服务器软件有Apache、Nginx等。Web服务器软件在运行过程中会写入各种日志到磁盘文件中。例如,Apache Web服务器软件运行过程中,会产生access.log文

    2024年02月03日
    浏览(36)
  • 猿创征文 | Solidity 智能合约技术成长之路

    Solidity 是链上智能合约的开发语言,链上智能合约相当于传统行业的后端,链上应用基本都是由合约 + 前端组成的,虽然不推荐,但部分链上应用也会加入后端进行数据存储,以降低用户的使用成本。 Solidity 这门开发语言并不复杂,只需要您稍微有一点儿编程基础,英文词汇

    2024年01月23日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包