多目标跟踪MOT:Deepocsort+YOLOv8+OSnet实战(代码讲解)

这篇具有很好参考价值的文章主要介绍了多目标跟踪MOT:Deepocsort+YOLOv8+OSnet实战(代码讲解)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.下载官方代码

mikel-brostrom/yolov8_tracking: Real-time multi-object tracking and segmentation using YOLOv8 with DeepOCSORT and OSNet (github.com)https://github.com/mikel-brostrom/yolov8_tracking

1.到上面的github网站下载跟踪代码,该代码使用YOLOv8作为检测网络,不过YOLOv8的文件需要去下面的链接下载,然后替换掉跟踪代码中YOLOv8的空文件。

GitHub - ultralytics/ultralytics at 15b3b0365ab2f12993a58985f3cb7f2137409a0cNEW - YOLOv8 🚀 in PyTorch > ONNX > CoreML > TFLite - GitHub - ultralytics/ultralytics at 15b3b0365ab2f12993a58985f3cb7f2137409a0chttps://github.com/ultralytics/ultralytics/tree/15b3b0365ab2f12993a58985f3cb7f2137409a0c2.该跟踪网络可以选择自行选择BoT-sort、Bytetrack、Deepocsort、Ocsort和Strongsort作为跟踪网络。我自己使用下来感觉Deepocsort和Bytetrack效果最好。

3.里面还有一个重识别网络也叫Reid网络,是用来匹配轨迹和检测框中国物体的外观特征的。里面的权重模型在下面的链接进行下载。其中market1501是清华大学的行人数据集,我用了之后发现效果不好(可能是数据集太老的原因)。msmt17数据集效果很不错,推荐下该数据集的预训练模型。模型越大,特征提取效果越好,一般使用osnet也够用,如果想要效果更好就推荐Resnet50,或者自己用更好的图像分类网络在msmt17上预训练。Model Zoo — torchreid 1.4.0 documentationhttps://kaiyangzhou.github.io/deep-person-reid/MODEL_ZOO

2.运行代码

其实github中有给出使用的方法,我在这里就不多说了,主要讲一下需要注意的地方。

1.安装需要的包

pip install -r requirements.txt

2.需要安装一个单独的包,不然后面会报错

pip install lap

3.将download.py文件第156行的verify改为False

不然代码会自动访问google的库下载权重文件

4.修改yolov8_tracking-master/trackers/strongsort/deep/models

/osnet.py中第483行的权重文件地址。

要先把reid网络的权重文件下载下来,修改成对应地址

5.将MOT数据集中的图片合成为视频,再进行检测

因为MOT数据集中全是一帧一帧的图片,所以我们先要将它整合成视频。代码为

import os

import cv2

video_writer = cv2.VideoWriter("result.mp4", cv2.VideoWriter_fourcc(*"mp4v"), 30, (1920, 1080))
images_path = "..."
images_list = os.listdir(images_path)
images_list.sort()

for image_name in images_list:
    image = cv2.imread(os.path.join(images_path, image_name))
    video_writer.write(image)
    show = cv2.resize(image, (1280, 720))
    cv2.imshow("test", show)
    if cv2.waitKey(10) != ord('q'):
        pass

6.最后在终端输入命令就可以运行起来了 

7.Deepocsort代码讲解

因为我一开始完全看不懂代码,自己摸索真的很累,所以我想能够帮助需要的人来快速理解代码,重要部分我都已经打上注释,还有不理解的地方可以在评论区留言。

"""
    This script is adopted from the SORT script by Alex Bewley alex@bewley.ai
"""
from __future__ import print_function

import pdb
import pickle

import cv2
import torch
import torchvision

import numpy as np
from .association import *
from .embedding import EmbeddingComputer
from .cmc import CMCComputer
from reid_multibackend import ReIDDetectMultiBackend


def k_previous_obs(observations, cur_age, k):
    if len(observations) == 0:
        # 若轨迹的观测数为0
        return [-1, -1, -1, -1, -1]
    for i in range(k):
        dt = k - i
        if cur_age - dt in observations:
            return observations[cur_age - dt]
    max_age = max(observations.keys())
    return observations[max_age]


def convert_bbox_to_z(bbox):
    """
    Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
      [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
      the aspect ratio
    """
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w / 2.0
    y = bbox[1] + h / 2.0
    s = w * h  # scale is just area
    r = w / float(h + 1e-6)
    return np.array([x, y, s, r]).reshape((4, 1))


