Files
nova/agents/shared/parser.py

57 lines
1.7 KiB
Python

"""Parsers for MCP response content (tools, resources, prompts)."""
import json
from typing import Any
def _extract_texts(result: Any) -> list[str]:
"""Pull text fields out of an MCP response, handling list/content/str shapes."""
if isinstance(result, list):
return [c.text for c in result if hasattr(c, "text")]
if hasattr(result, "content"):
return [c.text for c in result.content if hasattr(c, "text")]
return []
def _maybe_json(text: str) -> Any:
"""Parse JSON if possible; fall back to raw text."""
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
return text
def parse_tool_result(result: Any) -> Any:
"""Parse `client.call_tool()` result into native Python structure."""
texts = _extract_texts(result)
if not texts:
return result
if len(texts) == 1:
return _maybe_json(texts[0])
return [_maybe_json(t) for t in texts]
def parse_resource_result(result: Any) -> Any:
"""Parse `client.read_resource()` result — often a JSON-encoded string."""
if isinstance(result, str):
return _maybe_json(result)
return result
def parse_prompt_result(result: Any) -> str:
"""Flatten a prompt response into a single string."""
if isinstance(result, str):
return result
if not hasattr(result, "messages"):
return str(result)
texts: list[str] = []
for msg in result.messages:
if hasattr(msg.content, "text"):
texts.append(msg.content.text)
elif isinstance(msg.content, list):
for c in msg.content:
if hasattr(c, "text"):
texts.append(c.text)
return "\n".join(texts) if texts else str(result)