You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

396 lines
9.9 KiB

7 months ago
import socket, struct, sys, json, asyncio, threading
import websockets
import time
from threading import Lock
from collections import deque
encode_queue = deque()
encode_lock = Lock()
MAX_ENC_QUEUE = 4
frames_buffer = {}
SOCK_PATH = sys.argv[1] if len(sys.argv) > 1 else "/tmp/cam.sock"
# ----- UDS 포맷 -----
FRAME_MAGIC = b"FRA\x00"
FRAME_HDR_FMT = "<4s6I2Q" # magic, ver, ch, w, h, stride, pixfmt, bytes, ts_us
FRAME_HDR_SIZE = struct.calcsize(FRAME_HDR_FMT)
DET_MAGIC = b"UDSD"
DET_HDR_FMT = "<4sIIIQI" # magic, ver, ch, seq, ts_us, count
DET_HDR_SIZE = struct.calcsize(DET_HDR_FMT)
ENTRY_FMT = "<fffffHHH" # prob, x, y, w, h, cls, tid, resv
ENTRY_SIZE = struct.calcsize(ENTRY_FMT)
WS_PIXFMT_RGB24 = 1
WS_PIXFMT_JPEG = 3
UDS_TAG_DET = 1
UDS_TAG_FIRE = 2
UDS_TAG_FACE = 3
UDS_TAG_LPR = 4
FEAT_OBJDET = 0
FEAT_ABNORM = 1
FEAT_CROWD = 2
FEAT_FIRE = 3
FEAT_LPR = 4
FEAT_FACEATTR = 5
FEAT_VIPTRACK = 6
CROWD_LOCAL_CLS = 3
# H = 640
# W = 384
W = 1080
H = 720
uds_cnt_lock = Lock()
uds_frames_total = 0
uds_dets_total = 0
uds_frames_sec = 0
uds_dets_sec = 0
# ws debug stats
ws_frames_total = 0
ws_frames_sec = 0
ws_frames_lock = Lock()
ws_clients = set()
last_frame_ts = {}
loop_main = None
ws_bytes_total = 0
ws_bytes_lock = Lock()
frame_queue = deque(maxlen=5)
def ws_add_bytes(n: int):
global ws_bytes_total
with ws_bytes_lock:
ws_bytes_total += n
def read_exact(sock, n: int) -> bytes:
buf = bytearray(n)
mv = memoryview(buf)
got = 0
while got < n:
r = sock.recv_into(mv[got:], n - got)
if r == 0:
raise ConnectionError("peer closed")
got += r
return buf
def uds_dec_tag(c: int) -> int:
return (c >> 12) & 0xF # 상위 4비트
def uds_dec_local(c: int) -> int:
return c & 0x0FFF # 하위 12비트
async def ws_handler(ws):
ws_clients.add(ws)
print(f"[WS] client connected ({len(ws_clients)})")
try:
async for _ in ws:
# 클라이언트 → 서버 메시지는 현재 사용 안 함
pass
finally:
ws_clients.discard(ws)
print(f"[WS] client disconnected ({len(ws_clients)})")
async def ws_broadcast_text(text: str):
if not ws_clients:
return
targets = list(ws_clients)
coros = [ws.send(text) for ws in targets]
results = await asyncio.gather(*coros, return_exceptions=True)
ok_count = 0
for ws, r in zip(targets, results):
if isinstance(r, Exception):
ws_clients.discard(ws)
else:
ok_count += 1
if ok_count > 0:
ws_add_bytes(len(text.encode("utf-8")) * ok_count)
async def ws_broadcast_binary(data: bytes):
if not ws_clients:
return
targets = list(ws_clients)
coros = [ws.send(data) for ws in targets]
results = await asyncio.gather(*coros, return_exceptions=True)
ok_count = 0
for ws, r in zip(targets, results):
if isinstance(r, Exception):
ws_clients.discard(ws)
else:
ok_count += 1
if ok_count > 0:
ws_add_bytes(len(data) * ok_count)
def ws_broadcast_frame(ch, ts_us, w, h, rgb_bytes, stride):
if loop_main is None or not ws_clients:
return
with encode_lock:
if len(encode_queue) >= MAX_ENC_QUEUE:
while len(encode_queue) >= MAX_ENC_QUEUE:
encode_queue.popleft()
encode_queue.append((ch, ts_us, w, h, bytes(rgb_bytes), int(stride)))
def ws_broadcast_det(ch, seq, ts_us, items, crowd_count=None):
"""UDS 스레드에서 호출: 디텍션 메타 텍스트 JSON 전송 예약"""
if loop_main is None or not ws_clients:
return
meta = {
"type": "det",
"ch": int(ch),
"seq": int(seq),
"ts_us": int(ts_us),
"items": items,
}
if crowd_count is not None:
meta["crowd_count"] = int(crowd_count)
text = json.dumps(meta)
asyncio.run_coroutine_threadsafe(ws_broadcast_text(text), loop_main)
def decode_reserved(resv):
md8 = resv & 0xFF # low = feature
fm8 = (resv >> 8) & 0xFF # high = model
return fm8, md8
def handle_frame(hdr, sock):
magic, ver, ch, w, h, stride, pixfmt, bytes_len, ts_us = hdr
if bytes_len <= 0 or bytes_len > 20 * 1024 * 1024:
raise ConnectionError("invalid bytes_len")
buf = read_exact(sock, bytes_len)
global uds_frames_total, uds_frames_sec
with uds_cnt_lock:
uds_frames_total += 1
uds_frames_sec += 1
if pixfmt != WS_PIXFMT_JPEG:
return
frames_buffer[ts_us] = bytes(buf)
if len(frames_buffer) > 6:
old_keys = sorted(frames_buffer.keys())[:-3]
for k in old_keys:
frames_buffer.pop(k, None)
def handle_det(hdr, sock):
magic, ver, ch, seq, ts_us, count = hdr
need_bytes = count * ENTRY_SIZE
entries_raw = read_exact(sock, need_bytes) if count > 0 else b""
items = []
off = 0
for _ in range(count):
prob, x, y, w, h, cls, tid, resv = struct.unpack_from(
ENTRY_FMT, entries_raw, off)
off += ENTRY_SIZE
x1, y1, x2, y2 = x, y, x+w, y+h
tag = uds_dec_tag(cls)
local_cls = uds_dec_local(cls)
items.append({
"x1": float(x1),
"y1": float(y1),
"x2": float(x2),
"y2": float(y2),
"score": float(prob),
"cls": int(local_cls),
"tag": int(tag),
"tid": int(tid)
})
if not frames_buffer:
return
candidate_ts = min(frames_buffer.keys(), key=lambda t: abs(t - ts_us))
if abs(candidate_ts - ts_us) > 200000:
return
jpeg_bytes = frames_buffer.pop(candidate_ts, None)
if jpeg_bytes is None:
return
meta = {
"type": "frame",
"ch": int(ch),
"ts_us": int(candidate_ts),
"w": int(W),
"h": int(H),
"items": items,
}
def push():
if len(frame_queue) >= 5:
frame_queue.popleft()
frame_queue.append((meta, jpeg_bytes))
loop_main.call_soon_threadsafe(push)
def uds_stats_printer_loop():
global uds_frames_sec, uds_dets_sec, uds_frames_total, uds_dets_total
while True:
time.sleep(1)
with uds_cnt_lock:
f_sec = uds_frames_sec
d_sec = uds_dets_sec
uds_frames_sec = 0
uds_dets_sec = 0
f_tot = uds_frames_total
d_tot = uds_dets_total
print(f"[UDS][RATE] {f_sec} fps, {d_sec} dps | total frames={f_tot}, dets={d_tot}")
def uds_loop():
while True:
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4*1024*1024)
sock.connect(SOCK_PATH)
print(f"[UDS] connected to {SOCK_PATH}")
try:
while True:
magic = read_exact(sock, 4)
if magic == FRAME_MAGIC:
rest = read_exact(sock, FRAME_HDR_SIZE - 4)
hdr = struct.unpack(FRAME_HDR_FMT, magic + rest)
try:
handle_frame(hdr, sock)
except ConnectionError as e:
print(f"[UDS][ERR] handle_frame ConnectionError: {e}")
raise
except Exception as e:
print(f"[UDS][ERR] handle_frame EXC: {e}")
break
elif magic == DET_MAGIC:
rest = read_exact(sock, DET_HDR_SIZE - 4)
hdr = struct.unpack(DET_HDR_FMT, magic + rest)
try:
handle_det(hdr, sock)
except ConnectionError as e:
print(f"[UDS][ERR] handle_det ConnectionError: {e}")
raise
except Exception as e:
print(f"[UDS][ERR] handle_det EXC: {e}")
break
else:
print(f"[UDS][WARN] unknown magic={magic!r} hex={magic.hex()}")
break
finally:
sock.close()
print("[UDS] disconnected")
except FileNotFoundError:
print(f"[UDS][ERR] {SOCK_PATH} not found, retry...")
except ConnectionError as e:
print(f"[UDS][ERR] connection closed, retry... ({e})")
except Exception as e:
print(f"[UDS][ERR] uds_loop outer EXC: {e}, retry...")
time.sleep(1)
async def ws_frame_sender():
global ws_frames_total, ws_frames_sec
while True:
try:
if not frame_queue or not ws_clients:
await asyncio.sleep(0.005)
continue
try:
meta, jpeg_bytes = frame_queue.pop()
except ValueError:
await asyncio.sleep(0.002)
continue
text = json.dumps(meta)
await ws_broadcast_text(text)
await ws_broadcast_binary(jpeg_bytes)
with ws_frames_lock:
ws_frames_total += 1
ws_frames_sec += 1
except Exception as e:
print(f"[WS][FRAME_SENDER][ERR] {e}")
await asyncio.sleep(0.05)
await asyncio.sleep(0.001)
async def main_async():
global loop_main
loop_main = asyncio.get_running_loop()
server = await websockets.serve(
ws_handler,
"0.0.0.0",
8765,
max_queue=None,
write_limit=2**20,
ping_interval=30,
ping_timeout=30,
compression=None
)
print("[WS] listening on 0.0.0.0:8765")
asyncio.create_task(ws_frame_sender())
t_uds = threading.Thread(target=uds_loop, daemon=True)
t_uds.start()
t2 = threading.Thread(target=uds_stats_printer_loop, daemon=True)
t2.start()
await server.wait_closed()
if __name__ == "__main__":
asyncio.run(main_async())