def convert_bbox_to_z_new(bbox):
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w / 2.0
    y = bbox[1] + h / 2.0
    return np.array([x, y, w, h]).reshape((4, 1))


def convert_x_to_bbox_new(x):
    x, y, w, h = x.reshape(-1)[:4]
    return np.array([x - w / 2, y - h / 2, x + w / 2, y + h / 2]).reshape(1, 4)


def convert_x_to_bbox(x, score=None):
    """
    Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
      [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
    """
    w = np.sqrt(x[2] * x[3])
    h = x[2] / w
    if score == None:
        return np.array([x[0] - w / 2.0, x[1] - h / 2.0, x[0] + w / 2.0, x[1] + h / 2.0]).reshape((1, 4))
    else:
        return np.array([x[0] - w / 2.0, x[1] - h / 2.0, x[0] + w / 2.0, x[1] + h / 2.0, score]).reshape((1, 5))


def speed_direction(bbox1, bbox2):
    cx1, cy1 = (bbox1[0] + bbox1[2]) / 2.0, (bbox1[1] + bbox1[3]) / 2.0
    cx2, cy2 = (bbox2[0] + bbox2[2]) / 2.0, (bbox2[1] + bbox2[3]) / 2.0
    speed = np.array([cy2 - cy1, cx2 - cx1])
    norm = np.sqrt((cy2 - cy1) ** 2 + (cx2 - cx1) ** 2) + 1e-6
    return speed / norm


def new_kf_process_noise(w, h, p=1 / 20, v=1 / 160):
    Q = np.diag(
        ((p * w) ** 2, (p * h) ** 2, (p * w) ** 2, (p * h) ** 2, (v * w) ** 2, (v * h) ** 2, (v * w) ** 2, (v * h) ** 2)
    )
    return Q


def new_kf_measurement_noise(w, h, m=1 / 20):
    w_var = (m * w) ** 2
    h_var = (m * h) ** 2
    R = np.diag((w_var, h_var, w_var, h_var))
    return R


