Files
soleprint/artery/veins/slack/api/routes.py
2025-12-31 08:34:18 -03:00

234 lines
7.0 KiB
Python

"""
API routes for Slack vein.
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import PlainTextResponse
from typing import Optional, Union
from pydantic import BaseModel
from ..core.auth import get_slack_credentials, SlackCredentials
from ..core.client import get_client, test_auth, SlackClientError
from ..models.message import (
Channel, ChannelList, Message, MessageList,
User, UserList,
)
from ..models.formatter import (
format_channel_list, format_message_list, format_user_list,
)
router = APIRouter()
class PostMessageRequest(BaseModel):
channel: str # Channel ID or name
text: str
thread_ts: Optional[str] = None # Reply to thread
class PostMessageResponse(BaseModel):
ok: bool
channel: str
ts: str
message: Optional[Message] = None
def _maybe_text(data, text: bool, formatter):
if not text:
return data
return PlainTextResponse(formatter(data))
@router.get("/health")
def health(creds: SlackCredentials = Depends(get_slack_credentials)):
"""Test Slack connection."""
try:
client = get_client(creds.token)
info = test_auth(client)
return {"status": "ok", **info}
except SlackClientError as e:
raise HTTPException(500, str(e))
except Exception as e:
raise HTTPException(500, f"Connection failed: {e}")
@router.get("/channels")
def list_channels(
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(100, ge=1, le=1000),
types: str = Query("public_channel", description="Channel types: public_channel, private_channel (needs groups:read), mpim, im"),
text: bool = False,
):
"""List channels the bot/user has access to."""
try:
client = get_client(creds.token)
response = client.conversations_list(limit=limit, types=types)
channels = [Channel.from_slack(ch) for ch in response.get("channels", [])]
result = ChannelList(channels=channels, total=len(channels))
return _maybe_text(result, text, format_channel_list)
except Exception as e:
raise HTTPException(500, f"Failed to list channels: {e}")
@router.get("/channels/{channel_id}/messages")
def get_messages(
channel_id: str,
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(50, ge=1, le=1000),
oldest: Optional[str] = None,
latest: Optional[str] = None,
text: bool = False,
include_users: bool = False,
):
"""Get messages from a channel."""
try:
client = get_client(creds.token)
kwargs = {"channel": channel_id, "limit": limit}
if oldest:
kwargs["oldest"] = oldest
if latest:
kwargs["latest"] = latest
response = client.conversations_history(**kwargs)
messages = [Message.from_slack(m) for m in response.get("messages", [])]
result = MessageList(
messages=messages,
channel_id=channel_id,
has_more=response.get("has_more", False),
)
# Optionally fetch user names for better text output
users_map = None
if text and include_users:
try:
users_resp = client.users_list(limit=200)
users_map = {
u["id"]: u.get("profile", {}).get("display_name") or u.get("real_name") or u.get("name")
for u in users_resp.get("members", [])
}
except Exception:
pass # Continue without user names
if text:
return PlainTextResponse(format_message_list(result, users_map))
return result
except Exception as e:
raise HTTPException(500, f"Failed to get messages: {e}")
@router.get("/channels/{channel_id}/thread/{thread_ts}")
def get_thread(
channel_id: str,
thread_ts: str,
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(100, ge=1, le=1000),
text: bool = False,
):
"""Get replies in a thread."""
try:
client = get_client(creds.token)
response = client.conversations_replies(
channel=channel_id,
ts=thread_ts,
limit=limit,
)
messages = [Message.from_slack(m) for m in response.get("messages", [])]
result = MessageList(
messages=messages,
channel_id=channel_id,
has_more=response.get("has_more", False),
)
return _maybe_text(result, text, format_message_list)
except Exception as e:
raise HTTPException(500, f"Failed to get thread: {e}")
@router.get("/users")
def list_users(
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(200, ge=1, le=1000),
text: bool = False,
):
"""List workspace users."""
try:
client = get_client(creds.token)
response = client.users_list(limit=limit)
users = [User.from_slack(u) for u in response.get("members", [])]
result = UserList(users=users, total=len(users))
return _maybe_text(result, text, format_user_list)
except Exception as e:
raise HTTPException(500, f"Failed to list users: {e}")
@router.post("/post")
def post_message(
request: PostMessageRequest,
creds: SlackCredentials = Depends(get_slack_credentials),
):
"""Post a message to a channel."""
try:
client = get_client(creds.token)
kwargs = {
"channel": request.channel,
"text": request.text,
}
if request.thread_ts:
kwargs["thread_ts"] = request.thread_ts
response = client.chat_postMessage(**kwargs)
msg = None
if response.get("message"):
msg = Message.from_slack(response["message"])
return PostMessageResponse(
ok=response.get("ok", False),
channel=response.get("channel", request.channel),
ts=response.get("ts", ""),
message=msg,
)
except Exception as e:
raise HTTPException(500, f"Failed to post message: {e}")
@router.get("/search")
def search_messages(
query: str,
creds: SlackCredentials = Depends(get_slack_credentials),
count: int = Query(20, ge=1, le=100),
text: bool = False,
):
"""Search messages (requires user token with search:read scope)."""
try:
client = get_client(creds.token)
response = client.search_messages(query=query, count=count)
messages_data = response.get("messages", {}).get("matches", [])
messages = []
for m in messages_data:
messages.append(Message(
ts=m.get("ts", ""),
user=m.get("user"),
text=m.get("text", ""),
thread_ts=m.get("thread_ts"),
))
result = MessageList(
messages=messages,
channel_id="search",
has_more=len(messages) >= count,
)
return _maybe_text(result, text, format_message_list)
except Exception as e:
raise HTTPException(500, f"Search failed: {e}")