【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动

这篇具有很好参考价值的文章主要介绍了【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言


fly-iot飞凡物联专栏:
https://blog.csdn.net/freewebsys/category_12219758.html

1,视频演示地址


https://www.bilibili.com/video/BV19a4y127Gt/

【fly-iot】(7):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动

【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动,iot-manager,fly-iot,Python,物联网,http,数据库,emqx,webhook

2,webhook直接写个服务端地址就行


【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动,iot-manager,fly-iot,Python,物联网,http,数据库,emqx,webhook
【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动,iot-manager,fly-iot,Python,物联网,http,数据库,emqx,webhook

3,在IOT平台创建设备

【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动,iot-manager,fly-iot,Python,物联网,http,数据库,emqx,webhook

然后就可以在工具中进行测试了:

【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动,iot-manager,fly-iot,Python,物联网,http,数据库,emqx,webhook

4,编写webhook 接口

直接设置接口:

...

async def backend_callback(request_dict):

    request_dict['callback_date'] = datetime.now()
    # 之前是 action 现在是 event
    callback_action = request_dict.get('event')
    print(f'######### event {callback_action} ############')

    if callback_action == 'client.connected':
        await client_connected_callback(request_dict)
        return {'status': 200}, 200
    elif callback_action == 'client.disconnected':
        await client_disconnected_callback(request_dict)
        return {'status': 200}, 200
    else:
        return {'status': 200}, 200
    


async def client_disconnected_callback(request_dict) -> None:

    print(f'######### client_disconnected_callback ############')

    device_info = await _query_device_info(
        request_dict.get('clientid'),
        request_dict.get('username'),
    )
    connect_dict = {
        'msgTime': request_dict['callback_date'],
        'deviceID': device_info['deviceID'],
        'tenantID': device_info['tenantID'],
        'connectStatus': 0,
        'IP': 'NULL'
    }

    print(f'######### device_info {device_info} ############')
    print(f'######### connect_dict {connect_dict} ############')

    update_device = {
        'deviceStatus': 0,
        'id': device_info['id']
    }
    await db.execute(insert_connect_logs_sql.format(**connect_dict))
    await db.execute(update_device_status_sql.format(**update_device))


async def client_connected_callback(request_dict) -> None:
    """ Device connected subscribe inbox topic """

    device_info = await _query_device_info(
        request_dict.get('clientid'),
        request_dict.get('username'),
    )
    if device_info['protocol'] == 'lwm2m':
        # if device protocol is lwm2m pass
        return
    
    print(f'######### device_info {device_info} ############')

    update_device = {
        'deviceStatus': 1,
        'id': device_info['id']
    }
    await db.execute(update_device_status_sql.format(**update_device))

    auto_sub_topic = (
        f"/{device_info['protocol']}/{device_info['tenantID']}"
        f"/{device_info['productID']}/{device_info['deviceID']}/inbox"
    )
    request_json = {
        'topic': auto_sub_topic,
        'qos': 1,
        'clientid': device_info['deviceID']
    }
    emqx_sub_url = f"{project_config['EMQX_API']}/mqtt/subscribe"
    async with AsyncHttp(auth=project_config['EMQX_AUTH']) as async_http:
        response = await async_http.post_url(
            url=emqx_sub_url, json=request_json
        )
        logger.info(response)


async def message_acked_callback(request_dict) -> None:
    """ Update the publish status when the device receives the publish message """

    device_id = request_dict.get('clientid')
    payload = request_dict.get('payload')
    if device_id == 'pulsario___emqx_all_0':
        # rule_engine filter
        return
    if not device_id or not payload:
        raise HttpException(code=404)
    try:
        load_payload = json.loads(payload)
    except Exception:
        raise HttpException(code=404)
    task_id = load_payload.get('task_id')
    if not task_id:
        raise HttpException(code=404)

    await db.execute(
        update_publish_logs_sql.format(publishStatus=2, taskID=task_id)
    )


async def _query_device_info(device_id, device_username):
    print(f'######### _query_device_info {device_id} {device_username} ############')


    if not device_id or not device_username:
        raise HttpException(code=404, field='devices')
    filter_username_sql = """ AND devices.`deviceUsername` = '{deviceUsername}' LIMIT 1 """
    device_query_sql = query_base_devices_sql + filter_username_sql

    query_sql = device_query_sql.format(deviceID=device_id, deviceUsername=device_username)

    print(f'######### query_sql {query_sql} ############')

    query_result = await db.fetch(query_sql, '')
    print(f'######### query_result len {len(query_result)} ############')


    if not query_result:
        raise HttpException(404, field='device')


    print(f'######### query_result {query_result[0]} ############')
    # device_info = dict(query_result[0])

    device_info = dict()
    device_info["id"] = query_result[0][0]
    device_info["authType"] = query_result[0][1]
    device_info["deviceID"] = query_result[0][2]
    device_info["deviceUsername"] = query_result[0][3]
    device_info["token"] = query_result[0][4]
    device_info["productID"] = query_result[0][5]
    device_info["tenantID"] = query_result[0][6]
    device_info["protocol"] = query_result[0][7]

    return device_info

然后就可以实现数据库的更新了:

