"""Multi-server MCP client using fastmcp composition. Composes the three domain-scoped MCP servers into namespaced configurations that agents connect to as a single client. """ import json from contextlib import asynccontextmanager from typing import Any from fastmcp import Client # Server configurations for stdio transport SERVERS = { "shared": { "command": "uv", "args": ["run", "python", "-m", "mcp_servers.shared"], }, "ops": { "command": "uv", "args": ["run", "python", "-m", "mcp_servers.ops"], }, "passenger": { "command": "uv", "args": ["run", "python", "-m", "mcp_servers.passenger"], }, } # Agent profiles — which servers each agent connects to AGENT_PROFILES = { "fce": ["shared", "ops", "passenger"], "handover": ["shared", "ops"], } class MCPMultiClient: """Manages connections to multiple MCP servers via fastmcp Client.""" def __init__(self) -> None: self._clients: dict[str, Client] = {} async def connect(self, server_names: list[str]) -> None: """Connect to the specified MCP servers.""" for name in server_names: if name not in SERVERS: raise ValueError(f"Unknown server: {name}. Available: {list(SERVERS.keys())}") config = {"mcpServers": {"default": SERVERS[name]}} client = Client(config) await client.__aenter__() self._clients[name] = client async def close(self) -> None: """Close all server connections.""" for client in self._clients.values(): try: await client.__aexit__(None, None, None) except (Exception, BaseException): pass self._clients.clear() async def call_tool(self, server: str, tool_name: str, arguments: dict) -> Any: """Call a tool on a specific server. Returns parsed result.""" client = self._clients.get(server) if not client: raise ValueError(f"Not connected to server: {server}") result = await client.call_tool(tool_name, arguments) # Parse the result content if isinstance(result, list): texts = [c.text for c in result if hasattr(c, "text")] elif hasattr(result, "content"): texts = [c.text for c in result.content if hasattr(c, "text")] else: return result if len(texts) == 1: try: return json.loads(texts[0]) except (json.JSONDecodeError, TypeError): return texts[0] elif len(texts) > 1: parsed = [] for t in texts: try: parsed.append(json.loads(t)) except (json.JSONDecodeError, TypeError): parsed.append(t) return parsed return None async def read_resource(self, server: str, uri: str) -> Any: """Read a resource from a specific server.""" client = self._clients.get(server) if not client: raise ValueError(f"Not connected to server: {server}") result = await client.read_resource(uri) if isinstance(result, str): try: return json.loads(result) except (json.JSONDecodeError, TypeError): return result return result async def get_prompt(self, server: str, prompt_name: str, arguments: dict) -> str: """Get a rendered prompt from a specific server.""" client = self._clients.get(server) if not client: raise ValueError(f"Not connected to server: {server}") result = await client.get_prompt(prompt_name, arguments) if isinstance(result, str): return result # Handle structured prompt response texts = [] if hasattr(result, "messages"): for msg in result.messages: if hasattr(msg.content, "text"): texts.append(msg.content.text) elif isinstance(msg.content, list): for c in msg.content: if hasattr(c, "text"): texts.append(c.text) return "\n".join(texts) if texts else str(result) @asynccontextmanager async def connect_servers(server_names: list[str]): """Context manager for multi-server MCP connections.""" client = MCPMultiClient() try: await client.connect(server_names) yield client finally: await client.close()