parent
56a1f27884
commit
25ab02cbe0
@ -1,2 +1,132 @@
|
|||||||
# tta-uds-streamer
|
tta-uds-streamer
|
||||||
|
📘 TTA UDS Streamer
|
||||||
|
|
||||||
|
C 모듈이 전송하는 UDS 기반 프레임/디텍션 스트림을 처리하는 Python 모듈
|
||||||
|
|
||||||
|
📌 개요
|
||||||
|
|
||||||
|
이 프로젝트는 C 측에서 Unix Domain Socket(UDS) 을 통해 전송하는
|
||||||
|
JPEG 프레임(FRA) + 디텍션(UDSD) 데이터를 Python에서 수신하여
|
||||||
|
|
||||||
|
프레임 버퍼링
|
||||||
|
|
||||||
|
디텍션 타임스탬프 매칭
|
||||||
|
|
||||||
|
WebSocket 실시간 스트리밍
|
||||||
|
|
||||||
|
까지 처리하는 파이프라인을 제공합니다.
|
||||||
|
|
||||||
|
본 저장소는 Python 측 모듈만 포함하며,
|
||||||
|
C 측 NPU → Postprocess → UDS 모듈(cam_ws_app 등) 과 연동하여 동작합니다.
|
||||||
|
|
||||||
|
📁 디렉토리 구조
|
||||||
|
tta-uds-streamer/
|
||||||
|
│
|
||||||
|
├── uds_postprocessing.py # 전체 파이프라인 처리 모듈
|
||||||
|
│
|
||||||
|
├── feat_control/ # 기능 제어 모듈
|
||||||
|
│ └── ctrl_cli.py # 기능 ON/OFF 제어 CLI (UDS 제어)
|
||||||
|
│
|
||||||
|
├── ctrl_features.sh # 기능 ON/OFF 제어 스크립트
|
||||||
|
│
|
||||||
|
├── requirements.txt # 패키지 목록 (버전 제거됨)
|
||||||
|
│
|
||||||
|
└── README.md
|
||||||
|
|
||||||
|
🔧 패키지 설치
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
🚀 전체 파이프라인 실행
|
||||||
|
python uds_postprocessing.py
|
||||||
|
|
||||||
|
|
||||||
|
C 프로그램이 /tmp/cam.sock UDS 소켓에
|
||||||
|
아래 포맷 그대로 전송하면 Python이 자동으로 프레임 및 디텍션을 처리합니다.
|
||||||
|
|
||||||
|
📡 데이터 포맷 요약
|
||||||
|
FRA (Frame)
|
||||||
|
필드 설명
|
||||||
|
magic "FRA\0"
|
||||||
|
width / height JPEG 이미지 크기
|
||||||
|
stride JPEG 바이트 길이
|
||||||
|
pixfmt WS_PIXFMT_JPEG
|
||||||
|
ts_us 프레임 타임스탬프(μs)
|
||||||
|
payload JPEG binary
|
||||||
|
UDSD (Detection)
|
||||||
|
필드 설명
|
||||||
|
magic "UDSD"
|
||||||
|
count 디텍션 수
|
||||||
|
ENTRY prob, x, y, w, h, cls, ...
|
||||||
|
ts_us 디텍션 타임스탬프(μs)
|
||||||
|
🧩 기능 요약
|
||||||
|
|
||||||
|
최근 프레임 N장 버퍼링
|
||||||
|
|
||||||
|
디텍션 타임스탬프와 가장 가까운 프레임 자동 매칭
|
||||||
|
|
||||||
|
C 측 bytesort 기반 tracking ID 적용 가능
|
||||||
|
|
||||||
|
WebSocket 실시간 스트리밍 지원
|
||||||
|
|
||||||
|
C 모듈의 기능 플래그를 UDS를 통해 실시간 제어 가능
|
||||||
|
|
||||||
|
🎮 Feature Control 사용법
|
||||||
|
|
||||||
|
Python의 ctrl_cli.py와 Shell 스크립트 ctrl_features.sh를 사용하여
|
||||||
|
C 스트리머의 기능(OBJDET, FIRE, FACE 등)을 ON/OFF할 수 있습니다.
|
||||||
|
|
||||||
|
📁 관련 파일
|
||||||
|
feat_control/
|
||||||
|
└── ctrl_cli.py # 기능 제어 메시지를 UDS로 전송하는 Python CLI
|
||||||
|
ctrl_features.sh # 여러 기능을 한 번에 ON/OFF하는 스크립트
|
||||||
|
|
||||||
|
🔧 ctrl_features.sh 사용법
|
||||||
|
기본 구조
|
||||||
|
./ctrl_features.sh FEATURE [FEATURE ...]
|
||||||
|
|
||||||
|
예시
|
||||||
|
객체 탐지만 활성화
|
||||||
|
./ctrl_features.sh OBJDET
|
||||||
|
|
||||||
|
객체 탐지 + 화재 탐지
|
||||||
|
./ctrl_features.sh OBJDET FIRE
|
||||||
|
|
||||||
|
잘못된 사용(인자 없음)
|
||||||
|
./ctrl_features.sh
|
||||||
|
# Usage 출력
|
||||||
|
|
||||||
|
🔍 ctrl_features.sh 동작 방식
|
||||||
|
1) 모든 기능 OFF
|
||||||
|
$CLI "ALL_OFF"
|
||||||
|
|
||||||
|
2) 전달받은 기능 목록을 순서대로 ON
|
||||||
|
for f in "$@"; do
|
||||||
|
$CLI "ON $f"
|
||||||
|
done
|
||||||
|
|
||||||
|
|
||||||
|
예시:
|
||||||
|
|
||||||
|
./ctrl_features.sh OBJDET FIRE
|
||||||
|
|
||||||
|
|
||||||
|
전송되는 실제 명령:
|
||||||
|
|
||||||
|
ALL_OFF
|
||||||
|
ON OBJDET
|
||||||
|
ON FIRE
|
||||||
|
|
||||||
|
🧩 ctrl_cli.py 동작 요약
|
||||||
|
|
||||||
|
ctrl_cli.py는 내부적으로:
|
||||||
|
|
||||||
|
UDS 제어 소켓(/tmp/ctrl_feat.sock) 연결
|
||||||
|
|
||||||
|
명령 문자열 전송
|
||||||
|
|
||||||
|
OK / FAIL 응답 수신
|
||||||
|
|
||||||
|
하는 단순한 CLI로 구성되어 있습니다.
|
||||||
|
|
||||||
|
C 측에서는 이를 기반으로
|
||||||
|
ctrl_flags.h 에 정의된 기능 플래그를 실시간 변경합니다.
|
||||||
@ -0,0 +1,23 @@
|
|||||||
|
import sys
|
||||||
|
import socket
|
||||||
|
|
||||||
|
SOCK = "/tmp/ctrl_feat.sock"
|
||||||
|
|
||||||
|
|
||||||
|
def send(cmd: str):
|
||||||
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
||||||
|
s.connect(SOCK)
|
||||||
|
s.sendall((cmd.rstrip() + "\n").encode())
|
||||||
|
while True:
|
||||||
|
data = s.recv(4096)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
sys.stdout.write(data.decode(errors="ignore"))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cmd = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else "STATUS"
|
||||||
|
try:
|
||||||
|
send(cmd)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"ERR: {e}")
|
||||||
@ -0,0 +1,17 @@
|
|||||||
|
BASE_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||||
|
CLI="python3 $BASE_DIR/ctrl_cli.py"
|
||||||
|
|
||||||
|
if [ $# -lt 1 ]; then
|
||||||
|
echo "Usage: $0 FEATURE [FEATURE ...]" >&2
|
||||||
|
echo " 예) $0 OBJDET"
|
||||||
|
echo " 예) $0 OBJDET FIRE"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 1) 먼저 전부 끄기
|
||||||
|
$CLI "ALL_OFF"
|
||||||
|
|
||||||
|
# 2) 넘겨준 FEATURE 들을 순서대로 ON
|
||||||
|
for f in "$@"; do
|
||||||
|
$CLI "ON $f"
|
||||||
|
done
|
||||||
@ -0,0 +1,2 @@
|
|||||||
|
numpy
|
||||||
|
websockets
|
||||||
@ -0,0 +1,396 @@
|
|||||||
|
import socket, struct, sys, json, asyncio, threading
|
||||||
|
import websockets
|
||||||
|
import time
|
||||||
|
from threading import Lock
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
|
||||||
|
encode_queue = deque()
|
||||||
|
encode_lock = Lock()
|
||||||
|
|
||||||
|
MAX_ENC_QUEUE = 4
|
||||||
|
|
||||||
|
frames_buffer = {}
|
||||||
|
|
||||||
|
SOCK_PATH = sys.argv[1] if len(sys.argv) > 1 else "/tmp/cam.sock"
|
||||||
|
|
||||||
|
# ----- UDS 포맷 -----
|
||||||
|
FRAME_MAGIC = b"FRA\x00"
|
||||||
|
FRAME_HDR_FMT = "<4s6I2Q" # magic, ver, ch, w, h, stride, pixfmt, bytes, ts_us
|
||||||
|
FRAME_HDR_SIZE = struct.calcsize(FRAME_HDR_FMT)
|
||||||
|
|
||||||
|
DET_MAGIC = b"UDSD"
|
||||||
|
DET_HDR_FMT = "<4sIIIQI" # magic, ver, ch, seq, ts_us, count
|
||||||
|
DET_HDR_SIZE = struct.calcsize(DET_HDR_FMT)
|
||||||
|
|
||||||
|
ENTRY_FMT = "<fffffHHH" # prob, x, y, w, h, cls, tid, resv
|
||||||
|
ENTRY_SIZE = struct.calcsize(ENTRY_FMT)
|
||||||
|
|
||||||
|
WS_PIXFMT_RGB24 = 1
|
||||||
|
WS_PIXFMT_JPEG = 3
|
||||||
|
|
||||||
|
UDS_TAG_DET = 1
|
||||||
|
UDS_TAG_FIRE = 2
|
||||||
|
UDS_TAG_FACE = 3
|
||||||
|
UDS_TAG_LPR = 4
|
||||||
|
|
||||||
|
FEAT_OBJDET = 0
|
||||||
|
FEAT_ABNORM = 1
|
||||||
|
FEAT_CROWD = 2
|
||||||
|
FEAT_FIRE = 3
|
||||||
|
FEAT_LPR = 4
|
||||||
|
FEAT_FACEATTR = 5
|
||||||
|
FEAT_VIPTRACK = 6
|
||||||
|
|
||||||
|
CROWD_LOCAL_CLS = 3
|
||||||
|
|
||||||
|
# H = 640
|
||||||
|
# W = 384
|
||||||
|
|
||||||
|
W = 1080
|
||||||
|
H = 720
|
||||||
|
|
||||||
|
|
||||||
|
uds_cnt_lock = Lock()
|
||||||
|
uds_frames_total = 0
|
||||||
|
uds_dets_total = 0
|
||||||
|
uds_frames_sec = 0
|
||||||
|
uds_dets_sec = 0
|
||||||
|
|
||||||
|
# ws debug stats
|
||||||
|
ws_frames_total = 0
|
||||||
|
ws_frames_sec = 0
|
||||||
|
ws_frames_lock = Lock()
|
||||||
|
|
||||||
|
|
||||||
|
ws_clients = set()
|
||||||
|
last_frame_ts = {}
|
||||||
|
|
||||||
|
loop_main = None
|
||||||
|
|
||||||
|
ws_bytes_total = 0
|
||||||
|
ws_bytes_lock = Lock()
|
||||||
|
frame_queue = deque(maxlen=5)
|
||||||
|
|
||||||
|
|
||||||
|
def ws_add_bytes(n: int):
|
||||||
|
global ws_bytes_total
|
||||||
|
with ws_bytes_lock:
|
||||||
|
ws_bytes_total += n
|
||||||
|
|
||||||
|
|
||||||
|
def read_exact(sock, n: int) -> bytes:
|
||||||
|
buf = bytearray(n)
|
||||||
|
mv = memoryview(buf)
|
||||||
|
got = 0
|
||||||
|
while got < n:
|
||||||
|
r = sock.recv_into(mv[got:], n - got)
|
||||||
|
if r == 0:
|
||||||
|
raise ConnectionError("peer closed")
|
||||||
|
got += r
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
def uds_dec_tag(c: int) -> int:
|
||||||
|
return (c >> 12) & 0xF # 상위 4비트
|
||||||
|
|
||||||
|
|
||||||
|
def uds_dec_local(c: int) -> int:
|
||||||
|
return c & 0x0FFF # 하위 12비트
|
||||||
|
|
||||||
|
|
||||||
|
async def ws_handler(ws):
|
||||||
|
ws_clients.add(ws)
|
||||||
|
print(f"[WS] client connected ({len(ws_clients)})")
|
||||||
|
try:
|
||||||
|
async for _ in ws:
|
||||||
|
# 클라이언트 → 서버 메시지는 현재 사용 안 함
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
ws_clients.discard(ws)
|
||||||
|
print(f"[WS] client disconnected ({len(ws_clients)})")
|
||||||
|
|
||||||
|
|
||||||
|
async def ws_broadcast_text(text: str):
|
||||||
|
if not ws_clients:
|
||||||
|
return
|
||||||
|
|
||||||
|
targets = list(ws_clients)
|
||||||
|
coros = [ws.send(text) for ws in targets]
|
||||||
|
results = await asyncio.gather(*coros, return_exceptions=True)
|
||||||
|
|
||||||
|
ok_count = 0
|
||||||
|
for ws, r in zip(targets, results):
|
||||||
|
if isinstance(r, Exception):
|
||||||
|
ws_clients.discard(ws)
|
||||||
|
else:
|
||||||
|
ok_count += 1
|
||||||
|
|
||||||
|
if ok_count > 0:
|
||||||
|
ws_add_bytes(len(text.encode("utf-8")) * ok_count)
|
||||||
|
|
||||||
|
|
||||||
|
async def ws_broadcast_binary(data: bytes):
|
||||||
|
if not ws_clients:
|
||||||
|
return
|
||||||
|
|
||||||
|
targets = list(ws_clients)
|
||||||
|
coros = [ws.send(data) for ws in targets]
|
||||||
|
results = await asyncio.gather(*coros, return_exceptions=True)
|
||||||
|
|
||||||
|
ok_count = 0
|
||||||
|
for ws, r in zip(targets, results):
|
||||||
|
if isinstance(r, Exception):
|
||||||
|
ws_clients.discard(ws)
|
||||||
|
else:
|
||||||
|
ok_count += 1
|
||||||
|
|
||||||
|
if ok_count > 0:
|
||||||
|
ws_add_bytes(len(data) * ok_count)
|
||||||
|
|
||||||
|
|
||||||
|
def ws_broadcast_frame(ch, ts_us, w, h, rgb_bytes, stride):
|
||||||
|
if loop_main is None or not ws_clients:
|
||||||
|
return
|
||||||
|
with encode_lock:
|
||||||
|
if len(encode_queue) >= MAX_ENC_QUEUE:
|
||||||
|
while len(encode_queue) >= MAX_ENC_QUEUE:
|
||||||
|
encode_queue.popleft()
|
||||||
|
encode_queue.append((ch, ts_us, w, h, bytes(rgb_bytes), int(stride)))
|
||||||
|
|
||||||
|
|
||||||
|
def ws_broadcast_det(ch, seq, ts_us, items, crowd_count=None):
|
||||||
|
"""UDS 스레드에서 호출: 디텍션 메타 텍스트 JSON 전송 예약"""
|
||||||
|
if loop_main is None or not ws_clients:
|
||||||
|
return
|
||||||
|
|
||||||
|
meta = {
|
||||||
|
"type": "det",
|
||||||
|
"ch": int(ch),
|
||||||
|
"seq": int(seq),
|
||||||
|
"ts_us": int(ts_us),
|
||||||
|
"items": items,
|
||||||
|
}
|
||||||
|
if crowd_count is not None:
|
||||||
|
meta["crowd_count"] = int(crowd_count)
|
||||||
|
|
||||||
|
text = json.dumps(meta)
|
||||||
|
asyncio.run_coroutine_threadsafe(ws_broadcast_text(text), loop_main)
|
||||||
|
|
||||||
|
|
||||||
|
def decode_reserved(resv):
|
||||||
|
md8 = resv & 0xFF # low = feature
|
||||||
|
fm8 = (resv >> 8) & 0xFF # high = model
|
||||||
|
return fm8, md8
|
||||||
|
|
||||||
|
|
||||||
|
def handle_frame(hdr, sock):
|
||||||
|
magic, ver, ch, w, h, stride, pixfmt, bytes_len, ts_us = hdr
|
||||||
|
|
||||||
|
if bytes_len <= 0 or bytes_len > 20 * 1024 * 1024:
|
||||||
|
raise ConnectionError("invalid bytes_len")
|
||||||
|
|
||||||
|
buf = read_exact(sock, bytes_len)
|
||||||
|
|
||||||
|
global uds_frames_total, uds_frames_sec
|
||||||
|
with uds_cnt_lock:
|
||||||
|
uds_frames_total += 1
|
||||||
|
uds_frames_sec += 1
|
||||||
|
|
||||||
|
if pixfmt != WS_PIXFMT_JPEG:
|
||||||
|
return
|
||||||
|
|
||||||
|
frames_buffer[ts_us] = bytes(buf)
|
||||||
|
|
||||||
|
if len(frames_buffer) > 6:
|
||||||
|
old_keys = sorted(frames_buffer.keys())[:-3]
|
||||||
|
for k in old_keys:
|
||||||
|
frames_buffer.pop(k, None)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_det(hdr, sock):
|
||||||
|
magic, ver, ch, seq, ts_us, count = hdr
|
||||||
|
|
||||||
|
need_bytes = count * ENTRY_SIZE
|
||||||
|
entries_raw = read_exact(sock, need_bytes) if count > 0 else b""
|
||||||
|
|
||||||
|
items = []
|
||||||
|
off = 0
|
||||||
|
|
||||||
|
for _ in range(count):
|
||||||
|
prob, x, y, w, h, cls, tid, resv = struct.unpack_from(
|
||||||
|
ENTRY_FMT, entries_raw, off)
|
||||||
|
off += ENTRY_SIZE
|
||||||
|
|
||||||
|
x1, y1, x2, y2 = x, y, x+w, y+h
|
||||||
|
|
||||||
|
tag = uds_dec_tag(cls)
|
||||||
|
local_cls = uds_dec_local(cls)
|
||||||
|
|
||||||
|
items.append({
|
||||||
|
"x1": float(x1),
|
||||||
|
"y1": float(y1),
|
||||||
|
"x2": float(x2),
|
||||||
|
"y2": float(y2),
|
||||||
|
"score": float(prob),
|
||||||
|
"cls": int(local_cls),
|
||||||
|
"tag": int(tag),
|
||||||
|
"tid": int(tid)
|
||||||
|
})
|
||||||
|
|
||||||
|
if not frames_buffer:
|
||||||
|
return
|
||||||
|
|
||||||
|
candidate_ts = min(frames_buffer.keys(), key=lambda t: abs(t - ts_us))
|
||||||
|
|
||||||
|
if abs(candidate_ts - ts_us) > 200000:
|
||||||
|
return
|
||||||
|
|
||||||
|
jpeg_bytes = frames_buffer.pop(candidate_ts, None)
|
||||||
|
if jpeg_bytes is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
meta = {
|
||||||
|
"type": "frame",
|
||||||
|
"ch": int(ch),
|
||||||
|
"ts_us": int(candidate_ts),
|
||||||
|
"w": int(W),
|
||||||
|
"h": int(H),
|
||||||
|
"items": items,
|
||||||
|
}
|
||||||
|
|
||||||
|
def push():
|
||||||
|
if len(frame_queue) >= 5:
|
||||||
|
frame_queue.popleft()
|
||||||
|
frame_queue.append((meta, jpeg_bytes))
|
||||||
|
|
||||||
|
loop_main.call_soon_threadsafe(push)
|
||||||
|
|
||||||
|
|
||||||
|
def uds_stats_printer_loop():
|
||||||
|
global uds_frames_sec, uds_dets_sec, uds_frames_total, uds_dets_total
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
with uds_cnt_lock:
|
||||||
|
f_sec = uds_frames_sec
|
||||||
|
d_sec = uds_dets_sec
|
||||||
|
uds_frames_sec = 0
|
||||||
|
uds_dets_sec = 0
|
||||||
|
f_tot = uds_frames_total
|
||||||
|
d_tot = uds_dets_total
|
||||||
|
print(f"[UDS][RATE] {f_sec} fps, {d_sec} dps | total frames={f_tot}, dets={d_tot}")
|
||||||
|
|
||||||
|
|
||||||
|
def uds_loop():
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4*1024*1024)
|
||||||
|
sock.connect(SOCK_PATH)
|
||||||
|
print(f"[UDS] connected to {SOCK_PATH}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
magic = read_exact(sock, 4)
|
||||||
|
|
||||||
|
if magic == FRAME_MAGIC:
|
||||||
|
rest = read_exact(sock, FRAME_HDR_SIZE - 4)
|
||||||
|
hdr = struct.unpack(FRAME_HDR_FMT, magic + rest)
|
||||||
|
try:
|
||||||
|
handle_frame(hdr, sock)
|
||||||
|
except ConnectionError as e:
|
||||||
|
print(f"[UDS][ERR] handle_frame ConnectionError: {e}")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[UDS][ERR] handle_frame EXC: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
elif magic == DET_MAGIC:
|
||||||
|
rest = read_exact(sock, DET_HDR_SIZE - 4)
|
||||||
|
hdr = struct.unpack(DET_HDR_FMT, magic + rest)
|
||||||
|
|
||||||
|
try:
|
||||||
|
handle_det(hdr, sock)
|
||||||
|
except ConnectionError as e:
|
||||||
|
print(f"[UDS][ERR] handle_det ConnectionError: {e}")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[UDS][ERR] handle_det EXC: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(f"[UDS][WARN] unknown magic={magic!r} hex={magic.hex()}")
|
||||||
|
break
|
||||||
|
|
||||||
|
finally:
|
||||||
|
sock.close()
|
||||||
|
print("[UDS] disconnected")
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f"[UDS][ERR] {SOCK_PATH} not found, retry...")
|
||||||
|
except ConnectionError as e:
|
||||||
|
print(f"[UDS][ERR] connection closed, retry... ({e})")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[UDS][ERR] uds_loop outer EXC: {e}, retry...")
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
async def ws_frame_sender():
|
||||||
|
global ws_frames_total, ws_frames_sec
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if not frame_queue or not ws_clients:
|
||||||
|
await asyncio.sleep(0.005)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
meta, jpeg_bytes = frame_queue.pop()
|
||||||
|
except ValueError:
|
||||||
|
await asyncio.sleep(0.002)
|
||||||
|
continue
|
||||||
|
|
||||||
|
text = json.dumps(meta)
|
||||||
|
await ws_broadcast_text(text)
|
||||||
|
await ws_broadcast_binary(jpeg_bytes)
|
||||||
|
|
||||||
|
with ws_frames_lock:
|
||||||
|
ws_frames_total += 1
|
||||||
|
ws_frames_sec += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[WS][FRAME_SENDER][ERR] {e}")
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
|
async def main_async():
|
||||||
|
|
||||||
|
global loop_main
|
||||||
|
loop_main = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
server = await websockets.serve(
|
||||||
|
ws_handler,
|
||||||
|
"0.0.0.0",
|
||||||
|
8765,
|
||||||
|
max_queue=None,
|
||||||
|
write_limit=2**20,
|
||||||
|
ping_interval=30,
|
||||||
|
ping_timeout=30,
|
||||||
|
compression=None
|
||||||
|
)
|
||||||
|
print("[WS] listening on 0.0.0.0:8765")
|
||||||
|
|
||||||
|
asyncio.create_task(ws_frame_sender())
|
||||||
|
|
||||||
|
t_uds = threading.Thread(target=uds_loop, daemon=True)
|
||||||
|
t_uds.start()
|
||||||
|
|
||||||
|
t2 = threading.Thread(target=uds_stats_printer_loop, daemon=True)
|
||||||
|
t2.start()
|
||||||
|
|
||||||
|
await server.wait_closed()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main_async())
|
||||||
Loading…
Reference in new issue