JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

这篇具有很好参考价值的文章主要介绍了JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

KafkaStream

概述

  • Kafka Stream: 提供了对存储与Kafka内的数据进行流式处理和分析的功能
  • 特点:
    • Kafka Stream提供了一个非常简单而轻量的Library, 它可以非常方便地嵌入任意Java应用中, 也可以任意方式打包和部署
    • 除了Kafka外, 无任何外部依赖
    • 通过可容错地state, store实现高效地状态操作(如windowed join和aggregation)
    • 支持基于事件时间地窗口操作, 并且可处理晚到的数据(late arrival of records)
  • 关键概念:
    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器.
    • Sink处理器: sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
      JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数,JavaWeb开发,# JavaWeb开发-LeadNews,java,spring boot,spring cloud,kafka,redis,kafkastream,后端
  • KStream:
    • 数据结构类似于map, key-value键值对.
    • 一段顺序的, 无限长, 不断更新的数据集.

案例-统计单词个数

  • 依赖
    依赖中有排除部分依赖, 还是一整个放上来好了
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- kafkfa -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    
  • 流式处理
    public class KafkaStreamQuickStart {
        public static void main(String[] args) {
    
            // Kafka的配置信息
            Properties prop = new Properties();
            prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");
            prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");
    
            // Stream 构建器
            StreamsBuilder streamsBuilder = new StreamsBuilder();
    
            // 流式计算
            streamProcessor(streamsBuilder);
    
            // 创建KafkaStream对象
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
    
            // 开启流式计算
            kafkaStreams.start();
        }
    
        /**
         * 流式计算
         * 消息的内容: hello kafka
         * @param streamsBuilder
         */
        private static void streamProcessor(StreamsBuilder streamsBuilder) {
            // 创建Kstream对象, 同时指定从哪个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            /**
             * 处理消息的value
             */
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                @Override
                public Iterable<String> apply(String value) {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                }
            })
                    // 按照value聚合处理
                    .groupBy((key, value)->value)
                    // 时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    // 统计单词的个数
                    .count()
                    // 转换为KStream
                    .toStream()
                    .map((key, value)->{
                        System.out.println("key:"+key+",value:"+value);
                        return new KeyValue<>(key.key().toString(), value.toString());
                    })
                    // 发送消息
                    .to("itcast-topic-out");
        }
    }
    
  • 发送消息
    for (int i = 0; i < 5; i++) {
        ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("icast-topic-input", "hello kafka");
        producer.send(kvProducerRecord);
    }
    
  • 接收消息
    // 订阅主题
    consumer.subscribe(Collections.singleton("itcast-topic-out"));
    

SpringBoot集成

  • 配置
    config.java
    /**
     * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
     */
    @Setter
    @Getter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {
        private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
        private String hosts;
        private String group;
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
            props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
            props.put(StreamsConfig.RETRIES_CONFIG, 10);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            return new KafkaStreamsConfiguration(props);
        }
    }
    
    application.yml
    kafka:
      hosts: 192.168.174.133:9092
      group: ${spring.application.name}
    
    BeanConfig.java
    @Slf4j
    @Configuration
    public class KafkaStreamHelloListener {
    
        @Bean
        public KStream<String, String> KStream(StreamsBuilder streamsBuilder)
        {
            // 创建Kstream对象, 同时指定从哪个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            /**
             * 处理消息的value
             */
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                        @Override
                        public Iterable<String> apply(String value) {
                            String[] valAry = value.split(" ");
                            return Arrays.asList(valAry);
                        }
                    })
                    // 按照value聚合处理
                    .groupBy((key, value)->value)
                    // 时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    // 统计单词的个数
                    .count()
                    // 转换为KStream
                    .toStream()
                    .map((key, value)->{
                        System.out.println("key:"+key+",value:"+value);
                        return new KeyValue<>(key.key().toString(), value.toString());
                    })
                    // 发送消息
                    .to("itcast-topic-out");
            return stream;
        }
    }
    

