springboot~kafka-stream实现实时统计

这篇具有很好参考价值的文章主要介绍了springboot~kafka-stream实现实时统计。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

实时统计,也可以理解为流式计算,一个输入流,一个输出流,源源不断。

Kafka Stream

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

相关术语

源处理器和Sink处理器是Kafka Streams中的两个重要组件,它们分别用于从输入流获取数据并将处理后的数据发送到输出流。以下是它们的工作流程的文字图示表达:

[Source Processor] -> [Processor Topology] -> [Sink Processor]
  1. 源处理器(Source Processor)

    • 源处理器负责从一个或多个输入主题(topics)中提取数据,并将数据转换为KStream或KTable对象。
    • 它通常是处理拓扑结构的起点,从一个或多个输入主题中读取数据,并将其发送到处理拓扑中的下一个处理器。
  2. Sink 处理器(Sink Processor)

    • Sink处理器负责将经过处理的数据发送到一个或多个输出主题,或者执行其他终端操作。
    • 它通常是处理拓扑结构的终点,在处理拓扑的最后阶段接收处理后的数据,并将其发送到输出主题,或者执行其他终端操作,如存储到数据库、发送到外部系统等。
  3. Processor Topology

    • 处理拓扑包含了源处理器、中间处理器和Sink处理器,它定义了数据流的处理逻辑。
    • 在处理拓扑中,数据流会通过一系列的处理器进行转换、聚合和处理,最终到达Sink处理器,完成整个处理流程。

通过这种处理流程,Kafka Streams可以实现对数据流的灵活处理和转换,使得你能够方便地构建实时流处理应用程序。

kafka stream demo

依赖

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
  <version>2.5.1</version>
</dependency>

环境准备

  • 安装kafka
  • 建立topic,我以keycloak为例,它有login_in这个主题,用来记录登录信息
  • 建立topic,如total_record,用来存储login_in的实时统计的结果
  • 可使用springboot继承的消费者,去消费total_record,如写入数据库进行持久化

业务代码

  • 配置类
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {

	private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;

	@Value("${spring.kafka.bootstrap-servers}")
	private String hosts;

	@Value("${spring.kafka.consumer.group-id}")
	private String group;

	@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
	public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
		Map<String, Object> props = new HashMap<>();
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, group + "_stream_aid");
		props.put(StreamsConfig.CLIENT_ID_CONFIG, group + "_stream_cid");
		props.put(StreamsConfig.RETRIES_CONFIG, 3);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//从最近的消息开始消费
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		return new KafkaStreamsConfiguration(props);
	}

}
  • 消费类
@Configuration
@Slf4j
public class KafkaStreamListener {

	@Autowired
	ReportLoginTypeMapper reportLoginTypeMapper;
	@KafkaListener(topics = "total_record")
	public void listen(ConsumerRecord<String, String> record) {
		// 将时间戳转换为本地日期时间
		LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault());
		ReportLoginType reportLoginType=new ReportLoginType();
		reportLoginType.setLoginType(record.key());
		reportLoginType.setCreateAt(dateTime);
		reportLoginType.setCount(Integer.parseInt(record.value()));
		reportLoginTypeMapper.insert(reportLoginType);
	}

	@Bean
	public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<String, String> stream = streamsBuilder.stream("KC_LOGIN");
		KStream<String, String> serializedStream = stream.mapValues(jsonString -> {
			// 分组依据
			if (JSONUtil.parseObj(jsonString).containsKey("details")) {
				JSONObject details = JSONUtil.parseObj(jsonString).getJSONObject("details");
				if (details.containsKey("loginType")) {
					String loginType = details.getStr("loginType");
					return loginType;
				}
				return "";
			}
			else {
				return "";
			}
		});
		/**
		 * 处理消息的value
		 */
		serializedStream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
			@Override
			public Iterable<String> apply(String value) {
				return Arrays.asList(value.split(" "));
			}
		}).filter((key, value) -> !value.equals(""))
				// 按照value进行聚合处理
				.groupBy((key, value) -> value)// 这进而的value是kafka的消息内容
				// 时间窗口
				.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
				// 统计单词的个数
				.count()
				// 转换为kStream
				.toStream().map((key, value) -> {
					// key是分组的key,它是一个window对象,它里面有分组key和时间窗口的开始时间和结束时间,方便后期我们统计,value是分组count的结果
					return new KeyValue<>(key.toString(), value.toString());
				})
				// 发送消息
				.to("topic-out");
		return stream;
	}

}

