import socket, struct, sys, json, asyncio, threading import websockets import time import numpy as np from threading import Lock from collections import deque from crowd_risk import distance_risk, motion_risk_and_path from trackers.ocsort_tracker.ocsort import OCSort import time import cv2, base64 crowd_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) face_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) car_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) anomy_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) vip_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1) encode_queue = deque() encode_lock = Lock() MAX_ENC_QUEUE = 4 frames_buffer = {} id_card_state = {} CARD_SEND_INTERVAL = 1.0 CARD_TIMEOUT = 0.0 SOCK_PATH = sys.argv[1] if len(sys.argv) > 1 else "/tmp/cam.sock" # ----- UDS 포맷 ----- FRAME_MAGIC = b"FRA\x00" FRAME_HDR_FMT = "<4s6I2Q" # magic, ver, ch, w, h, stride, pixfmt, bytes, ts_us FRAME_HDR_SIZE = struct.calcsize(FRAME_HDR_FMT) DET_MAGIC = b"UDSD" DET_HDR_FMT = "<4sIIIQI" # magic, ver, ch, seq, ts_us, count DET_HDR_SIZE = struct.calcsize(DET_HDR_FMT) ENTRY_FMT = " bytes: buf = bytearray(n) mv = memoryview(buf) got = 0 while got < n: r = sock.recv_into(mv[got:], n - got) if r == 0: raise ConnectionError("peer closed") got += r return buf def uds_dec_tag(c: int) -> int: return (c >> 12) & 0xF # 상위 4비트 def uds_dec_local(c: int) -> int: return c & 0x0FFF # 하위 12비트 async def ws_handler(ws): ws_clients.add(ws) print(f"[WS] client connected ({len(ws_clients)})") try: async for _ in ws: # 클라이언트 → 서버 메시지는 현재 사용 안 함 pass finally: ws_clients.discard(ws) print(f"[WS] client disconnected ({len(ws_clients)})") async def ws_broadcast_text(text: str): if not ws_clients: return try: meta = json.loads(text) if meta.get("type") == "card": print(f"[WS][TEXT] send card -> clients={len(ws_clients)}") except Exception: pass targets = list(ws_clients) coros = [ws.send(text) for ws in targets] results = await asyncio.gather(*coros, return_exceptions=True) ok_count = 0 for ws, r in zip(targets, results): if isinstance(r, Exception): ws_clients.discard(ws) else: ok_count += 1 if ok_count > 0: ws_add_bytes(len(text.encode("utf-8")) * ok_count) async def ws_broadcast_binary(data: bytes): if not ws_clients: return targets = list(ws_clients) coros = [ws.send(data) for ws in targets] results = await asyncio.gather(*coros, return_exceptions=True) ok_count = 0 for ws, r in zip(targets, results): if isinstance(r, Exception): ws_clients.discard(ws) else: ok_count += 1 if ok_count > 0: ws_add_bytes(len(data) * ok_count) def ws_broadcast_frame(ch, ts_us, w, h, rgb_bytes, stride): if loop_main is None or not ws_clients: return with encode_lock: if len(encode_queue) >= MAX_ENC_QUEUE: while len(encode_queue) >= MAX_ENC_QUEUE: encode_queue.popleft() encode_queue.append((ch, ts_us, w, h, bytes(rgb_bytes), int(stride))) def ws_broadcast_det(ch, seq, ts_us, items, crowd_count=None): """UDS 스레드에서 호출: 디텍션 메타 텍스트 JSON 전송 예약""" if loop_main is None or not ws_clients: return meta = { "type": "det", "ch": int(ch), "seq": int(seq), "ts_us": int(ts_us), "items": items, } if crowd_count is not None: meta["crowd_count"] = int(crowd_count) text = json.dumps(meta) asyncio.run_coroutine_threadsafe(ws_broadcast_text(text), loop_main) def decode_reserved(resv): md8 = resv & 0xFF # low = feature fm8 = (resv >> 8) & 0xFF # high = model return fm8, md8 def fm8_to_feature_index(fm8): if fm8 is None: return None if fm8 == 0: return None return (fm8 & -fm8).bit_length() - 1 def iou(a, b): ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b inter_x1 = max(ax1, bx1) inter_y1 = max(ay1, by1) inter_x2 = min(ax2, bx2) inter_y2 = min(ay2, by2) inter_w = max(0, inter_x2 - inter_x1) inter_h = max(0, inter_y2 - inter_y1) inter_area = inter_w * inter_h area_a = (ax2 - ax1) * (ay2 - ay1) area_b = (bx2 - bx1) * (by2 - by1) union_area = area_a + area_b - inter_area if union_area <= 0: return 0.0 return inter_area / union_area def sanitize_dets_xyxy_score( dets_xyxy_score: np.ndarray, W: int, H: int, min_wh: float = 1.0 ) -> np.ndarray: """ dets_xyxy_score: (N,5) = [x1,y1,x2,y2,score] """ if dets_xyxy_score is None or len(dets_xyxy_score) == 0: return np.empty((0, 5), dtype=np.float32) d = dets_xyxy_score.astype(np.float32, copy=True) finite_mask = np.all(np.isfinite(d[:, :5]), axis=1) d = d[finite_mask] if d.shape[0] == 0: return np.empty((0, 5), dtype=np.float32) d[:, [0, 2]] = np.clip(d[:, [0, 2]], 0, W - 1) d[:, [1, 3]] = np.clip(d[:, [1, 3]], 0, H - 1) x1, y1, x2, y2 = d[:, 0], d[:, 1], d[:, 2], d[:, 3] d[:, 0], d[:, 1], d[:, 2], d[:, 3] = ( np.minimum(x1, x2), np.minimum(y1, y2), np.maximum(x1, x2), np.maximum(y1, y2), ) w = d[:, 2] - d[:, 0] h = d[:, 3] - d[:, 1] size_mask = (w >= min_wh) & (h >= min_wh) d = d[size_mask] if d.shape[0] > 0: d = d[d[:, 4] > 0.0] return d def handle_frame(hdr, sock): magic, ver, ch, w, h, stride, pixfmt, bytes_len, ts_us = hdr if bytes_len <= 0 or bytes_len > 20 * 1024 * 1024: raise ConnectionError("invalid bytes_len") buf = read_exact(sock, bytes_len) global uds_frames_total, uds_frames_sec with uds_cnt_lock: uds_frames_total += 1 uds_frames_sec += 1 if pixfmt != WS_PIXFMT_JPEG: return frames_buffer[ts_us] = bytes(buf) if len(frames_buffer) > 6: old_keys = sorted(frames_buffer.keys())[:-3] for k in old_keys: frames_buffer.pop(k, None) def handle_det(hdr, sock): magic, ver, ch, seq, ts_us, count = hdr need_bytes = count * ENTRY_SIZE entries_raw = read_exact(sock, need_bytes) if count > 0 else b"" items = [] det_rows = [] det_face = [] det_car = [] off = 0 if count > 0: _, _, _, _, _, _, _, first_resv = struct.unpack_from( ENTRY_FMT, entries_raw, 0) fm8, md8 = decode_reserved(first_resv) feat_idx = fm8_to_feature_index(fm8) else: fm8, md8 = None, None feat_idx = None for _ in range(count): prob, x, y, w, h, cls, tid, resv = struct.unpack_from( ENTRY_FMT, entries_raw, off) off += ENTRY_SIZE x1, y1, x2, y2 = x, y, x+w, y+h tag = uds_dec_tag(cls) local_cls = uds_dec_local(cls) if tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_CROWD: det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) elif tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_FACEATTR: det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) elif tag == UDS_TAG_LPR and local_cls == 0 and feat_idx == FEAT_LPR: det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) elif tag == UDS_TAG_ABNORMAL and local_cls == 0 and feat_idx == FEAT_ABNORM: det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) elif tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_VIPTRACK: det_rows.append([x1, y1, x2, y2, prob, int(local_cls)]) else: if feat_idx == FEAT_FACEATTR and tag == UDS_TAG_FACE: det_face.append([x1, y1, x2, y2]) if feat_idx == FEAT_LPR and tag == UDS_TAG_DET and local_cls == 2: det_car.append([x1, y1, x2, y2]) items.append({ "x1": float(x1), "y1": float(y1), "x2": float(x2), "y2": float(y2), "cls": int(local_cls), "tag": int(tag), "tid": int(tid) }) if det_rows: arr = np.asarray(det_rows, dtype=np.float32) dets_xyxy_score = arr[:, :5] track_cls = 0 tag = int(UDS_TAG_DET) dets_for_tracker = sanitize_dets_xyxy_score(dets_xyxy_score, W, H, min_wh=1.0) if feat_idx == FEAT_CROWD: tracks = crowd_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) if feat_idx == FEAT_FACEATTR: tracks = face_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) if feat_idx == FEAT_LPR: tracks = car_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) tag = UDS_TAG_LPR if feat_idx == FEAT_ABNORM: tracks = anomy_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) tag = UDS_TAG_ABNORMAL if feat_idx == FEAT_VIPTRACK: tracks = vip_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W)) if tracks is not None and len(tracks) > 0: for t in tracks: if t.shape[0] < 5: continue tx1, ty1, tx2, ty2, tid = float(t[0]), float(t[1]), float(t[2]), float(t[3]), int(t[4]) items.append({ "x1": tx1, "y1": ty1, "x2": tx2, "y2": ty2, "tid": tid, "cls": track_cls, "tag": tag, }) if not frames_buffer: return candidate_ts = min(frames_buffer.keys(), key=lambda t: abs(t - ts_us)) if abs(candidate_ts - ts_us) > 200000: now_fail = time.time() for it in items: tid2 = it["tid"] st = id_card_state.get(tid2) if not st: continue if now_fail - st["last_update"] > CARD_TIMEOUT: id_card_state.pop(tid2, None) return jpeg_bytes = frames_buffer.pop(candidate_ts, None) if jpeg_bytes is None: return now = time.time() if feat_idx == FEAT_FACEATTR: for it in items: if it["tag"] != UDS_TAG_DET: continue if it["cls"] != 0: continue tid2 = it["tid"] x1 = it["x1"] y1 = it["y1"] x2 = it["x2"] y2 = it["y2"] id_card_state["frame"] = jpeg_bytes id_card_state["feat_idx"] = feat_idx id_card_state[tid2] = { "bbox": [x1, y1, x2, y2], "last_update": now, } best_face = None best_iou = 0.0 person_bbox = [x1, y1, x2, y2] for fx1, fy1, fx2, fy2 in det_face: face_bbox = [fx1, fy1, fx2, fy2] i = iou(person_bbox, face_bbox) if i > best_iou: best_iou = i best_face = face_bbox if best_face is not None: id_card_state[tid2]["face"] = best_face if feat_idx == FEAT_ABNORM: for it in items: if it["tag"] != UDS_TAG_ABNORMAL: continue if it["cls"] != 0: continue tid2 = it["tid"] x1 = it["x1"] y1 = it["y1"] x2 = it["x2"] y2 = it["y2"] id_card_state["frame"] = jpeg_bytes id_card_state["feat_idx"] = feat_idx id_card_state[tid2] = { "bbox": [x1, y1, x2, y2], "last_update": now, } if feat_idx == FEAT_LPR: for it in items: if it["tag"] != UDS_TAG_LPR: continue if it["cls"] != 0: continue tid2 = it["tid"] x1 = it["x1"] y1 = it["y1"] x2 = it["x2"] y2 = it["y2"] id_card_state["frame"] = jpeg_bytes id_card_state["feat_idx"] = feat_idx id_card_state[tid2] = { "bbox": [x1, y1, x2, y2], "last_update": now, } best_car = None best_iou = 0.0 lp_bbox = [x1, y1, x2, y2] for fx1, fy1, fx2, fy2 in det_car: print("det_car:", fx1, fy1, fx2, fy2) car_bbox = [fx1, fy1, fx2, fy2] i = iou(lp_bbox, car_bbox) if i > best_iou: best_iou = i best_car = car_bbox if best_car is not None: print("best_car:", best_car) id_card_state[tid2]["car"] = best_car if feat_idx == FEAT_VIPTRACK: persons = [] for it in items: if it.get("tag") == UDS_TAG_DET and it.get("cls") == 0: persons.append(it) if persons: for it in items: if it.get("tag") != UDS_TAG_FACE: continue face_bbox = [it["x1"], it["y1"], it["x2"], it["y2"]] best_iou = 0.0 best_person_tid = None for p in persons: person_bbox = [p["x1"], p["y1"], p["x2"], p["y2"]] i = iou(person_bbox, face_bbox) if i > best_iou: best_iou = i best_person_tid = p["tid"] if best_person_tid is not None: it["tid"] = int(best_person_tid) meta = { "type": "frame", "ch": int(ch), "ts_us": int(candidate_ts), "w": int(W), "h": int(H), "items": items, } if feat_idx == FEAT_CROWD: dist_val = distance_risk(items) motion_val, traj = motion_risk_and_path(items) risk_info = { "dist_risk": dist_val, "motion_risk": motion_val, "risk_total": max(dist_val, motion_val) } trajectory = traj if risk_info is not None: meta["risk"] = risk_info if trajectory is not None: meta["trajectory"] = {tid: list(path) for tid, path in trajectory.items()} def push(): if len(frame_queue) >= 5: frame_queue.popleft() frame_queue.append((meta, jpeg_bytes)) loop_main.call_soon_threadsafe(push) def uds_stats_printer_loop(): global uds_frames_sec, uds_dets_sec, uds_frames_total, uds_dets_total while True: time.sleep(1) with uds_cnt_lock: f_sec = uds_frames_sec d_sec = uds_dets_sec uds_frames_sec = 0 uds_dets_sec = 0 f_tot = uds_frames_total d_tot = uds_dets_total print(f"[UDS][RATE] {f_sec} fps, {d_sec} dps | total frames={f_tot}, dets={d_tot}") def uds_loop(): while True: try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4*1024*1024) sock.connect(SOCK_PATH) print(f"[UDS] connected to {SOCK_PATH}") try: while True: magic = read_exact(sock, 4) if magic == FRAME_MAGIC: rest = read_exact(sock, FRAME_HDR_SIZE - 4) hdr = struct.unpack(FRAME_HDR_FMT, magic + rest) try: handle_frame(hdr, sock) except ConnectionError as e: print(f"[UDS][ERR] handle_frame ConnectionError: {e}") raise except Exception as e: print(f"[UDS][ERR] handle_frame EXC: {e}") break elif magic == DET_MAGIC: rest = read_exact(sock, DET_HDR_SIZE - 4) hdr = struct.unpack(DET_HDR_FMT, magic + rest) try: handle_det(hdr, sock) except ConnectionError as e: print(f"[UDS][ERR] handle_det ConnectionError: {e}") raise except Exception as e: print(f"[UDS][ERR] handle_det EXC: {e}") break else: print(f"[UDS][WARN] unknown magic={magic!r} hex={magic.hex()}") break finally: sock.close() print("[UDS] disconnected") except FileNotFoundError: print(f"[UDS][ERR] {SOCK_PATH} not found, retry...") except ConnectionError as e: print(f"[UDS][ERR] connection closed, retry... ({e})") except Exception as e: print(f"[UDS][ERR] uds_loop outer EXC: {e}, retry...") time.sleep(1) import asyncio async def ws_frame_sender(): global ws_frames_total, ws_frames_sec while True: try: if not frame_queue or not ws_clients: await asyncio.sleep(0.005) continue # deque.pop() 빈 경우는 IndexError가 일반적 try: meta, jpeg_bytes = frame_queue.pop() except IndexError: await asyncio.sleep(0.002) continue text = json.dumps(meta) await asyncio.wait_for(ws_broadcast_text(text), timeout=0.3) await asyncio.wait_for(ws_broadcast_binary(jpeg_bytes), timeout=0.3) with ws_frames_lock: ws_frames_total += 1 ws_frames_sec += 1 except asyncio.TimeoutError: print("[WS][FRAME_SENDER][TIMEOUT] broadcast stalled (slow client?)") await asyncio.sleep(0.05) except Exception as e: print(f"[WS][FRAME_SENDER][ERR] {type(e).__name__}: {e}") await asyncio.sleep(0.05) await asyncio.sleep(0.001) def card_sender_loop(): while True: global pre_feat_idx now = time.time() send_list = [] remove = [] frame_bytes = id_card_state.get("frame") feat_idx = id_card_state.get("feat_idx") if pre_feat_idx != feat_idx: pre_feat_idx = feat_idx continue for tid, info in list(id_card_state.items()): if not isinstance(tid, int): continue if now - info["last_update"] > CARD_TIMEOUT: remove.append(tid) for tid in remove: id_card_state.pop(tid, None) if frame_bytes is None: time.sleep(0.1) continue try: np_frame = np.frombuffer(frame_bytes, dtype=np.uint8) frame = cv2.imdecode(np_frame, cv2.IMREAD_COLOR) except: time.sleep(0.1) continue for tid, info in list(id_card_state.items()): if not isinstance(tid, int): continue face_b64 = None car_b64 = None x1, y1, x2, y2 = map(int, info["bbox"]) crop = frame[y1:y2, x1:x2] if crop.size == 0: continue ok, buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, 80]) if not ok: continue b64 = base64.b64encode(buf).decode("utf-8") if feat_idx == FEAT_FACEATTR: if "face" in info: fx1, fy1, fx2, fy2 = map(int, info["face"]) face_crop = frame[fy1:fy2, fx1:fx2] if face_crop.size != 0: ok2, buf2 = cv2.imencode(".jpg", face_crop, [cv2.IMWRITE_JPEG_QUALITY, 80]) if ok2: face_b64 = base64.b64encode(buf2).decode("utf-8") entry = { "tid": tid, "person": b64, "appear": "jacket, pants, bag" } if face_b64 is not None: entry["face"] = face_b64 if feat_idx == FEAT_LPR: if "car" in info: fx1, fy1, fx2, fy2 = map(int, info["car"]) lp_crop = frame[fy1:fy2, fx1:fx2] if lp_crop.size != 0: ok2, buf2 = cv2.imencode(".jpg", lp_crop, [cv2.IMWRITE_JPEG_QUALITY, 80]) if ok2: car_b64 = base64.b64encode(buf2).decode("utf-8") entry = { "tid": tid, "lp": b64, "ocr": "12머 3532" } if car_b64 is not None: entry["car"] = car_b64 if feat_idx == FEAT_ABNORM: entry = { "tid": tid, "fallen": b64 } send_list.append(entry) if send_list: meta = { "type": "card", "timestamp": int(now * 1000), "items": send_list, } if loop_main: asyncio.run_coroutine_threadsafe( ws_broadcast_text(json.dumps(meta)), loop_main ) time.sleep(1.0) async def main_async(): global loop_main loop_main = asyncio.get_running_loop() server = await websockets.serve( ws_handler, "0.0.0.0", 8765, max_queue=None, write_limit=2**20, ping_interval=30, ping_timeout=30, compression=None ) print("[WS] listening on 0.0.0.0:8765") asyncio.create_task(ws_frame_sender()) t_uds = threading.Thread(target=uds_loop, daemon=True) t_uds.start() t_card = threading.Thread(target=card_sender_loop, daemon=True) t_card.start() t2 = threading.Thread(target=uds_stats_printer_loop, daemon=True) t2.start() await server.wait_closed() if __name__ == "__main__": asyncio.run(main_async())