Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录

这篇具有很好参考价值的文章主要介绍了Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

之前的博客写了关于Handsfree_ros_imu:ROS机器人IMU模块ARHS姿态传感器(A9)Liunx系统Ubuntu20.04学习启动和运行教程:

https://blog.csdn.net/qq_54900679/article/details/135539176?spm=1001.2014.3001.5502

与Handsfree_ros_imu:ROS机器人IMU模块的get_imu_rpy.py文件学习记录:

https://blog.csdn.net/qq_54900679/article/details/135550752?spm=1001.2014.3001.5502

 

这次带来hfi_a9.py文件的学习与数据记录改进:

hfi_a9.py文件位置如下:

Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录,Aloha,Python,ROS,机器人,学习,python

 对应的代码如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import serial
import struct
import platform
import serial.tools.list_ports
import math


# 查找 ttyUSB* 设备
def find_ttyUSB():
    print('imu 默认串口为 /dev/ttyUSB0, 若识别多个串口设备, 请在 launch 文件中修改 imu 对应的串口')
    posts = [port.device for port in serial.tools.list_ports.comports() if 'USB' in port.device]
    print('当前电脑所连接的 {} 串口设备共 {} 个: {}'.format('USB', len(posts), posts))


# crc 校验
def checkSum(list_data, check_data):
    data = bytearray(list_data)
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for i in range(8):
            if (crc & 1) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return hex(((crc & 0xff) << 8) + (crc >> 8)) == hex(check_data[0] << 8 | check_data[1])


# 16 进制转 ieee 浮点数
def hex_to_ieee(raw_data):
    ieee_data = []
    raw_data.reverse()
    for i in range(0, len(raw_data), 4):
        data2str =hex(raw_data[i] | 0xff00)[4:6] + hex(raw_data[i + 1] | 0xff00)[4:6] + hex(raw_data[i + 2] | 0xff00)[4:6] + hex(raw_data[i + 3] | 0xff00)[4:6]
        if python_version == '2':
            ieee_data.append(struct.unpack('>f', data2str.decode('hex'))[0])
        if python_version == '3':
            ieee_data.append(struct.unpack('>f', bytes.fromhex(data2str))[0])
    ieee_data.reverse()
    return ieee_data


# 处理串口数据
def handleSerialData(raw_data):
    global buff, key, angle_degree, magnetometer, acceleration, angularVelocity, pub_flag
    if python_version == '2':
        buff[key] = ord(raw_data)
    if python_version == '3':
        buff[key] = raw_data

    key += 1
    if buff[0] != 0xaa:
        key = 0
        return
    if key < 3:
        return
    if buff[1] != 0x55:
        key = 0
        return
    if key < buff[2] + 5:  # 根据数据长度位的判断, 来获取对应长度数据
        return

    else:
        data_buff = list(buff.values())  # 获取字典所以 value

        if buff[2] == 0x2c and pub_flag[0]:
            if checkSum(data_buff[2:47], data_buff[47:49]):
                data = hex_to_ieee(data_buff[7:47])
                angularVelocity = data[1:4]
                acceleration = data[4:7]
                magnetometer = data[7:10]
            else:
                print('校验失败')
            pub_flag[0] = False
        elif buff[2] == 0x14 and pub_flag[1]:
            if checkSum(data_buff[2:23], data_buff[23:25]):
                data = hex_to_ieee(data_buff[7:23])
                angle_degree = data[1:4]
            else:
                print('校验失败')
            pub_flag[1] = False
        else:
            print("该数据处理类没有提供该 " + str(buff[2]) + " 的解析")
            print("或数据错误")
            buff = {}
            key = 0

        buff = {}
        key = 0
        if pub_flag[0] == True or pub_flag[1] == True:
            return
        pub_flag[0] = pub_flag[1] = True
        acc_k = math.sqrt(acceleration[0] ** 2 + acceleration[1] ** 2 + acceleration[2] ** 2)

        print('''
加速度(m/s²):
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f

角速度(rad/s):
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f

欧拉角(°):
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f

磁场:
    x轴:%.2f
    y轴:%.2f
    z轴:%.2f
''' % (acceleration[0] * -9.8 / acc_k, acceleration[1] * -9.8 / acc_k, acceleration[2] * -9.8 / acc_k,
       angularVelocity[0], angularVelocity[1], angularVelocity[2],
       angle_degree[0], angle_degree[1], angle_degree[2],
       magnetometer[0], magnetometer[1], magnetometer[2]
      ))
       