实时计算文章分值

  1. nacos: leadnews-behavior配置kafka生产者
    kafka:
      bootstrap-servers: 192.168.174.133:9092
      producer:
        retries: 10
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
  2. 修改leadnews-behavior
    like
    public ResponseResult likesBehavior(LikesBehaviorDto dto) {
    
        ...
    
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
    
        if(dto.getOperation() == 0){
            ...
            mess.setAdd(1);
        }else{
            ...
            mess.setAdd(-1);
        }
    
        // kafka: 发送消息, 数据聚合
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));
    
        ...
    }
    
    view
    public ResponseResult readBehavior(ReadBehaviorDto dto) {
    
        ...
    
        // kafka: 发送消息, 数据聚合
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));
    
        ...
    }
    
  3. leadnews-article中添加流式聚合处理
    @Slf4j
    @Configuration
    public class HotArticleStreamHandler {
    
        @Bean
        public KStream<String, String> KStream(StreamsBuilder streamsBuilder)
        {
            // 接收消息
            KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
    
            // 聚合流式处理
            stream.map((key, value)->{
                UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
                // 重置消息的key和value
                return new KeyValue<>(mess.getArticleId().toString(), mess.getType().name()+":"+mess.getAdd());
            })
                    // 根据文章id进行聚合
                    .groupBy((key, value)->key)
                    // 时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    // 自行实现聚合计算
                    .aggregate(
                            // 初始方法, 返回值是消息的value
                            new Initializer<String>() {
                                @Override
                                public String apply() {
                                    return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                                }
                            },
                            // 真正的聚合操作, 返回值是消息的value
                            new Aggregator<String, String, String>() {
                                @Override
                                public String apply(String key, String value, String aggValue) {
                                    if(StringUtils.isBlank(value)){
                                        return aggValue;
                                    }
                                    String[] aggAry = aggValue.split(",");
                                    int col=0, com=0, lik=0, vie=0;
                                    for (String agg : aggAry) {
                                        String[] split = agg.split(":");
                                        /**
                                        * 获得初始值, 也是时间窗口内计算之后的值
                                        */
                                        switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0]))
                                        {
                                            case COLLECTION:
                                                col = Integer.parseInt(split[1]);
                                                break;
                                            case COMMENT:
                                                com = Integer.parseInt(split[1]);
                                                break;
                                            case LIKES:
                                                lik = Integer.parseInt(split[1]);
                                                break;
                                            case VIEWS:
                                                vie = Integer.parseInt(split[1]);
                                                break;
                                        }
                                    }
                                    // 累加操作
                                    String[] valAry = value.split(":");
                                    switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0]))
                                    {
                                        case COLLECTION:
                                            col += Integer.parseInt(valAry[1]);
                                            break;
                                        case COMMENT:
                                            com += Integer.parseInt(valAry[1]);
                                            break;
                                        case LIKES:
                                            lik += Integer.parseInt(valAry[1]);
                                            break;
                                        case VIEWS:
                                            vie += Integer.parseInt(valAry[1]);
                                            break;
                                    }
                                    String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                                    System.out.println("文章的id "+key);
                                    System.out.println("当前时间窗口内的消息处理结果: "+formatStr);
                                    return formatStr;
                                }
                            }, Materialized.as("hot-article-stream-count-001")
                    )
                    .toStream()
                    .map((key, value)->{
                        return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value));
                    })
                    // 发送消息
                    .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
            return stream;
        }
    
        /**
        * 格式化消息的value数据
        * @param articleId
        * @param value
        * @return
        */
        private String formatObj(String articleId, String value) {
            ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
            mess.setArticleId(Long.valueOf(articleId));
    
            // COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
            String[] valAry = value.split(",");
            for (String val : valAry) {
                String[] split = val.split(":");
                switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0]))
                {
                    case COLLECTION:
                        mess.setCollect(Integer.parseInt(split[1]));
                        break;
                    case COMMENT:
                        mess.setComment(Integer.parseInt(split[1]));
                        break;
                    case LIKES:
                        mess.setLike(Integer.parseInt(split[1]));
                        break;
                    case VIEWS:
                        mess.setView(Integer.parseInt(split[1]));
                        break;
                }
            }
            
            log.info("聚合消息处理之后的结果为: {}", JSON.toJSONString(mess));
            return JSON.toJSONString(mess);
        }
    
    }
    
    
  4. leadnews-article添加聚合数据监听器
    @Slf4j
    @Component
    public class ArticleIncrHandlerListener {
    
        @Autowired
        private ApArticleService apArticleService;
        
        @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
        public void onMessage(String mess)
        {
            if(StringUtils.isNotBlank(mess)){
                ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
                apArticleService.updateScore(articleVisitStreamMess);
                System.out.println(mess);
            }
        }
    
    }
    
  5. 替换redis中的热点数据
    /**
     * 更新 文章分值, 缓存中的热点文章数据
     * @param mess
     */
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {
        // 1. 更新文章的阅读, 点赞, 收藏, 评论的数量
        ApArticle apArticle = updateArticleBehavior(mess);
    
        // 2. 计算文章的分值
        Integer score = hotArticleService.computeArticleScore(apArticle);
        score *= 3;
    
        // 3. 替换当前文章对应频道的热点数据
        replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score);
    
        // 4. 替换推荐对应的热点数据
        replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score);
    }
    
    /**
     * 替换数据并且存入到redis中
     * @param HOT_ARTICLE_FIRST_PAGE
     * @param apArticle
     * @param score
     */
    private void replaceDataToRedis(String HOT_ARTICLE_FIRST_PAGE, ApArticle apArticle, Integer score) {
        String articleList = cacheService.get(HOT_ARTICLE_FIRST_PAGE);
        if(StringUtils.isNotBlank(articleList)){
            List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleList, HotArticleVo.class);
            boolean flag = true;
            // 3.1 文章已是热点文章, 则更新文章分数
            for (HotArticleVo hotArticleVo : hotArticleVoList) {
                if(hotArticleVo.getId().equals(apArticle.getId())){
                    hotArticleVo.setScore(score);
                    flag = false;
                    break;
                }
            }
            // 3.2 文章还不是热点文章, 则替换分数最小的文章
            if(flag){
                if(hotArticleVoList.size() >= 30){
                    // 热点文章超过30条
                    hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                    HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                    if(lastHot.getScore() < score){
                        hotArticleVoList.remove(lastHot);
                        HotArticleVo hotArticleVo = new HotArticleVo();
                        BeanUtils.copyProperties(apArticle, hotArticleVo);
                        hotArticleVo.setScore(score);
                        hotArticleVoList.add(hotArticleVo);
                    }
                }else{
                    // 热点文章没超过30条
                    HotArticleVo hotArticleVo = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hotArticleVo);
                    hotArticleVo.setScore(score);
                    hotArticleVoList.add(hotArticleVo);
                }
            }
            // 3.3 缓存到redis
            hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
            cacheService.set(HOT_ARTICLE_FIRST_PAGE, JSON.toJSONString(hotArticleVoList));
        }
    }
    
    /**
     * 更新文章行为数据
     * @param mess
     */
    private ApArticle updateArticleBehavior(ArticleVisitStreamMess mess) {
        ApArticle apArticle = getById(mess.getArticleId());
        apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
        apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
        apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
        apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
    
        updateById(apArticle);
    
        return apArticle;
    }
    

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews文章来源地址https://www.toymoban.com/news/detail-701684.html