class KalmanBoxTracker(object):
    """
    This class represents the internal state of individual tracked objects observed as bbox.
    """

    count = 0

    def __init__(self, bbox, cls, delta_t=3, orig=False, emb=None, alpha=0, new_kf=False):
        """
        Initialises a tracker using initial bounding box.

        """
        # define constant velocity model
        if not orig:
            from .kalmanfilter import KalmanFilterNew as KalmanFilter
        else:
            from filterpy.kalman import KalmanFilter
        self.cls = cls
        self.conf = bbox[-1]
        self.new_kf = new_kf
        if new_kf:
            self.kf = KalmanFilter(dim_x=8, dim_z=4)
            self.kf.F = np.array(
                [
                    # x y w h x' y' w' h'
                    [1, 0, 0, 0, 1, 0, 0, 0],
                    [0, 1, 0, 0, 0, 1, 0, 0],
                    [0, 0, 1, 0, 0, 0, 1, 0],
                    [0, 0, 0, 1, 0, 0, 0, 1],
                    [0, 0, 0, 0, 1, 0, 0, 0],
                    [0, 0, 0, 0, 0, 1, 0, 0],
                    [0, 0, 0, 0, 0, 0, 1, 0],
                    [0, 0, 0, 0, 0, 0, 0, 1],
                ]
            )
            self.kf.H = np.array(
                [
                    [1, 0, 0, 0, 0, 0, 0, 0],
                    [0, 1, 0, 0, 0, 0, 0, 0],
                    [0, 0, 1, 0, 0, 0, 0, 0],
                    [0, 0, 0, 1, 0, 0, 0, 0],
                ]
            )
            _, _, w, h = convert_bbox_to_z_new(bbox).reshape(-1)
            self.kf.P = new_kf_process_noise(w, h)
            self.kf.P[:4, :4] *= 4
            self.kf.P[4:, 4:] *= 100
            # Process and measurement uncertainty happen in functions
            self.bbox_to_z_func = convert_bbox_to_z_new
            self.x_to_bbox_func = convert_x_to_bbox_new
        else:
            self.kf = KalmanFilter(dim_x=7, dim_z=4)
            self.kf.F = np.array(
                [
                    # x  y  s  r  x' y' s'
                    [1, 0, 0, 0, 1, 0, 0],
                    [0, 1, 0, 0, 0, 1, 0],
                    [0, 0, 1, 0, 0, 0, 1],
                    [0, 0, 0, 1, 0, 0, 0],
                    [0, 0, 0, 0, 1, 0, 0],
                    [0, 0, 0, 0, 0, 1, 0],
                    [0, 0, 0, 0, 0, 0, 1],
                ]
            )
            self.kf.H = np.array(
                [
                    [1, 0, 0, 0, 0, 0, 0],
                    [0, 1, 0, 0, 0, 0, 0],
                    [0, 0, 1, 0, 0, 0, 0],
                    [0, 0, 0, 1, 0, 0, 0],
                ]
            )
            self.kf.R[2:, 2:] *= 10.0
            self.kf.P[4:, 4:] *= 1000.0  # give high uncertainty to the unobservable initial velocities
            self.kf.P *= 10.0
            self.kf.Q[-1, -1] *= 0.01
            self.kf.Q[4:, 4:] *= 0.01
            self.bbox_to_z_func = convert_bbox_to_z
            self.x_to_bbox_func = convert_x_to_bbox

        self.kf.x[:4] = self.bbox_to_z_func(bbox)

        self.time_since_update = 0
        # 每有一个卡尔曼轨迹id就加一
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0
        """
        NOTE: [-1,-1,-1,-1,-1] is a compromising placeholder for non-observation status, the same for the return of 
        function k_previous_obs. It is ugly and I do not like it. But to support generate observation array in a 
        fast and unified way, which you would see below k_observations = np.array([k_previous_obs(...]]), let's bear it for now.
        """
        # Used for OCR
        self.last_observation = np.array([-1, -1, -1, -1, -1])  # placeholder
        # Used to output track after min_hits reached
        self.history_observations = []
        # Used for velocity
        self.observations = dict()
        self.velocity = None
        self.delta_t = delta_t

        self.emb = emb

        self.frozen = False

    def update(self, bbox, cls):
        """
        Updates the state vector with observed bbox.
        """
        if bbox is not None:
            self.frozen = False
            self.cls = cls
            if self.last_observation.sum() >= 0:  # no previous observation
                previous_box = None
                for dt in range(self.delta_t, 0, -1):
                    if self.age - dt in self.observations:
                        previous_box = self.observations[self.age - dt]
                        break
                if previous_box is None:
                    previous_box = self.last_observation
                """
                  Estimate the track speed direction with observations \Delta t steps away
                """
                self.velocity = speed_direction(previous_box, bbox)
            """
              Insert new observations. This is a ugly way to maintain both self.observations
              and self.history_observations. Bear it for the moment.
            """
            self.last_observation = bbox
            self.observations[self.age] = bbox
            self.history_observations.append(bbox)

            self.time_since_update = 0
            self.history = []
            self.hits += 1
            self.hit_streak += 1
            if self.new_kf:
                R = new_kf_measurement_noise(self.kf.x[2, 0], self.kf.x[3, 0])
                self.kf.update(self.bbox_to_z_func(bbox), R=R)
            else:
                self.kf.update(self.bbox_to_z_func(bbox))
        else:
            self.kf.update(bbox)
            self.frozen = True

    def update_emb(self, emb, alpha=0.9):
        self.emb = alpha * self.emb + (1 - alpha) * emb
        self.emb /= np.linalg.norm(self.emb)

    def get_emb(self):
        return self.emb.cpu()

    def apply_affine_correction(self, affine):
        m = affine[:, :2]
        t = affine[:, 2].reshape(2, 1)
        # For OCR
        if self.last_observation.sum() > 0:
            ps = self.last_observation[:4].reshape(2, 2).T
            ps = m @ ps + t
            self.last_observation[:4] = ps.T.reshape(-1)

        # Apply to each box in the range of velocity computation
        for dt in range(self.delta_t, -1, -1):
            if self.age - dt in self.observations:
                ps = self.observations[self.age - dt][:4].reshape(2, 2).T
                ps = m @ ps + t
                self.observations[self.age - dt][:4] = ps.T.reshape(-1)

        # Also need to change kf state, but might be frozen
        self.kf.apply_affine_correction(m, t, self.new_kf)

    def predict(self):
        """
        Advances the state vector and returns the predicted bounding box estimate.
        """
        # Don't allow negative bounding boxes
        if self.new_kf:
            if self.kf.x[2] + self.kf.x[6] <= 0:
                self.kf.x[6] = 0
            if self.kf.x[3] + self.kf.x[7] <= 0:
                self.kf.x[7] = 0

            # Stop velocity, will update in kf during OOS
            if self.frozen:
                self.kf.x[6] = self.kf.x[7] = 0
            Q = new_kf_process_noise(self.kf.x[2, 0], self.kf.x[3, 0])
        else:
            if (self.kf.x[6] + self.kf.x[2]) <= 0:
                self.kf.x[6] *= 0.0
            Q = None

        self.kf.predict(Q=Q)
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(self.x_to_bbox_func(self.kf.x))
        return self.history[-1]

    def get_state(self):
        """
        Returns the current bounding box estimate.
        """
        return self.x_to_bbox_func(self.kf.x)

    def mahalanobis(self, bbox):
        """Should be run after a predict() call for accuracy."""
        return self.kf.md_for_measurement(self.bbox_to_z_func(bbox))


