simple is better
This commit is contained in:
@@ -32,6 +32,11 @@ logger = setup_logging(
|
||||
log_format=config.log_format,
|
||||
)
|
||||
|
||||
# Edge forwarding config
|
||||
EDGE_URL = config.edge_url if hasattr(config, "edge_url") else None
|
||||
EDGE_API_KEY = config.edge_api_key if hasattr(config, "edge_api_key") else ""
|
||||
edge_ws = None
|
||||
|
||||
|
||||
# WebSocket connection manager
|
||||
class ConnectionManager:
|
||||
@@ -77,6 +82,54 @@ timescale: TimescaleStorage | None = None
|
||||
grpc_channel: grpc.aio.Channel | None = None
|
||||
grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None
|
||||
|
||||
|
||||
async def connect_to_edge():
|
||||
"""Maintain persistent WebSocket connection to edge and forward metrics."""
|
||||
global edge_ws
|
||||
|
||||
if not EDGE_URL:
|
||||
logger.info("edge_not_configured", msg="No EDGE_URL set, running local only")
|
||||
return
|
||||
|
||||
import websockets
|
||||
|
||||
url = EDGE_URL
|
||||
if EDGE_API_KEY:
|
||||
separator = "&" if "?" in url else "?"
|
||||
url = f"{url}{separator}key={EDGE_API_KEY}"
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info("edge_connecting", url=EDGE_URL)
|
||||
async with websockets.connect(url) as ws:
|
||||
edge_ws = ws
|
||||
logger.info("edge_connected")
|
||||
|
||||
while True:
|
||||
try:
|
||||
msg = await asyncio.wait_for(ws.recv(), timeout=30)
|
||||
# Ignore messages from edge (pings, etc)
|
||||
except asyncio.TimeoutError:
|
||||
await ws.ping()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
edge_ws = None
|
||||
logger.warning("edge_connection_error", error=str(e))
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
async def forward_to_edge(data: dict):
|
||||
"""Forward metrics to edge if connected."""
|
||||
global edge_ws
|
||||
if edge_ws:
|
||||
try:
|
||||
await edge_ws.send(json.dumps(data))
|
||||
except Exception as e:
|
||||
logger.warning("edge_forward_error", error=str(e))
|
||||
|
||||
|
||||
# Track recent events for internals view
|
||||
recent_events: list[dict] = []
|
||||
MAX_RECENT_EVENTS = 100
|
||||
@@ -133,15 +186,17 @@ async def event_listener():
|
||||
merged_payload = event.payload
|
||||
|
||||
# Broadcast merged data to dashboard
|
||||
await manager.broadcast(
|
||||
{
|
||||
"type": "metrics",
|
||||
"data": merged_payload,
|
||||
"timestamp": event.timestamp.isoformat(),
|
||||
}
|
||||
)
|
||||
broadcast_msg = {
|
||||
"type": "metrics",
|
||||
"data": merged_payload,
|
||||
"timestamp": event.timestamp.isoformat(),
|
||||
}
|
||||
await manager.broadcast(broadcast_msg)
|
||||
service_stats["websocket_broadcasts"] += 1
|
||||
|
||||
# Forward to edge if connected
|
||||
await forward_to_edge(broadcast_msg)
|
||||
|
||||
# Broadcast to internals (show raw event, not merged)
|
||||
await internals_manager.broadcast(
|
||||
{
|
||||
@@ -176,6 +231,9 @@ async def lifespan(app: FastAPI):
|
||||
# Start event listener in background
|
||||
listener_task = asyncio.create_task(event_listener())
|
||||
|
||||
# Start edge connection if configured
|
||||
edge_task = asyncio.create_task(connect_to_edge())
|
||||
|
||||
service_stats["started_at"] = datetime.utcnow().isoformat()
|
||||
logger.info("gateway_started")
|
||||
|
||||
@@ -183,10 +241,15 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
# Cleanup
|
||||
listener_task.cancel()
|
||||
edge_task.cancel()
|
||||
try:
|
||||
await listener_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
try:
|
||||
await edge_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
if grpc_channel:
|
||||
await grpc_channel.close()
|
||||
|
||||
Reference in New Issue
Block a user