到了这里,关于JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • JavaWeb_LeadNews_Day4-阿里云内容安全, 雪花算法, app文章保存, 自媒体文章审核

    依赖 实现 总结 工具类太复杂, 云盾内容安全也没开通(要企业认证), 看看测试好了 测试图片审核会报错, 可能是没开通 背景 技术选型 方案 优势 劣势 redis (INCR)生成一个全局连续递增的数字类型主键 增加了一个外部组件的依赖, Redis不可以, 则整个数据库将无法再插入 UUID 全局

    2024年02月15日
    浏览(42)
  • 猿创征文 | 项目整合KafkaStream实现文章热度实时计算

     个人简介:  📦个人主页:赵四司机 🏆学习方向:JAVA后端开发  ⏰往期文章:SpringBoot项目整合微信支付 🔔博主推荐网站:牛客网 刷题|面试|找工作神器 📣种一棵树最好的时间是十年前,其次是现在! 💖喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。 前言:

    2024年02月03日
    浏览(30)
  • JavaWeb学习-Day10

    准备工作    开发流程:  开发接口步骤:  删除部门:  新增部门:    简化代码: limit:分页展示,公式:(页数-1)*页面总数,页面总数   目前出现的问题: 1.网页没有图形化界面,只有从数据库读取的信息,postman可以 正常运行,初步怀疑是前端和后端Java代码没有连

    2024年02月11日
    浏览(33)
  • 【JavaWeb】day01-HTML&CSS

    图片标签: img src :指定图像URL(绝对路径/相对路径) width :图像宽度(像素/相对于父元素的百分比) height :图像高度(像素/相对于父元素的百分比) 标题标签: h1 - h6 水平线标签: hr 超链接: a href :指定资源访问的url target :指定在何处打开资源链接 _self :默认值,

    2024年02月04日
    浏览(35)
  • 【JavaWeb】11—Axios Ajax

    ⭐⭐⭐⭐⭐⭐ Github主页👉https://github.com/A-BigTree 笔记链接👉https://github.com/A-BigTree/Code_Learning ⭐⭐⭐⭐⭐⭐ 如果可以,麻烦各位看官顺手点个star~😊 如果文章对你有所帮助,可以点赞👍收藏⭐支持一下博主~😆 12.1.1 服务器端渲染 12.1.2 Ajax渲染(局部更新) 12.1.3 前后端分离

    2023年04月10日
    浏览(42)
  • JavaWeb学习路线(11)—— Maven延伸

    一、分模块设计 (一)概念: 将项目按功能拆分出若干个子模块。 (二)作用: 方便项目管理维护、扩展,也方便模块间相互调用,资源共享。 (三)具体实现 1、抽取公共包作成模块(以pojo实体类为例) 2、向使用的项目添加依赖 3、向使用类中引用 (四)注意事项 分

    2024年02月12日
    浏览(41)
  • 【javaweb】学习日记Day4 - Maven 依赖管理 Web入门

    目录 一、Maven入门 - 管理和构建java项目的工具 1、IDEA如何构建Maven项目 2、Maven 坐标 (1)定义 (2)主要组成 3、IDEA如何导入和删除项目 二、Maven - 依赖管理 1、依赖配置 2、依赖传递 (1)查看依赖  (2)排除依赖 3、依赖范围 三、Web 入门 1、Springboot web入门体验 2、HTTP简述

    2024年02月11日
    浏览(53)
  • 【javaweb】学习日记Day3 - Ajax 前后端分离开发 入门

    目录 一、Ajax 1、简介 2、Axios (没懂 暂留) (1)请求方式别名 (2)发送get请求 (3)发送post请求 (4)案例 二、前端工程化 1、Vue项目-目录结构 2、Vue项目-启动 (1)vscode页面启动 (2)cmd命令框启动 3、配置Vue端口号 4、Vue项目开发流程 三、Vue组件库 - Element  1、快速入门

    2024年02月12日
    浏览(37)
  • JavaWeb_瑞吉外卖_项目优化Day10-Spring Cache

    提交步骤: 生成空仓库, 不要加任何文件 右键项目, add整个项目 commit和push, push时添加远程仓库地址 新建分支步骤: 在本地新建一个分支 push 注入redis对象 将验证码保存到redis, 有效期5分钟 获取验证码, 登录成功后删除验证码 获取缓存数据 删除缓存数据 Spring Cache是一个框架,

    2024年02月13日
    浏览(44)
  • KafkaStream:Springboot中集成

    1、在kafka-demo中创建配置类         配置kafka参数 2、在application.yml中配置上面配置类需要的参数 3、新增配置类,创建KStream对象,进行聚合 4、启动kafka-demo服务测试         使用生产者发送消息可以看到控制台接收成功  

    2024年02月12日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包