构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

这篇具有很好参考价值的文章主要介绍了构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合,工程师有话说,wpf,时序数据库,flink,kafka

当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个强大的实时数据处理流水线。

什么是 Flink、Kafka、CnosDB

  • Flink:是一个强大的流式处理引擎,它支持事件驱动、分布式、并且容错。Flink能够处理高吞吐量和低延迟的实时数据流,适用于多种应用场景,如数据分析、实时报表和推荐系统等。
  • Kafka:是一个高吞吐量的分布式流数据平台,用于收集、存储和传输实时数据流。Kafka具有良好的持久性、可扩展性和容错性,适用于构建实时数据流的可靠管道。
  • CnosDB:是一个专为时序数据设计的开源时序数据库。它具有高性能、高可用性和易用性的特性,非常适合存储实时生成的时间序列数据,如传感器数据、日志和监控数据等。

场景描述

用例中假设有一个物联网设备网络,每个设备都定期生成传感器数据,包括温度、湿度和压力等。我们希望能够实时地收集、处理和存储这些数据,以便进行实时监控和分析。

数据流向架构图如下:

构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合,工程师有话说,wpf,时序数据库,flink,kafka

  1. 首先,我们需要设置一个数据收集器来获取传感器数据,并将数据发送到 Kafka 主题。这可以通过编写一个生产者应用程序来实现,该应用程序将生成的传感器数据发送到 Kafka。
  2. 使用 Flink来实时处理传感器数据。首先,需要编写一个Flink应用程序,该应用程序订阅 Kafka 主题中的数据流,并对数据进行实时处理和转换。例如,您可以计算温度的平均值、湿度的最大值等。
  3. 将处理后的数据存储到 CnosDB 中以供后续查询。为了实现这一步,需要配置一个CnosDB Sink,使得Flink应用程序可以将处理后的数据写入 CnosDB 中。

构建流水线

1.数据采集与传输

编写一个生产者应用程序,读取传感器数据并将其发送到 Kafka 主题。

public class SensorDataProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String, String> producer = new KafkaProducer<>(props);


        while (true) {
            SensorData data = generateSensorData(); // 生成传感器数据
            producer.send(new ProducerRecord<>("sensor-data-topic", data));
            Thread.sleep(1000); // 每秒发送一次数据
        }
    }
}

2.实时处理与转换

编写一个 Flink 应用程序,订阅 Kafka 主题中的数据流,实时处理并转换数据。

// Flink 应用程序示例
public class SensorDataProcessingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.setProperty("group.id", "sensor-data-consumer-group");


        DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));


        DataStream<ProcessedData> processedData = sensorData
            .map(json -> parseJson(json)) // 解析JSON数据
            .keyBy(ProcessedData::getDeviceId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口
            .apply(new SensorDataProcessor()); // 自定义处理逻辑


        processedData.print(); // 打印处理后的数据,可以替换为写入 CnosDB 操作


        env.execute("SensorDataProcessingJob");
    }
}

3.数据写入与存储

配置CnosDB Sink,将 processedData.print() 替换为写入 CnosDB 的程序在 CnosDB 创建一个存储数据时长为 30 天的数据库:

| CnosDB 建库语法说明请查看:创建数据库[https://docs.cnosdb.com/zh/latest/reference/sql.html#创建数据库]

CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REPLICA 2;

在 Maven [https://maven.apache.org/]中引入 CnosBD Sink [https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html]包:

<dependency>
    <groupId>com.cnosdb</groupId>
    <artifactId>flink-connector-cnosdb</artifactId>
    <version>1.0</version>
</dependency>

编写程序:

public class WriteToCnosDBJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.setProperty("group.id", "sensor-data-consumer-group");


        DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));


        DataStream<ProcessedData> processedData = sensorData
            .map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // 解析JSON数据
            .keyBy(ProcessedData::getDeviceId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口
            .apply(new SensorDataProcessor()); // 自定义处理逻辑


        DataStream<CnosDBPoint> cnosDBDataStream = processedData.map(
                new RichMapFunction<ProcessedData, CnosDBPoint>() {
                    @Override
                    public CnosDBPoint map(String s) throws Exception {
                        return new CnosDBPoint("sensor_metric")
                                .time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS)
                                .tag("device_id", value.getDeviceId())
                                .field("average_temperature", value.getAverageTemperature())
                                .field("max_humidity", value.getMaxHumidity());
                    }
                }
        );


        CnosDBConfig cnosDBConfig = CnosDBConfig.builder()
                .url("http://localhost:8902")
                .database("db_flink_test")
                .username("root")
                .password("")
                .build();


        cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig));
        env.execute("WriteToCnosDBJob");
    }
}

运行后查看结果:

db_flink_test ❯ select * from sensor_metric limit 10;
+---------------------+---------------+---------------------+--------------+
| time                | device_id     | average_temperature | max_humidity |
+---------------------+---------------+---------------------+--------------+
| 2023-01-14T17:00:00 | OceanSensor1  | 23.5                | 79.0         |
| 2023-01-14T17:05:00 | OceanSensor2  | 21.8                | 68.0         |
| 2023-01-14T17:10:00 | OceanSensor1  | 25.2                | 75.0         |
| 2023-01-14T17:15:00 | OceanSensor3  | 24.1                | 82.0         |
| 2023-01-14T17:20:00 | OceanSensor2  | 22.7                | 71.0         |
| 2023-01-14T17:25:00 | OceanSensor1  | 24.8                | 78.0         |
| 2023-01-14T17:30:00 | OceanSensor3  | 23.6                | 80.0         |
| 2023-01-14T17:35:00 | OceanSensor4  | 22.3                | 67.0         |
| 2023-01-14T17:40:00 | OceanSensor2  | 25.9                | 76.0         |
| 2023-01-14T17:45:00 | OceanSensor4  | 23.4                | 70.0         |
+---------------------+---------------+---------------------+--------------+