key = 0
flag = 0
buff = {}
angularVelocity = [0, 0, 0]
acceleration = [0, 0, 0]
magnetometer = [0, 0, 0]
angle_degree = [0, 0, 0]
pub_flag = [True, True]


if __name__ == "__main__":
    python_version = platform.python_version()[0]

    
    find_ttyUSB()
    port = "/dev/ttyUSB0"
    baudrate = 921600

    try:
        hf_imu = serial.Serial(port=port, baudrate=baudrate, timeout=0.5)
        if hf_imu.isOpen():
            print("\033[32m串口打开成功...\033[0m")
        else:
            hf_imu.open()
            print("\033[32m打开串口成功...\033[0m")
    except Exception as e:
        print(e)
        print("\033[31m串口打开失败\033[0m")
        exit(0)
    else:
        while True:
            try:
                buff_count = hf_imu.inWaiting()
            except Exception as e:
                print("exception:" + str(e))
                print("imu 失去连接,接触不良,或断线")
                exit(0)
            else:
                if buff_count > 0:
                    buff_data = hf_imu.read(buff_count)
                    for i in range(0, buff_count):
                        handleSerialData(buff_data[i])

这段Python代码主要用于处理与IMU(惯性测量单元)相关的串口通信。以下是对代码的主要功能和组件的详细分析:

  1. 导入库

    • serial:用于串口通信。
    • struct:解析打包的二进制数据。
    • platform:检测操作系统信息,用于判断Python版本。
    • serial.tools.list_ports:列出计算机的串口设备。
    • math:提供数学函数,例如开方。
  2. 查找ttyUSB设备(find_ttyUSB函数)

    • 打印默认IMU串口设备(/dev/ttyUSB0),提示用户在识别多个设备时需要在配置文件中修改。
    • 列出连接到计算机的所有USB串口设备。
  3. CRC校验(checkSum函数)

    • 使用CRC16校验算法来验证数据的完整性。
  4. 16进制转IEEE浮点数(hex_to_ieee函数)

    • 将16进制数据转换为IEEE标准的浮点数。
    • 对Python 2和3使用不同的方法处理数据。
  5. 处理串口数据(handleSerialData函数)

    • 解析串口接收到的原始数据。
    • 检查数据的起始位和数据长度。
    • 根据数据类型(角速度、加速度、磁场、欧拉角)进行处理。
    • 使用CRC校验判断数据是否有效。
    • 计算并打印加速度、角速度、欧拉角和磁场的值。
  6. 全局变量

    • 定义用于存储数据和标记的全局变量。
  7. 主函数(if __name__ == "__main__"

    • 检测Python版本。
    • 查找并设置串口。
    • 循环读取串口数据并调用handleSerialData函数处理。

这段代码主要用于从IMU设备接收数据,并通过串口将其转换为可用的浮点数,用于进一步的数据分析或应用。代码在兼容性方面对Python 2和3都进行了考虑,并且具有较强的错误处理和数据校验功能。

改进部分

        要将角速度、加速度和磁场数据保存到文本文件或CSV文件中,可以在上述代码的基础上进行一些修改。除了保存到对应路径下的文件中,还要考虑实时地保存数据这个要求,从开始保存数据到结束保存数据的时间是可以自定义的,比如要实现保存3秒时间段内实时的数据。

改进后的代码(imu_data_a9_record.py)如下:

import serial
import struct
import time
import platform
import serial.tools.list_ports
import math

# 其他必要的导入模块和全局变量

def find_ttyUSB():
    print('imu 默认串口为 /dev/ttyUSB0, 若识别多个串口设备, 请在 launch 文件中修改 imu 对应的串口')
    posts = [port.device for port in serial.tools.list_ports.comports() if 'USB' in port.device]
    print('当前电脑所连接的 {} 串口设备共 {} 个: {}'.format('USB', len(posts), posts))

# crc 校验
def checkSum(list_data, check_data):
    data = bytearray(list_data)
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for i in range(8):
            if (crc & 1) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return hex(((crc & 0xff) << 8) + (crc >> 8)) == hex(check_data[0] << 8 | check_data[1])

# 16 进制转 ieee 浮点数
def hex_to_ieee(raw_data):
    ieee_data = []
    raw_data.reverse()
    for i in range(0, len(raw_data), 4):
        data2str = hex(raw_data[i] | 0xff00)[4:6] + hex(raw_data[i + 1] | 0xff00)[4:6] + hex(
            raw_data[i + 2] | 0xff00)[4:6] + hex(raw_data[i + 3] | 0xff00)[4:6]
        if python_version == '2':
            ieee_data.append(struct.unpack('>f', data2str.decode('hex'))[0])
        if python_version == '3':
            ieee_data.append(struct.unpack('>f', bytes.fromhex(data2str))[0])
    ieee_data.reverse()
    return ieee_data

# 处理串口数据
def handleSerialData(raw_data):
    global buff, key, angle_degree, magnetometer, acceleration, angularVelocity, pub_flag
    if python_version == '2':
        buff[key] = ord(raw_data)
    if python_version == '3':
        buff[key] = raw_data

    key += 1
    if buff[0] != 0xaa:
        key = 0
        return
    if key < 3:
        return
    if buff[1] != 0x55:
        key = 0
        return
    if key < buff[2] + 5:  # 根据数据长度位的判断, 来获取对应长度数据
        return
    else:
        data_buff = list(buff.values())  # 获取字典所有 value

        if buff[2] == 0x2c and pub_flag[0]:
            if checkSum(data_buff[2:47], data_buff[47:49]):
                data = hex_to_ieee(data_buff[7:47])
                angularVelocity = data[1:4]
                acceleration = data[4:7]
                magnetometer = data[7:10]
            else:
                print('校验失败')
            pub_flag[0] = False
        elif buff[2] == 0x14 and pub_flag[1]:
            if checkSum(data_buff[2:23], data_buff[23:25]):
                data = hex_to_ieee(data_buff[7:23])
                angle_degree = data[1:4]
            else:
                print('校验失败')
            pub_flag[1] = False
        else:
            print("该数据处理类没有提供该 " + str(buff[2]) + " 的解析")
            print("或数据错误")
            buff = {}
            key = 0

        buff = {}
        key = 0
        if pub_flag[0] == True or pub_flag[1] == True:
            return
        pub_flag[0] = pub_flag[1] = True
        acc_k = math.sqrt(acceleration[0] ** 2 + acceleration[1] ** 2 + acceleration[2] ** 2)

#         print('''
# 加速度(m/s²):
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
#
# 角速度(rad/s):
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
#
# 欧拉角(°):
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
#
# 磁场:
#     x轴:%.2f
#     y轴:%.2f
#     z轴:%.2f
# ''' % (acceleration[0] * -9.8 / acc_k, acceleration[1] * -9.8 / acc_k, acceleration[2] * -9.8 / acc_k,
#        angularVelocity[0], angularVelocity[1], angularVelocity[2],
#        angle_degree[0], angle_degree[1], angle_degree[2],
#        magnetometer[0], magnetometer[1], magnetometer[2]
#       ))

key = 0
flag = 0
buff = {}
angularVelocity = [0, 0, 0]
acceleration = [0, 0, 0]
magnetometer = [0, 0, 0]
angle_degree = [0, 0, 0]
pub_flag = [True, True]

if __name__ == "__main__":
    python_version = platform.python_version()[0]

    find_ttyUSB()
    port = "/dev/ttyUSB0"
    baudrate = 921600

    try:
        hf_imu = serial.Serial(port=port, baudrate=baudrate, timeout=0.5)
        if hf_imu.isOpen():
            print("\033[32m串口打开成功...\033[0m")
        else:
            hf_imu.open()
            print("\033[32m打开串口成功...\033[0m")
    except Exception as e:
        print(e)
        print("\033[31m串口打开失败\033[0m")
        exit(0)
    else:
        # 定义保存数据的文件名和持续时间
        file_name = '/home/hjx/handsfree/imu_data_record/hfi_a9_timer/imu_data_a9_timer.csv'  # 可以根据需要更改文件名和路径
        duration = 3  # 保存3秒数据,可根据需要调整

        start_time = time.time()
        end_time = start_time + duration

        # 在保存数据之前,添加一个标题行
        with open(file_name, 'w') as file:
            file.write(
                'Timestamp,X_Acceleration,Y_Acceleration,Z_Acceleration,X_AngularVelocity,Y_AngularVelocity,Z_AngularVelocity,Euler_X,Euler_Y,Euler_Z,X_Magnetometer,Y_Magnetometer,Z_Magnetometer\n')

        while True:
            try:
                buff_count = hf_imu.inWaiting()
            except Exception as e:
                print("exception:" + str(e))
                print("imu 失去连接,接触不良,或断线")
                exit(0)
            else:
                if buff_count > 0:
                    buff_data = hf_imu.read(buff_count)
                    for i in range(0, buff_count):
                        handleSerialData(buff_data[i])

                current_time = time.time()
                if current_time >= end_time:
                    # 保存数据并退出循环
                    with open(file_name, 'a') as file:
                        file.write(','.join(map(str, [current_time, acceleration[0], acceleration[1], acceleration[2],
                                                      angularVelocity[0], angularVelocity[1], angularVelocity[2],
                                                      angle_degree[0], angle_degree[1], angle_degree[2],
                                                      magnetometer[0], magnetometer[1], magnetometer[2]])) + '\n')
                    break
                else:
                    # 保存数据并继续接收和处理
                    with open(file_name, 'a') as file:
                        file.write(','.join(map(str, [current_time, acceleration[0], acceleration[1], acceleration[2],
                                                      angularVelocity[0], angularVelocity[1], angularVelocity[2],
                                                      angle_degree[0], angle_degree[1], angle_degree[2],
                                                      magnetometer[0], magnetometer[1], magnetometer[2]])) + '\n')

 下面给出以上修改后的代码解析:

  1. 导入模块

    • serial:用于处理串口通信。
    • struct:用于处理二进制数据。
    • time:用于处理时间相关的功能。
    • platform:用于获取操作系统信息。
    • serial.tools.list_ports:用于列出系统中的串口设备。
    • math:用于数学运算。
  2. 函数定义

    • find_ttyUSB:检测连接到电脑的USB串口设备,并打印设备列表。
    • checkSum:执行CRC校验,验证数据的完整性。
    • hex_to_ieee:将16进制数据转换为IEEE浮点数。
    • handleSerialData:处理从串口接收到的原始数据。
  3. 主要逻辑

    • 初始化一些全局变量。
    • __main__部分,脚本首先检测Python版本,然后搜索串口设备,并尝试打开特定的串口(默认为/dev/ttyUSB0)。
    • 如果串口打开成功,脚本会记录IMU数据到指定文件中(默认路径为/home/hjx/handsfree/imu_data_record/hfi_a9_timer/imu_data_a9_timer.csv)。
    • 数据记录包括加速度、角速度、欧拉角和磁场信息。
    • 脚本会在指定时间(默认3秒)后停止记录数据。
  4. 数据处理

    • handleSerialData函数接收串口数据,根据特定的格式解析数据,包括加速度、角速度、欧拉角和磁场等信息。
    • 进行CRC校验确保数据完整性,然后将数据转换为浮点数进行记录。
  5. 异常处理

    • 在尝试打开串口或处理数据时,脚本包含异常处理逻辑,以确保在出现错误时能够优雅地处理。

        总之,这个脚本主要用于通过串口从IMU设备收集数据,并将其转换为可用的格式进行分析和记录。

配置好conda的环境和ros包的路径后,开始在pycharm中运行:

其中  duration = 3  # 保存3秒数据,可根据需要调整

Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录,Aloha,Python,ROS,机器人,学习,python

查看经过3秒后保存好的csv文件:

Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录,Aloha,Python,ROS,机器人,学习,python

    

Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录,Aloha,Python,ROS,机器人,学习,python文章来源地址https://www.toymoban.com/news/detail-813506.html

到了这里,关于Handsfree_ros_imu:ROS机器人IMU模块的hfi_a9.py文件学习记录的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【ROS2机器人入门到实战】2.ROS与ROS2对比

    当前平台文章汇总地址:ROS2机器人从入门到实战 获取完整教程及配套资料代码,请关注公众号鱼香ROS获取 教程配套机器人开发平台:两驱版| 四驱版 为方便交流,搭建了机器人技术问答社区:地址 fishros.org.cn 经过上一节的学习,相信你已经对ROS和ROS2的发展有了一定的了解

    2024年02月04日
    浏览(44)
  • ROS入门21讲 | ROS机器人入门教程 【简明笔记】

    古月·ROS入门21讲 | 一学就会的ROS机器人入门教程 ROS = 通信机制+开发工具+应用功能+生态系统 通信机制 : Node :完成具体功能的进程、独立运行的可执行文件。可用多种语言py、c++。节点在系统中的名称唯一。 ROS Master :为节点提供命名注册服务;跟踪和记录话题、服务通信

    2024年02月09日
    浏览(62)
  • ROS的机器人协议:实现机器人之间的有效通信

    作者:禅与计算机程序设计艺术 ROS(Robot Operating System)是一个开源的机器人操作系统,其功能主要包括以下几个方面: 消息传递:ROS通过消息传递的方式进行通信,各个节点之间可以通过发布、订阅等方式互相通讯。 资源管理:ROS可以对进程、线程、资源进行管理,使得不

    2024年02月06日
    浏览(42)
  • 【ROS 06】机器人系统仿真

    对于ROS新手而言,可能会有疑问:学习机器人操作系统,实体机器人是必须的吗?答案是否定的,机器人一般价格不菲,为了降低机器人学习、调试成本,在ROS中提供了系统的机器人仿真实现,通过仿真,可以实现大部分需求,本章主要就是围绕“仿真”展开的,比如,本章会

    2024年02月09日
    浏览(43)
  • ROS机器人入门-环境搭建

    ROS  是机器人操作系统(Robot Operating System)的英文缩写。  ROS  是用于编写机器人软件程序的一种具有高度灵活性的软件架构。 ROS 图标  : ROS  是开源的,是用于机器人控制的一种后操作系统,或者说次级操作系统。它提供类似操作系统所提供的功能,包含硬件抽象描述、

    2024年02月08日
    浏览(60)
  • ROS高效进阶第五章 -- 机器人语音交互之ros集成科大讯飞中文语音库,实现语音控制机器人小车

    从本文开始,我们将用两篇文章学习机器人语音交互。本文作为第一篇,将在ros上集成科大讯飞的中文语音库,实现语音控制机器人小车运动。至于语音识别和语音合成的原理,本文并不深究,读者可以自行搜索相关的文章介绍。这里提醒,本文的测试环境是ubuntu20.04 + ros

    2024年02月04日
    浏览(79)
  • 【ROS2机器人入门到实战】ROS2话题入门

    当前平台文章汇总地址:ROS2机器人从入门到实战 获取完整教程及配套资料代码,请关注公众号鱼香ROS获取 教程配套机器人开发平台:两驱版| 四驱版 为方便交流,搭建了机器人技术问答社区:地址 fishros.org.cn 话题是ROS2中最常用的通信方式之一,话题通信采取的是订阅发布

    2024年02月04日
    浏览(61)
  • 【ROS2机器人入门到实战】ROS2服务入门

    当前平台文章汇总地址:ROS2机器人从入门到实战 获取完整教程及配套资料代码,请关注公众号鱼香ROS获取 教程配套机器人开发平台:两驱版| 四驱版 为方便交流,搭建了机器人技术问答社区:地址 fishros.org.cn 大家好,帅鱼又蹬蹬蹬的游回来了。本节小鱼将要带大家一起了解

    2024年02月07日
    浏览(55)
  • 【ROS2机器人入门到实战】ROS2节点介绍

    当前平台文章汇总地址:ROS2机器人从入门到实战 获取完整教程及配套资料代码,请关注公众号鱼香ROS获取 教程配套机器人开发平台:两驱版| 四驱版 为方便交流,搭建了机器人技术问答社区:地址 fishros.org.cn ROS2中每一个节点也是只负责一个单独的模块化的功能(比如一个

    2024年02月06日
    浏览(49)
  • 【ROS2机器人入门到实战】ROS2接口介绍

    当前平台文章汇总地址:ROS2机器人从入门到实战 获取完整教程及配套资料代码,请关注公众号鱼香ROS获取 教程配套机器人开发平台:两驱版| 四驱版 为方便交流,搭建了机器人技术问答社区:地址 fishros.org.cn 本节小鱼将会带你学习认识一个新的概念,叫做interface,即接口。

    2024年02月05日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包