from svc_infra.websocket.manager import ConnectionManagerServer-side WebSocket connection manager. Features: - Track multiple connections per user - Room/group support for targeted broadcasts - Connection lifecycle hooks - Thread-safe with asyncio.Lock
manager = ConnectionManager() @app.websocket("/ws/{user_id}") async def websocket_endpoint(websocket: WebSocket, user_id: str): await manager.connect(user_id, websocket) try: async for message in websocket.iter_json(): await manager.broadcast(message) finally: await manager.disconnect(user_id, websocket)