"""
    We support multiple ways for association cost calculation, by default
    we use IoU. GIoU may have better performance in some situations. We note 
    that we hardly normalize the cost by all methods to (0,1) which may not be 
    the best practice.
"""
ASSO_FUNCS = {
    "iou": iou_batch,
    "giou": giou_batch,
    "ciou": ciou_batch,
    "diou": diou_batch,
    "ct_dist": ct_dist,
}


class OCSort(object):
    def __init__(
        self,
        # reid模型的权重文件
        model_weights,
        device,
        # 是否需要半精度
        fp16,
        # 检测框的置信度阈值
        det_thresh,
        # 轨迹的最大生命,轨迹每预测一次,
        # trk.time_since_update就加一,知道大于max_age后被删除
        max_age=30,
        # 轨道的最小连击值,卡尔曼滤波每更新一次,hit_streak就加一
        min_hits=3,
        # 检测框和轨迹之间IOU的最小阈值
        iou_threshold=0.3,
        delta_t=3,
        # 检测框和轨迹之间计算IOU的方式
        asso_func="iou",
        # vdc_weight权重
        inertia=0.2,
        w_association_emb=0.75,
        alpha_fixed_emb=0.95,
        aw_param=0.5,
        embedding_off=False,
        cmc_off=False,
        aw_off=False,
        new_kf_off=True,
        **kwargs
    ):
        """
        Sets key parameters for SORT
        """
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []
        self.frame_count = 0
        # 检测框的置信度阈值
        self.det_thresh = det_thresh
        self.delta_t = delta_t
        self.asso_func = ASSO_FUNCS[asso_func]
        self.inertia = inertia
        self.w_association_emb = w_association_emb
        self.alpha_fixed_emb = alpha_fixed_emb
        self.aw_param = aw_param
        # 初始化id为0
        KalmanBoxTracker.count = 0
        # embedder为reid提取出来的特征
        self.embedder = ReIDDetectMultiBackend(weights=model_weights, device=device, fp16=fp16)
        # CMC为累积匹配特性,用于评价reid的
        self.cmc = CMCComputer()
        self.embedding_off = embedding_off
        self.cmc_off = cmc_off
        self.aw_off = aw_off
        self.new_kf_off = new_kf_off

        self.s = 0

    def update(self, dets, img_numpy, tag='blub'):
        """
        Params:
          dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
        Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections).
        Returns the a similar array, where the last column is the object ID.
        NOTE: The number of objects returned may differ from the number of detections provided.
        """
        xyxys = dets[:, 0:4]
        scores = dets[:, 4]
        clss = dets[:, 5]
        
        classes = clss.numpy()
        xyxys = xyxys.numpy()
        scores = scores.numpy()
        
        dets = dets[:, 0:6].numpy()
        # 当检测框的置信度大于阈值时
        remain_inds = scores > self.det_thresh
        # 检测框dets为大于阈值的检测框
        dets = dets[remain_inds]
        self.height, self.width = img_numpy.shape[:2]

        # Rescale
        #scale = min(img_tensor.shape[2] / img_numpy.shape[0], img_tensor.shape[3] / img_numpy.shape[1])
        #dets[:, :4] /= scale

        # Embedding
        if self.embedding_off or dets.shape[0] == 0:
            # 如果不提取特征或没有超过阈值的检测框
            dets_embs = np.ones((dets.shape[0], 1))
        else:
            # (Ndets x X) [512, 1024, 2048]
            #dets_embs = self.embedder.compute_embedding(img_numpy, dets[:, :4], tag)
            # 输入为检测出来的框和原图的numpy
            # 输出为检测框对应原图的特征
            dets_embs = self._get_features(dets[:, :4], img_numpy)

        # CMC
        # CMC用来评估匹配成功的概率
        if not self.cmc_off:
            transform = self.cmc.compute_affine(img_numpy, dets[:, :4], tag)
            for trk in self.trackers:
                trk.apply_affine_correction(transform)
        # (检测框的score-检测框置信度阈值)/(1-检测框置信度阈值)
        trust = (dets[:, 4] - self.det_thresh) / (1 - self.det_thresh)
        af = self.alpha_fixed_emb
        # From [self.alpha_fixed_emb, 1], goes to 1 as detector is less confident
        dets_alpha = af + (1 - af) * (1 - trust)

        # get predicted locations from existing trackers.
        # 从现存的轨迹预测位置
        # 生成一个trackers行,5列的数组
        trks = np.zeros((len(self.trackers), 5))
        trk_embs = []
        to_del = []
        ret = []
        for t, trk in enumerate(trks):
            # 用轨迹去预测位置
            pos = self.trackers[t].predict()[0]
            # 用预测的位置放入生成的轨迹
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            # np.any()对矩阵所有元素做或运算,存在True则返回True
            # np.isnan(x)函数可以判断x是否为空值,然后输出布尔类型的变量
            if np.any(np.isnan(pos)):
                # 若预测的位置不存在,则添加到to_del中
                to_del.append(t)
            else:
                # 存在则将该轨迹的特征添加到轨迹特征列表中
                trk_embs.append(self.trackers[t].get_emb())
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))

        if len(trk_embs) > 0:
            # np.vstack():在竖直方向上堆叠
            # np.hstack():在水平方向上平铺
            # np.vstack()输入为元组,返回堆叠后的新数组
            trk_embs = np.vstack(trk_embs)
        else:
            trk_embs = np.array(trk_embs)

        for t in reversed(to_del):
            # 从轨迹中删除预测的位置为空的轨迹
            self.trackers.pop(t)

        # 如果轨迹的速度不是None,velocities为轨迹的速度
        velocities = np.array([trk.velocity if trk.velocity is not None else np.array((0, 0)) for trk in self.trackers])
        # 最新的框为轨迹的last_observation
        last_boxes = np.array([trk.last_observation for trk in self.trackers])
        k_observations = np.array([k_previous_obs(trk.observations, trk.age, self.delta_t) for trk in self.trackers])

        """
            First round of association
        """
        # (M detections X N tracks, final score)
        if self.embedding_off or dets.shape[0] == 0 or trk_embs.shape[0] == 0:
            stage1_emb_cost = None
        else:
            # @表示矩阵乘法
            # 检测框的特征乘轨迹的特征得到代价矩阵
            stage1_emb_cost = dets_embs @ trk_embs.T
        # 第一次匹配
        matched, unmatched_dets, unmatched_trks = associate(
            dets,
            trks,
            self.iou_threshold,
            velocities,
            k_observations,
            self.inertia,
            stage1_emb_cost,
            self.w_association_emb,
            self.aw_off,
            self.aw_param,
        )
        # m0是检测框,m1是轨迹
        for m in matched:
            # 更新该轨迹的检测框
            self.trackers[m[1]].update(dets[m[0], :5], dets[m[0], 5])
            # 更新该轨迹的人物特征
            self.trackers[m[1]].update_emb(dets_embs[m[0]], alpha=dets_alpha[m[0]])

        """
            Second round of associaton by OCR
        """
        # 第二次匹配
        # 如果还有剩下的检测框和轨迹没有匹配的,进行第二次匹配
        if unmatched_dets.shape[0] > 0 and unmatched_trks.shape[0] > 0:
            left_dets = dets[unmatched_dets]
            left_dets_embs = dets_embs[unmatched_dets]
            left_trks = last_boxes[unmatched_trks]
            left_trks_embs = trk_embs[unmatched_trks]

            # print(dets.shape) # (9, 6)
            # print(left_dets.shape) # (1, 6)
            # print(left_trks.shape) # (1, 5)
            iou_left = self.asso_func(left_dets, left_trks)
            # TODO: is better without this
            emb_cost_left = left_dets_embs @ left_trks_embs.T
            if self.embedding_off:
                emb_cost_left = np.zeros_like(emb_cost_left)
            iou_left = np.array(iou_left)
            # 如果留下的检测框和轨迹的iou大于阈值
            if iou_left.max() > self.iou_threshold:
                """
                NOTE: by using a lower threshold, e.g., self.iou_threshold - 0.1, you may
                get a higher performance especially on MOT17/MOT20 datasets. But we keep it
                uniform here for simplicity
                """
                rematched_indices = linear_assignment(-iou_left)
                to_remove_det_indices = []
                to_remove_trk_indices = []
                for m in rematched_indices:
                    det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]]
                    if iou_left[m[0], m[1]] < self.iou_threshold:
                        continue
                    # 在之前没有匹配成功的轨迹上添加再匹配成功的检测框
                    self.trackers[trk_ind].update(dets[det_ind, :5], dets[det_ind, 5])
                    self.trackers[trk_ind].update_emb(dets_embs[det_ind], alpha=dets_alpha[det_ind])
                    to_remove_det_indices.append(det_ind)
                    to_remove_trk_indices.append(trk_ind)
                # 从匹配失败的检测框和轨迹列表中删除再次匹配成功的
                unmatched_dets = np.setdiff1d(unmatched_dets, np.array(to_remove_det_indices))
                unmatched_trks = np.setdiff1d(unmatched_trks, np.array(to_remove_trk_indices))

        """
                    third round of associaton by OCR
                """


        # 遍历匹配失败的轨迹,将他们置0
        for m in unmatched_trks:
            # self.s = self.s + 1
            # print("匹配失败的轨迹为" + self.s)
            # self.s = self.s + 1
            # if self.s > 150:
            self.trackers[m].update(None, None)

        # create and initialise new trackers for unmatched detections
        # 遍历匹配失败的检测框,生成新的轨迹
        for i in unmatched_dets:
            trk = KalmanBoxTracker(
                dets[i, :5], dets[i, 5], delta_t=self.delta_t, emb=dets_embs[i], alpha=dets_alpha[i], new_kf=not self.new_kf_off
            )
            self.trackers.append(trk)
        i = len(self.trackers)
        # 遍历轨迹的逆序序列
        for trk in reversed(self.trackers):
            if trk.last_observation.sum() < 0:
                d = trk.get_state()[0]
            else:
                """
                this is optional to use the recent observation or the kalman filter prediction,
                we didn't notice significant difference here
                """
                d = trk.last_observation[:4]
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                # +1 as MOT benchmark requires positive
                ret.append(np.concatenate((d, [trk.id + 1], [trk.cls], [trk.conf])).reshape(1, -1))
            i -= 1
            # remove dead tracklet
            # 如果轨迹的生命大于阈值则删除
            if trk.time_since_update > self.max_age:
                self.trackers.pop(i)
        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 5))
    
    def _xywh_to_xyxy(self, bbox_xywh):
        x, y, w, h = bbox_xywh
        x1 = max(int(x - w / 2), 0)
        x2 = min(int(x + w / 2), self.width - 1)
        y1 = max(int(y - h / 2), 0)
        y2 = min(int(y + h / 2), self.height - 1)
        return x1, y1, x2, y2
    
    def _get_features(self, bbox_xywh, ori_img):
        im_crops = []
        for box in bbox_xywh:
            x1, y1, x2, y2 = self._xywh_to_xyxy(box)
            im = ori_img[y1:y2, x1:x2]
            im_crops.append(im)
        if im_crops:
            features = self.embedder(im_crops).cpu()
        else:
            features = np.array([])
        
        return features

    def update_public(self, dets, cates, scores):
        self.frame_count += 1

        det_scores = np.ones((dets.shape[0], 1))
        dets = np.concatenate((dets, det_scores), axis=1)

        remain_inds = scores > self.det_thresh

        cates = cates[remain_inds]
        dets = dets[remain_inds]

        trks = np.zeros((len(self.trackers), 5))
        to_del = []
        ret = []
        for t, trk in enumerate(trks):
            pos = self.trackers[t].predict()[0]
            cat = self.trackers[t].cate
            trk[:] = [pos[0], pos[1], pos[2], pos[3], cat]
            if np.any(np.isnan(pos)):
                to_del.append(t)
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        for t in reversed(to_del):
            self.trackers.pop(t)

        velocities = np.array([trk.velocity if trk.velocity is not None else np.array((0, 0)) for trk in self.trackers])
        last_boxes = np.array([trk.last_observation for trk in self.trackers])
        k_observations = np.array([k_previous_obs(trk.observations, trk.age, self.delta_t) for trk in self.trackers])

        matched, unmatched_dets, unmatched_trks = associate_kitti(
            dets,
            trks,
            cates,
            self.iou_threshold,
            velocities,
            k_observations,
            self.inertia,
        )

        for m in matched:
            self.trackers[m[1]].update(dets[m[0], :])

        if unmatched_dets.shape[0] > 0 and unmatched_trks.shape[0] > 0:
            """
            The re-association stage by OCR.
            NOTE: at this stage, adding other strategy might be able to continue improve
            the performance, such as BYTE association by ByteTrack.
            """
            left_dets = dets[unmatched_dets]
            left_trks = last_boxes[unmatched_trks]
            left_dets_c = left_dets.copy()
            left_trks_c = left_trks.copy()

            iou_left = self.asso_func(left_dets_c, left_trks_c)
            iou_left = np.array(iou_left)
            det_cates_left = cates[unmatched_dets]
            trk_cates_left = trks[unmatched_trks][:, 4]
            num_dets = unmatched_dets.shape[0]
            num_trks = unmatched_trks.shape[0]
            cate_matrix = np.zeros((num_dets, num_trks))
            for i in range(num_dets):
                for j in range(num_trks):
                    if det_cates_left[i] != trk_cates_left[j]:
                        """
                        For some datasets, such as KITTI, there are different categories,
                        we have to avoid associate them together.
                        """
                        cate_matrix[i][j] = -1e6
            iou_left = iou_left + cate_matrix
            if iou_left.max() > self.iou_threshold - 0.1:
                rematched_indices = linear_assignment(-iou_left)
                to_remove_det_indices = []
                to_remove_trk_indices = []
                for m in rematched_indices:
                    det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]]
                    if iou_left[m[0], m[1]] < self.iou_threshold - 0.1:
                        continue
                    self.trackers[trk_ind].update(dets[det_ind, :])
                    to_remove_det_indices.append(det_ind)
                    to_remove_trk_indices.append(trk_ind)
                unmatched_dets = np.setdiff1d(unmatched_dets, np.array(to_remove_det_indices))
                unmatched_trks = np.setdiff1d(unmatched_trks, np.array(to_remove_trk_indices))

        for i in unmatched_dets:
            trk = KalmanBoxTracker(dets[i, :])
            trk.cate = cates[i]
            self.trackers.append(trk)
        i = len(self.trackers)

        for trk in reversed(self.trackers):
            if trk.last_observation.sum() > 0:
                d = trk.last_observation[:4]
            else:
                d = trk.get_state()[0]
            if trk.time_since_update < 1:
                if (self.frame_count <= self.min_hits) or (trk.hit_streak >= self.min_hits):
                    # id+1 as MOT benchmark requires positive
                    ret.append(np.concatenate((d, [trk.id + 1], [trk.cls], [trk.conf])).reshape(1, -1))
                if trk.hit_streak == self.min_hits:
                    # Head Padding (HP): recover the lost steps during initializing the track
                    for prev_i in range(self.min_hits - 1):
                        prev_observation = trk.history_observations[-(prev_i + 2)]
                        ret.append(
                            (
                                np.concatenate(
                                    (
                                        prev_observation[:4],
                                        [trk.id + 1],
                                        [trk.cls],
                                        [trk.conf],
                                    )
                                )
                            ).reshape(1, -1)
                        )
            i -= 1
            if trk.time_since_update > self.max_age:
                self.trackers.pop(i)

        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 7))

    def dump_cache(self):
        self.cmc.dump_cache()
        self.embedder.dump_cache()

 文章来源地址https://www.toymoban.com/news/detail-439703.html

