高效数据传输:轻松上手将Kafka实时数据接入CnosDB

这篇具有很好参考价值的文章主要介绍了高效数据传输:轻松上手将Kafka实时数据接入CnosDB。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

高效数据传输:轻松上手将Kafka实时数据接入CnosDB,工程师有话说,kafka,分布式

本篇我们将主要介绍如何在 Ubuntu 22.04.2 LTS 环境下,实现一个Kafka+Telegraf+CnosDB 同步实时获取流数据并存储的方案。在本次操作中,CnosDB 版本是2.3.0,Kafka 版本是2.5.1,Telegraf 版本是1.27.1 

随着越来越多的应用程序架构转向微服务或无服务器结构,应用程序和服务的数量每天都在增加。用户既可以通过实时聚合,也可以通过输出为测量或指标的计算,来处理数量不断增加的时间序列数据。面对产生的海量数据,用户可以通过多种方式来捕获和观察系统中数据的变化,在云原生环境中,最流行的一种是使用事件。

Apache Kafka是一个耐用、高性能的消息系统,也被认为是分布式流处理平台。它可应用于许多用例,包括消息传递、数据集成、日志聚合和指标。而就指标而言,仅有消息主干或代理是不够的。虽然 Apache Kafka 很耐用,但它并不是为运行指标和监控查询而设计的。这恰恰正是 CnosDB 的长处。

架构方案

通过将这Kafka、Telegraf和CnosDB 三者结合起来,可以实现数据的完整流程:

  1. 数据生成:使用传感器、设备或其他数据源产生数据,并将其发送到Kafka主题。
  2. Kafka 消息队列:Kafka 接收并存储数据流,确保数据安全和可靠性。
  3. Telegraf 消费者:Telegraf 作为 Kafka 的消费者,订阅 Kafka 主题并获取数据流。
  4. CnosDB 数据存储:经过预处理的数据由 Telegraf 发送到 CnosDB 中进行时序数据的存储。

整体的应用程序架构如图所示:

高效数据传输:轻松上手将Kafka实时数据接入CnosDB,工程师有话说,kafka,分布式

Kafka

Apache Kafka 是一个开源分布式流处理平台,它被设计用于处理实时数据流,具有高可靠性、高吞吐量和低延迟的特点,目前已经被大多数公司使用。它的使用方式非常多样化,包括:

  • 流处理:它通过存储实时事件以进行聚合、丰富和处理来提供事件主干。
  • 指标:Apache Kafka 成为许多分布式组件或应用程序(例如微服务)的集中聚合点。这些应用程序可以发送实时指标以供其他平台使用,包括 CnosDB。
  • 数据集成:可以捕获数据和事件更改并将其发送到 Apache Kafka,任何需要对这些更改采取行动的应用程序都可以使用它们。
  • 日志聚合:Apache Kafka 可以充当日志流平台的消息主干,将日志块转换为数据流。

几个核心概念

  1. 实例(Broker):Kafka的Broker是Kafka集群中的服务器节点,负责存储和转发消息,提供高可用性、容错性和可靠性。
  2. 主题(Topic):Apache Kafka 中的 topic ,是逻辑存储单元,就像关系数据库的表一样。主题通过分区通过代理进行分发,提供可扩展性和弹性。
  3. 生产者(Producer):生产者将消息发布到Kafka的指定主题。生产者可以选择将消息发送到特定的分区,也可以让Kafka自动决定分配策略。
  4. 消费者(Consumer):消费者从指定主题的一个或多个分区中读取消息。消费者可以以不同的方式进行组织,如单播、多播、消费者组等。
  5. 发布-订阅模式:是指生产者将消息发布到一个或多个主题,而消费者可以订阅一个或多个主题,从中接收并处理消息。

简单来说就是,当客户端将数据发送到 Apache Kafka 集群实例时,它必须将其发送到某个主题。

此外,当客户端从 Apache Kafka 集群读取数据时,它必须从主题中读取。向 Apache Kafka 发送数据的客户端成为生产者,而从 Kafka 集群读取数据的客户端则成为消费者。数据流向示意图如下:

高效数据传输:轻松上手将Kafka实时数据接入CnosDB,工程师有话说,kafka,分布式

注:这里没有引入更复杂的概念,如topic分区、偏移量、消费者组等,用户可自行参考官方指导文档学习:

Kafka:【https://kafka.apache.org/documentation/#gettingStarted】

部署 Kafka

下载并安装Kafka【https://kafka.apache.org/】

1.前提:需确保有 JDK 环境和 Zookeeper 环境,如果没有可以使用下面的命令进行安装:

