From aa853c4f26e5678db46d09014f2600d4e152d8fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=86=A1=ED=98=84=EC=88=98?= Date: Mon, 15 Dec 2025 15:57:19 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EA=B5=B0=EC=A4=91=20=EC=9C=84=ED=97=98?= =?UTF-8?q?=20=EA=B0=90=EC=A7=80=20=EB=B0=8F=20=EA=B0=9D=EC=B2=B4=20?= =?UTF-8?q?=EC=B6=94=EC=A0=81=C2=B7=EB=A9=94=ED=83=80=EB=8D=B0=EC=9D=B4?= =?UTF-8?q?=ED=84=B0=20=EC=B9=B4=EB=93=9C=20=EA=B8=B0=EB=8A=A5=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 상세 설명: - 군중 위험 감지 로직 추가 (거리 기반·속도 기반 위험도 산출 및 Optical Flow 적용) - OC-SORT 트래킹 알고리즘 연동 - 얼굴 인식, 이상 행동 감지, 차량 번호판 인식 결과를 card 형태의 메타데이터로 구성 - 테스트 사항: - Node.js 연동을 통한 실시간 시각화 테스트 완료 --- README.md | 2 - crowd_risk.py | 71 + feat_control/zoom_control.sh | 28 + test.ini | 20 + trackers/.DS_Store | Bin 0 -> 6148 bytes trackers/ocsort_tracker/association.py | 379 ++++++ trackers/ocsort_tracker/kalmanfilter.py | 1581 +++++++++++++++++++++++ trackers/ocsort_tracker/ocsort.py | 431 ++++++ uds_postprocessing.py | 494 ++++++- 9 files changed, 2973 insertions(+), 33 deletions(-) create mode 100644 crowd_risk.py create mode 100644 feat_control/zoom_control.sh create mode 100644 test.ini create mode 100644 trackers/.DS_Store create mode 100644 trackers/ocsort_tracker/association.py create mode 100644 trackers/ocsort_tracker/kalmanfilter.py create mode 100644 trackers/ocsort_tracker/ocsort.py diff --git a/README.md b/README.md index 56a7a51..cee318c 100644 --- a/README.md +++ b/README.md @@ -108,8 +108,6 @@ C 스트리머 기능(OBJDET, FIRE, FACE 등)을 ON/OFF 할 수 있습니다. feat_control/ ├── ctrl_cli.py # 기능 제어 메시지를 UDS로 전송하는 Python CLI └── feat_on.sh # 여러 기능을 한 번에 ON/OFF 제어하는 스크립트 - - ``` --- diff --git a/crowd_risk.py b/crowd_risk.py new file mode 100644 index 0000000..4130ad0 --- /dev/null +++ b/crowd_risk.py @@ -0,0 +1,71 @@ +from collections import defaultdict, deque + + +def distance_risk(items, thr=80): + centers = [((it["x1"]+it["x2"])/2, (it["y1"]+it["y2"])/2) for it in items] + n = len(centers) + if n < 2: + return 0.0 + + close_pairs = 0 + for i in range(n): + cx1, cy1 = centers[i] + for j in range(i+1, n): + cx2, cy2 = centers[j] + dist = ((cx1-cx2)**2 + (cy1-cy2)**2)**0.5 + if dist < thr: + close_pairs += 1 + + return min(close_pairs / 30.0, 1.0) + + +prev_centers = {} +trajectories = defaultdict(lambda: deque(maxlen=10)) +last_seen = {} +KEEP_FRAMES = 15 +frame_counter = 0 + + +def motion_risk_and_path(items): + global prev_centers, trajectories, last_seen, frame_counter + + frame_counter += 1 + fid = frame_counter + + curr = {} + speeds = [] + + for it in items: + + if it["tid"] == 65535: + continue + + tid = it["tid"] + + cx = (it["x1"] + it["x2"]) / 2 + cy = (it["y1"] + it["y2"]) / 2 + + curr[tid] = (cx, cy) + trajectories[tid].append((cx, cy)) + + last_seen[tid] = fid + + if tid in prev_centers: + px, py = prev_centers[tid] + spd = ((cx - px)**2 + (cy - py)**2)**0.5 + speeds.append(spd) + + prev_centers = curr + + for tid in list(trajectories.keys()): + if tid not in curr: + if fid - last_seen.get(tid, fid) > KEEP_FRAMES: + del trajectories[tid] + del last_seen[tid] + + if not speeds: + risk = 0.0 + else: + risk = min((sum(speeds) / len(speeds)) / 20.0, 1.0) + + return risk, trajectories diff --git a/feat_control/zoom_control.sh b/feat_control/zoom_control.sh new file mode 100644 index 0000000..9169ea8 --- /dev/null +++ b/feat_control/zoom_control.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +BASE_DIR="$(cd "$(dirname "$0")" && pwd)" +CLI="python3 $BASE_DIR/ctrl_cli.py" + +if [ $# -lt 1 ]; then + echo "Usage: $0 {IN|OUT|SET value|STATUS}" >&2 + echo " 예) $0 IN" + echo " 예) $0 OUT" + echo " 예) $0 SET 3" + echo " 예) $0 STATUS" + exit 1 +fi + +CMD="$1" + +case "$CMD" in + IN) + $CLI "ZOOM_IN" + ;; + OUT) + $CLI "ZOOM_OUT" + ;; + *) + echo "ERR: unknown command '$CMD'" >&2 + exit 1 + ;; +esac diff --git a/test.ini b/test.ini new file mode 100644 index 0000000..6a3c39b --- /dev/null +++ b/test.ini @@ -0,0 +1,20 @@ +; config file for yolov8s_coco_640x384_apache6sr250_aiw4939.aiwbin +; author: +; date: +; network input size: + +[detection] ; network + +class_num = 12 ; number of class +class_name = shirt, t-shirt, sweater, cardigan, jacket, vest, pants, shorts, skirt, coat, dress, bag + +class_color = {255, 255, 255},{0, 0, 255},{255, 0, 0},{0, 255, 255},{55, 255, 0},{0, 255, 0},{255, 255, 255},{0, 0, 255},{255, 0, 0},{0, 255, 255},{55, 255, 0},{0, 255, 0} + +CONFIDENCE_DETECTION_THRESHOLD = 0.4 ; detection threshold +CONFIDENCE_NMS_THRESHOLD = 0.3 ; nms threshold + +; caution +; 1. Write the value of each key on one line +; 2. class_num = number of class_name = number of class_color +; 3. range of threshold -> 0 < threshold < 1 +; 4. Class IDs are determined in the order of class_name. (0,1,2,3 ...) diff --git a/trackers/.DS_Store b/trackers/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..4869712d98ae4063f9d44aff4e624bebc00e8480 GIT binary patch literal 6148 zcmeHK%}T>S5T0$TO(;SS3Oz1(Em&JE7B8XJ7cim+mD-S^!I&*gY7eE5v%Zi|;`2DO zyA?|nJc-zuGW*TWPnP{scCr853GrhfwHe^dSWWW5|N( zN;W(GA_Mg9TnJ%^zY%3hSPp^`$Q$#AWDW~T@ZysOu4>{l2A>%YLbMhuJuj8aUHi`t<7f5 zPP;B!M~CyeoOPOwx;$vL=X2ND+1)!n>p#SgiF!743jF73*|NBR7c{=u*^@s`VwK!s zbWypeU}OfE0cK!b7_jG`Q(2b{^FEmYX5hyR(D@)y2|bIsL49=KK%b8kFA 0).all() and (hc > 0).all()) + area_enclose = wc * hc + giou = iou - (area_enclose - union) / area_enclose + giou = (giou + 1.)/2.0 # resize from (-1,1) to (0,1) + return giou + + +def diou_batch(bboxes1, bboxes2): + """ + :param bbox_p: predict of bbox(N,4)(x1,y1,x2,y2) + :param bbox_g: groundtruth of bbox(N,4)(x1,y1,x2,y2) + :return: + """ + # for details should go to https://arxiv.org/pdf/1902.09630.pdf + # ensure predict's bbox form + bboxes2 = np.expand_dims(bboxes2, 0) + bboxes1 = np.expand_dims(bboxes1, 1) + + # calculate the intersection box + xx1 = np.maximum(bboxes1[..., 0], bboxes2[..., 0]) + yy1 = np.maximum(bboxes1[..., 1], bboxes2[..., 1]) + xx2 = np.minimum(bboxes1[..., 2], bboxes2[..., 2]) + yy2 = np.minimum(bboxes1[..., 3], bboxes2[..., 3]) + w = np.maximum(0., xx2 - xx1) + h = np.maximum(0., yy2 - yy1) + wh = w * h + union = ((bboxes1[..., 2] - bboxes1[..., 0]) * (bboxes1[..., 3] - bboxes1[..., 1]) + + (bboxes2[..., 2] - bboxes2[..., 0]) * (bboxes2[..., 3] - bboxes2[..., 1]) - wh) + iou = wh / union + centerx1 = (bboxes1[..., 0] + bboxes1[..., 2]) / 2.0 + centery1 = (bboxes1[..., 1] + bboxes1[..., 3]) / 2.0 + centerx2 = (bboxes2[..., 0] + bboxes2[..., 2]) / 2.0 + centery2 = (bboxes2[..., 1] + bboxes2[..., 3]) / 2.0 + + inner_diag = (centerx1 - centerx2) ** 2 + (centery1 - centery2) ** 2 + + xxc1 = np.minimum(bboxes1[..., 0], bboxes2[..., 0]) + yyc1 = np.minimum(bboxes1[..., 1], bboxes2[..., 1]) + xxc2 = np.maximum(bboxes1[..., 2], bboxes2[..., 2]) + yyc2 = np.maximum(bboxes1[..., 3], bboxes2[..., 3]) + + outer_diag = (xxc2 - xxc1) ** 2 + (yyc2 - yyc1) ** 2 + diou = iou - inner_diag / outer_diag + + return (diou + 1) / 2.0 # resize from (-1,1) to (0,1) + +def ciou_batch(bboxes1, bboxes2): + """ + :param bbox_p: predict of bbox(N,4)(x1,y1,x2,y2) + :param bbox_g: groundtruth of bbox(N,4)(x1,y1,x2,y2) + :return: + """ + # for details should go to https://arxiv.org/pdf/1902.09630.pdf + # ensure predict's bbox form + bboxes2 = np.expand_dims(bboxes2, 0) + bboxes1 = np.expand_dims(bboxes1, 1) + + # calculate the intersection box + xx1 = np.maximum(bboxes1[..., 0], bboxes2[..., 0]) + yy1 = np.maximum(bboxes1[..., 1], bboxes2[..., 1]) + xx2 = np.minimum(bboxes1[..., 2], bboxes2[..., 2]) + yy2 = np.minimum(bboxes1[..., 3], bboxes2[..., 3]) + w = np.maximum(0., xx2 - xx1) + h = np.maximum(0., yy2 - yy1) + wh = w * h + union = ((bboxes1[..., 2] - bboxes1[..., 0]) * (bboxes1[..., 3] - bboxes1[..., 1]) + + (bboxes2[..., 2] - bboxes2[..., 0]) * (bboxes2[..., 3] - bboxes2[..., 1]) - wh) + iou = wh / union + + centerx1 = (bboxes1[..., 0] + bboxes1[..., 2]) / 2.0 + centery1 = (bboxes1[..., 1] + bboxes1[..., 3]) / 2.0 + centerx2 = (bboxes2[..., 0] + bboxes2[..., 2]) / 2.0 + centery2 = (bboxes2[..., 1] + bboxes2[..., 3]) / 2.0 + + inner_diag = (centerx1 - centerx2) ** 2 + (centery1 - centery2) ** 2 + + xxc1 = np.minimum(bboxes1[..., 0], bboxes2[..., 0]) + yyc1 = np.minimum(bboxes1[..., 1], bboxes2[..., 1]) + xxc2 = np.maximum(bboxes1[..., 2], bboxes2[..., 2]) + yyc2 = np.maximum(bboxes1[..., 3], bboxes2[..., 3]) + + outer_diag = (xxc2 - xxc1) ** 2 + (yyc2 - yyc1) ** 2 + + w1 = bboxes1[..., 2] - bboxes1[..., 0] + h1 = bboxes1[..., 3] - bboxes1[..., 1] + w2 = bboxes2[..., 2] - bboxes2[..., 0] + h2 = bboxes2[..., 3] - bboxes2[..., 1] + + # prevent dividing over zero. add one pixel shift + h2 = h2 + 1. + h1 = h1 + 1. + arctan = np.arctan(w2/h2) - np.arctan(w1/h1) + v = (4 / (np.pi ** 2)) * (arctan ** 2) + S = 1 - iou + alpha = v / (S+v) + ciou = iou - inner_diag / outer_diag - alpha * v + + return (ciou + 1) / 2.0 # resize from (-1,1) to (0,1) + + +def ct_dist(bboxes1, bboxes2): + """ + Measure the center distance between two sets of bounding boxes, + this is a coarse implementation, we don't recommend using it only + for association, which can be unstable and sensitive to frame rate + and object speed. + """ + bboxes2 = np.expand_dims(bboxes2, 0) + bboxes1 = np.expand_dims(bboxes1, 1) + + centerx1 = (bboxes1[..., 0] + bboxes1[..., 2]) / 2.0 + centery1 = (bboxes1[..., 1] + bboxes1[..., 3]) / 2.0 + centerx2 = (bboxes2[..., 0] + bboxes2[..., 2]) / 2.0 + centery2 = (bboxes2[..., 1] + bboxes2[..., 3]) / 2.0 + + ct_dist2 = (centerx1 - centerx2) ** 2 + (centery1 - centery2) ** 2 + + ct_dist = np.sqrt(ct_dist2) + + # The linear rescaling is a naive version and needs more study + ct_dist = ct_dist / ct_dist.max() + return ct_dist.max() - ct_dist # resize to (0,1) + + + +def speed_direction_batch(dets, tracks): + tracks = tracks[..., np.newaxis] + CX1, CY1 = (dets[:,0] + dets[:,2])/2.0, (dets[:,1]+dets[:,3])/2.0 + CX2, CY2 = (tracks[:,0] + tracks[:,2]) /2.0, (tracks[:,1]+tracks[:,3])/2.0 + dx = CX1 - CX2 + dy = CY1 - CY2 + norm = np.sqrt(dx**2 + dy**2) + 1e-6 + dx = dx / norm + dy = dy / norm + return dy, dx # size: num_track x num_det + + +def linear_assignment(cost_matrix): + try: + import lap + _, x, y = lap.lapjv(cost_matrix, extend_cost=True) + return np.array([[y[i],i] for i in x if i >= 0]) # + except ImportError: + from scipy.optimize import linear_sum_assignment + x, y = linear_sum_assignment(cost_matrix) + return np.array(list(zip(x, y))) + + +def associate_detections_to_trackers(detections,trackers,iou_threshold = 0.3): + """ + Assigns detections to tracked object (both represented as bounding boxes) + Returns 3 lists of matches, unmatched_detections and unmatched_trackers + """ + if(len(trackers)==0): + return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int) + + iou_matrix = iou_batch(detections, trackers) + + if min(iou_matrix.shape) > 0: + a = (iou_matrix > iou_threshold).astype(np.int32) + if a.sum(1).max() == 1 and a.sum(0).max() == 1: + matched_indices = np.stack(np.where(a), axis=1) + else: + matched_indices = linear_assignment(-iou_matrix) + else: + matched_indices = np.empty(shape=(0,2)) + + unmatched_detections = [] + for d, det in enumerate(detections): + if(d not in matched_indices[:,0]): + unmatched_detections.append(d) + unmatched_trackers = [] + for t, trk in enumerate(trackers): + if(t not in matched_indices[:,1]): + unmatched_trackers.append(t) + + #filter out matched with low IOU + matches = [] + for m in matched_indices: + if(iou_matrix[m[0], m[1]] 0: + a = (iou_matrix > iou_threshold).astype(np.int32) + if a.sum(1).max() == 1 and a.sum(0).max() == 1: + matched_indices = np.stack(np.where(a), axis=1) + else: + matched_indices = linear_assignment(-(iou_matrix+angle_diff_cost)) + else: + matched_indices = np.empty(shape=(0,2)) + + unmatched_detections = [] + for d, det in enumerate(detections): + if(d not in matched_indices[:,0]): + unmatched_detections.append(d) + unmatched_trackers = [] + for t, trk in enumerate(trackers): + if(t not in matched_indices[:,1]): + unmatched_trackers.append(t) + + # filter out matched with low IOU + matches = [] + for m in matched_indices: + if(iou_matrix[m[0], m[1]] 0: + a = (iou_matrix > iou_threshold).astype(np.int32) + if a.sum(1).max() == 1 and a.sum(0).max() == 1: + matched_indices = np.stack(np.where(a), axis=1) + else: + matched_indices = linear_assignment(cost_matrix) + else: + matched_indices = np.empty(shape=(0,2)) + + unmatched_detections = [] + for d, det in enumerate(detections): + if(d not in matched_indices[:,0]): + unmatched_detections.append(d) + unmatched_trackers = [] + for t, trk in enumerate(trackers): + if(t not in matched_indices[:,1]): + unmatched_trackers.append(t) + + #filter out matched with low IOU + matches = [] + for m in matched_indices: + if(iou_matrix[m[0], m[1]]update cycle. The +predict step, implemented with the method or function predict(), +uses the state transition matrix F to predict the state in the next +time period (epoch). The state is stored as a gaussian (x, P), where +x is the state (column) vector, and P is its covariance. Covariance +matrix Q specifies the process covariance. In Bayesian terms, this +prediction is called the *prior*, which you can think of colloquially +as the estimate prior to incorporating the measurement. +The update step, implemented with the method or function `update()`, +incorporates the measurement z with covariance R, into the state +estimate (x, P). The class stores the system uncertainty in S, +the innovation (residual between prediction and measurement in +measurement space) in y, and the Kalman gain in k. The procedural +form returns these variables to you. In Bayesian terms this computes +the *posterior* - the estimate after the information from the +measurement is incorporated. +Whether you use the OO form or procedural form is up to you. If +matrices such as H, R, and F are changing each epoch, you'll probably +opt to use the procedural form. If they are unchanging, the OO +form is perhaps easier to use since you won't need to keep track +of these matrices. This is especially useful if you are implementing +banks of filters or comparing various KF designs for performance; +a trivial coding bug could lead to using the wrong sets of matrices. +This module also offers an implementation of the RTS smoother, and +other helper functions, such as log likelihood computations. +The Saver class allows you to easily save the state of the +KalmanFilter class after every update +This module expects NumPy arrays for all values that expect +arrays, although in a few cases, particularly method parameters, +it will accept types that convert to NumPy arrays, such as lists +of lists. These exceptions are documented in the method or function. +Examples +-------- +The following example constructs a constant velocity kinematic +filter, filters noisy data, and plots the results. It also demonstrates +using the Saver class to save the state of the filter at each epoch. +.. code-block:: Python + import matplotlib.pyplot as plt + import numpy as np + from filterpy.kalman import KalmanFilter + from filterpy.common import Q_discrete_white_noise, Saver + r_std, q_std = 2., 0.003 + cv = KalmanFilter(dim_x=2, dim_z=1) + cv.x = np.array([[0., 1.]]) # position, velocity + cv.F = np.array([[1, dt],[ [0, 1]]) + cv.R = np.array([[r_std^^2]]) + f.H = np.array([[1., 0.]]) + f.P = np.diag([.1^^2, .03^^2) + f.Q = Q_discrete_white_noise(2, dt, q_std**2) + saver = Saver(cv) + for z in range(100): + cv.predict() + cv.update([z + randn() * r_std]) + saver.save() # save the filter's state + saver.to_array() + plt.plot(saver.x[:, 0]) + # plot all of the priors + plt.plot(saver.x_prior[:, 0]) + # plot mahalanobis distance + plt.figure() + plt.plot(saver.mahalanobis) +This code implements the same filter using the procedural form + x = np.array([[0., 1.]]) # position, velocity + F = np.array([[1, dt],[ [0, 1]]) + R = np.array([[r_std^^2]]) + H = np.array([[1., 0.]]) + P = np.diag([.1^^2, .03^^2) + Q = Q_discrete_white_noise(2, dt, q_std**2) + for z in range(100): + x, P = predict(x, P, F=F, Q=Q) + x, P = update(x, P, z=[z + randn() * r_std], R=R, H=H) + xs.append(x[0, 0]) + plt.plot(xs) +For more examples see the test subdirectory, or refer to the +book cited below. In it I both teach Kalman filtering from basic +principles, and teach the use of this library in great detail. +FilterPy library. +http://github.com/rlabbe/filterpy +Documentation at: +https://filterpy.readthedocs.org +Supporting book at: +https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python +This is licensed under an MIT license. See the readme.MD file +for more information. +Copyright 2014-2018 Roger R Labbe Jr. +""" + +from __future__ import absolute_import, division + +from copy import deepcopy +from math import log, exp, sqrt +import sys +import numpy as np +from numpy import dot, zeros, eye, isscalar, shape +import numpy.linalg as linalg +from filterpy.stats import logpdf +from filterpy.common import pretty_str, reshape_z + + +class KalmanFilterNew(object): + """ Implements a Kalman filter. You are responsible for setting the + various state variables to reasonable values; the defaults will + not give you a functional filter. + For now the best documentation is my free book Kalman and Bayesian + Filters in Python [2]_. The test files in this directory also give you a + basic idea of use, albeit without much description. + In brief, you will first construct this object, specifying the size of + the state vector with dim_x and the size of the measurement vector that + you will be using with dim_z. These are mostly used to perform size checks + when you assign values to the various matrices. For example, if you + specified dim_z=2 and then try to assign a 3x3 matrix to R (the + measurement noise matrix you will get an assert exception because R + should be 2x2. (If for whatever reason you need to alter the size of + things midstream just use the underscore version of the matrices to + assign directly: your_filter._R = a_3x3_matrix.) + After construction the filter will have default matrices created for you, + but you must specify the values for each. It’s usually easiest to just + overwrite them rather than assign to each element yourself. This will be + clearer in the example below. All are of type numpy.array. + Examples + -------- + Here is a filter that tracks position and velocity using a sensor that only + reads position. + First construct the object with the required dimensionality. Here the state + (`dim_x`) has 2 coefficients (position and velocity), and the measurement + (`dim_z`) has one. In FilterPy `x` is the state, `z` is the measurement. + .. code:: + from filterpy.kalman import KalmanFilter + f = KalmanFilter (dim_x=2, dim_z=1) + Assign the initial value for the state (position and velocity). You can do this + with a two dimensional array like so: + .. code:: + f.x = np.array([[2.], # position + [0.]]) # velocity + or just use a one dimensional array, which I prefer doing. + .. code:: + f.x = np.array([2., 0.]) + Define the state transition matrix: + .. code:: + f.F = np.array([[1.,1.], + [0.,1.]]) + Define the measurement function. Here we need to convert a position-velocity + vector into just a position vector, so we use: + .. code:: + f.H = np.array([[1., 0.]]) + Define the state's covariance matrix P. + .. code:: + f.P = np.array([[1000., 0.], + [ 0., 1000.] ]) + Now assign the measurement noise. Here the dimension is 1x1, so I can + use a scalar + .. code:: + f.R = 5 + I could have done this instead: + .. code:: + f.R = np.array([[5.]]) + Note that this must be a 2 dimensional array. + Finally, I will assign the process noise. Here I will take advantage of + another FilterPy library function: + .. code:: + from filterpy.common import Q_discrete_white_noise + f.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.13) + Now just perform the standard predict/update loop: + .. code:: + while some_condition_is_true: + z = get_sensor_reading() + f.predict() + f.update(z) + do_something_with_estimate (f.x) + **Procedural Form** + This module also contains stand alone functions to perform Kalman filtering. + Use these if you are not a fan of objects. + **Example** + .. code:: + while True: + z, R = read_sensor() + x, P = predict(x, P, F, Q) + x, P = update(x, P, z, R, H) + See my book Kalman and Bayesian Filters in Python [2]_. + You will have to set the following attributes after constructing this + object for the filter to perform properly. Please note that there are + various checks in place to ensure that you have made everything the + 'correct' size. However, it is possible to provide incorrectly sized + arrays such that the linear algebra can not perform an operation. + It can also fail silently - you can end up with matrices of a size that + allows the linear algebra to work, but are the wrong shape for the problem + you are trying to solve. + Parameters + ---------- + dim_x : int + Number of state variables for the Kalman filter. For example, if + you are tracking the position and velocity of an object in two + dimensions, dim_x would be 4. + This is used to set the default size of P, Q, and u + dim_z : int + Number of of measurement inputs. For example, if the sensor + provides you with position in (x,y), dim_z would be 2. + dim_u : int (optional) + size of the control input, if it is being used. + Default value of 0 indicates it is not used. + compute_log_likelihood : bool (default = True) + Computes log likelihood by default, but this can be a slow + computation, so if you never use it you can turn this computation + off. + Attributes + ---------- + x : numpy.array(dim_x, 1) + Current state estimate. Any call to update() or predict() updates + this variable. + P : numpy.array(dim_x, dim_x) + Current state covariance matrix. Any call to update() or predict() + updates this variable. + x_prior : numpy.array(dim_x, 1) + Prior (predicted) state estimate. The *_prior and *_post attributes + are for convenience; they store the prior and posterior of the + current epoch. Read Only. + P_prior : numpy.array(dim_x, dim_x) + Prior (predicted) state covariance matrix. Read Only. + x_post : numpy.array(dim_x, 1) + Posterior (updated) state estimate. Read Only. + P_post : numpy.array(dim_x, dim_x) + Posterior (updated) state covariance matrix. Read Only. + z : numpy.array + Last measurement used in update(). Read only. + R : numpy.array(dim_z, dim_z) + Measurement noise covariance matrix. Also known as the + observation covariance. + Q : numpy.array(dim_x, dim_x) + Process noise covariance matrix. Also known as the transition + covariance. + F : numpy.array() + State Transition matrix. Also known as `A` in some formulation. + H : numpy.array(dim_z, dim_x) + Measurement function. Also known as the observation matrix, or as `C`. + y : numpy.array + Residual of the update step. Read only. + K : numpy.array(dim_x, dim_z) + Kalman gain of the update step. Read only. + S : numpy.array + System uncertainty (P projected to measurement space). Read only. + SI : numpy.array + Inverse system uncertainty. Read only. + log_likelihood : float + log-likelihood of the last measurement. Read only. + likelihood : float + likelihood of last measurement. Read only. + Computed from the log-likelihood. The log-likelihood can be very + small, meaning a large negative value such as -28000. Taking the + exp() of that results in 0.0, which can break typical algorithms + which multiply by this value, so by default we always return a + number >= sys.float_info.min. + mahalanobis : float + mahalanobis distance of the innovation. Read only. + inv : function, default numpy.linalg.inv + If you prefer another inverse function, such as the Moore-Penrose + pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv + This is only used to invert self.S. If you know it is diagonal, you + might choose to set it to filterpy.common.inv_diagonal, which is + several times faster than numpy.linalg.inv for diagonal matrices. + alpha : float + Fading memory setting. 1.0 gives the normal Kalman filter, and + values slightly larger than 1.0 (such as 1.02) give a fading + memory effect - previous measurements have less influence on the + filter's estimates. This formulation of the Fading memory filter + (there are many) is due to Dan Simon [1]_. + References + ---------- + .. [1] Dan Simon. "Optimal State Estimation." John Wiley & Sons. + p. 208-212. (2006) + .. [2] Roger Labbe. "Kalman and Bayesian Filters in Python" + https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python + """ + + def __init__(self, dim_x, dim_z, dim_u=0): + if dim_x < 1: + raise ValueError('dim_x must be 1 or greater') + if dim_z < 1: + raise ValueError('dim_z must be 1 or greater') + if dim_u < 0: + raise ValueError('dim_u must be 0 or greater') + + self.dim_x = dim_x + self.dim_z = dim_z + self.dim_u = dim_u + + self.x = zeros((dim_x, 1)) # state + self.P = eye(dim_x) # uncertainty covariance + self.Q = eye(dim_x) # process uncertainty + self.B = None # control transition matrix + self.F = eye(dim_x) # state transition matrix + self.H = zeros((dim_z, dim_x)) # measurement function + self.R = eye(dim_z) # measurement uncertainty + self._alpha_sq = 1. # fading memory control + self.M = np.zeros((dim_x, dim_z)) # process-measurement cross correlation + self.z = np.array([[None]*self.dim_z]).T + + # gain and residual are computed during the innovation step. We + # save them so that in case you want to inspect them for various + # purposes + self.K = np.zeros((dim_x, dim_z)) # kalman gain + self.y = zeros((dim_z, 1)) + self.S = np.zeros((dim_z, dim_z)) # system uncertainty + self.SI = np.zeros((dim_z, dim_z)) # inverse system uncertainty + + # identity matrix. Do not alter this. + self._I = np.eye(dim_x) + + # these will always be a copy of x,P after predict() is called + self.x_prior = self.x.copy() + self.P_prior = self.P.copy() + + # these will always be a copy of x,P after update() is called + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + # Only computed only if requested via property + self._log_likelihood = log(sys.float_info.min) + self._likelihood = sys.float_info.min + self._mahalanobis = None + + # keep all observations + self.history_obs = [] + + self.inv = np.linalg.inv + + self.attr_saved = None + self.observed = False + + + def predict(self, u=None, B=None, F=None, Q=None): + """ + Predict next state (prior) using the Kalman filter state propagation + equations. + Parameters + ---------- + u : np.array, default 0 + Optional control vector. + B : np.array(dim_x, dim_u), or None + Optional control transition matrix; a value of None + will cause the filter to use `self.B`. + F : np.array(dim_x, dim_x), or None + Optional state transition matrix; a value of None + will cause the filter to use `self.F`. + Q : np.array(dim_x, dim_x), scalar, or None + Optional process noise matrix; a value of None will cause the + filter to use `self.Q`. + """ + + if B is None: + B = self.B + if F is None: + F = self.F + if Q is None: + Q = self.Q + elif isscalar(Q): + Q = eye(self.dim_x) * Q + + + # x = Fx + Bu + if B is not None and u is not None: + self.x = dot(F, self.x) + dot(B, u) + else: + self.x = dot(F, self.x) + + # P = FPF' + Q + self.P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q + + # save prior + self.x_prior = self.x.copy() + self.P_prior = self.P.copy() + + + + def freeze(self): + """ + Save the parameters before non-observation forward + """ + self.attr_saved = deepcopy(self.__dict__) + + + def unfreeze(self): + if self.attr_saved is not None: + new_history = deepcopy(self.history_obs) + self.__dict__ = self.attr_saved + # self.history_obs = new_history + self.history_obs = self.history_obs[:-1] + occur = [int(d is None) for d in new_history] + indices = np.where(np.array(occur)==0)[0] + index1 = indices[-2] + index2 = indices[-1] + box1 = new_history[index1] + x1, y1, s1, r1 = box1 + w1 = np.sqrt(s1 * r1) + h1 = np.sqrt(s1 / r1) + box2 = new_history[index2] + x2, y2, s2, r2 = box2 + w2 = np.sqrt(s2 * r2) + h2 = np.sqrt(s2 / r2) + time_gap = index2 - index1 + dx = (x2-x1)/time_gap + dy = (y2-y1)/time_gap + dw = (w2-w1)/time_gap + dh = (h2-h1)/time_gap + for i in range(index2 - index1): + """ + The default virtual trajectory generation is by linear + motion (constant speed hypothesis), you could modify this + part to implement your own. + """ + x = x1 + (i+1) * dx + y = y1 + (i+1) * dy + w = w1 + (i+1) * dw + h = h1 + (i+1) * dh + s = w * h + r = w / float(h) + new_box = np.array([x, y, s, r]).reshape((4, 1)) + """ + I still use predict-update loop here to refresh the parameters, + but this can be faster by directly modifying the internal parameters + as suggested in the paper. I keep this naive but slow way for + easy read and understanding + """ + self.update(new_box) + if not i == (index2-index1-1): + self.predict() + + + def update(self, z, R=None, H=None): + """ + Add a new measurement (z) to the Kalman filter. + If z is None, nothing is computed. However, x_post and P_post are + updated with the prior (x_prior, P_prior), and self.z is set to None. + Parameters + ---------- + z : (dim_z, 1): array_like + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be convertible to a column vector. + If you pass in a value of H, z must be a column vector the + of the correct size. + R : np.array, scalar, or None + Optionally provide R to override the measurement noise for this + one call, otherwise self.R will be used. + H : np.array, or None + Optionally provide H to override the measurement function for this + one call, otherwise self.H will be used. + """ + + # set to None to force recompute + self._log_likelihood = None + self._likelihood = None + self._mahalanobis = None + + # append the observation + self.history_obs.append(z) + + if z is None: + if self.observed: + """ + Got no observation so freeze the current parameters for future + potential online smoothing. + """ + self.freeze() + self.observed = False + self.z = np.array([[None]*self.dim_z]).T + self.x_post = self.x.copy() + self.P_post = self.P.copy() + self.y = zeros((self.dim_z, 1)) + return + + # self.observed = True + if not self.observed: + """ + Get observation, use online smoothing to re-update parameters + """ + self.unfreeze() + self.observed = True + + if R is None: + R = self.R + elif isscalar(R): + R = eye(self.dim_z) * R + + if H is None: + z = reshape_z(z, self.dim_z, self.x.ndim) + H = self.H + + # y = z - Hx + # error (residual) between measurement and prediction + self.y = z - dot(H, self.x) + + # common subexpression for speed + PHT = dot(self.P, H.T) + + # S = HPH' + R + # project system uncertainty into measurement space + self.S = dot(H, PHT) + R + self.SI = self.inv(self.S) + # K = PH'inv(S) + # map system uncertainty into kalman gain + self.K = dot(PHT, self.SI) + + # x = x + Ky + # predict new x with residual scaled by the kalman gain + self.x = self.x + dot(self.K, self.y) + + # P = (I-KH)P(I-KH)' + KRK' + # This is more numerically stable + # and works for non-optimal K vs the equation + # P = (I-KH)P usually seen in the literature. + + I_KH = self._I - dot(self.K, H) + self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(self.K, R), self.K.T) + + # save measurement and posterior state + self.z = deepcopy(z) + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + def predict_steadystate(self, u=0, B=None): + """ + Predict state (prior) using the Kalman filter state propagation + equations. Only x is updated, P is left unchanged. See + update_steadstate() for a longer explanation of when to use this + method. + Parameters + ---------- + u : np.array + Optional control vector. If non-zero, it is multiplied by B + to create the control input into the system. + B : np.array(dim_x, dim_u), or None + Optional control transition matrix; a value of None + will cause the filter to use `self.B`. + """ + + if B is None: + B = self.B + + # x = Fx + Bu + if B is not None: + self.x = dot(self.F, self.x) + dot(B, u) + else: + self.x = dot(self.F, self.x) + + # save prior + self.x_prior = self.x.copy() + self.P_prior = self.P.copy() + + def update_steadystate(self, z): + """ + Add a new measurement (z) to the Kalman filter without recomputing + the Kalman gain K, the state covariance P, or the system + uncertainty S. + You can use this for LTI systems since the Kalman gain and covariance + converge to a fixed value. Precompute these and assign them explicitly, + or run the Kalman filter using the normal predict()/update(0 cycle + until they converge. + The main advantage of this call is speed. We do significantly less + computation, notably avoiding a costly matrix inversion. + Use in conjunction with predict_steadystate(), otherwise P will grow + without bound. + Parameters + ---------- + z : (dim_z, 1): array_like + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be convertible to a column vector. + Examples + -------- + >>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter + >>> # let filter converge on representative data, then save k and P + >>> for i in range(100): + >>> cv.predict() + >>> cv.update([i, i, i]) + >>> saved_k = np.copy(cv.K) + >>> saved_P = np.copy(cv.P) + later on: + >>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter + >>> cv.K = np.copy(saved_K) + >>> cv.P = np.copy(saved_P) + >>> for i in range(100): + >>> cv.predict_steadystate() + >>> cv.update_steadystate([i, i, i]) + """ + + # set to None to force recompute + self._log_likelihood = None + self._likelihood = None + self._mahalanobis = None + + if z is None: + self.z = np.array([[None]*self.dim_z]).T + self.x_post = self.x.copy() + self.P_post = self.P.copy() + self.y = zeros((self.dim_z, 1)) + return + + z = reshape_z(z, self.dim_z, self.x.ndim) + + # y = z - Hx + # error (residual) between measurement and prediction + self.y = z - dot(self.H, self.x) + + # x = x + Ky + # predict new x with residual scaled by the kalman gain + self.x = self.x + dot(self.K, self.y) + + self.z = deepcopy(z) + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + # set to None to force recompute + self._log_likelihood = None + self._likelihood = None + self._mahalanobis = None + + def update_correlated(self, z, R=None, H=None): + """ Add a new measurement (z) to the Kalman filter assuming that + process noise and measurement noise are correlated as defined in + the `self.M` matrix. + A partial derivation can be found in [1] + If z is None, nothing is changed. + Parameters + ---------- + z : (dim_z, 1): array_like + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be convertible to a column vector. + R : np.array, scalar, or None + Optionally provide R to override the measurement noise for this + one call, otherwise self.R will be used. + H : np.array, or None + Optionally provide H to override the measurement function for this + one call, otherwise self.H will be used. + References + ---------- + .. [1] Bulut, Y. (2011). Applied Kalman filter theory (Doctoral dissertation, Northeastern University). + http://people.duke.edu/~hpgavin/SystemID/References/Balut-KalmanFilter-PhD-NEU-2011.pdf + """ + + # set to None to force recompute + self._log_likelihood = None + self._likelihood = None + self._mahalanobis = None + + if z is None: + self.z = np.array([[None]*self.dim_z]).T + self.x_post = self.x.copy() + self.P_post = self.P.copy() + self.y = zeros((self.dim_z, 1)) + return + + if R is None: + R = self.R + elif isscalar(R): + R = eye(self.dim_z) * R + + # rename for readability and a tiny extra bit of speed + if H is None: + z = reshape_z(z, self.dim_z, self.x.ndim) + H = self.H + + # handle special case: if z is in form [[z]] but x is not a column + # vector dimensions will not match + if self.x.ndim == 1 and shape(z) == (1, 1): + z = z[0] + + if shape(z) == (): # is it scalar, e.g. z=3 or z=np.array(3) + z = np.asarray([z]) + + # y = z - Hx + # error (residual) between measurement and prediction + self.y = z - dot(H, self.x) + + # common subexpression for speed + PHT = dot(self.P, H.T) + + # project system uncertainty into measurement space + self.S = dot(H, PHT) + dot(H, self.M) + dot(self.M.T, H.T) + R + self.SI = self.inv(self.S) + + # K = PH'inv(S) + # map system uncertainty into kalman gain + self.K = dot(PHT + self.M, self.SI) + + # x = x + Ky + # predict new x with residual scaled by the kalman gain + self.x = self.x + dot(self.K, self.y) + self.P = self.P - dot(self.K, dot(H, self.P) + self.M.T) + + self.z = deepcopy(z) + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + def batch_filter(self, zs, Fs=None, Qs=None, Hs=None, + Rs=None, Bs=None, us=None, update_first=False, + saver=None): + """ Batch processes a sequences of measurements. + Parameters + ---------- + zs : list-like + list of measurements at each time step `self.dt`. Missing + measurements must be represented by `None`. + Fs : None, list-like, default=None + optional value or list of values to use for the state transition + matrix F. + If Fs is None then self.F is used for all epochs. + Otherwise it must contain a list-like list of F's, one for + each epoch. This allows you to have varying F per epoch. + Qs : None, np.array or list-like, default=None + optional value or list of values to use for the process error + covariance Q. + If Qs is None then self.Q is used for all epochs. + Otherwise it must contain a list-like list of Q's, one for + each epoch. This allows you to have varying Q per epoch. + Hs : None, np.array or list-like, default=None + optional list of values to use for the measurement matrix H. + If Hs is None then self.H is used for all epochs. + If Hs contains a single matrix, then it is used as H for all + epochs. + Otherwise it must contain a list-like list of H's, one for + each epoch. This allows you to have varying H per epoch. + Rs : None, np.array or list-like, default=None + optional list of values to use for the measurement error + covariance R. + If Rs is None then self.R is used for all epochs. + Otherwise it must contain a list-like list of R's, one for + each epoch. This allows you to have varying R per epoch. + Bs : None, np.array or list-like, default=None + optional list of values to use for the control transition matrix B. + If Bs is None then self.B is used for all epochs. + Otherwise it must contain a list-like list of B's, one for + each epoch. This allows you to have varying B per epoch. + us : None, np.array or list-like, default=None + optional list of values to use for the control input vector; + If us is None then None is used for all epochs (equivalent to 0, + or no control input). + Otherwise it must contain a list-like list of u's, one for + each epoch. + update_first : bool, optional, default=False + controls whether the order of operations is update followed by + predict, or predict followed by update. Default is predict->update. + saver : filterpy.common.Saver, optional + filterpy.common.Saver object. If provided, saver.save() will be + called after every epoch + Returns + ------- + means : np.array((n,dim_x,1)) + array of the state for each time step after the update. Each entry + is an np.array. In other words `means[k,:]` is the state at step + `k`. + covariance : np.array((n,dim_x,dim_x)) + array of the covariances for each time step after the update. + In other words `covariance[k,:,:]` is the covariance at step `k`. + means_predictions : np.array((n,dim_x,1)) + array of the state for each time step after the predictions. Each + entry is an np.array. In other words `means[k,:]` is the state at + step `k`. + covariance_predictions : np.array((n,dim_x,dim_x)) + array of the covariances for each time step after the prediction. + In other words `covariance[k,:,:]` is the covariance at step `k`. + Examples + -------- + .. code-block:: Python + # this example demonstrates tracking a measurement where the time + # between measurement varies, as stored in dts. This requires + # that F be recomputed for each epoch. The output is then smoothed + # with an RTS smoother. + zs = [t + random.randn()*4 for t in range (40)] + Fs = [np.array([[1., dt], [0, 1]] for dt in dts] + (mu, cov, _, _) = kf.batch_filter(zs, Fs=Fs) + (xs, Ps, Ks, Pps) = kf.rts_smoother(mu, cov, Fs=Fs) + """ + + #pylint: disable=too-many-statements + n = np.size(zs, 0) + if Fs is None: + Fs = [self.F] * n + if Qs is None: + Qs = [self.Q] * n + if Hs is None: + Hs = [self.H] * n + if Rs is None: + Rs = [self.R] * n + if Bs is None: + Bs = [self.B] * n + if us is None: + us = [0] * n + + # mean estimates from Kalman Filter + if self.x.ndim == 1: + means = zeros((n, self.dim_x)) + means_p = zeros((n, self.dim_x)) + else: + means = zeros((n, self.dim_x, 1)) + means_p = zeros((n, self.dim_x, 1)) + + # state covariances from Kalman Filter + covariances = zeros((n, self.dim_x, self.dim_x)) + covariances_p = zeros((n, self.dim_x, self.dim_x)) + + if update_first: + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): + + self.update(z, R=R, H=H) + means[i, :] = self.x + covariances[i, :, :] = self.P + + self.predict(u=u, B=B, F=F, Q=Q) + means_p[i, :] = self.x + covariances_p[i, :, :] = self.P + + if saver is not None: + saver.save() + else: + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): + + self.predict(u=u, B=B, F=F, Q=Q) + means_p[i, :] = self.x + covariances_p[i, :, :] = self.P + + self.update(z, R=R, H=H) + means[i, :] = self.x + covariances[i, :, :] = self.P + + if saver is not None: + saver.save() + + return (means, covariances, means_p, covariances_p) + + def rts_smoother(self, Xs, Ps, Fs=None, Qs=None, inv=np.linalg.inv): + """ + Runs the Rauch-Tung-Striebel Kalman smoother on a set of + means and covariances computed by a Kalman filter. The usual input + would come from the output of `KalmanFilter.batch_filter()`. + Parameters + ---------- + Xs : numpy.array + array of the means (state variable x) of the output of a Kalman + filter. + Ps : numpy.array + array of the covariances of the output of a kalman filter. + Fs : list-like collection of numpy.array, optional + State transition matrix of the Kalman filter at each time step. + Optional, if not provided the filter's self.F will be used + Qs : list-like collection of numpy.array, optional + Process noise of the Kalman filter at each time step. Optional, + if not provided the filter's self.Q will be used + inv : function, default numpy.linalg.inv + If you prefer another inverse function, such as the Moore-Penrose + pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv + Returns + ------- + x : numpy.ndarray + smoothed means + P : numpy.ndarray + smoothed state covariances + K : numpy.ndarray + smoother gain at each step + Pp : numpy.ndarray + Predicted state covariances + Examples + -------- + .. code-block:: Python + zs = [t + random.randn()*4 for t in range (40)] + (mu, cov, _, _) = kalman.batch_filter(zs) + (x, P, K, Pp) = rts_smoother(mu, cov, kf.F, kf.Q) + """ + + if len(Xs) != len(Ps): + raise ValueError('length of Xs and Ps must be the same') + + n = Xs.shape[0] + dim_x = Xs.shape[1] + + if Fs is None: + Fs = [self.F] * n + if Qs is None: + Qs = [self.Q] * n + + # smoother gain + K = zeros((n, dim_x, dim_x)) + + x, P, Pp = Xs.copy(), Ps.copy(), Ps.copy() + for k in range(n-2, -1, -1): + Pp[k] = dot(dot(Fs[k+1], P[k]), Fs[k+1].T) + Qs[k+1] + + #pylint: disable=bad-whitespace + K[k] = dot(dot(P[k], Fs[k+1].T), inv(Pp[k])) + x[k] += dot(K[k], x[k+1] - dot(Fs[k+1], x[k])) + P[k] += dot(dot(K[k], P[k+1] - Pp[k]), K[k].T) + + return (x, P, K, Pp) + + def get_prediction(self, u=None, B=None, F=None, Q=None): + """ + Predict next state (prior) using the Kalman filter state propagation + equations and returns it without modifying the object. + Parameters + ---------- + u : np.array, default 0 + Optional control vector. + B : np.array(dim_x, dim_u), or None + Optional control transition matrix; a value of None + will cause the filter to use `self.B`. + F : np.array(dim_x, dim_x), or None + Optional state transition matrix; a value of None + will cause the filter to use `self.F`. + Q : np.array(dim_x, dim_x), scalar, or None + Optional process noise matrix; a value of None will cause the + filter to use `self.Q`. + Returns + ------- + (x, P) : tuple + State vector and covariance array of the prediction. + """ + + if B is None: + B = self.B + if F is None: + F = self.F + if Q is None: + Q = self.Q + elif isscalar(Q): + Q = eye(self.dim_x) * Q + + # x = Fx + Bu + if B is not None and u is not None: + x = dot(F, self.x) + dot(B, u) + else: + x = dot(F, self.x) + + # P = FPF' + Q + P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q + + return x, P + + def get_update(self, z=None): + """ + Computes the new estimate based on measurement `z` and returns it + without altering the state of the filter. + Parameters + ---------- + z : (dim_z, 1): array_like + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be convertible to a column vector. + Returns + ------- + (x, P) : tuple + State vector and covariance array of the update. + """ + + if z is None: + return self.x, self.P + z = reshape_z(z, self.dim_z, self.x.ndim) + + R = self.R + H = self.H + P = self.P + x = self.x + + # error (residual) between measurement and prediction + y = z - dot(H, x) + + # common subexpression for speed + PHT = dot(P, H.T) + + # project system uncertainty into measurement space + S = dot(H, PHT) + R + + # map system uncertainty into kalman gain + K = dot(PHT, self.inv(S)) + + # predict new x with residual scaled by the kalman gain + x = x + dot(K, y) + + # P = (I-KH)P(I-KH)' + KRK' + I_KH = self._I - dot(K, H) + P = dot(dot(I_KH, P), I_KH.T) + dot(dot(K, R), K.T) + + return x, P + + def residual_of(self, z): + """ + Returns the residual for the given measurement (z). Does not alter + the state of the filter. + """ + z = reshape_z(z, self.dim_z, self.x.ndim) + return z - dot(self.H, self.x_prior) + + def measurement_of_state(self, x): + """ + Helper function that converts a state into a measurement. + Parameters + ---------- + x : np.array + kalman state vector + Returns + ------- + z : (dim_z, 1): array_like + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be convertible to a column vector. + """ + + return dot(self.H, x) + + @property + def log_likelihood(self): + """ + log-likelihood of the last measurement. + """ + if self._log_likelihood is None: + self._log_likelihood = logpdf(x=self.y, cov=self.S) + return self._log_likelihood + + @property + def likelihood(self): + """ + Computed from the log-likelihood. The log-likelihood can be very + small, meaning a large negative value such as -28000. Taking the + exp() of that results in 0.0, which can break typical algorithms + which multiply by this value, so by default we always return a + number >= sys.float_info.min. + """ + if self._likelihood is None: + self._likelihood = exp(self.log_likelihood) + if self._likelihood == 0: + self._likelihood = sys.float_info.min + return self._likelihood + + @property + def mahalanobis(self): + """" + Mahalanobis distance of measurement. E.g. 3 means measurement + was 3 standard deviations away from the predicted value. + Returns + ------- + mahalanobis : float + """ + if self._mahalanobis is None: + self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y))) + return self._mahalanobis + + @property + def alpha(self): + """ + Fading memory setting. 1.0 gives the normal Kalman filter, and + values slightly larger than 1.0 (such as 1.02) give a fading + memory effect - previous measurements have less influence on the + filter's estimates. This formulation of the Fading memory filter + (there are many) is due to Dan Simon [1]_. + """ + return self._alpha_sq**.5 + + def log_likelihood_of(self, z): + """ + log likelihood of the measurement `z`. This should only be called + after a call to update(). Calling after predict() will yield an + incorrect result.""" + + if z is None: + return log(sys.float_info.min) + return logpdf(z, dot(self.H, self.x), self.S) + + @alpha.setter + def alpha(self, value): + if not np.isscalar(value) or value < 1: + raise ValueError('alpha must be a float greater than 1') + + self._alpha_sq = value**2 + + def __repr__(self): + return '\n'.join([ + 'KalmanFilter object', + pretty_str('dim_x', self.dim_x), + pretty_str('dim_z', self.dim_z), + pretty_str('dim_u', self.dim_u), + pretty_str('x', self.x), + pretty_str('P', self.P), + pretty_str('x_prior', self.x_prior), + pretty_str('P_prior', self.P_prior), + pretty_str('x_post', self.x_post), + pretty_str('P_post', self.P_post), + pretty_str('F', self.F), + pretty_str('Q', self.Q), + pretty_str('R', self.R), + pretty_str('H', self.H), + pretty_str('K', self.K), + pretty_str('y', self.y), + pretty_str('S', self.S), + pretty_str('SI', self.SI), + pretty_str('M', self.M), + pretty_str('B', self.B), + pretty_str('z', self.z), + pretty_str('log-likelihood', self.log_likelihood), + pretty_str('likelihood', self.likelihood), + pretty_str('mahalanobis', self.mahalanobis), + pretty_str('alpha', self.alpha), + pretty_str('inv', self.inv) + ]) + + def test_matrix_dimensions(self, z=None, H=None, R=None, F=None, Q=None): + """ + Performs a series of asserts to check that the size of everything + is what it should be. This can help you debug problems in your design. + If you pass in H, R, F, Q those will be used instead of this object's + value for those matrices. + Testing `z` (the measurement) is problamatic. x is a vector, and can be + implemented as either a 1D array or as a nx1 column vector. Thus Hx + can be of different shapes. Then, if Hx is a single value, it can + be either a 1D array or 2D vector. If either is true, z can reasonably + be a scalar (either '3' or np.array('3') are scalars under this + definition), a 1D, 1 element array, or a 2D, 1 element array. You are + allowed to pass in any combination that works. + """ + + if H is None: + H = self.H + if R is None: + R = self.R + if F is None: + F = self.F + if Q is None: + Q = self.Q + x = self.x + P = self.P + + assert x.ndim == 1 or x.ndim == 2, \ + "x must have one or two dimensions, but has {}".format(x.ndim) + + if x.ndim == 1: + assert x.shape[0] == self.dim_x, \ + "Shape of x must be ({},{}), but is {}".format( + self.dim_x, 1, x.shape) + else: + assert x.shape == (self.dim_x, 1), \ + "Shape of x must be ({},{}), but is {}".format( + self.dim_x, 1, x.shape) + + assert P.shape == (self.dim_x, self.dim_x), \ + "Shape of P must be ({},{}), but is {}".format( + self.dim_x, self.dim_x, P.shape) + + assert Q.shape == (self.dim_x, self.dim_x), \ + "Shape of Q must be ({},{}), but is {}".format( + self.dim_x, self.dim_x, P.shape) + + assert F.shape == (self.dim_x, self.dim_x), \ + "Shape of F must be ({},{}), but is {}".format( + self.dim_x, self.dim_x, F.shape) + + assert np.ndim(H) == 2, \ + "Shape of H must be (dim_z, {}), but is {}".format( + P.shape[0], shape(H)) + + assert H.shape[1] == P.shape[0], \ + "Shape of H must be (dim_z, {}), but is {}".format( + P.shape[0], H.shape) + + # shape of R must be the same as HPH' + hph_shape = (H.shape[0], H.shape[0]) + r_shape = shape(R) + + if H.shape[0] == 1: + # r can be scalar, 1D, or 2D in this case + assert r_shape in [(), (1,), (1, 1)], \ + "R must be scalar or one element array, but is shaped {}".format( + r_shape) + else: + assert r_shape == hph_shape, \ + "shape of R should be {} but it is {}".format(hph_shape, r_shape) + + + if z is not None: + z_shape = shape(z) + else: + z_shape = (self.dim_z, 1) + + # H@x must have shape of z + Hx = dot(H, x) + + if z_shape == (): # scalar or np.array(scalar) + assert Hx.ndim == 1 or shape(Hx) == (1, 1), \ + "shape of z should be {}, not {} for the given H".format( + shape(Hx), z_shape) + + elif shape(Hx) == (1,): + assert z_shape[0] == 1, 'Shape of z must be {} for the given H'.format(shape(Hx)) + + else: + assert (z_shape == shape(Hx) or + (len(z_shape) == 1 and shape(Hx) == (z_shape[0], 1))), \ + "shape of z should be {}, not {} for the given H".format( + shape(Hx), z_shape) + + if np.ndim(Hx) > 1 and shape(Hx) != (1, 1): + assert shape(Hx) == z_shape, \ + 'shape of z should be {} for the given H, but it is {}'.format( + shape(Hx), z_shape) + + +def update(x, P, z, R, H=None, return_all=False): + """ + Add a new measurement (z) to the Kalman filter. If z is None, nothing + is changed. + This can handle either the multidimensional or unidimensional case. If + all parameters are floats instead of arrays the filter will still work, + and return floats for x, P as the result. + update(1, 2, 1, 1, 1) # univariate + update(x, P, 1 + Parameters + ---------- + x : numpy.array(dim_x, 1), or float + State estimate vector + P : numpy.array(dim_x, dim_x), or float + Covariance matrix + z : (dim_z, 1): array_like + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be convertible to a column vector. + R : numpy.array(dim_z, dim_z), or float + Measurement noise matrix + H : numpy.array(dim_x, dim_x), or float, optional + Measurement function. If not provided, a value of 1 is assumed. + return_all : bool, default False + If true, y, K, S, and log_likelihood are returned, otherwise + only x and P are returned. + Returns + ------- + x : numpy.array + Posterior state estimate vector + P : numpy.array + Posterior covariance matrix + y : numpy.array or scalar + Residua. Difference between measurement and state in measurement space + K : numpy.array + Kalman gain + S : numpy.array + System uncertainty in measurement space + log_likelihood : float + log likelihood of the measurement + """ + + #pylint: disable=bare-except + + if z is None: + if return_all: + return x, P, None, None, None, None + return x, P + + if H is None: + H = np.array([1]) + + if np.isscalar(H): + H = np.array([H]) + + Hx = np.atleast_1d(dot(H, x)) + z = reshape_z(z, Hx.shape[0], x.ndim) + + # error (residual) between measurement and prediction + y = z - Hx + + # project system uncertainty into measurement space + S = dot(dot(H, P), H.T) + R + + + # map system uncertainty into kalman gain + try: + K = dot(dot(P, H.T), linalg.inv(S)) + except: + # can't invert a 1D array, annoyingly + K = dot(dot(P, H.T), 1./S) + + + # predict new x with residual scaled by the kalman gain + x = x + dot(K, y) + + # P = (I-KH)P(I-KH)' + KRK' + KH = dot(K, H) + + try: + I_KH = np.eye(KH.shape[0]) - KH + except: + I_KH = np.array([1 - KH]) + P = dot(dot(I_KH, P), I_KH.T) + dot(dot(K, R), K.T) + + + if return_all: + # compute log likelihood + log_likelihood = logpdf(z, dot(H, x), S) + return x, P, y, K, S, log_likelihood + return x, P + + +def update_steadystate(x, z, K, H=None): + """ + Add a new measurement (z) to the Kalman filter. If z is None, nothing + is changed. + Parameters + ---------- + x : numpy.array(dim_x, 1), or float + State estimate vector + z : (dim_z, 1): array_like + measurement for this update. z can be a scalar if dim_z is 1, + otherwise it must be convertible to a column vector. + K : numpy.array, or float + Kalman gain matrix + H : numpy.array(dim_x, dim_x), or float, optional + Measurement function. If not provided, a value of 1 is assumed. + Returns + ------- + x : numpy.array + Posterior state estimate vector + Examples + -------- + This can handle either the multidimensional or unidimensional case. If + all parameters are floats instead of arrays the filter will still work, + and return floats for x, P as the result. + >>> update_steadystate(1, 2, 1) # univariate + >>> update_steadystate(x, P, z, H) + """ + + + if z is None: + return x + + if H is None: + H = np.array([1]) + + if np.isscalar(H): + H = np.array([H]) + + Hx = np.atleast_1d(dot(H, x)) + z = reshape_z(z, Hx.shape[0], x.ndim) + + # error (residual) between measurement and prediction + y = z - Hx + + # estimate new x with residual scaled by the kalman gain + return x + dot(K, y) + + +def predict(x, P, F=1, Q=0, u=0, B=1, alpha=1.): + """ + Predict next state (prior) using the Kalman filter state propagation + equations. + Parameters + ---------- + x : numpy.array + State estimate vector + P : numpy.array + Covariance matrix + F : numpy.array() + State Transition matrix + Q : numpy.array, Optional + Process noise matrix + u : numpy.array, Optional, default 0. + Control vector. If non-zero, it is multiplied by B + to create the control input into the system. + B : numpy.array, optional, default 0. + Control transition matrix. + alpha : float, Optional, default=1.0 + Fading memory setting. 1.0 gives the normal Kalman filter, and + values slightly larger than 1.0 (such as 1.02) give a fading + memory effect - previous measurements have less influence on the + filter's estimates. This formulation of the Fading memory filter + (there are many) is due to Dan Simon + Returns + ------- + x : numpy.array + Prior state estimate vector + P : numpy.array + Prior covariance matrix + """ + + if np.isscalar(F): + F = np.array(F) + x = dot(F, x) + dot(B, u) + P = (alpha * alpha) * dot(dot(F, P), F.T) + Q + + return x, P + + +def predict_steadystate(x, F=1, u=0, B=1): + """ + Predict next state (prior) using the Kalman filter state propagation + equations. This steady state form only computes x, assuming that the + covariance is constant. + Parameters + ---------- + x : numpy.array + State estimate vector + P : numpy.array + Covariance matrix + F : numpy.array() + State Transition matrix + u : numpy.array, Optional, default 0. + Control vector. If non-zero, it is multiplied by B + to create the control input into the system. + B : numpy.array, optional, default 0. + Control transition matrix. + Returns + ------- + x : numpy.array + Prior state estimate vector + """ + + if np.isscalar(F): + F = np.array(F) + x = dot(F, x) + dot(B, u) + + return x + + + +def batch_filter(x, P, zs, Fs, Qs, Hs, Rs, Bs=None, us=None, + update_first=False, saver=None): + """ + Batch processes a sequences of measurements. + Parameters + ---------- + zs : list-like + list of measurements at each time step. Missing measurements must be + represented by None. + Fs : list-like + list of values to use for the state transition matrix matrix. + Qs : list-like + list of values to use for the process error + covariance. + Hs : list-like + list of values to use for the measurement matrix. + Rs : list-like + list of values to use for the measurement error + covariance. + Bs : list-like, optional + list of values to use for the control transition matrix; + a value of None in any position will cause the filter + to use `self.B` for that time step. + us : list-like, optional + list of values to use for the control input vector; + a value of None in any position will cause the filter to use + 0 for that time step. + update_first : bool, optional + controls whether the order of operations is update followed by + predict, or predict followed by update. Default is predict->update. + saver : filterpy.common.Saver, optional + filterpy.common.Saver object. If provided, saver.save() will be + called after every epoch + Returns + ------- + means : np.array((n,dim_x,1)) + array of the state for each time step after the update. Each entry + is an np.array. In other words `means[k,:]` is the state at step + `k`. + covariance : np.array((n,dim_x,dim_x)) + array of the covariances for each time step after the update. + In other words `covariance[k,:,:]` is the covariance at step `k`. + means_predictions : np.array((n,dim_x,1)) + array of the state for each time step after the predictions. Each + entry is an np.array. In other words `means[k,:]` is the state at + step `k`. + covariance_predictions : np.array((n,dim_x,dim_x)) + array of the covariances for each time step after the prediction. + In other words `covariance[k,:,:]` is the covariance at step `k`. + Examples + -------- + .. code-block:: Python + zs = [t + random.randn()*4 for t in range (40)] + Fs = [kf.F for t in range (40)] + Hs = [kf.H for t in range (40)] + (mu, cov, _, _) = kf.batch_filter(zs, Rs=R_list, Fs=Fs, Hs=Hs, Qs=None, + Bs=None, us=None, update_first=False) + (xs, Ps, Ks, Pps) = kf.rts_smoother(mu, cov, Fs=Fs, Qs=None) + """ + + n = np.size(zs, 0) + dim_x = x.shape[0] + + # mean estimates from Kalman Filter + if x.ndim == 1: + means = zeros((n, dim_x)) + means_p = zeros((n, dim_x)) + else: + means = zeros((n, dim_x, 1)) + means_p = zeros((n, dim_x, 1)) + + # state covariances from Kalman Filter + covariances = zeros((n, dim_x, dim_x)) + covariances_p = zeros((n, dim_x, dim_x)) + + if us is None: + us = [0.] * n + Bs = [0.] * n + + if update_first: + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): + + x, P = update(x, P, z, R=R, H=H) + means[i, :] = x + covariances[i, :, :] = P + + x, P = predict(x, P, u=u, B=B, F=F, Q=Q) + means_p[i, :] = x + covariances_p[i, :, :] = P + if saver is not None: + saver.save() + else: + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): + + x, P = predict(x, P, u=u, B=B, F=F, Q=Q) + means_p[i, :] = x + covariances_p[i, :, :] = P + + x, P = update(x, P, z, R=R, H=H) + means[i, :] = x + covariances[i, :, :] = P + if saver is not None: + saver.save() + + return (means, covariances, means_p, covariances_p) + + + +def rts_smoother(Xs, Ps, Fs, Qs): + """ + Runs the Rauch-Tung-Striebel Kalman smoother on a set of + means and covariances computed by a Kalman filter. The usual input + would come from the output of `KalmanFilter.batch_filter()`. + Parameters + ---------- + Xs : numpy.array + array of the means (state variable x) of the output of a Kalman + filter. + Ps : numpy.array + array of the covariances of the output of a kalman filter. + Fs : list-like collection of numpy.array + State transition matrix of the Kalman filter at each time step. + Qs : list-like collection of numpy.array, optional + Process noise of the Kalman filter at each time step. + Returns + ------- + x : numpy.ndarray + smoothed means + P : numpy.ndarray + smoothed state covariances + K : numpy.ndarray + smoother gain at each step + pP : numpy.ndarray + predicted state covariances + Examples + -------- + .. code-block:: Python + zs = [t + random.randn()*4 for t in range (40)] + (mu, cov, _, _) = kalman.batch_filter(zs) + (x, P, K, pP) = rts_smoother(mu, cov, kf.F, kf.Q) + """ + + if len(Xs) != len(Ps): + raise ValueError('length of Xs and Ps must be the same') + + n = Xs.shape[0] + dim_x = Xs.shape[1] + + # smoother gain + K = zeros((n, dim_x, dim_x)) + x, P, pP = Xs.copy(), Ps.copy(), Ps.copy() + + for k in range(n-2, -1, -1): + pP[k] = dot(dot(Fs[k], P[k]), Fs[k].T) + Qs[k] + + #pylint: disable=bad-whitespace + K[k] = dot(dot(P[k], Fs[k].T), linalg.inv(pP[k])) + x[k] += dot(K[k], x[k+1] - dot(Fs[k], x[k])) + P[k] += dot(dot(K[k], P[k+1] - pP[k]), K[k].T) + + return (x, P, K, pP) \ No newline at end of file diff --git a/trackers/ocsort_tracker/ocsort.py b/trackers/ocsort_tracker/ocsort.py new file mode 100644 index 0000000..7437f17 --- /dev/null +++ b/trackers/ocsort_tracker/ocsort.py @@ -0,0 +1,431 @@ +""" + This script is adopted from the SORT script by Alex Bewley alex@bewley.ai +""" +from __future__ import print_function + +import numpy as np +from .association import * + + +def k_previous_obs(observations, cur_age, k): + if len(observations) == 0: + return [-1, -1, -1, -1, -1] + for i in range(k): + dt = k - i + if cur_age - dt in observations: + return observations[cur_age-dt] + max_age = max(observations.keys()) + return observations[max_age] + + +def convert_bbox_to_z(bbox): + """ + Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form + [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is + the aspect ratio + """ + w = bbox[2] - bbox[0] + h = bbox[3] - bbox[1] + x = bbox[0] + w/2. + y = bbox[1] + h/2. + s = w * h # scale is just area + r = w / float(h+1e-6) + return np.array([x, y, s, r]).reshape((4, 1)) + + +def convert_x_to_bbox(x, score=None): + """ + Takes a bounding box in the centre form [x,y,s,r] and returns it in the form + [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right + """ + w = np.sqrt(x[2] * x[3]) + h = x[2] / w + if(score == None): + return np.array([x[0]-w/2., x[1]-h/2., x[0]+w/2., x[1]+h/2.]).reshape((1, 4)) + else: + return np.array([x[0]-w/2., x[1]-h/2., x[0]+w/2., x[1]+h/2., score]).reshape((1, 5)) + + +def speed_direction(bbox1, bbox2): + cx1, cy1 = (bbox1[0]+bbox1[2]) / 2.0, (bbox1[1]+bbox1[3])/2.0 + cx2, cy2 = (bbox2[0]+bbox2[2]) / 2.0, (bbox2[1]+bbox2[3])/2.0 + speed = np.array([cy2-cy1, cx2-cx1]) + norm = np.sqrt((cy2-cy1)**2 + (cx2-cx1)**2) + 1e-6 + return speed / norm + + +class KalmanBoxTracker(object): + """ + This class represents the internal state of individual tracked objects observed as bbox. + """ + count = 0 + + def __init__(self, bbox, delta_t=3, orig=False): + """ + Initialises a tracker using initial bounding box. + + """ + # define constant velocity model + if not orig: + from .kalmanfilter import KalmanFilterNew as KalmanFilter + self.kf = KalmanFilter(dim_x=7, dim_z=4) + else: + from filterpy.kalman import KalmanFilter + self.kf = KalmanFilter(dim_x=7, dim_z=4) + self.kf.F = np.array([[1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 1, 0], [0, 0, 1, 0, 0, 0, 1], [ + 0, 0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 1]]) + self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0], + [0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0]]) + + self.kf.R[2:, 2:] *= 10. + self.kf.P[4:, 4:] *= 1000. # give high uncertainty to the unobservable initial velocities + self.kf.P *= 10. + self.kf.Q[-1, -1] *= 0.01 + self.kf.Q[4:, 4:] *= 0.01 + + self.kf.x[:4] = convert_bbox_to_z(bbox) + self.time_since_update = 0 + self.id = KalmanBoxTracker.count + KalmanBoxTracker.count += 1 + self.history = [] + self.hits = 0 + self.hit_streak = 0 + self.age = 0 + """ + NOTE: [-1,-1,-1,-1,-1] is a compromising placeholder for non-observation status, the same for the return of + function k_previous_obs. It is ugly and I do not like it. But to support generate observation array in a + fast and unified way, which you would see below k_observations = np.array([k_previous_obs(...]]), let's bear it for now. + """ + self.last_observation = np.array([-1, -1, -1, -1, -1]) # placeholder + self.observations = dict() + self.history_observations = [] + self.velocity = None + self.delta_t = delta_t + + def update(self, bbox): + """ + Updates the state vector with observed bbox. + """ + if bbox is not None: + if self.last_observation.sum() >= 0: # no previous observation + previous_box = None + for i in range(self.delta_t): + dt = self.delta_t - i + if self.age - dt in self.observations: + previous_box = self.observations[self.age-dt] + break + if previous_box is None: + previous_box = self.last_observation + """ + Estimate the track speed direction with observations \Delta t steps away + """ + self.velocity = speed_direction(previous_box, bbox) + + """ + Insert new observations. This is a ugly way to maintain both self.observations + and self.history_observations. Bear it for the moment. + """ + self.last_observation = bbox + self.observations[self.age] = bbox + self.history_observations.append(bbox) + + self.time_since_update = 0 + self.history = [] + self.hits += 1 + self.hit_streak += 1 + self.kf.update(convert_bbox_to_z(bbox)) + else: + self.kf.update(bbox) + + def predict(self): + """ + Advances the state vector and returns the predicted bounding box estimate. + """ + if((self.kf.x[6]+self.kf.x[2]) <= 0): + self.kf.x[6] *= 0.0 + + self.kf.predict() + self.age += 1 + if(self.time_since_update > 0): + self.hit_streak = 0 + self.time_since_update += 1 + self.history.append(convert_x_to_bbox(self.kf.x)) + return self.history[-1] + + def get_state(self): + """ + Returns the current bounding box estimate. + """ + return convert_x_to_bbox(self.kf.x) + + +""" + We support multiple ways for association cost calculation, by default + we use IoU. GIoU may have better performance in some situations. We note + that we hardly normalize the cost by all methods to (0,1) which may not be + the best practice. +""" +ASSO_FUNCS = { "iou": iou_batch, + "giou": giou_batch, + "ciou": ciou_batch, + "diou": diou_batch, + "ct_dist": ct_dist} + + +class OCSort(object): + def __init__(self, det_thresh, max_age=30, min_hits=3, + iou_threshold=0.3, delta_t=3, asso_func="iou", inertia=0.2, use_byte=False): + """ + Sets key parameters for SORT + """ + self.max_age = max_age + self.min_hits = min_hits + self.iou_threshold = iou_threshold + self.trackers = [] + self.frame_count = 0 + self.det_thresh = det_thresh + self.delta_t = delta_t + self.asso_func = ASSO_FUNCS[asso_func] + self.inertia = inertia + self.use_byte = use_byte + KalmanBoxTracker.count = 0 + + def update(self, output_results, img_info, img_size): + """ + Params: + dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...] + Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections). + Returns the a similar array, where the last column is the object ID. + NOTE: The number of objects returned may differ from the number of detections provided. + """ + if output_results is None: + return np.empty((0, 5)) + + self.frame_count += 1 + # post_process detections + if output_results.shape[1] == 5: + scores = output_results[:, 4] + bboxes = output_results[:, :4] + else: + output_results = output_results.cpu().numpy() + scores = output_results[:, 4] * output_results[:, 5] + bboxes = output_results[:, :4] # x1y1x2y2 + img_h, img_w = img_info[0], img_info[1] + scale = min(img_size[0] / float(img_h), img_size[1] / float(img_w)) + bboxes /= scale + dets = np.concatenate((bboxes, np.expand_dims(scores, axis=-1)), axis=1) + inds_low = scores > 0.1 + inds_high = scores < self.det_thresh + inds_second = np.logical_and(inds_low, inds_high) # self.det_thresh > score > 0.1, for second matching + dets_second = dets[inds_second] # detections for second matching + remain_inds = scores > self.det_thresh + dets = dets[remain_inds] + + # get predicted locations from existing trackers. + trks = np.zeros((len(self.trackers), 5)) + to_del = [] + ret = [] + for t, trk in enumerate(trks): + pos = self.trackers[t].predict()[0] + trk[:] = [pos[0], pos[1], pos[2], pos[3], 0] + if np.any(np.isnan(pos)): + to_del.append(t) + trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) + for t in reversed(to_del): + self.trackers.pop(t) + + velocities = np.array( + [trk.velocity if trk.velocity is not None else np.array((0, 0)) for trk in self.trackers]) + last_boxes = np.array([trk.last_observation for trk in self.trackers]) + k_observations = np.array( + [k_previous_obs(trk.observations, trk.age, self.delta_t) for trk in self.trackers]) + + """ + First round of association + """ + matched, unmatched_dets, unmatched_trks = associate( + dets, trks, self.iou_threshold, velocities, k_observations, self.inertia) + for m in matched: + self.trackers[m[1]].update(dets[m[0], :]) + + """ + Second round of associaton by OCR + """ + # BYTE association + if self.use_byte and len(dets_second) > 0 and unmatched_trks.shape[0] > 0: + u_trks = trks[unmatched_trks] + iou_left = self.asso_func(dets_second, u_trks) # iou between low score detections and unmatched tracks + iou_left = np.array(iou_left) + if iou_left.max() > self.iou_threshold: + """ + NOTE: by using a lower threshold, e.g., self.iou_threshold - 0.1, you may + get a higher performance especially on MOT17/MOT20 datasets. But we keep it + uniform here for simplicity + """ + matched_indices = linear_assignment(-iou_left) + to_remove_trk_indices = [] + for m in matched_indices: + det_ind, trk_ind = m[0], unmatched_trks[m[1]] + if iou_left[m[0], m[1]] < self.iou_threshold: + continue + self.trackers[trk_ind].update(dets_second[det_ind, :]) + to_remove_trk_indices.append(trk_ind) + unmatched_trks = np.setdiff1d(unmatched_trks, np.array(to_remove_trk_indices)) + + if unmatched_dets.shape[0] > 0 and unmatched_trks.shape[0] > 0: + left_dets = dets[unmatched_dets] + left_trks = last_boxes[unmatched_trks] + iou_left = self.asso_func(left_dets, left_trks) + iou_left = np.array(iou_left) + if iou_left.max() > self.iou_threshold: + """ + NOTE: by using a lower threshold, e.g., self.iou_threshold - 0.1, you may + get a higher performance especially on MOT17/MOT20 datasets. But we keep it + uniform here for simplicity + """ + rematched_indices = linear_assignment(-iou_left) + to_remove_det_indices = [] + to_remove_trk_indices = [] + for m in rematched_indices: + det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]] + if iou_left[m[0], m[1]] < self.iou_threshold: + continue + self.trackers[trk_ind].update(dets[det_ind, :]) + to_remove_det_indices.append(det_ind) + to_remove_trk_indices.append(trk_ind) + unmatched_dets = np.setdiff1d(unmatched_dets, np.array(to_remove_det_indices)) + unmatched_trks = np.setdiff1d(unmatched_trks, np.array(to_remove_trk_indices)) + + for m in unmatched_trks: + self.trackers[m].update(None) + + # create and initialise new trackers for unmatched detections + for i in unmatched_dets: + trk = KalmanBoxTracker(dets[i, :], delta_t=self.delta_t) + self.trackers.append(trk) + i = len(self.trackers) + for trk in reversed(self.trackers): + if trk.last_observation.sum() < 0: + d = trk.get_state()[0] + else: + """ + this is optional to use the recent observation or the kalman filter prediction, + we didn't notice significant difference here + """ + d = trk.last_observation[:4] + if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits): + # +1 as MOT benchmark requires positive + ret.append(np.concatenate((d, [trk.id+1])).reshape(1, -1)) + i -= 1 + # remove dead tracklet + if(trk.time_since_update > self.max_age): + self.trackers.pop(i) + if(len(ret) > 0): + return np.concatenate(ret) + return np.empty((0, 5)) + + def update_public(self, dets, cates, scores): + self.frame_count += 1 + + det_scores = np.ones((dets.shape[0], 1)) + dets = np.concatenate((dets, det_scores), axis=1) + + remain_inds = scores > self.det_thresh + + cates = cates[remain_inds] + dets = dets[remain_inds] + + trks = np.zeros((len(self.trackers), 5)) + to_del = [] + ret = [] + for t, trk in enumerate(trks): + pos = self.trackers[t].predict()[0] + cat = self.trackers[t].cate + trk[:] = [pos[0], pos[1], pos[2], pos[3], cat] + if np.any(np.isnan(pos)): + to_del.append(t) + trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) + for t in reversed(to_del): + self.trackers.pop(t) + + velocities = np.array([trk.velocity if trk.velocity is not None else np.array((0,0)) for trk in self.trackers]) + last_boxes = np.array([trk.last_observation for trk in self.trackers]) + k_observations = np.array([k_previous_obs(trk.observations, trk.age, self.delta_t) for trk in self.trackers]) + + matched, unmatched_dets, unmatched_trks = associate_kitti\ + (dets, trks, cates, self.iou_threshold, velocities, k_observations, self.inertia) + + for m in matched: + self.trackers[m[1]].update(dets[m[0], :]) + + if unmatched_dets.shape[0] > 0 and unmatched_trks.shape[0] > 0: + """ + The re-association stage by OCR. + NOTE: at this stage, adding other strategy might be able to continue improve + the performance, such as BYTE association by ByteTrack. + """ + left_dets = dets[unmatched_dets] + left_trks = last_boxes[unmatched_trks] + left_dets_c = left_dets.copy() + left_trks_c = left_trks.copy() + + iou_left = self.asso_func(left_dets_c, left_trks_c) + iou_left = np.array(iou_left) + det_cates_left = cates[unmatched_dets] + trk_cates_left = trks[unmatched_trks][:,4] + num_dets = unmatched_dets.shape[0] + num_trks = unmatched_trks.shape[0] + cate_matrix = np.zeros((num_dets, num_trks)) + for i in range(num_dets): + for j in range(num_trks): + if det_cates_left[i] != trk_cates_left[j]: + """ + For some datasets, such as KITTI, there are different categories, + we have to avoid associate them together. + """ + cate_matrix[i][j] = -1e6 + iou_left = iou_left + cate_matrix + if iou_left.max() > self.iou_threshold - 0.1: + rematched_indices = linear_assignment(-iou_left) + to_remove_det_indices = [] + to_remove_trk_indices = [] + for m in rematched_indices: + det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]] + if iou_left[m[0], m[1]] < self.iou_threshold - 0.1: + continue + self.trackers[trk_ind].update(dets[det_ind, :]) + to_remove_det_indices.append(det_ind) + to_remove_trk_indices.append(trk_ind) + unmatched_dets = np.setdiff1d(unmatched_dets, np.array(to_remove_det_indices)) + unmatched_trks = np.setdiff1d(unmatched_trks, np.array(to_remove_trk_indices)) + + for i in unmatched_dets: + trk = KalmanBoxTracker(dets[i,:]) + trk.cate = cates[i] + self.trackers.append(trk) + i = len(self.trackers) + + for trk in reversed(self.trackers): + if trk.last_observation.sum() > 0: + d = trk.last_observation[:4] + else: + d = trk.get_state()[0] + if (trk.time_since_update < 1): + if (self.frame_count <= self.min_hits) or (trk.hit_streak >= self.min_hits): + # id+1 as MOT benchmark requires positive + ret.append(np.concatenate((d, [trk.id+1], [trk.cate], [0])).reshape(1,-1)) + if trk.hit_streak == self.min_hits: + # Head Padding (HP): recover the lost steps during initializing the track + for prev_i in range(self.min_hits - 1): + prev_observation = trk.history_observations[-(prev_i+2)] + ret.append((np.concatenate((prev_observation[:4], [trk.id+1], [trk.cate], + [-(prev_i+1)]))).reshape(1,-1)) + i -= 1 + if (trk.time_since_update > self.max_age): + self.trackers.pop(i) + + if(len(ret)>0): + return np.concatenate(ret) + return np.empty((0, 7)) + + diff --git a/uds_postprocessing.py b/uds_postprocessing.py index eca70e3..95b2e2e 100644 --- a/uds_postprocessing.py +++ b/uds_postprocessing.py @@ -1,9 +1,20 @@ import socket, struct, sys, json, asyncio, threading import websockets import time +import numpy as np from threading import Lock from collections import deque +from crowd_risk import distance_risk, motion_risk_and_path +from trackers.ocsort_tracker.ocsort import OCSort +import time +import cv2, base64 + +crowd_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) +face_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) +car_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) +anomy_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) +vip_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) encode_queue = deque() encode_lock = Lock() @@ -11,6 +22,10 @@ encode_lock = Lock() MAX_ENC_QUEUE = 4 frames_buffer = {} +id_card_state = {} + +CARD_SEND_INTERVAL = 1.0 +CARD_TIMEOUT = 0.0 SOCK_PATH = sys.argv[1] if len(sys.argv) > 1 else "/tmp/cam.sock" @@ -27,25 +42,23 @@ ENTRY_FMT = " clients={len(ws_clients)}") + except Exception: + pass + targets = list(ws_clients) coros = [ws.send(text) for ws in targets] results = await asyncio.gather(*coros, return_exceptions=True) @@ -184,6 +204,75 @@ def decode_reserved(resv): return fm8, md8 +def fm8_to_feature_index(fm8): + if fm8 is None: + return None + if fm8 == 0: + return None + return (fm8 & -fm8).bit_length() - 1 + + +def iou(a, b): + ax1, ay1, ax2, ay2 = a + bx1, by1, bx2, by2 = b + + inter_x1 = max(ax1, bx1) + inter_y1 = max(ay1, by1) + inter_x2 = min(ax2, bx2) + inter_y2 = min(ay2, by2) + + inter_w = max(0, inter_x2 - inter_x1) + inter_h = max(0, inter_y2 - inter_y1) + inter_area = inter_w * inter_h + + area_a = (ax2 - ax1) * (ay2 - ay1) + area_b = (bx2 - bx1) * (by2 - by1) + + union_area = area_a + area_b - inter_area + if union_area <= 0: + return 0.0 + + return inter_area / union_area + + +def sanitize_dets_xyxy_score( + dets_xyxy_score: np.ndarray, W: int, H: int, min_wh: float = 1.0 +) -> np.ndarray: + """ + dets_xyxy_score: (N,5) = [x1,y1,x2,y2,score] + """ + if dets_xyxy_score is None or len(dets_xyxy_score) == 0: + return np.empty((0, 5), dtype=np.float32) + + d = dets_xyxy_score.astype(np.float32, copy=True) + + finite_mask = np.all(np.isfinite(d[:, :5]), axis=1) + d = d[finite_mask] + if d.shape[0] == 0: + return np.empty((0, 5), dtype=np.float32) + + d[:, [0, 2]] = np.clip(d[:, [0, 2]], 0, W - 1) + d[:, [1, 3]] = np.clip(d[:, [1, 3]], 0, H - 1) + + x1, y1, x2, y2 = d[:, 0], d[:, 1], d[:, 2], d[:, 3] + d[:, 0], d[:, 1], d[:, 2], d[:, 3] = ( + np.minimum(x1, x2), + np.minimum(y1, y2), + np.maximum(x1, x2), + np.maximum(y1, y2), + ) + + w = d[:, 2] - d[:, 0] + h = d[:, 3] - d[:, 1] + size_mask = (w >= min_wh) & (h >= min_wh) + d = d[size_mask] + + if d.shape[0] > 0: + d = d[d[:, 4] > 0.0] + + return d + + def handle_frame(hdr, sock): magic, ver, ch, w, h, stride, pixfmt, bytes_len, ts_us = hdr @@ -215,8 +304,21 @@ def handle_det(hdr, sock): entries_raw = read_exact(sock, need_bytes) if count > 0 else b"" items = [] + det_rows = [] + det_face = [] + det_car = [] off = 0 + if count > 0: + _, _, _, _, _, _, _, first_resv = struct.unpack_from( + ENTRY_FMT, entries_raw, 0) + fm8, md8 = decode_reserved(first_resv) + feat_idx = fm8_to_feature_index(fm8) + + else: + fm8, md8 = None, None + feat_idx = None + for _ in range(count): prob, x, y, w, h, cls, tid, resv = struct.unpack_from( ENTRY_FMT, entries_raw, off) @@ -227,16 +329,71 @@ def handle_det(hdr, sock): tag = uds_dec_tag(cls) local_cls = uds_dec_local(cls) - items.append({ - "x1": float(x1), - "y1": float(y1), - "x2": float(x2), - "y2": float(y2), - "score": float(prob), - "cls": int(local_cls), - "tag": int(tag), - "tid": int(tid) - }) + if tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_CROWD: + det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) + elif tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_FACEATTR: + det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) + elif tag == UDS_TAG_LPR and local_cls == 0 and feat_idx == FEAT_LPR: + det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) + elif tag == UDS_TAG_ABNORMAL and local_cls == 0 and feat_idx == FEAT_ABNORM: + det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) + elif tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_VIPTRACK: + det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) + else: + if feat_idx == FEAT_FACEATTR and tag == UDS_TAG_FACE: + det_face.append([x1, y1, x2, y2]) + + if feat_idx == FEAT_LPR and tag == UDS_TAG_DET and local_cls == 2: + det_car.append([x1, y1, x2, y2]) + + items.append({ + "x1": float(x1), + "y1": float(y1), + "x2": float(x2), + "y2": float(y2), + "cls": int(local_cls), + "tag": int(tag), + "tid": int(tid) + }) + + if det_rows: + arr = np.asarray(det_rows, dtype=np.float32) + dets_xyxy_score = arr[:, :5] + track_cls = 0 + tag = int(UDS_TAG_DET) + + dets_for_tracker = sanitize_dets_xyxy_score(dets_xyxy_score, W, H, min_wh=1.0) + + if feat_idx == FEAT_CROWD: + tracks = crowd_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) + + if feat_idx == FEAT_FACEATTR: + tracks = face_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) + + if feat_idx == FEAT_LPR: + tracks = car_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) + tag = UDS_TAG_LPR + + if feat_idx == FEAT_ABNORM: + tracks = anomy_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) + tag = UDS_TAG_ABNORMAL + + if feat_idx == FEAT_VIPTRACK: + tracks = vip_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) + + if tracks is not None and len(tracks) > 0: + for t in tracks: + if t.shape[0] < 5: + continue + tx1, ty1, tx2, ty2, tid = float(t[0]), float(t[1]), float(t[2]), float(t[3]), int(t[4]) + + items.append({ + "x1": tx1, "y1": ty1, + "x2": tx2, "y2": ty2, + "tid": tid, + "cls": track_cls, + "tag": tag, + }) if not frames_buffer: return @@ -244,12 +401,146 @@ def handle_det(hdr, sock): candidate_ts = min(frames_buffer.keys(), key=lambda t: abs(t - ts_us)) if abs(candidate_ts - ts_us) > 200000: + now_fail = time.time() + for it in items: + tid2 = it["tid"] + st = id_card_state.get(tid2) + if not st: + continue + + if now_fail - st["last_update"] > CARD_TIMEOUT: + id_card_state.pop(tid2, None) + return jpeg_bytes = frames_buffer.pop(candidate_ts, None) if jpeg_bytes is None: return + now = time.time() + + if feat_idx == FEAT_FACEATTR: + for it in items: + if it["tag"] != UDS_TAG_DET: + continue + + if it["cls"] != 0: + continue + + tid2 = it["tid"] + x1 = it["x1"] + y1 = it["y1"] + x2 = it["x2"] + y2 = it["y2"] + + id_card_state["frame"] = jpeg_bytes + id_card_state["feat_idx"] = feat_idx + + id_card_state[tid2] = { + "bbox": [x1, y1, x2, y2], + "last_update": now, + } + + best_face = None + best_iou = 0.0 + person_bbox = [x1, y1, x2, y2] + + for fx1, fy1, fx2, fy2 in det_face: + face_bbox = [fx1, fy1, fx2, fy2] + i = iou(person_bbox, face_bbox) + if i > best_iou: + best_iou = i + best_face = face_bbox + + if best_face is not None: + id_card_state[tid2]["face"] = best_face + + if feat_idx == FEAT_ABNORM: + for it in items: + if it["tag"] != UDS_TAG_ABNORMAL: + continue + + if it["cls"] != 0: + continue + + tid2 = it["tid"] + x1 = it["x1"] + y1 = it["y1"] + x2 = it["x2"] + y2 = it["y2"] + + id_card_state["frame"] = jpeg_bytes + id_card_state["feat_idx"] = feat_idx + + id_card_state[tid2] = { + "bbox": [x1, y1, x2, y2], + "last_update": now, + } + + if feat_idx == FEAT_LPR: + for it in items: + if it["tag"] != UDS_TAG_LPR: + continue + + if it["cls"] != 0: + continue + + tid2 = it["tid"] + x1 = it["x1"] + y1 = it["y1"] + x2 = it["x2"] + y2 = it["y2"] + + id_card_state["frame"] = jpeg_bytes + id_card_state["feat_idx"] = feat_idx + + id_card_state[tid2] = { + "bbox": [x1, y1, x2, y2], + "last_update": now, + } + + best_car = None + best_iou = 0.0 + lp_bbox = [x1, y1, x2, y2] + + for fx1, fy1, fx2, fy2 in det_car: + print("det_car:", fx1, fy1, fx2, fy2) + car_bbox = [fx1, fy1, fx2, fy2] + i = iou(lp_bbox, car_bbox) + if i > best_iou: + best_iou = i + best_car = car_bbox + + if best_car is not None: + print("best_car:", best_car) + id_card_state[tid2]["car"] = best_car + + if feat_idx == FEAT_VIPTRACK: + persons = [] + for it in items: + if it.get("tag") == UDS_TAG_DET and it.get("cls") == 0: + persons.append(it) + + if persons: + for it in items: + if it.get("tag") != UDS_TAG_FACE: + continue + + face_bbox = [it["x1"], it["y1"], it["x2"], it["y2"]] + + best_iou = 0.0 + best_person_tid = None + + for p in persons: + person_bbox = [p["x1"], p["y1"], p["x2"], p["y2"]] + i = iou(person_bbox, face_bbox) + if i > best_iou: + best_iou = i + best_person_tid = p["tid"] + + if best_person_tid is not None: + it["tid"] = int(best_person_tid) + meta = { "type": "frame", "ch": int(ch), @@ -259,6 +550,24 @@ def handle_det(hdr, sock): "items": items, } + if feat_idx == FEAT_CROWD: + dist_val = distance_risk(items) + motion_val, traj = motion_risk_and_path(items) + + risk_info = { + "dist_risk": dist_val, + "motion_risk": motion_val, + "risk_total": max(dist_val, motion_val) + } + + trajectory = traj + + if risk_info is not None: + meta["risk"] = risk_info + + if trajectory is not None: + meta["trajectory"] = {tid: list(path) for tid, path in trajectory.items()} + def push(): if len(frame_queue) >= 5: frame_queue.popleft() @@ -336,6 +645,8 @@ def uds_loop(): time.sleep(1) +import asyncio + async def ws_frame_sender(): global ws_frames_total, ws_frames_sec @@ -345,28 +656,146 @@ async def ws_frame_sender(): await asyncio.sleep(0.005) continue + # deque.pop() 빈 경우는 IndexError가 일반적 try: meta, jpeg_bytes = frame_queue.pop() - except ValueError: + except IndexError: await asyncio.sleep(0.002) continue text = json.dumps(meta) - await ws_broadcast_text(text) - await ws_broadcast_binary(jpeg_bytes) + + await asyncio.wait_for(ws_broadcast_text(text), timeout=0.3) + await asyncio.wait_for(ws_broadcast_binary(jpeg_bytes), timeout=0.3) with ws_frames_lock: ws_frames_total += 1 - ws_frames_sec += 1 + ws_frames_sec += 1 + + except asyncio.TimeoutError: + print("[WS][FRAME_SENDER][TIMEOUT] broadcast stalled (slow client?)") + await asyncio.sleep(0.05) except Exception as e: - print(f"[WS][FRAME_SENDER][ERR] {e}") + print(f"[WS][FRAME_SENDER][ERR] {type(e).__name__}: {e}") await asyncio.sleep(0.05) - await asyncio.sleep(0.001) + await asyncio.sleep(0.001) + + + +def card_sender_loop(): + while True: + + global pre_feat_idx + + now = time.time() + send_list = [] + + remove = [] + frame_bytes = id_card_state.get("frame") + feat_idx = id_card_state.get("feat_idx") + + if pre_feat_idx != feat_idx: + pre_feat_idx = feat_idx + continue + + for tid, info in list(id_card_state.items()): + if not isinstance(tid, int): + continue + if now - info["last_update"] > CARD_TIMEOUT: + remove.append(tid) + + for tid in remove: + id_card_state.pop(tid, None) + + if frame_bytes is None: + time.sleep(0.1) + continue + + try: + np_frame = np.frombuffer(frame_bytes, dtype=np.uint8) + frame = cv2.imdecode(np_frame, cv2.IMREAD_COLOR) + except: + time.sleep(0.1) + continue + + for tid, info in list(id_card_state.items()): + if not isinstance(tid, int): + continue + face_b64 = None + car_b64 = None + + x1, y1, x2, y2 = map(int, info["bbox"]) + crop = frame[y1:y2, x1:x2] + if crop.size == 0: + continue + + ok, buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, 80]) + if not ok: + continue + + b64 = base64.b64encode(buf).decode("utf-8") + + if feat_idx == FEAT_FACEATTR: + if "face" in info: + fx1, fy1, fx2, fy2 = map(int, info["face"]) + face_crop = frame[fy1:fy2, fx1:fx2] + if face_crop.size != 0: + ok2, buf2 = cv2.imencode(".jpg", face_crop, [cv2.IMWRITE_JPEG_QUALITY, 80]) + if ok2: + face_b64 = base64.b64encode(buf2).decode("utf-8") + entry = { + "tid": tid, + "person": b64, + "appear": "jacket, pants, bag" + } + + if face_b64 is not None: + entry["face"] = face_b64 + + if feat_idx == FEAT_LPR: + if "car" in info: + fx1, fy1, fx2, fy2 = map(int, info["car"]) + lp_crop = frame[fy1:fy2, fx1:fx2] + if lp_crop.size != 0: + ok2, buf2 = cv2.imencode(".jpg", lp_crop, [cv2.IMWRITE_JPEG_QUALITY, 80]) + if ok2: + car_b64 = base64.b64encode(buf2).decode("utf-8") + entry = { + "tid": tid, + "lp": b64, + "ocr": "12머 3532" + } + + if car_b64 is not None: + entry["car"] = car_b64 + + if feat_idx == FEAT_ABNORM: + entry = { + "tid": tid, + "fallen": b64 + } + + send_list.append(entry) + + if send_list: + meta = { + "type": "card", + "timestamp": int(now * 1000), + "items": send_list, + } + if loop_main: + asyncio.run_coroutine_threadsafe( + ws_broadcast_text(json.dumps(meta)), + loop_main + ) + + time.sleep(1.0) + async def main_async(): - + global loop_main loop_main = asyncio.get_running_loop() @@ -387,10 +816,13 @@ async def main_async(): t_uds = threading.Thread(target=uds_loop, daemon=True) t_uds.start() + t_card = threading.Thread(target=card_sender_loop, daemon=True) + t_card.start() + t2 = threading.Thread(target=uds_stats_printer_loop, daemon=True) t2.start() await server.wait_closed() if __name__ == "__main__": - asyncio.run(main_async()) \ No newline at end of file + asyncio.run(main_async())