diff --git a/README.md b/README.md index 4ef9875..175f784 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,132 @@ -# tta-uds-streamer +tta-uds-streamer +๐Ÿ“˜ TTA UDS Streamer +C ๋ชจ๋“ˆ์ด ์ „์†กํ•˜๋Š” UDS ๊ธฐ๋ฐ˜ ํ”„๋ ˆ์ž„/๋””ํ…์…˜ ์ŠคํŠธ๋ฆผ์„ ์ฒ˜๋ฆฌํ•˜๋Š” Python ๋ชจ๋“ˆ + +๐Ÿ“Œ ๊ฐœ์š” + +์ด ํ”„๋กœ์ ํŠธ๋Š” C ์ธก์—์„œ Unix Domain Socket(UDS) ์„ ํ†ตํ•ด ์ „์†กํ•˜๋Š” +JPEG ํ”„๋ ˆ์ž„(FRA) + ๋””ํ…์…˜(UDSD) ๋ฐ์ดํ„ฐ๋ฅผ Python์—์„œ ์ˆ˜์‹ ํ•˜์—ฌ + +ํ”„๋ ˆ์ž„ ๋ฒ„ํผ๋ง + +๋””ํ…์…˜ ํƒ€์ž„์Šคํƒฌํ”„ ๋งค์นญ + +WebSocket ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ + +๊นŒ์ง€ ์ฒ˜๋ฆฌํ•˜๋Š” ํŒŒ์ดํ”„๋ผ์ธ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. + +๋ณธ ์ €์žฅ์†Œ๋Š” Python ์ธก ๋ชจ๋“ˆ๋งŒ ํฌํ•จํ•˜๋ฉฐ, +C ์ธก NPU โ†’ Postprocess โ†’ UDS ๋ชจ๋“ˆ(cam_ws_app ๋“ฑ) ๊ณผ ์—ฐ๋™ํ•˜์—ฌ ๋™์ž‘ํ•ฉ๋‹ˆ๋‹ค. + +๐Ÿ“ ๋””๋ ‰ํ† ๋ฆฌ ๊ตฌ์กฐ +tta-uds-streamer/ +โ”‚ +โ”œโ”€โ”€ uds_postprocessing.py # ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ ์ฒ˜๋ฆฌ ๋ชจ๋“ˆ +โ”‚ +โ”œโ”€โ”€ feat_control/ # ๊ธฐ๋Šฅ ์ œ์–ด ๋ชจ๋“ˆ +โ”‚ โ””โ”€โ”€ ctrl_cli.py # ๊ธฐ๋Šฅ ON/OFF ์ œ์–ด CLI (UDS ์ œ์–ด) +โ”‚ +โ”œโ”€โ”€ ctrl_features.sh # ๊ธฐ๋Šฅ ON/OFF ์ œ์–ด ์Šคํฌ๋ฆฝํŠธ +โ”‚ +โ”œโ”€โ”€ requirements.txt # ํŒจํ‚ค์ง€ ๋ชฉ๋ก (๋ฒ„์ „ ์ œ๊ฑฐ๋จ) +โ”‚ +โ””โ”€โ”€ README.md + +๐Ÿ”ง ํŒจํ‚ค์ง€ ์„ค์น˜ +pip install -r requirements.txt + +๐Ÿš€ ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰ +python uds_postprocessing.py + + +C ํ”„๋กœ๊ทธ๋žจ์ด /tmp/cam.sock UDS ์†Œ์ผ“์— +์•„๋ž˜ ํฌ๋งท ๊ทธ๋Œ€๋กœ ์ „์†กํ•˜๋ฉด Python์ด ์ž๋™์œผ๋กœ ํ”„๋ ˆ์ž„ ๋ฐ ๋””ํ…์…˜์„ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. + +๐Ÿ“ก ๋ฐ์ดํ„ฐ ํฌ๋งท ์š”์•ฝ +FRA (Frame) +ํ•„๋“œ ์„ค๋ช… +magic "FRA\0" +width / height JPEG ์ด๋ฏธ์ง€ ํฌ๊ธฐ +stride JPEG ๋ฐ”์ดํŠธ ๊ธธ์ด +pixfmt WS_PIXFMT_JPEG +ts_us ํ”„๋ ˆ์ž„ ํƒ€์ž„์Šคํƒฌํ”„(ฮผs) +payload JPEG binary +UDSD (Detection) +ํ•„๋“œ ์„ค๋ช… +magic "UDSD" +count ๋””ํ…์…˜ ์ˆ˜ +ENTRY prob, x, y, w, h, cls, ... +ts_us ๋””ํ…์…˜ ํƒ€์ž„์Šคํƒฌํ”„(ฮผs) +๐Ÿงฉ ๊ธฐ๋Šฅ ์š”์•ฝ + +์ตœ๊ทผ ํ”„๋ ˆ์ž„ N์žฅ ๋ฒ„ํผ๋ง + +๋””ํ…์…˜ ํƒ€์ž„์Šคํƒฌํ”„์™€ ๊ฐ€์žฅ ๊ฐ€๊นŒ์šด ํ”„๋ ˆ์ž„ ์ž๋™ ๋งค์นญ + +C ์ธก bytesort ๊ธฐ๋ฐ˜ tracking ID ์ ์šฉ ๊ฐ€๋Šฅ + +WebSocket ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ์ง€์› + +C ๋ชจ๋“ˆ์˜ ๊ธฐ๋Šฅ ํ”Œ๋ž˜๊ทธ๋ฅผ UDS๋ฅผ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„ ์ œ์–ด ๊ฐ€๋Šฅ + +๐ŸŽฎ Feature Control ์‚ฌ์šฉ๋ฒ• + +Python์˜ ctrl_cli.py์™€ Shell ์Šคํฌ๋ฆฝํŠธ ctrl_features.sh๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ +C ์ŠคํŠธ๋ฆฌ๋จธ์˜ ๊ธฐ๋Šฅ(OBJDET, FIRE, FACE ๋“ฑ)์„ ON/OFFํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. + +๐Ÿ“ ๊ด€๋ จ ํŒŒ์ผ +feat_control/ +โ””โ”€โ”€ ctrl_cli.py # ๊ธฐ๋Šฅ ์ œ์–ด ๋ฉ”์‹œ์ง€๋ฅผ UDS๋กœ ์ „์†กํ•˜๋Š” Python CLI +ctrl_features.sh # ์—ฌ๋Ÿฌ ๊ธฐ๋Šฅ์„ ํ•œ ๋ฒˆ์— ON/OFFํ•˜๋Š” ์Šคํฌ๋ฆฝํŠธ + +๐Ÿ”ง ctrl_features.sh ์‚ฌ์šฉ๋ฒ• +๊ธฐ๋ณธ ๊ตฌ์กฐ +./ctrl_features.sh FEATURE [FEATURE ...] + +์˜ˆ์‹œ +๊ฐ์ฒด ํƒ์ง€๋งŒ ํ™œ์„ฑํ™” +./ctrl_features.sh OBJDET + +๊ฐ์ฒด ํƒ์ง€ + ํ™”์žฌ ํƒ์ง€ +./ctrl_features.sh OBJDET FIRE + +์ž˜๋ชป๋œ ์‚ฌ์šฉ(์ธ์ž ์—†์Œ) +./ctrl_features.sh +# Usage ์ถœ๋ ฅ + +๐Ÿ” ctrl_features.sh ๋™์ž‘ ๋ฐฉ์‹ +1) ๋ชจ๋“  ๊ธฐ๋Šฅ OFF +$CLI "ALL_OFF" + +2) ์ „๋‹ฌ๋ฐ›์€ ๊ธฐ๋Šฅ ๋ชฉ๋ก์„ ์ˆœ์„œ๋Œ€๋กœ ON +for f in "$@"; do + $CLI "ON $f" +done + + +์˜ˆ์‹œ: + +./ctrl_features.sh OBJDET FIRE + + +์ „์†ก๋˜๋Š” ์‹ค์ œ ๋ช…๋ น: + +ALL_OFF +ON OBJDET +ON FIRE + +๐Ÿงฉ ctrl_cli.py ๋™์ž‘ ์š”์•ฝ + +ctrl_cli.py๋Š” ๋‚ด๋ถ€์ ์œผ๋กœ: + +UDS ์ œ์–ด ์†Œ์ผ“(/tmp/ctrl_feat.sock) ์—ฐ๊ฒฐ + +๋ช…๋ น ๋ฌธ์ž์—ด ์ „์†ก + +OK / FAIL ์‘๋‹ต ์ˆ˜์‹  + +ํ•˜๋Š” ๋‹จ์ˆœํ•œ CLI๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค. + +C ์ธก์—์„œ๋Š” ์ด๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ +ctrl_flags.h ์— ์ •์˜๋œ ๊ธฐ๋Šฅ ํ”Œ๋ž˜๊ทธ๋ฅผ ์‹ค์‹œ๊ฐ„ ๋ณ€๊ฒฝํ•ฉ๋‹ˆ๋‹ค. \ No newline at end of file diff --git a/feat_control/ctrl_cli.py b/feat_control/ctrl_cli.py new file mode 100644 index 0000000..142a5b1 --- /dev/null +++ b/feat_control/ctrl_cli.py @@ -0,0 +1,23 @@ +import sys +import socket + +SOCK = "/tmp/ctrl_feat.sock" + + +def send(cmd: str): + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.connect(SOCK) + s.sendall((cmd.rstrip() + "\n").encode()) + while True: + data = s.recv(4096) + if not data: + break + sys.stdout.write(data.decode(errors="ignore")) + + +if __name__ == "__main__": + cmd = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else "STATUS" + try: + send(cmd) + except Exception as e: + print(f"ERR: {e}") diff --git a/feat_control/feat_on.sh b/feat_control/feat_on.sh new file mode 100644 index 0000000..b4877dd --- /dev/null +++ b/feat_control/feat_on.sh @@ -0,0 +1,17 @@ +BASE_DIR="$(cd "$(dirname "$0")" && pwd)" +CLI="python3 $BASE_DIR/ctrl_cli.py" + +if [ $# -lt 1 ]; then + echo "Usage: $0 FEATURE [FEATURE ...]" >&2 + echo " ์˜ˆ) $0 OBJDET" + echo " ์˜ˆ) $0 OBJDET FIRE" + exit 1 +fi + +# 1) ๋จผ์ € ์ „๋ถ€ ๋„๊ธฐ +$CLI "ALL_OFF" + +# 2) ๋„˜๊ฒจ์ค€ FEATURE ๋“ค์„ ์ˆœ์„œ๋Œ€๋กœ ON +for f in "$@"; do + $CLI "ON $f" +done \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dc3eb83 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +numpy +websockets diff --git a/uds_postprocessing.py b/uds_postprocessing.py new file mode 100644 index 0000000..eca70e3 --- /dev/null +++ b/uds_postprocessing.py @@ -0,0 +1,396 @@ +import socket, struct, sys, json, asyncio, threading +import websockets +import time +from threading import Lock +from collections import deque + + +encode_queue = deque() +encode_lock = Lock() + +MAX_ENC_QUEUE = 4 + +frames_buffer = {} + +SOCK_PATH = sys.argv[1] if len(sys.argv) > 1 else "/tmp/cam.sock" + +# ----- UDS ํฌ๋งท ----- +FRAME_MAGIC = b"FRA\x00" +FRAME_HDR_FMT = "<4s6I2Q" # magic, ver, ch, w, h, stride, pixfmt, bytes, ts_us +FRAME_HDR_SIZE = struct.calcsize(FRAME_HDR_FMT) + +DET_MAGIC = b"UDSD" +DET_HDR_FMT = "<4sIIIQI" # magic, ver, ch, seq, ts_us, count +DET_HDR_SIZE = struct.calcsize(DET_HDR_FMT) + +ENTRY_FMT = " bytes: + buf = bytearray(n) + mv = memoryview(buf) + got = 0 + while got < n: + r = sock.recv_into(mv[got:], n - got) + if r == 0: + raise ConnectionError("peer closed") + got += r + return buf + + +def uds_dec_tag(c: int) -> int: + return (c >> 12) & 0xF # ์ƒ์œ„ 4๋น„ํŠธ + + +def uds_dec_local(c: int) -> int: + return c & 0x0FFF # ํ•˜์œ„ 12๋น„ํŠธ + + +async def ws_handler(ws): + ws_clients.add(ws) + print(f"[WS] client connected ({len(ws_clients)})") + try: + async for _ in ws: + # ํด๋ผ์ด์–ธํŠธ โ†’ ์„œ๋ฒ„ ๋ฉ”์‹œ์ง€๋Š” ํ˜„์žฌ ์‚ฌ์šฉ ์•ˆ ํ•จ + pass + finally: + ws_clients.discard(ws) + print(f"[WS] client disconnected ({len(ws_clients)})") + + +async def ws_broadcast_text(text: str): + if not ws_clients: + return + + targets = list(ws_clients) + coros = [ws.send(text) for ws in targets] + results = await asyncio.gather(*coros, return_exceptions=True) + + ok_count = 0 + for ws, r in zip(targets, results): + if isinstance(r, Exception): + ws_clients.discard(ws) + else: + ok_count += 1 + + if ok_count > 0: + ws_add_bytes(len(text.encode("utf-8")) * ok_count) + + +async def ws_broadcast_binary(data: bytes): + if not ws_clients: + return + + targets = list(ws_clients) + coros = [ws.send(data) for ws in targets] + results = await asyncio.gather(*coros, return_exceptions=True) + + ok_count = 0 + for ws, r in zip(targets, results): + if isinstance(r, Exception): + ws_clients.discard(ws) + else: + ok_count += 1 + + if ok_count > 0: + ws_add_bytes(len(data) * ok_count) + + +def ws_broadcast_frame(ch, ts_us, w, h, rgb_bytes, stride): + if loop_main is None or not ws_clients: + return + with encode_lock: + if len(encode_queue) >= MAX_ENC_QUEUE: + while len(encode_queue) >= MAX_ENC_QUEUE: + encode_queue.popleft() + encode_queue.append((ch, ts_us, w, h, bytes(rgb_bytes), int(stride))) + + +def ws_broadcast_det(ch, seq, ts_us, items, crowd_count=None): + """UDS ์Šค๋ ˆ๋“œ์—์„œ ํ˜ธ์ถœ: ๋””ํ…์…˜ ๋ฉ”ํƒ€ ํ…์ŠคํŠธ JSON ์ „์†ก ์˜ˆ์•ฝ""" + if loop_main is None or not ws_clients: + return + + meta = { + "type": "det", + "ch": int(ch), + "seq": int(seq), + "ts_us": int(ts_us), + "items": items, + } + if crowd_count is not None: + meta["crowd_count"] = int(crowd_count) + + text = json.dumps(meta) + asyncio.run_coroutine_threadsafe(ws_broadcast_text(text), loop_main) + + +def decode_reserved(resv): + md8 = resv & 0xFF # low = feature + fm8 = (resv >> 8) & 0xFF # high = model + return fm8, md8 + + +def handle_frame(hdr, sock): + magic, ver, ch, w, h, stride, pixfmt, bytes_len, ts_us = hdr + + if bytes_len <= 0 or bytes_len > 20 * 1024 * 1024: + raise ConnectionError("invalid bytes_len") + + buf = read_exact(sock, bytes_len) + + global uds_frames_total, uds_frames_sec + with uds_cnt_lock: + uds_frames_total += 1 + uds_frames_sec += 1 + + if pixfmt != WS_PIXFMT_JPEG: + return + + frames_buffer[ts_us] = bytes(buf) + + if len(frames_buffer) > 6: + old_keys = sorted(frames_buffer.keys())[:-3] + for k in old_keys: + frames_buffer.pop(k, None) + + +def handle_det(hdr, sock): + magic, ver, ch, seq, ts_us, count = hdr + + need_bytes = count * ENTRY_SIZE + entries_raw = read_exact(sock, need_bytes) if count > 0 else b"" + + items = [] + off = 0 + + for _ in range(count): + prob, x, y, w, h, cls, tid, resv = struct.unpack_from( + ENTRY_FMT, entries_raw, off) + off += ENTRY_SIZE + + x1, y1, x2, y2 = x, y, x+w, y+h + + tag = uds_dec_tag(cls) + local_cls = uds_dec_local(cls) + + items.append({ + "x1": float(x1), + "y1": float(y1), + "x2": float(x2), + "y2": float(y2), + "score": float(prob), + "cls": int(local_cls), + "tag": int(tag), + "tid": int(tid) + }) + + if not frames_buffer: + return + + candidate_ts = min(frames_buffer.keys(), key=lambda t: abs(t - ts_us)) + + if abs(candidate_ts - ts_us) > 200000: + return + + jpeg_bytes = frames_buffer.pop(candidate_ts, None) + if jpeg_bytes is None: + return + + meta = { + "type": "frame", + "ch": int(ch), + "ts_us": int(candidate_ts), + "w": int(W), + "h": int(H), + "items": items, + } + + def push(): + if len(frame_queue) >= 5: + frame_queue.popleft() + frame_queue.append((meta, jpeg_bytes)) + + loop_main.call_soon_threadsafe(push) + + +def uds_stats_printer_loop(): + global uds_frames_sec, uds_dets_sec, uds_frames_total, uds_dets_total + while True: + time.sleep(1) + with uds_cnt_lock: + f_sec = uds_frames_sec + d_sec = uds_dets_sec + uds_frames_sec = 0 + uds_dets_sec = 0 + f_tot = uds_frames_total + d_tot = uds_dets_total + print(f"[UDS][RATE] {f_sec} fps, {d_sec} dps | total frames={f_tot}, dets={d_tot}") + + +def uds_loop(): + while True: + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4*1024*1024) + sock.connect(SOCK_PATH) + print(f"[UDS] connected to {SOCK_PATH}") + + try: + while True: + magic = read_exact(sock, 4) + + if magic == FRAME_MAGIC: + rest = read_exact(sock, FRAME_HDR_SIZE - 4) + hdr = struct.unpack(FRAME_HDR_FMT, magic + rest) + try: + handle_frame(hdr, sock) + except ConnectionError as e: + print(f"[UDS][ERR] handle_frame ConnectionError: {e}") + raise + except Exception as e: + print(f"[UDS][ERR] handle_frame EXC: {e}") + break + + elif magic == DET_MAGIC: + rest = read_exact(sock, DET_HDR_SIZE - 4) + hdr = struct.unpack(DET_HDR_FMT, magic + rest) + + try: + handle_det(hdr, sock) + except ConnectionError as e: + print(f"[UDS][ERR] handle_det ConnectionError: {e}") + raise + except Exception as e: + print(f"[UDS][ERR] handle_det EXC: {e}") + break + + else: + print(f"[UDS][WARN] unknown magic={magic!r} hex={magic.hex()}") + break + + finally: + sock.close() + print("[UDS] disconnected") + + except FileNotFoundError: + print(f"[UDS][ERR] {SOCK_PATH} not found, retry...") + except ConnectionError as e: + print(f"[UDS][ERR] connection closed, retry... ({e})") + except Exception as e: + print(f"[UDS][ERR] uds_loop outer EXC: {e}, retry...") + + time.sleep(1) + + +async def ws_frame_sender(): + global ws_frames_total, ws_frames_sec + + while True: + try: + if not frame_queue or not ws_clients: + await asyncio.sleep(0.005) + continue + + try: + meta, jpeg_bytes = frame_queue.pop() + except ValueError: + await asyncio.sleep(0.002) + continue + + text = json.dumps(meta) + await ws_broadcast_text(text) + await ws_broadcast_binary(jpeg_bytes) + + with ws_frames_lock: + ws_frames_total += 1 + ws_frames_sec += 1 + + except Exception as e: + print(f"[WS][FRAME_SENDER][ERR] {e}") + await asyncio.sleep(0.05) + + await asyncio.sleep(0.001) + +async def main_async(): + + global loop_main + loop_main = asyncio.get_running_loop() + + server = await websockets.serve( + ws_handler, + "0.0.0.0", + 8765, + max_queue=None, + write_limit=2**20, + ping_interval=30, + ping_timeout=30, + compression=None + ) + print("[WS] listening on 0.0.0.0:8765") + + asyncio.create_task(ws_frame_sender()) + + t_uds = threading.Thread(target=uds_loop, daemon=True) + t_uds.start() + + t2 = threading.Thread(target=uds_stats_printer_loop, daemon=True) + t2.start() + + await server.wait_closed() + +if __name__ == "__main__": + asyncio.run(main_async()) \ No newline at end of file