sudo apt install openjdk-8-jdk
sudo apt install zookeeper

2.下载 Kafka 安装包并解压

wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz
tar -zxvf kafka_2.12-2.5.1.tgz

3.进入解压后的 Kafka 目录

cd  kafka_2.12-2.5.1

4.修改$KAFKA_HOME/config/server.properties的配置文件(可按需修改端口、日志路径等配置信息)

5.保存并关闭编辑器。运行下面的命令来启动Kafka:

bin/kafka-server-start.sh config/server.properties

Kafka 将在后台运行,并通过默认的 9092 端口监听连接。

Telegraf

Telegraf 是一个开源的服务器代理程序,用于收集、处理和传输系统和应用程序的指标数据。Telegraf 支持多种输入插件和输出插件,并且能够与各种不同类型的系统和服务进行集成。它可以从系统统计、日志文件、API 接口、消息队列等多个来源采集指标数据,并将其发送到各种目标,如 CnosDB 、Elasticsearch、Kafka、Prometheus 等。这使得 Telegraf 非常灵活,可适应不同的监控和数据处理场景。

  • 轻量级:Telegraf被设计为一个轻量级的代理程序,对系统资源的占用相对较小,可以高效运行在各种环境中。
  • 插件驱动:Telegraf使用插件来支持各种输入和输出功能。它提供了丰富的插件生态系统,涵盖了众多的系统和服务。用户可以根据自己的需求选择合适的插件来进行指标数据的采集和传输。
  • 数据处理和转换:Telegraf具有灵活的数据处理和转换功能,可以通过插件链(Plugin Chain)来对采集到的指标数据进行过滤、处理、转换和聚合,从而提供更加精确和高级的数据分析。

部署 Telegraf

1.安装 Telegraf

sudo apt-get update && sudo apt-get install telegraf

2.切换到 Telegraf 的默认配置文件所处目录 /etc/telegraf 下

3.在配置文件 telegraf.config 中添加目标 OUTPUT PLUGIN

[[outputs.http]]
  url = "http://127.0.0.1:8902/api/v1/write?db=telegraf"
  timeout = "5s"
  method = "POST"
  username = "root"
  password = ""
  data_format = "influx"
  use_batch_format = true
  content_encoding = "identity"
  idle_conn_timeout = 10

按需修改的参数:

url:CnosDB 地址和端口
username:连接 CnosDB 的用户名
password:连接 CnosDB 的用户名对应的密码

注:其余参数可与上述配置示例中保持一致

4.在配置文件中将下面的配置注释放开,可按需修改

[[inputs.kafka_consumer]]
brokers = ["127.0.0.1:9092"]
topics = ["oceanic"]
data_format = "json"

参数:

brokers:Kafka 的 broker list 
topics:指定写入 Kafka 目标的 topic
data_format:写入数据的格式

注:其余参数可与上述配置示例中保持一致

5.启动 Telegraf

telegraf -config /etc/telegraf/telegraf.conf

CnosDB

部署 CnosDB

详细操作请参考: CnosDB 安装

【https://docs.cnosdb.com/zh/latest/start/install.html】

整合

Kafka创建topic

1.进入 kafka 的 bin 文件夹下

2.执行命令,创建 topic

./kafka-topics.sh --create --topic oceanic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Python 模拟写入数据到Kakfa

1.编写代码:

import time
import json
import random


from kafka import KafkaProducer


def random_pressure():
    return round(random.uniform(0, 10), 1)


def random_tempreture():
    return round(random.uniform(0, 100), 1)


def random_visibility():
    return round(random.uniform(0, 100), 1)


def get_json_data():
    data = {}


    data["pressure"] = random_pressure()
    data["temperature"] = random_temp_cels()
    data["visibility"] = random_visibility()


    return json.dumps(data) 


def main():
    producer = KafkaProducer(bootstrap_servers=['ip:9092'])


    for _ in rang(2000):
        json_data = get_json_data()
        producer.send('oceanic', bytes(f'{json_data}','UTF-8'))
        print(f"Sensor data is sent: {json_data}")
        time.sleep(5)




if __name__ == "__main__":
    main()

2.运行Python脚本

python3 test.py

查看 kafka topic 中的数据

1.执行下面查看指定 topic 数据的命令

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic oceanic --from-beginning

高效数据传输:轻松上手将Kafka实时数据接入CnosDB,工程师有话说,kafka,分布式

查看同步到 CnosDB 中的数据

1.使用工具连接到CnosDB

cnosdb-cli

2.切换到指定库

\c public

3.查看数据

select * from kafka_consumer;

高效数据传输:轻松上手将Kafka实时数据接入CnosDB,工程师有话说,kafka,分布式