到了这里,关于多目标跟踪MOT:Deepocsort+YOLOv8+OSnet实战(代码讲解)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 多目标跟踪数据集 :mot16、mot17数据集介绍以及多目标跟踪指标评测

    多目标跟踪数据集 MOT16 、MOT1数据集介绍: 数据集百度网分享: 点击此处 提取码: miao 文件格式: 解压MOT16后在文件夹下面有两个目录: test 和 train 。分别代表训练集和测试集。 这两个目录分别有7个子目录。 每个子目录下都是一段视频的抽帧图片及标注。由于train里面的

    2024年02月03日
    浏览(35)
  • 基于YOLOv8与DeepSORT实现多目标跟踪——算法与源码解析

    \\\"目标跟踪 (Object Tracking)\\\"是机器视觉领域中的一个重要研究领域。根据跟踪的目标数量,可以将其分为两大类:单目标跟踪 (Single Object Tracking,简称 SOT) 和多目标跟踪 (Multi Object Tracking,简称 MOT)。 多目标跟踪往往面临一些挑战,例如需要同时跟踪多个目标、目标可能频繁遮挡

    2024年02月05日
    浏览(51)
  • YoloV8 +可视化界面+GUI+交互式界面目标检测与跟踪

    本项目旨在基于 YoloV8 目标检测算法开发一个直观的可视化界面,使用户能够轻松上传图像或视频,并对其进行目标检测。 通过图形用户界面,用户可以方便地调整检测参数、查看检测结果,并将结果保存或导出。同时,该界面还将提供实时目标检测功能,让用户能够在视频

    2024年02月20日
    浏览(44)
  • 基于YOLOv8深度学习的葡萄簇目标检测系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

    《博主简介》 小伙伴们好,我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌ 更多学习资源,可关注公-仲-hao:【阿旭算法与机器学习】,共同学习交流~ 👍 感谢小伙伴们点赞、关注! 《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】

    2024年01月19日
    浏览(117)
  • YOLO8实战:yolov8实现行人跟踪计数

    本篇文章首先介绍YOLOV8实现人流量跟踪计数的原理,文末附代码 yolo8人流量统计 引言:行人跟踪统计是智能监控系统中的重要功能,可以广泛应用于人流控制、安全监控等领域。传统的行人跟踪算法往往受到光照、遮挡等因素的干扰,难以实现准确跟踪。随着深度学习技术的

    2024年02月06日
    浏览(74)
  • 模型实战(14)之YOLOv8+Deepsort 实现车辆跟踪+计数 详解

    本文通过最新的检测模型YOLOv8算法+deepsort实现车辆跟踪与计数功能 源码+工程详解以点此直接下载可用 https://download.csdn.net/download/yohnyang/88026100 其效果如下: YOLOv8 + deepsort 智能车辆跟踪与计数系统 新建虚拟环境

    2024年02月13日
    浏览(45)
  • 【目标跟踪】MOT数据集GroundTruth可视化

    MOT15数据集下载:https://pan.baidu.com/s/1foGrBXvsanW8BI4eybqfWg?pwd=8888 以下为一行gt示例: 1,1,1367,393,73,225,1,-1,-1,-1 各列数据对应含义如下 frame:图片帧id id:目标id bb_left:bbox左上角坐标x bb_top:bbox左上角坐标y bb_width:bbox的宽度 bb_height:bbox的高度 conf:置信度 x:三维坐标系x值,对于

    2024年02月09日
    浏览(41)
  • 超维空间S2无人机使用说明书——51、基础版——使用yolov8进行目标跟踪

    硬件:D435摄像头,Jetson orin nano 8G 环境:ubuntu20.04,ros-noetic, yolov8 注:目标跟随是在木根识别的基础上进行,因此本小节和yolov8识别小节类似,只是在此基础上添加了跟随控制程序 步骤一: 启动摄像头,获取摄像头发布的图像话题 没有出现红色报错,出现如下界面,表明摄

    2024年02月03日
    浏览(38)
  • 基于YOLOv8深度学习的吸烟/抽烟行为检测系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

    《博主简介》 小伙伴们好,我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌ 更多学习资源,可关注公-仲-hao:【阿旭算法与机器学习】,共同学习交流~ 👍 感谢小伙伴们点赞、关注! 《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】

    2024年02月04日
    浏览(142)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包