【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动,iot-manager,fly-iot,Python,物联网,http,数据库,emqx,webhook
【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动,iot-manager,fly-iot,Python,物联网,http,数据库,emqx,webhook

设备离线状态也可以更新

5,总结

使用webhook还是非常方便的。
可以快速的实现设备状态的更新。同时接搜到设备的在线消息。文章来源地址https://www.toymoban.com/news/detail-791879.html

到了这里,关于【fly-iot飞凡物联】(18):配置Emqx的webhook,编写http接口,完成设备状态的更新。显示在线/离线状态,异步插入数据库,使用supervisor启动的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Jenkins配置Gitee webhook 触发构建】

    目录 一、配置构建触发器 二、配置Gitee WebHooks 管理 三、测试构建 勾选“Gitee webhook 触发构建”。把后面的URL复制、保存、待在Gitee码云里配置Web Hook使用。(Gitee webhook 触发构建,需要在 Gitee webhook 中填写 URL: http://192.569.93.

    2024年02月12日
    浏览(50)
  • 宝塔webhook的使用及配置gitee完成代码自动更新教程

    1.服务端安装git 首先去宝塔终端查看是否有装git(一般默认是安装了的) 如果没有就自行安装一下,在终端输入安装指令 如果是Ubuntu系统使用 sudo apt-get install git 安装git 完成git账户信息写入 生成公钥,输入下面命令后回车三下 查看公钥,按顺序输入下面代码,最后一个是获

    2024年02月11日
    浏览(56)
  • gitlab配置webhook,commit message的时候校验提交的信息

    在 GitLab 中配置 Webhook 来调用 Java 接口以校验 commit 信息,是很多公司的一些要求,因为提交信息的规范化是必要的 GitLab中的拦截钩子(Interceptor Hook)是一个自定义的钩子(Hook),用于拦截和修改Git命令的输出。它的工作原理是在Git命令执行后,将命令的输出拦截下来,然后

    2024年02月09日
    浏览(42)
  • EMQX配置 用户名和密码认证方式连接

    1, 找到mqtt 的 /etc/plugins/ 文件夹下的emqx_auth_mnesia.conf 文件 2 ,vim打开编辑该文件 根据例子添加账号密码 并保存 3, 打开EMQX后台, 找到插件 ,启动该插件, 随后就可以了 4,修改etc/emqx.conf 配置文件 如下 下面是禁用匿名连接,禁用之后, 配置的 账号密码才能生效 vim 查找 /allow_anonymo

    2024年02月11日
    浏览(54)
  • emqx 配置ssl/tls 双向认证(亲自测试有效)

    bash脚本,生成自签名ca、服务端、客户端的key和证书 openssl.cnf配置文件 验证证书是否有效 将证书文件拷贝到emqxetccerts目录下(默认目录),并修改配置文件emqx.conf。SSL/TLS 双向连接的启用及验证 mqttx连接验证 出现连接成功,代表测试无问题  

    2024年03月11日
    浏览(41)
  • aliyun服务器(Linux)安装emqx,配置ssl证书

    EMQX版本:5.0.8 操作系统及版本:Ubuntu 20.04.1 云服务器:阿里云轻量应用服务器 所用软件:WinSCP、XShell、宝塔面板、MQTTX 其他 食用本文的前提:服务器已经购买,相关基础配置已经完备,域名已经备案,域名与IP已经绑定。 文章末尾会提供所用到的软件 小破站找到的emqx安装

    2024年02月09日
    浏览(65)
  • Docker安装emqx详解(配置SSL证书、开启WSS、鉴权)

    EMQX 是一款大规模可弹性伸缩的云原生分布式物联网 MQTT 消息服务器。 1 端口介绍 1883:MQTT 协议端口 8084:MQTT/SSL 端口 8083:MQTT/WebSocket 端口 8080:HTTP API 端口 18083:Dashboard 管理控制台端口 2 拉取镜像 3 启动临时容器 其他小知识 选项 选项简写 说明 –detach -d 在后台运行容器,

    2023年04月16日
    浏览(45)
  • 【物联网那些事儿】18 大物联网操作系统,Raspbian Pi、Ubuntu Core、Windows 10 IoT、Micropython、OpenWrt ....你用过哪几个?

    如果您查看我们周围所有连接的嵌入式系统,您会意识到物联网操作系统是确保安全、连接、互操作性、网络和存储等一系列任务的关键。 Raspbian Pi、Ubuntu Core、FreeRTOS、RIOT、Tizen、Micropython 是一些最流行的物联网设备操作系统。下面让我们看看具体他们都有什么特点。 操作

    2024年01月20日
    浏览(46)
  • Flying HTML生成PDF添加水印

    HTML转PDF并添加水印

    2024年01月22日
    浏览(45)
  • IOT-Reaserch安装ghidra以及IDEA和ghidra的配置

    IOT自带的java是符合要求的,不需要额外下载 屁,当我没说,我做到后面现在又来补前面的,没有java17 要重新下载找到对应版本的java17  https://adoptium.net/zh-CN/temurin/releases/ 或者直接sudo  Ubuntu 22.04、20.04、18.04上安装OpenJDK 17 - 知乎 (zhihu.com) 如何切换Java版本 在主文件夹中的 .b

    2024年02月22日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包