KafkaStream
概述
- Kafka Stream: 提供了对存储与Kafka内的数据进行流式处理和分析的功能
- 特点:
- Kafka Stream提供了一个非常简单而轻量的Library, 它可以非常方便地嵌入任意Java应用中, 也可以任意方式打包和部署
- 除了Kafka外, 无任何外部依赖
- 通过可容错地state, store实现高效地状态操作(如windowed join和aggregation)
- 支持基于事件时间地窗口操作, 并且可处理晚到的数据(late arrival of records)
- 关键概念:
- 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器.
- Sink处理器: sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
- KStream:
- 数据结构类似于map, key-value键值对.
- 一段顺序的, 无限长, 不断更新的数据集.
案例-统计单词个数
- 依赖
依赖中有排除部分依赖, 还是一整个放上来好了<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <exclusions> <exclusion> <artifactId>connect-json</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
- 流式处理
public class KafkaStreamQuickStart { public static void main(String[] args) { // Kafka的配置信息 Properties prop = new Properties(); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart"); // Stream 构建器 StreamsBuilder streamsBuilder = new StreamsBuilder(); // 流式计算 streamProcessor(streamsBuilder); // 创建KafkaStream对象 KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop); // 开启流式计算 kafkaStreams.start(); } /** * 流式计算 * 消息的内容: hello kafka * @param streamsBuilder */ private static void streamProcessor(StreamsBuilder streamsBuilder) { // 创建Kstream对象, 同时指定从哪个topic中接收消息 KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input"); /** * 处理消息的value */ stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { String[] valAry = value.split(" "); return Arrays.asList(valAry); } }) // 按照value聚合处理 .groupBy((key, value)->value) // 时间窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // 统计单词的个数 .count() // 转换为KStream .toStream() .map((key, value)->{ System.out.println("key:"+key+",value:"+value); return new KeyValue<>(key.key().toString(), value.toString()); }) // 发送消息 .to("itcast-topic-out"); } }
- 发送消息
for (int i = 0; i < 5; i++) { ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("icast-topic-input", "hello kafka"); producer.send(kvProducerRecord); }
- 接收消息
// 订阅主题 consumer.subscribe(Collections.singleton("itcast-topic-out"));
SpringBoot集成
- 配置
config.java
application.yml/** * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数 */ @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; private String group; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } }
BeanConfig.javakafka: hosts: 192.168.174.133:9092 group: ${spring.application.name}
@Slf4j @Configuration public class KafkaStreamHelloListener { @Bean public KStream<String, String> KStream(StreamsBuilder streamsBuilder) { // 创建Kstream对象, 同时指定从哪个topic中接收消息 KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input"); /** * 处理消息的value */ stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { String[] valAry = value.split(" "); return Arrays.asList(valAry); } }) // 按照value聚合处理 .groupBy((key, value)->value) // 时间窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // 统计单词的个数 .count() // 转换为KStream .toStream() .map((key, value)->{ System.out.println("key:"+key+",value:"+value); return new KeyValue<>(key.key().toString(), value.toString()); }) // 发送消息 .to("itcast-topic-out"); return stream; } }
实时计算文章分值
- nacos: leadnews-behavior配置kafka生产者
kafka: bootstrap-servers: 192.168.174.133:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 修改leadnews-behavior
like
viewpublic ResponseResult likesBehavior(LikesBehaviorDto dto) { ... UpdateArticleMess mess = new UpdateArticleMess(); mess.setArticleId(dto.getArticleId()); mess.setType(UpdateArticleMess.UpdateArticleType.LIKES); if(dto.getOperation() == 0){ ... mess.setAdd(1); }else{ ... mess.setAdd(-1); } // kafka: 发送消息, 数据聚合 kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess)); ... }
public ResponseResult readBehavior(ReadBehaviorDto dto) { ... // kafka: 发送消息, 数据聚合 UpdateArticleMess mess = new UpdateArticleMess(); mess.setArticleId(dto.getArticleId()); mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS); mess.setAdd(1); kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess)); ... }
- leadnews-article中添加流式聚合处理
@Slf4j @Configuration public class HotArticleStreamHandler { @Bean public KStream<String, String> KStream(StreamsBuilder streamsBuilder) { // 接收消息 KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC); // 聚合流式处理 stream.map((key, value)->{ UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class); // 重置消息的key和value return new KeyValue<>(mess.getArticleId().toString(), mess.getType().name()+":"+mess.getAdd()); }) // 根据文章id进行聚合 .groupBy((key, value)->key) // 时间窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // 自行实现聚合计算 .aggregate( // 初始方法, 返回值是消息的value new Initializer<String>() { @Override public String apply() { return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"; } }, // 真正的聚合操作, 返回值是消息的value new Aggregator<String, String, String>() { @Override public String apply(String key, String value, String aggValue) { if(StringUtils.isBlank(value)){ return aggValue; } String[] aggAry = aggValue.split(","); int col=0, com=0, lik=0, vie=0; for (String agg : aggAry) { String[] split = agg.split(":"); /** * 获得初始值, 也是时间窗口内计算之后的值 */ switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])) { case COLLECTION: col = Integer.parseInt(split[1]); break; case COMMENT: com = Integer.parseInt(split[1]); break; case LIKES: lik = Integer.parseInt(split[1]); break; case VIEWS: vie = Integer.parseInt(split[1]); break; } } // 累加操作 String[] valAry = value.split(":"); switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])) { case COLLECTION: col += Integer.parseInt(valAry[1]); break; case COMMENT: com += Integer.parseInt(valAry[1]); break; case LIKES: lik += Integer.parseInt(valAry[1]); break; case VIEWS: vie += Integer.parseInt(valAry[1]); break; } String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie); System.out.println("文章的id "+key); System.out.println("当前时间窗口内的消息处理结果: "+formatStr); return formatStr; } }, Materialized.as("hot-article-stream-count-001") ) .toStream() .map((key, value)->{ return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value)); }) // 发送消息 .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC); return stream; } /** * 格式化消息的value数据 * @param articleId * @param value * @return */ private String formatObj(String articleId, String value) { ArticleVisitStreamMess mess = new ArticleVisitStreamMess(); mess.setArticleId(Long.valueOf(articleId)); // COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0 String[] valAry = value.split(","); for (String val : valAry) { String[] split = val.split(":"); switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])) { case COLLECTION: mess.setCollect(Integer.parseInt(split[1])); break; case COMMENT: mess.setComment(Integer.parseInt(split[1])); break; case LIKES: mess.setLike(Integer.parseInt(split[1])); break; case VIEWS: mess.setView(Integer.parseInt(split[1])); break; } } log.info("聚合消息处理之后的结果为: {}", JSON.toJSONString(mess)); return JSON.toJSONString(mess); } }
- leadnews-article添加聚合数据监听器
@Slf4j @Component public class ArticleIncrHandlerListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC) public void onMessage(String mess) { if(StringUtils.isNotBlank(mess)){ ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class); apArticleService.updateScore(articleVisitStreamMess); System.out.println(mess); } } }
- 替换redis中的热点数据
/** * 更新 文章分值, 缓存中的热点文章数据 * @param mess */ @Override public void updateScore(ArticleVisitStreamMess mess) { // 1. 更新文章的阅读, 点赞, 收藏, 评论的数量 ApArticle apArticle = updateArticleBehavior(mess); // 2. 计算文章的分值 Integer score = hotArticleService.computeArticleScore(apArticle); score *= 3; // 3. 替换当前文章对应频道的热点数据 replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score); // 4. 替换推荐对应的热点数据 replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score); } /** * 替换数据并且存入到redis中 * @param HOT_ARTICLE_FIRST_PAGE * @param apArticle * @param score */ private void replaceDataToRedis(String HOT_ARTICLE_FIRST_PAGE, ApArticle apArticle, Integer score) { String articleList = cacheService.get(HOT_ARTICLE_FIRST_PAGE); if(StringUtils.isNotBlank(articleList)){ List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleList, HotArticleVo.class); boolean flag = true; // 3.1 文章已是热点文章, 则更新文章分数 for (HotArticleVo hotArticleVo : hotArticleVoList) { if(hotArticleVo.getId().equals(apArticle.getId())){ hotArticleVo.setScore(score); flag = false; break; } } // 3.2 文章还不是热点文章, 则替换分数最小的文章 if(flag){ if(hotArticleVoList.size() >= 30){ // 热点文章超过30条 hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList()); HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1); if(lastHot.getScore() < score){ hotArticleVoList.remove(lastHot); HotArticleVo hotArticleVo = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hotArticleVo); hotArticleVo.setScore(score); hotArticleVoList.add(hotArticleVo); } }else{ // 热点文章没超过30条 HotArticleVo hotArticleVo = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hotArticleVo); hotArticleVo.setScore(score); hotArticleVoList.add(hotArticleVo); } } // 3.3 缓存到redis hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList()); cacheService.set(HOT_ARTICLE_FIRST_PAGE, JSON.toJSONString(hotArticleVoList)); } } /** * 更新文章行为数据 * @param mess */ private ApArticle updateArticleBehavior(ArticleVisitStreamMess mess) { ApArticle apArticle = getById(mess.getArticleId()); apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment()); apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect()); apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike()); apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView()); updateById(apArticle); return apArticle; }
来源
黑马程序员. 黑马头条文章来源:https://www.toymoban.com/news/detail-701684.html
Gitee
https://gitee.com/yu-ba-ba-ba/leadnews文章来源地址https://www.toymoban.com/news/detail-701684.html
到了这里,关于JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!