补充阅读

1.使用 Telegraf 采集数据并写入 CnosDB:

https://docs.cnosdb.com/zh/latest/versatility/collect/telegraf.html

2.Python 连接器:

https://docs.cnosdb.com/zh/latest/reference/connector/python.html

3.CnosDB 快速开始:

https://docs.cnosdb.com/zh/latest/start/quick_start.html文章来源地址https://www.toymoban.com/news/detail-654666.html

到了这里,关于高效数据传输:轻松上手将Kafka实时数据接入CnosDB的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 前端(二十一)——WebSocket:实现实时双向数据传输的Web通信协议

    🤨博主:小猫娃来啦 🤨文章核心: WebSocket:实现实时双向数据传输的Web通信协议 在当今互联网时代,实时通信已成为很多应用的需求。为了满足这种需求,WebSocket协议被设计出来。WebSocket是一种基于TCP议的全双工通信协议,通过WebSocket,Web应用程序可以与服务器建立持久

    2024年02月04日
    浏览(64)
  • chatgpt赋能python:Python的Pipe:快速高效的数据传输工具

    如果你是一名Python工程师,那么你一定会非常了解数据传输的重要性。Python的Pipe就是一种可以让你快速高效地传输数据的工具。在本文中,我们将对Python的Pipe进行详细介绍,探讨它的优点、如何使用以及应用场景。 Pipe是一种在Linux操作系统中非常常见的数据传输工具。Pyt

    2024年02月10日
    浏览(48)
  • IT行业软件数据文件传输安全与高效是如何保障的?

    在当今迅速发展的科技世界中,云计算、大数据、移动互联网等信息技术正迎来蓬勃发展,IT行业正置身于一个全新的世界。数据不仅是最重要的资产,也是企业竞争力的核心所在。然而,如何缩短信息共享时间、高速流转数据、跨部门/跨区域协作,以及有效整合IT基础设施

    2024年02月19日
    浏览(46)
  • 【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同

    Flink版本: 本文主要是基于Flink1.14.4 版本 导言: Apache Flink 作为流式处理领域的先锋,为实时数据处理提供了强大而灵活的解决方案。其中,KafkaSink 是 Flink 生态系统中的关键组件之一,扮演着将 Flink 处理的数据可靠地发送到 Kafka 主题的角色。本文将深入探讨 KafkaSink 的工作

    2024年02月20日
    浏览(63)
  • 电脑传输数据STM32模拟I2C显示实时画面到OLED

    写的不好,还望大家指正,有的地方引用了一下大佬的代码。 一、所需硬件: STM32F103C8T6 USB转串口模块 OLED 128*64显示屏 STLINK 二、代码部分 1.stm32串口部分代码 2.stm32OLED屏幕部分代码 3.主程序 4.电脑通过opencv库截取电脑当前1080p一帧画面,并对图片二值化处理,通过电脑端编写

    2024年02月13日
    浏览(47)
  • 使用 Web-Socket 客户端在两个 ESP32 之间进行实时数据传输

    近年来, 基于Arduino和ESP32 的网络服务器项目变得非常流行,并且对各种应用都很有用。但是这种 Web 服务器的主要缺点之一是它的静态特性。意味着通过 HTTP 更新网页,您需要更新整个网页,然后才能更新任何数据。这个问题有很多解决方法,比如几秒钟后自动刷新网页等等

    2024年02月07日
    浏览(54)
  • 日志采集传输框架之 Flume,将监听端口数据发送至Kafka

    1、简介                 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,主要有以下几个部分组成。  主要组件介绍: 1)、 Flume Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。Agent 主

    2024年01月22日
    浏览(57)
  • Linux版百度网盘丨直接在服务器SSH命令行中使用百度云,轻松解决数据传输和分享难题

    本文主要的目的就是在Linux环境下通过命令行来使用百度云盘!直接在服务器上将数据(比如基因组等大文件)传输到百度网盘,之后可以进行轻松分享,而且还可以支持备份,解决文件的传输和分享难题。 bypy是一个Python客户端,用于操作百度网盘,提供了丰富的命令行操作

    2024年02月09日
    浏览(56)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(55)
  • 【ESP32音视频传输】②通过I2S采集SPH0645麦克风音频数据并上传到服务端实时播放

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 本文章基于Arduino ESP32 2.07版本,因为2.04版本开始I2S驱动被更改了,所以相同代码可能效果不太同 本文主要参考了:https://atomic14.com/2020/09/12/esp32-audio-input.html ESP32有多种方式从外置麦克风中读取数据:

    2024年02月11日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包