You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

829 lines
23 KiB

import socket, struct, sys, json, asyncio, threading
import websockets
import time
import numpy as np
from threading import Lock
from collections import deque
from crowd_risk import distance_risk, motion_risk_and_path
from trackers.ocsort_tracker.ocsort import OCSort
import time
import cv2, base64
crowd_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1)
face_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1)
car_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1)
anomy_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1)
vip_tracker = OCSort(det_thresh=0.3, iou_threshold=0.3, max_age=30, min_hits=1, delta_t=1)
encode_queue = deque()
encode_lock = Lock()
MAX_ENC_QUEUE = 4
frames_buffer = {}
id_card_state = {}
CARD_SEND_INTERVAL = 1.0
CARD_TIMEOUT = 0.0
SOCK_PATH = sys.argv[1] if len(sys.argv) > 1 else "/tmp/cam.sock"
# ----- UDS 포맷 -----
FRAME_MAGIC = b"FRA\x00"
FRAME_HDR_FMT = "<4s6I2Q" # magic, ver, ch, w, h, stride, pixfmt, bytes, ts_us
FRAME_HDR_SIZE = struct.calcsize(FRAME_HDR_FMT)
DET_MAGIC = b"UDSD"
DET_HDR_FMT = "<4sIIIQI" # magic, ver, ch, seq, ts_us, count
DET_HDR_SIZE = struct.calcsize(DET_HDR_FMT)
ENTRY_FMT = "<fffffHHH" # prob, x, y, w, h, cls, tid, resv
ENTRY_SIZE = struct.calcsize(ENTRY_FMT)
WS_PIXFMT_RGB24 = 1
WS_PIXFMT_JPEG = 3
UDS_TAG_DET = 1
UDS_TAG_FIRE = 2
UDS_TAG_FACE = 3
UDS_TAG_LPR = 4
UDS_TAG_ABNORMAL = 5
FEAT_OBJDET = 0
FEAT_ABNORM = 1
FEAT_CROWD = 2
FEAT_FIRE = 3
FEAT_LPR = 4
FEAT_FACEATTR = 5
FEAT_VIPTRACK = 6
pre_feat_idx = None
W = 1080
H = 720
uds_cnt_lock = Lock()
uds_frames_total = 0
uds_dets_total = 0
uds_frames_sec = 0
uds_dets_sec = 0
# ws debug stats
ws_frames_total = 0
ws_frames_sec = 0
ws_frames_lock = Lock()
ws_clients = set()
last_frame_ts = {}
loop_main = None
ws_bytes_total = 0
ws_bytes_lock = Lock()
frame_queue = deque(maxlen=5)
def ws_add_bytes(n: int):
global ws_bytes_total
with ws_bytes_lock:
ws_bytes_total += n
def read_exact(sock, n: int) -> bytes:
buf = bytearray(n)
mv = memoryview(buf)
got = 0
while got < n:
r = sock.recv_into(mv[got:], n - got)
if r == 0:
raise ConnectionError("peer closed")
got += r
return buf
def uds_dec_tag(c: int) -> int:
return (c >> 12) & 0xF # 상위 4비트
def uds_dec_local(c: int) -> int:
return c & 0x0FFF # 하위 12비트
async def ws_handler(ws):
ws_clients.add(ws)
print(f"[WS] client connected ({len(ws_clients)})")
try:
async for _ in ws:
# 클라이언트 → 서버 메시지는 현재 사용 안 함
pass
finally:
ws_clients.discard(ws)
print(f"[WS] client disconnected ({len(ws_clients)})")
async def ws_broadcast_text(text: str):
if not ws_clients:
return
try:
meta = json.loads(text)
if meta.get("type") == "card":
print(f"[WS][TEXT] send card -> clients={len(ws_clients)}")
except Exception:
pass
targets = list(ws_clients)
coros = [ws.send(text) for ws in targets]
results = await asyncio.gather(*coros, return_exceptions=True)
ok_count = 0
for ws, r in zip(targets, results):
if isinstance(r, Exception):
ws_clients.discard(ws)
else:
ok_count += 1
if ok_count > 0:
ws_add_bytes(len(text.encode("utf-8")) * ok_count)
async def ws_broadcast_binary(data: bytes):
if not ws_clients:
return
targets = list(ws_clients)
coros = [ws.send(data) for ws in targets]
results = await asyncio.gather(*coros, return_exceptions=True)
ok_count = 0
for ws, r in zip(targets, results):
if isinstance(r, Exception):
ws_clients.discard(ws)
else:
ok_count += 1
if ok_count > 0:
ws_add_bytes(len(data) * ok_count)
def ws_broadcast_frame(ch, ts_us, w, h, rgb_bytes, stride):
if loop_main is None or not ws_clients:
return
with encode_lock:
if len(encode_queue) >= MAX_ENC_QUEUE:
while len(encode_queue) >= MAX_ENC_QUEUE:
encode_queue.popleft()
encode_queue.append((ch, ts_us, w, h, bytes(rgb_bytes), int(stride)))
def ws_broadcast_det(ch, seq, ts_us, items, crowd_count=None):
"""UDS 스레드에서 호출: 디텍션 메타 텍스트 JSON 전송 예약"""
if loop_main is None or not ws_clients:
return
meta = {
"type": "det",
"ch": int(ch),
"seq": int(seq),
"ts_us": int(ts_us),
"items": items,
}
if crowd_count is not None:
meta["crowd_count"] = int(crowd_count)
text = json.dumps(meta)
asyncio.run_coroutine_threadsafe(ws_broadcast_text(text), loop_main)
def decode_reserved(resv):
md8 = resv & 0xFF # low = feature
fm8 = (resv >> 8) & 0xFF # high = model
return fm8, md8
def fm8_to_feature_index(fm8):
if fm8 is None:
return None
if fm8 == 0:
return None
return (fm8 & -fm8).bit_length() - 1
def iou(a, b):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
inter_x1 = max(ax1, bx1)
inter_y1 = max(ay1, by1)
inter_x2 = min(ax2, bx2)
inter_y2 = min(ay2, by2)
inter_w = max(0, inter_x2 - inter_x1)
inter_h = max(0, inter_y2 - inter_y1)
inter_area = inter_w * inter_h
area_a = (ax2 - ax1) * (ay2 - ay1)
area_b = (bx2 - bx1) * (by2 - by1)
union_area = area_a + area_b - inter_area
if union_area <= 0:
return 0.0
return inter_area / union_area
def sanitize_dets_xyxy_score(
dets_xyxy_score: np.ndarray, W: int, H: int, min_wh: float = 1.0
) -> np.ndarray:
"""
dets_xyxy_score: (N,5) = [x1,y1,x2,y2,score]
"""
if dets_xyxy_score is None or len(dets_xyxy_score) == 0:
return np.empty((0, 5), dtype=np.float32)
d = dets_xyxy_score.astype(np.float32, copy=True)
finite_mask = np.all(np.isfinite(d[:, :5]), axis=1)
d = d[finite_mask]
if d.shape[0] == 0:
return np.empty((0, 5), dtype=np.float32)
d[:, [0, 2]] = np.clip(d[:, [0, 2]], 0, W - 1)
d[:, [1, 3]] = np.clip(d[:, [1, 3]], 0, H - 1)
x1, y1, x2, y2 = d[:, 0], d[:, 1], d[:, 2], d[:, 3]
d[:, 0], d[:, 1], d[:, 2], d[:, 3] = (
np.minimum(x1, x2),
np.minimum(y1, y2),
np.maximum(x1, x2),
np.maximum(y1, y2),
)
w = d[:, 2] - d[:, 0]
h = d[:, 3] - d[:, 1]
size_mask = (w >= min_wh) & (h >= min_wh)
d = d[size_mask]
if d.shape[0] > 0:
d = d[d[:, 4] > 0.0]
return d
def handle_frame(hdr, sock):
magic, ver, ch, w, h, stride, pixfmt, bytes_len, ts_us = hdr
if bytes_len <= 0 or bytes_len > 20 * 1024 * 1024:
raise ConnectionError("invalid bytes_len")
buf = read_exact(sock, bytes_len)
global uds_frames_total, uds_frames_sec
with uds_cnt_lock:
uds_frames_total += 1
uds_frames_sec += 1
if pixfmt != WS_PIXFMT_JPEG:
return
frames_buffer[ts_us] = bytes(buf)
if len(frames_buffer) > 6:
old_keys = sorted(frames_buffer.keys())[:-3]
for k in old_keys:
frames_buffer.pop(k, None)
def handle_det(hdr, sock):
magic, ver, ch, seq, ts_us, count = hdr
need_bytes = count * ENTRY_SIZE
entries_raw = read_exact(sock, need_bytes) if count > 0 else b""
items = []
det_rows = []
det_face = []
det_car = []
off = 0
if count > 0:
_, _, _, _, _, _, _, first_resv = struct.unpack_from(
ENTRY_FMT, entries_raw, 0)
fm8, md8 = decode_reserved(first_resv)
feat_idx = fm8_to_feature_index(fm8)
else:
fm8, md8 = None, None
feat_idx = None
for _ in range(count):
prob, x, y, w, h, cls, tid, resv = struct.unpack_from(
ENTRY_FMT, entries_raw, off)
off += ENTRY_SIZE
x1, y1, x2, y2 = x, y, x+w, y+h
tag = uds_dec_tag(cls)
local_cls = uds_dec_local(cls)
if tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_CROWD:
det_rows.append([x1, y1, x2, y2, prob, int(local_cls)])
elif tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_FACEATTR:
det_rows.append([x1, y1, x2, y2, prob, int(local_cls)])
elif tag == UDS_TAG_LPR and local_cls == 0 and feat_idx == FEAT_LPR:
det_rows.append([x1, y1, x2, y2, prob, int(local_cls)])
elif tag == UDS_TAG_ABNORMAL and local_cls == 0 and feat_idx == FEAT_ABNORM:
det_rows.append([x1, y1, x2, y2, prob, int(local_cls)])
elif tag == UDS_TAG_DET and local_cls == 0 and feat_idx == FEAT_VIPTRACK:
det_rows.append([x1, y1, x2, y2, prob, int(local_cls)])
else:
if feat_idx == FEAT_FACEATTR and tag == UDS_TAG_FACE:
det_face.append([x1, y1, x2, y2])
if feat_idx == FEAT_LPR and tag == UDS_TAG_DET and local_cls == 2:
det_car.append([x1, y1, x2, y2])
items.append({
"x1": float(x1),
"y1": float(y1),
"x2": float(x2),
"y2": float(y2),
"cls": int(local_cls),
"tag": int(tag),
"tid": int(tid)
})
if det_rows:
arr = np.asarray(det_rows, dtype=np.float32)
dets_xyxy_score = arr[:, :5]
track_cls = 0
tag = int(UDS_TAG_DET)
dets_for_tracker = sanitize_dets_xyxy_score(dets_xyxy_score, W, H, min_wh=1.0)
if feat_idx == FEAT_CROWD:
tracks = crowd_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W))
if feat_idx == FEAT_FACEATTR:
tracks = face_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W))
if feat_idx == FEAT_LPR:
tracks = car_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W))
tag = UDS_TAG_LPR
if feat_idx == FEAT_ABNORM:
tracks = anomy_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W))
tag = UDS_TAG_ABNORMAL
if feat_idx == FEAT_VIPTRACK:
tracks = vip_tracker.update(dets_for_tracker, img_info=(H, W), img_size=(H, W))
if tracks is not None and len(tracks) > 0:
for t in tracks:
if t.shape[0] < 5:
continue
tx1, ty1, tx2, ty2, tid = float(t[0]), float(t[1]), float(t[2]), float(t[3]), int(t[4])
items.append({
"x1": tx1, "y1": ty1,
"x2": tx2, "y2": ty2,
"tid": tid,
"cls": track_cls,
"tag": tag,
})
if not frames_buffer:
return
candidate_ts = min(frames_buffer.keys(), key=lambda t: abs(t - ts_us))
if abs(candidate_ts - ts_us) > 200000:
now_fail = time.time()
for it in items:
tid2 = it["tid"]
st = id_card_state.get(tid2)
if not st:
continue
if now_fail - st["last_update"] > CARD_TIMEOUT:
id_card_state.pop(tid2, None)
return
jpeg_bytes = frames_buffer.pop(candidate_ts, None)
if jpeg_bytes is None:
return
now = time.time()
if feat_idx == FEAT_FACEATTR:
for it in items:
if it["tag"] != UDS_TAG_DET:
continue
if it["cls"] != 0:
continue
tid2 = it["tid"]
x1 = it["x1"]
y1 = it["y1"]
x2 = it["x2"]
y2 = it["y2"]
id_card_state["frame"] = jpeg_bytes
id_card_state["feat_idx"] = feat_idx
id_card_state[tid2] = {
"bbox": [x1, y1, x2, y2],
"last_update": now,
}
best_face = None
best_iou = 0.0
person_bbox = [x1, y1, x2, y2]
for fx1, fy1, fx2, fy2 in det_face:
face_bbox = [fx1, fy1, fx2, fy2]
i = iou(person_bbox, face_bbox)
if i > best_iou:
best_iou = i
best_face = face_bbox
if best_face is not None:
id_card_state[tid2]["face"] = best_face
if feat_idx == FEAT_ABNORM:
for it in items:
if it["tag"] != UDS_TAG_ABNORMAL:
continue
if it["cls"] != 0:
continue
tid2 = it["tid"]
x1 = it["x1"]
y1 = it["y1"]
x2 = it["x2"]
y2 = it["y2"]
id_card_state["frame"] = jpeg_bytes
id_card_state["feat_idx"] = feat_idx
id_card_state[tid2] = {
"bbox": [x1, y1, x2, y2],
"last_update": now,
}
if feat_idx == FEAT_LPR:
for it in items:
if it["tag"] != UDS_TAG_LPR:
continue
if it["cls"] != 0:
continue
tid2 = it["tid"]
x1 = it["x1"]
y1 = it["y1"]
x2 = it["x2"]
y2 = it["y2"]
id_card_state["frame"] = jpeg_bytes
id_card_state["feat_idx"] = feat_idx
id_card_state[tid2] = {
"bbox": [x1, y1, x2, y2],
"last_update": now,
}
best_car = None
best_iou = 0.0
lp_bbox = [x1, y1, x2, y2]
for fx1, fy1, fx2, fy2 in det_car:
print("det_car:", fx1, fy1, fx2, fy2)
car_bbox = [fx1, fy1, fx2, fy2]
i = iou(lp_bbox, car_bbox)
if i > best_iou:
best_iou = i
best_car = car_bbox
if best_car is not None:
print("best_car:", best_car)
id_card_state[tid2]["car"] = best_car
if feat_idx == FEAT_VIPTRACK:
persons = []
for it in items:
if it.get("tag") == UDS_TAG_DET and it.get("cls") == 0:
persons.append(it)
if persons:
for it in items:
if it.get("tag") != UDS_TAG_FACE:
continue
face_bbox = [it["x1"], it["y1"], it["x2"], it["y2"]]
best_iou = 0.0
best_person_tid = None
for p in persons:
person_bbox = [p["x1"], p["y1"], p["x2"], p["y2"]]
i = iou(person_bbox, face_bbox)
if i > best_iou:
best_iou = i
best_person_tid = p["tid"]
if best_person_tid is not None:
it["tid"] = int(best_person_tid)
meta = {
"type": "frame",
"ch": int(ch),
"ts_us": int(candidate_ts),
"w": int(W),
"h": int(H),
"items": items,
}
if feat_idx == FEAT_CROWD:
dist_val = distance_risk(items)
motion_val, traj = motion_risk_and_path(items)
risk_info = {
"dist_risk": dist_val,
"motion_risk": motion_val,
"risk_total": max(dist_val, motion_val)
}
trajectory = traj
if risk_info is not None:
meta["risk"] = risk_info
if trajectory is not None:
meta["trajectory"] = {tid: list(path) for tid, path in trajectory.items()}
def push():
if len(frame_queue) >= 5:
frame_queue.popleft()
frame_queue.append((meta, jpeg_bytes))
loop_main.call_soon_threadsafe(push)
def uds_stats_printer_loop():
global uds_frames_sec, uds_dets_sec, uds_frames_total, uds_dets_total
while True:
time.sleep(1)
with uds_cnt_lock:
f_sec = uds_frames_sec
d_sec = uds_dets_sec
uds_frames_sec = 0
uds_dets_sec = 0
f_tot = uds_frames_total
d_tot = uds_dets_total
print(f"[UDS][RATE] {f_sec} fps, {d_sec} dps | total frames={f_tot}, dets={d_tot}")
def uds_loop():
while True:
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4*1024*1024)
sock.connect(SOCK_PATH)
print(f"[UDS] connected to {SOCK_PATH}")
try:
while True:
magic = read_exact(sock, 4)
if magic == FRAME_MAGIC:
rest = read_exact(sock, FRAME_HDR_SIZE - 4)
hdr = struct.unpack(FRAME_HDR_FMT, magic + rest)
try:
handle_frame(hdr, sock)
except ConnectionError as e:
print(f"[UDS][ERR] handle_frame ConnectionError: {e}")
raise
except Exception as e:
print(f"[UDS][ERR] handle_frame EXC: {e}")
break
elif magic == DET_MAGIC:
rest = read_exact(sock, DET_HDR_SIZE - 4)
hdr = struct.unpack(DET_HDR_FMT, magic + rest)
try:
handle_det(hdr, sock)
except ConnectionError as e:
print(f"[UDS][ERR] handle_det ConnectionError: {e}")
raise
except Exception as e:
print(f"[UDS][ERR] handle_det EXC: {e}")
break
else:
print(f"[UDS][WARN] unknown magic={magic!r} hex={magic.hex()}")
break
finally:
sock.close()
print("[UDS] disconnected")
except FileNotFoundError:
print(f"[UDS][ERR] {SOCK_PATH} not found, retry...")
except ConnectionError as e:
print(f"[UDS][ERR] connection closed, retry... ({e})")
except Exception as e:
print(f"[UDS][ERR] uds_loop outer EXC: {e}, retry...")
time.sleep(1)
import asyncio
async def ws_frame_sender():
global ws_frames_total, ws_frames_sec
while True:
try:
if not frame_queue or not ws_clients:
await asyncio.sleep(0.005)
continue
# deque.pop() 빈 경우는 IndexError가 일반적
try:
meta, jpeg_bytes = frame_queue.pop()
except IndexError:
await asyncio.sleep(0.002)
continue
text = json.dumps(meta)
await asyncio.wait_for(ws_broadcast_text(text), timeout=0.3)
await asyncio.wait_for(ws_broadcast_binary(jpeg_bytes), timeout=0.3)
with ws_frames_lock:
ws_frames_total += 1
ws_frames_sec += 1
except asyncio.TimeoutError:
print("[WS][FRAME_SENDER][TIMEOUT] broadcast stalled (slow client?)")
await asyncio.sleep(0.05)
except Exception as e:
print(f"[WS][FRAME_SENDER][ERR] {type(e).__name__}: {e}")
await asyncio.sleep(0.05)
await asyncio.sleep(0.001)
def card_sender_loop():
while True:
global pre_feat_idx
now = time.time()
send_list = []
remove = []
frame_bytes = id_card_state.get("frame")
feat_idx = id_card_state.get("feat_idx")
if pre_feat_idx != feat_idx:
pre_feat_idx = feat_idx
continue
for tid, info in list(id_card_state.items()):
if not isinstance(tid, int):
continue
if now - info["last_update"] > CARD_TIMEOUT:
remove.append(tid)
for tid in remove:
id_card_state.pop(tid, None)
if frame_bytes is None:
time.sleep(0.1)
continue
try:
np_frame = np.frombuffer(frame_bytes, dtype=np.uint8)
frame = cv2.imdecode(np_frame, cv2.IMREAD_COLOR)
except:
time.sleep(0.1)
continue
for tid, info in list(id_card_state.items()):
if not isinstance(tid, int):
continue
face_b64 = None
car_b64 = None
x1, y1, x2, y2 = map(int, info["bbox"])
crop = frame[y1:y2, x1:x2]
if crop.size == 0:
continue
ok, buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, 80])
if not ok:
continue
b64 = base64.b64encode(buf).decode("utf-8")
if feat_idx == FEAT_FACEATTR:
if "face" in info:
fx1, fy1, fx2, fy2 = map(int, info["face"])
face_crop = frame[fy1:fy2, fx1:fx2]
if face_crop.size != 0:
ok2, buf2 = cv2.imencode(".jpg", face_crop, [cv2.IMWRITE_JPEG_QUALITY, 80])
if ok2:
face_b64 = base64.b64encode(buf2).decode("utf-8")
entry = {
"tid": tid,
"person": b64,
"appear": "jacket, pants, bag"
}
if face_b64 is not None:
entry["face"] = face_b64
if feat_idx == FEAT_LPR:
if "car" in info:
fx1, fy1, fx2, fy2 = map(int, info["car"])
lp_crop = frame[fy1:fy2, fx1:fx2]
if lp_crop.size != 0:
ok2, buf2 = cv2.imencode(".jpg", lp_crop, [cv2.IMWRITE_JPEG_QUALITY, 80])
if ok2:
car_b64 = base64.b64encode(buf2).decode("utf-8")
entry = {
"tid": tid,
"lp": b64,
"ocr": "12머 3532"
}
if car_b64 is not None:
entry["car"] = car_b64
if feat_idx == FEAT_ABNORM:
entry = {
"tid": tid,
"fallen": b64
}
send_list.append(entry)
if send_list:
meta = {
"type": "card",
"timestamp": int(now * 1000),
"items": send_list,
}
if loop_main:
asyncio.run_coroutine_threadsafe(
ws_broadcast_text(json.dumps(meta)),
loop_main
)
time.sleep(1.0)
async def main_async():
global loop_main
loop_main = asyncio.get_running_loop()
server = await websockets.serve(
ws_handler,
"0.0.0.0",
8765,
max_queue=None,
write_limit=2**20,
ping_interval=30,
ping_timeout=30,
compression=None
)
print("[WS] listening on 0.0.0.0:8765")
asyncio.create_task(ws_frame_sender())
t_uds = threading.Thread(target=uds_loop, daemon=True)
t_uds.start()
t_card = threading.Thread(target=card_sender_loop, daemon=True)
t_card.start()
t2 = threading.Thread(target=uds_stats_printer_loop, daemon=True)
t2.start()
await server.wait_closed()
if __name__ == "__main__":
asyncio.run(main_async())