总结

通过结合Flink、Kafka 和 CnosDB,您可以构建一个强大的实时数据处理流水线,从数据采集到实时处理再到数据存储和可视化。每个步骤都涉及具体的配置和代码实现,确保您熟悉每个工具的特性和操作。这种架构适用于各种实时数据应用,如物联网监控、实时报表和仪表板等。根据您的需求和情境,调整配置和代码,以构建适合您业务的实时数据处理解决方案。

CnosDB简介

CnosDB是一款高性能、高易用性的开源分布式时序数据库,现已正式发布及全部开源。

欢迎关注我们的社区网站:https://cn.cnosdb.com文章来源地址https://www.toymoban.com/news/detail-693844.html

到了这里,关于构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Jenkins】Jenkins构建后端流水线

    xxxx后端-后端cim服务测试环境-项目构建 创建人:创建者姓名 时间:创建时间 保持构建天数:2 保持最大构建个数:3 ①clean:清楚之前;package:打包 -Dmaven.test.skip=true:跳过测试;-U:强制让Maven检查所有SNAPSHOT依赖更新 这里的pom.xml文件注意: Branches to build:分支某个环境的

    2024年02月12日
    浏览(53)
  • jenkins通过流水线进行构建jar包

    最近项目上需要进行CICD,本篇博客主要分享各种骚操作 1.下载Jenkins.war包上传到服务器上面,然后在同级目录下面创建如下脚本:

    2024年02月14日
    浏览(44)
  • 37 | Kafka & ZMQ:自动化交易流水线

    在进行这节的学习前,我们先来回顾一下,前面三节,我们学了些什么。 第 34 讲,我们介绍了如何通过 RESTful API 在交易所下单;第 35 讲,我们讲解了如何通过 Websocket ,来获取交易所的 orderbook 数据;第 36 讲,我们介绍了如何实现一个策略,以及如何对策略进行历史回测。

    2024年01月16日
    浏览(52)
  • 实战:Docker+Jenkins+Gitee构建CICD流水线

    持续集成和持续交付一直是当下流行的开发运维方式,CICD省去了大量的运维时间,也能够提高开发者代码集成规范。开发者在开发完需求功能后可以直接提交到gitee,然后jenkins直接进行代码编译和一体化流水线部署。通过流水线部署可以极大的提高devops效率,也是企业信息自

    2024年02月14日
    浏览(53)
  • Docker+Jenkins(blueocean)+Gitee构建CICD流水线实战

    vim /etc/profile export JAVA_HOME=/home/jdk/jdk1.8.0_301 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin source /etc/profile vim /etc/profile export MAVEN_HOME=/home/maven/apache-maven-3.8.6 export PATH=$PATH:$MAVEN_HOME/bin source /etc/profile docker pull jenkinsci/blue

    2024年02月11日
    浏览(76)
  • (十六)devops持续集成开发——jenkins流水线构建之邮件通知

    本节内容主要介绍jenkins在流水线任务构建完成后的通知操作,使用jenkins的邮件通知插件完成构建任务结束的通知。一般项目发布都会通知相关的责任人,这样项目发布在出现问题时能够及时的处理。 ①在插件中心安装Email Extension邮件通知插件 ②申请一个发送邮件的邮箱服务

    2024年02月21日
    浏览(63)
  • devops-5:从0开始构建一条完成的CI CD流水线

    前文中已经讲述了静态、动态增加agent节点,以动态的k8s cloud为例,下面就以Maven构建Java程序为例,开始构建出一条完整的CI CD流水线。 实现功能目标: 1.分别可以根据分支和tag从源码仓库clone代码 2.拿到源码后开始编译 3.构建image,并push到镜像仓库 4.部署到对应k8s集群 5.部署

    2023年04月20日
    浏览(59)
  • 如何使用CodeceptJS、Playwright和GitHub Actions构建端到端测试流水线

    介绍 端到端测试是软件开发的一个重要方面,因为它确保系统的所有组件都能正确运行。CodeceptJS是一个高效且强大的端到端自动化框架,与Playwright 结合使用时,它成为自动化Web、移动甚至桌面 (Electron.js) 应用程序比较好用的工具。 在本文中,作者探讨如何使用 CodeceptJS、

    2024年02月05日
    浏览(72)
  • (十五)devops持续集成开发——jenkins流水线构建策略配置及触发器的使用

    本节内容我们主要介绍在Jenkins流水线中,其构建过程中的一些构建策略的配置,例如通过远程http构建、定时任务构建、轮询SCM构建、参数化构建、Git hook钩子触发构建等,可根据不同的需求完成不同构建策略的配置。 - 构建策略说明: - 测试验证 - 构建说明 - 测试验证 - 配置

    2024年02月21日
    浏览(92)
  • 构建强大的产品级NLP系统:PaddleNLP Pipelines端到端流水线框架解析

    搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术细节以及项目实战(含码源) 专栏详细介绍:搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目

    2024年02月04日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包