使用 LF Edge eKuiper 将物联网流处理数据写入 Databend

这篇具有很好参考价值的文章主要介绍了使用 LF Edge eKuiper 将物联网流处理数据写入 Databend。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,edge,物联网,前端

LF Edge eKuiper

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 的主要目标是在边缘端提供一个流媒体软件框架(类似于 Apache Flink (opens new window))。eKuiper 的规则引擎允许用户提供基于 SQL 或基于图形(类似于 Node-RED)的规则,在几分钟内创建物联网边缘分析应用。具体介绍可以参考 [LF Edge eKuiper - 超轻量物联网边缘流处理软件(https://ekuiper.org/docs/zh/latest/)。 

使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,edge,物联网,前端

Databend Sql Sink

eKuiper 支持通过 Golang 或者 Python 在源 (Source)SQL 函数目标 (Sink) 三个方面的扩展,通过支持不同的 Sink,允许用户将分析结果发送到不同的扩展系统中。Databend 作为 Sink 也被集成到了 eKuiper plugin 当中,下面通过一个案例来展示如何使用 eKuiper 将物联网流处理数据写入 Databend。

编译 eKuiper 和 Databend Sql Plugin

eKuiper

git clone https://github.com/lf-edge/ekuiper & cd ekuiper
make

Databend Sql Plugin

go build -trimpath --buildmode=plugin -tags databend -o plugins/sinks/Sql.so extensions/sinks/sql/sql.go

编译后的 sink plugin 拷贝到 build 目录:

cp plugins/sinks/Sql.so _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64/plugins/sinks

Databend 建表

在 Databend 中先创建目标表 ekuiper_test:

create table ekuiper_test (name string,size bigint,id bigint);

启动 eKuiperd

cd _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64 
./bin/kuiperd

服务正常启动:

使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,edge,物联网,前端

创建流(stream) 和 规则 (rule)

eKuiper 提供了两种管理各种流、规则,目标端的方式,一种是通过 ekuiper-manager 的 [docker image](https://hub.docker.com/r/lfedge/ekuiper) 启动可视化管理界面,一种是通过 CLI 工具来管理。这里我们使用 CLI。

创建 stream

流是 eKuiper 中数据源连接器的运行形式。它必须指定一个源类型来定义如何连接到外部资源。这里我们创建一个流,从 json 文件数据源中获取数据,并发送到 eKuiper 中。

首先配置文件数据源,连接器的配置文件位于 /etc/sources/file.yaml

default:
  # 文件的类型,支持 json, csv 和 lines
  fileType: json
  # 文件以 eKuiper 为根目录的目录或文件的绝对路径。
  # 请勿在此处包含文件名。文件名应在流数据源中定义
  path: data
  # 读取文件的时间间隔,单位为ms。如果只读取一次,则将其设置为 0
  interval: 0
  # 读取后,两条数据发送的间隔时间
  sendInterval: 0
  # 是否并行读取目录中的文件
  parallel: false
  # 文件读取后的操作
  # 0: 文件保持不变
  # 1: 删除文件
  # 2: 移动文件到 moveTo 定义的位置
  actionAfterRead: 0
  # 移动文件的位置, 仅用于 actionAfterRead 为 2 的情况
  moveTo: /tmp/kuiper/moved
  # 是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。
  hasHeader: false
  # 定义文件的列。如果定义了文件头,该选项将被覆盖。
  # columns: [id, name]
  # 忽略开头多少行的内容。
  ignoreStartLines: 0
  # 忽略结尾多少行的内容。最后的空行不计算在内。
  ignoreEndLines: 0
  # 使用指定的压缩方法解压缩文件。现在支持`gzip`、`zstd` 方法。
  decompression: ""

使用 CLI 创建 steam 名为 stream1:

./bin/kuiper create stream stream1 '(id BIGINT, name STRING,size BIGINT) WITH (DATASOURCE="test.json", FORMAT="json", TYPE="file");'

使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,edge,物联网,前端

Json 文件的内容为:

[
  {"id": 1,"size":100, "name": "John Doe"},
  {"id": 2,"size":200, "name": "Jane Smith"},
  {"id": 3,"size":300, "name": "Kobe Brant"},
  {"id": 4,"size":400, "name": "Alen Iverson"}
]

创建 Databend Sink Rule

一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。eKuiper 有两种方法来定义规则的业务逻辑。要么使用 SQL / 动作组合,要么使用新增加的图 API。

这里我们通过指定 sql 和 actions 属性,以声明的方式定义规则的业务逻辑。其中,sql 定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action 路由到多个位置。

规则由 JSON 定义,下面是准备创建的规则 myRule.json:

{
  "id": "myRule",
  "sql": "SELECT id, name from stream1",
  "actions": [
    {
      "log": {
      },
      "sql": {
        "url": "databend://databend:databend@localhost:8000/default?sslmode=disable",
        "table": "ekuiper_test",
        "fields": ["id","name"]
      }
    }
  ]
}

执行 CLI 创建规则:

./bin/kuiper create rule myRule -f myRule.json

使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,edge,物联网,前端

可以查看所创建规则的运行状态:

./bin/kuiper getstatus rule myRule

使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,edge,物联网,前端

规则创建后,会立即将符合规则条件的数据发送到目标端,此时我们查看 Databend 的 ekuiper_test 表,可以看到文件数据源中的数据已经被写入到 Databend:

使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,edge,物联网,前端

可以看到由于我们的规则 SQL 中只指定了 idname 字段,所以这里只有这两个字段被写入。

结论

eKuiper 是 EMQ 旗下的一款流处理软件,其体积小、功能强大,在工业物联网、车辆网、公共数据分析等很多场景中得到广泛使用。本文介绍如何使用 eKuiper 将物联网流处理数据写入 Databend。文章来源地址https://www.toymoban.com/news/detail-721672.html

到了这里,关于使用 LF Edge eKuiper 将物联网流处理数据写入 Databend的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python点云处理(一)点云数据读取与写入

    当处理点云数据时,我们通常需要读取各种不同格式的点云文件。Python作为一种强大的编程语言,在点云处理领域提供了许多库和工具,可以帮助我们读取和处理各种格式的点云文件。本文将介绍如何使用Python读取和写入各种格式的点云文件。 LAS(Lidar Data Exchange)和LAZ(L

    2024年02月08日
    浏览(38)
  • 物联网开发终端管理篇-java从MQTT获取设备数据,并通过Druid连接池把数据写入MySQL数据库(Windows系统)

    下面来给大家做个简单的数据对接,也就是通过写JAVA代码实现MQTT协议 首页我们得搭建一个简单的IDEA项目,这个我就不做演示了 搭建完项目,我们需要准备一些jar包,jar包名如下: org.eclipse.paho.client.mqttv3-1.1.0.jar mysql-connector-java-5.1.34.jar jackson-databind-2.10.0.jar jackson-core-2.10.0

    2024年02月11日
    浏览(48)
  • Python处理xlsx文件(读取、转为列表、新建、写入数据、保存)

    xlsxwriter**库对于xslx表的列数不做限制, xlrd 库不能写入超过65535行,256列的数据。 由于需要处理的数据行列数较多,遇到报错才发现库的限制问题,记录一下。

    2024年02月12日
    浏览(66)
  • NCDC气象数据的提取与处理(四):python批量读取、写入nc数据经纬度格点数值

    1.问题描述: 2.思路: 3.实现过程: 3.1格点位置匹配 3.2写入表格 4.运行效果 4.1打包站点信息 4.2读取nc文件列表 4.3提取对应格点的nc数据 4.4数据写入 NCDC的站点数据处理在之前三节里已经介绍过了,但是NCDC的就那么几种数据可能不能满足日常使用,比如说辐射数据他就没有。

    2024年02月05日
    浏览(73)
  • Edge 中的msedgewebview2总想联网

    使用Edge浏览器的时候,右下角火绒总会弹出“msedgewebview2”想要联网的弹窗,如下 点击发起程序,找到路径如下: 从路径层次来看,属于【EdgeWebView】。 据2021年,IT之家一篇文章显示,安装 WebView2 主要是为了改善 Web 应用在使用某些服务时的体验,如 Office 应用、Outlook 和

    2024年02月04日
    浏览(26)
  • 电脑无法联网或者edge浏览器无法上网的解决办法

    本人有幸经历了三次电脑无法上网,只能说修的时候长路漫漫,于是痛定思痛,亡羊补牢,现在决心把它们都记录下来。一般情况:电脑无法联网的常见情况有以下几种: 网络故障:包括路由器、调制解调器或网络设备故障,网络线路断开或者是网络服务商的问题。 解决方

    2023年04月13日
    浏览(106)
  • Elasticsearch 基本使用(一)写入数据

    my_index :索引名 _doc :文档类型,现在只有一个类型,都是_doc 1 :文档id(PUT时,必须,POST时不是必须) PUT 为修改操作; 必须指定文档id POST 为保存操作; 指定id时,以该id保存;id存在则修改,不存在则新增。 未指定id时,直接新增,并随机生成一个id。 写入数据时,如果

    2024年02月10日
    浏览(49)
  • 基于边缘计算的物联网数据处理与分析

    边缘计算面临着数据安全与隐私保护、网络稳定性等挑战,但同时也带来了更强的实时性和本地处理能力,为企业降低了成本和压力,提高了数据处理效率。因此,边缘计算既带来了挑战也带来了机遇,需要我们不断地研究和创新,以应对日益复杂的应用场景和技术需求  

    2024年01月18日
    浏览(40)
  • 使用 Python 数据写入 Excel 工作表

    在数据处理和报告生成等工作中,Excel 表格是一种常见且广泛使用的工具。然而,手动将大量数据输入到 Excel 表格中既费时又容易出错。为了提高效率并减少错误,使用 Python 编程语言来自动化数据写入 Excel 表格是一个明智的选择。Python 作为一种简单易学且功能强大的编程

    2024年01月18日
    浏览(33)
  • 如何使用Python给Excel写入数据

    openpyxl三步走 获取work book 获取 work sheet 再然后 获取单元格 进行操作 保存文件 安装OpenpyXl 导包方式以下两种都可以 from openpyxl import Workbook from openpyxl import load_workbook 向工作表中写入数据 保存至文件 最保险的保存方式是调用 save 方法保存到指定文件: 结果如下: 向工作表中

    2024年02月14日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包