上面代码在分组统计之后,给把数据发到topic-out的kafka主题里,需要注意kafka主题的key是一个代码分组key和窗口期的字符串,方便我们后期做数据统计,一般这些窗口期的数据和key一样,会写到数据表里,像我们查询数据表时,会根据它们选择最大的value值,因为同一窗口里的计数,我们取最大就可以,它已经包含了相同窗口期的其它值。

select login_type,window_start,window_end,max(count) FROM report_login_type
where login_type='password' and create_at>='2024-01-10 14:00:00' 
group by login_type,window_start,window_end

最后看一下total_record的内容

springboot~kafka-stream实现实时统计文章来源地址https://www.toymoban.com/news/detail-776943.html

到了这里,关于springboot~kafka-stream实现实时统计的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 实时处理Stream与Batch的对比分析

    Apache Kafka是一个高吞吐量、分布式、基于发布/订阅模式的消息队列,被大部分公司用做实时数据处理平台。它主要有以下特点: 高性能:Kafka采用了Zero-Copy技术和PageCache机制,在保证数据可靠性的同时提高了性能表现; 可扩展性:Kafka可以很容易的在集群中添加或删除Broke

    2024年02月15日
    浏览(29)
  • 数据平台的实时处理:Streaming和Apache Kafka

    随着数据的增长和数据处理的复杂性,实时数据处理变得越来越重要。实时数据处理是指在数据产生时或者数据产生后的很短时间内对数据进行处理的技术。这种技术在各个领域都有广泛的应用,如实时推荐、实时监控、实时分析、实时语言翻译等。 在实时数据处理中,St

    2024年04月14日
    浏览(32)
  • 推荐系统架构设计实践:Spark Streaming+Kafka构建实时推荐系统架构

    作者:禅与计算机程序设计艺术 推荐系统(Recommendation System)一直都是互联网领域一个非常火热的话题。其主要目标是在用户多样化的信息环境中,通过分析用户的偏好、消费习惯等数据,提供个性化的信息推送、商品推荐、购物指导等服务。如何设计一个推荐系统的架构及

    2024年02月08日
    浏览(36)
  • Kafka数据流的实时采集与统计机制

    随着大数据时代的到来,实时数据处理成为了众多企业和组织的关注焦点。为了满足这一需求,Apache Kafka成为了一个广泛采用的分布式流处理平台。Kafka以其高吞吐量、可扩展性和容错性而闻名,被广泛应用于日志收集、事件驱动架构和实时分析等场景。 在本文中,我们将探

    2024年02月07日
    浏览(34)
  • 如何使用Apache Kafka和Storm实时处理大规模的Twitter数据集 ?4 Streaming Large Collections of Twitter Data in RealTime

    作者:禅与计算机程序设计艺术 Twitter是一个巨大的社交媒体网站,每天都有数以亿计的用户参与其中。许多企业利用其数据的价值已经成为众矢之的。比如,广告、营销、市场调研等方面都依赖于Twitter数据。 Streaming Large Collections of Twitter Data in Real-Time with Apache Kafka and Stor

    2024年02月07日
    浏览(37)
  • PyQt结合OpenCV实现实时人流量统计

    之前学的基本都是Web端的技术。前两天的面试,让我深入的去学习一下  Qt 技术,了解完概念之后,才知道我之前接触的类  TkInter  技术,有点安卓开发的味道。。。

    2024年02月12日
    浏览(30)
  • 153基于python和opencv实现实时统计米粒计数

    本期是一个小的demo,对米粒进行实时统计个数。效果图如下: 代码下载和视频演示地址: 153基于python和opencv实现实时统计米粒计数_哔哩哔哩_bilibili 其他代码可了解: 001手写汉字识别-单个汉字识别-pyqt可视化交互界面-python代码\\\') 002unet墙体瑕疵检测-python-pytorch\\\') 003水果识别

    2024年04月11日
    浏览(33)
  • Java8用Stream流一行代码实现数据分组统计,排序,最大值、最小值、平均值、总数、合计

    Java8对数据处理可谓十分流畅,既不改变数据,又能对数据进行很好的处理,今天给大家演示下,用Java8的Stream如何对数据进行分组统计,排序,求和等 汇总统计方法 找到汇总统计的方法。这些方法属于java 8的汇总统计类。 getAverage(): 它返回所有接受值的平均值。 getCount():

    2023年04月20日
    浏览(52)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(46)
  • SpringBoot+Redis stream实现消息队列

    目录 一、前言 二、下载Redis及引入Redis依赖 三、配置消费者及消费组 四,配置Redsi及初始化stream、消费组、消费者 相较于 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等重量级的消息队列中间件,Redis在需求量小的情况下,也可以作为消息中间件来使用。Redis作为消息队列使

    2024年02月16日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包