"""Multi-server MCP client using fastmcp composition. Composes the three domain-scoped MCP servers into namespaced configurations that agents connect to as a single client. """ import os from contextlib import asynccontextmanager from typing import Any from fastmcp import Client from agents.shared.parser import ( parse_prompt_result, parse_resource_result, parse_tool_result, ) SERVER_MODULES = { "shared": "mcp_servers.shared", "ops": "mcp_servers.ops", "passenger": "mcp_servers.passenger", } AGENT_PROFILES = { "fce": ["shared", "ops", "passenger"], "handover": ["shared", "ops"], } _FORWARDED_ENV_KEYS = ( "LLM_PROVIDER", "GROQ_API_KEY", "GROQ_MODEL", "ANTHROPIC_API_KEY", "ANTHROPIC_MODEL", "OPENAI_API_KEY", "OPENAI_BASE_URL", "OPENAI_MODEL", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_DEFAULT_REGION", "BEDROCK_MODEL_ID", "USE_BEDROCK", "PATH", ) def _subprocess_env() -> dict: """Env vars forwarded to MCP server subprocesses (LLM config + active scenario).""" from mcp_servers.data.scenarios.manager import scenario_manager env = {k: os.environ[k] for k in _FORWARDED_ENV_KEYS if k in os.environ} env["ACTIVE_SCENARIO"] = scenario_manager.active_id return env def _server_config(module: str) -> dict: return {"command": "uv", "args": ["run", "python", "-m", module], "env": _subprocess_env()} class MCPMultiClient: """Manages connections to multiple MCP servers via fastmcp Client.""" def __init__(self) -> None: self._clients: dict[str, Client] = {} async def connect(self, server_names: list[str]) -> None: for name in server_names: if name not in SERVER_MODULES: raise ValueError(f"Unknown server: {name}. Available: {list(SERVER_MODULES.keys())}") config = {"mcpServers": {"default": _server_config(SERVER_MODULES[name])}} client = Client(config) await client.__aenter__() self._clients[name] = client async def close(self) -> None: for client in self._clients.values(): try: await client.__aexit__(None, None, None) except (Exception, BaseException): pass self._clients.clear() def _client(self, server: str) -> Client: client = self._clients.get(server) if not client: raise ValueError(f"Not connected to server: {server}") return client async def call_tool(self, server: str, tool_name: str, arguments: dict) -> Any: result = await self._client(server).call_tool(tool_name, arguments) return parse_tool_result(result) async def read_resource(self, server: str, uri: str) -> Any: result = await self._client(server).read_resource(uri) return parse_resource_result(result) async def get_prompt(self, server: str, prompt_name: str, arguments: dict) -> str: result = await self._client(server).get_prompt(prompt_name, arguments) return parse_prompt_result(result) @asynccontextmanager async def connect_servers(server_names: list[str]): """Context manager for multi-server MCP connections.""" client = MCPMultiClient() try: await client.connect(server_names) yield client finally: await client.close()