migrated all pawprint work

This commit is contained in:
buenosairesam
2025-12-31 08:34:18 -03:00
parent fc63e9010c
commit 680969ca42
63 changed files with 4687 additions and 5 deletions

376
artery/veins/PATTERNS.md Normal file
View File

@@ -0,0 +1,376 @@
# Vein Patterns
This document describes the patterns that emerged from building Jira, Slack, and Google veins side-by-side.
## Philosophy
**Core = Isolated API client** - Can run without FastAPI, framework-agnostic
**Vein = Corset/wrapper** - Makes the core speak to pawprint ecosystem
The vein wrapper is not a literal folder - it's following structural conventions and patterns.
## Directory Structure (Standard)
```
vein/{service}/
├── core/ # ISOLATED - can run standalone
│ ├── __init__.py
│ ├── config.py # Pydantic settings from .env
│ ├── auth.py # Auth logic (optional, for complex auth)
│ ├── client.py # Main API client
│ └── {domain}.py # Additional clients (sheets, drive, etc.)
├── api/ # WRAPPER - FastAPI integration
│ ├── __init__.py
│ └── routes.py # APIRouter with endpoints
├── models/ # Data models
│ ├── __init__.py
│ ├── {domain}.py # Pydantic models with from_{service}()
│ └── formatter.py # Text formatters for LLM output
├── storage/ # Persistent data (optional, for OAuth tokens)
│ └── .gitignore
├── main.py # FastAPI app setup
├── run.py # Standalone runner
├── requirements.txt # Dependencies
├── .env.example # Configuration template
└── README.md # Service-specific docs
```
## Base Classes
### `BaseVein` (vein/base.py)
Minimal interface for all veins:
- `name: str` - Service name
- `get_client(creds) -> Client` - Create API client
- `health_check(creds) -> dict` - Test connection
Used for simple token-based auth (Jira, Slack, WhatsApp).
### `BaseOAuthVein` (vein/oauth.py)
Extends BaseVein for OAuth2 services:
- `get_auth_url(state) -> str` - Generate OAuth URL
- `exchange_code(code) -> dict` - Code for tokens
- `refresh_token(refresh_token) -> dict` - Refresh expired tokens
- `storage: TokenStorage` - Token persistence
Used for OAuth2 services (Google, GitHub, GitLab).
### `TokenStorage` (vein/oauth.py)
File-based token storage (can be overridden for Redis/DB):
- `save_tokens(user_id, tokens)` - Persist tokens
- `load_tokens(user_id) -> dict` - Retrieve tokens
- `is_expired(tokens) -> bool` - Check expiration
- `delete_tokens(user_id)` - Logout
## Core Module Patterns
### config.py
```python
from pathlib import Path
from pydantic_settings import BaseSettings
ENV_FILE = Path(__file__).parent.parent / ".env"
class {Service}Config(BaseSettings):
# Service-specific settings
api_port: int = 800X # Unique port per vein
model_config = {
"env_file": ENV_FILE,
"env_file_encoding": "utf-8",
}
settings = {Service}Config()
```
**Pattern**: Pydantic BaseSettings with .env file at vein root.
### client.py (Simple Auth - Jira, Slack)
```python
from {sdk} import Client
def get_client(credentials) -> Client:
"""Create authenticated client."""
return Client(credentials)
```
**Pattern**: Simple factory function returning SDK client.
### oauth.py (OAuth2 - Google)
```python
class {Service}OAuth:
"""OAuth2 client."""
def get_authorization_url(self, state=None) -> str:
"""Generate auth URL for user redirect."""
def exchange_code_for_tokens(self, code: str) -> dict:
"""Exchange code for tokens."""
def refresh_access_token(self, refresh_token: str) -> dict:
"""Refresh expired token."""
def get_credentials(self, access_token, refresh_token=None):
"""Create SDK credentials from tokens."""
```
**Pattern**: OAuth client handles full flow, separate from API client.
## API Module Patterns
### routes.py
```python
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import PlainTextResponse
router = APIRouter()
@router.get("/health")
async def health(creds = Depends(get_credentials)):
"""Test connection."""
return {"status": "ok", "user": "..."}
@router.get("/resource")
async def get_resource(text: bool = False):
"""Get resource data."""
# ... fetch data
return _maybe_text(data, text, formatter)
```
**Standard endpoints**:
- `/health` - Connection test (required)
- Service-specific resources
- `?text=true` query param for text output
### Helper Functions
```python
def _maybe_text(data, text: bool, formatter):
"""Return text or JSON based on query param."""
if not text:
return data
return PlainTextResponse(formatter(data))
```
**Pattern**: Consistent text/JSON toggle across all veins.
## Model Patterns
### Domain Models
```python
from pydantic import BaseModel
class Resource(BaseModel):
id: str
name: str
# ... fields
@classmethod
def from_{service}(cls, raw: dict) -> "Resource":
"""Parse from service API response."""
return cls(
id=raw["id"],
name=raw["name"],
# ... mapping
)
```
**Pattern**: Pydantic models with `from_{service}()` factory methods.
### Formatters
```python
def format_{resource}(resource: Resource) -> str:
"""Format resource as text (LLM-friendly)."""
return f"{resource.name} (ID: {resource.id})"
```
**Pattern**: Simple functions returning plain text, no fancy tables.
## Authentication Patterns
### Simple Token Auth (Jira, Slack, WhatsApp)
**Headers or .env fallback**:
```python
async def get_{service}_credentials(
x_{service}_token: str | None = Header(None),
) -> Credentials:
# Use header if provided
if x_{service}_token and x_{service}_token.strip():
return Credentials(token=x_{service}_token.strip())
# Fall back to .env
if settings.{service}_token:
return Credentials(token=settings.{service}_token)
raise HTTPException(401, "Missing credentials")
```
**Pattern**: Per-request headers for web UI, .env for standalone/API use.
### OAuth2 (Google, GitHub, GitLab)
**Three-step flow**:
1. **Start**: `GET /oauth/start` → Redirect to service
2. **Callback**: `GET /oauth/callback?code=...` → Exchange code, save tokens
3. **Use**: Load tokens from storage, auto-refresh if expired
**Pattern**: Stateful (requires token storage), user must complete browser flow.
## Error Handling
```python
try:
client = get_client(creds)
data = client.fetch_something()
return data
except {Service}ClientError as e:
raise HTTPException(500, str(e))
except Exception as e:
raise HTTPException(500, f"Unexpected error: {e}")
```
**Pattern**: Catch service-specific errors first, then generic fallback.
## Configuration Files
### .env.example
Include all required settings with placeholder values:
```dotenv
# Service credentials
SERVICE_API_KEY=your_key_here
SERVICE_URL=https://api.service.com
# Vein config
API_PORT=8001
```
### requirements.txt
Minimal dependencies:
```txt
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
{service-specific-sdk}>=X.Y.Z
```
## Main App Pattern
```python
from fastapi import FastAPI
from api.routes import router
from core.config import settings
app = FastAPI(title="{Service} Vein", version="0.1.0")
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=settings.api_port)
```
**Pattern**: Simple FastAPI app, routes included at root or with prefix.
## Testing Isolation
Because `core/` is isolated from FastAPI:
```python
# Can test core directly without HTTP
from vein.google.core.sheets import GoogleSheetsClient
def test_sheets_client():
client = GoogleSheetsClient(mock_credentials)
data = client.get_sheet_values("sheet_id", "A1:D10")
assert len(data) > 0
```
**Pattern**: Core modules are testable without spinning up FastAPI.
## Port Allocation
- **8001**: Jira
- **8002**: Slack
- **8003**: Google
- **8004**: WhatsApp (planned)
- **8005+**: Future veins
**Pattern**: Sequential ports starting from 8001.
## Vein Types
### Type 1: Simple Token Auth
**Examples**: Jira, Slack, WhatsApp
**Auth**: Token in headers or .env
**Stateless**: No storage needed
**Inherits**: BaseVein
### Type 2: OAuth2
**Examples**: Google, GitHub, GitLab
**Auth**: OAuth2 flow with callback
**Stateful**: Requires token storage
**Inherits**: BaseOAuthVein
### Type 3: Hybrid (Future)
**Examples**: Services with webhooks + API
**May need**: Database, Redis, webhook endpoints
**Consider**: Pulse instead of vein (composed service)
## When to Use Pulse vs Vein
**Vein**: Pure connector
- Stateless or minimal state (OAuth tokens)
- Pull-based (you call the API)
- No database required
**Pulse**: Composed service
- Stateful (database, message queue)
- Push-based (webhooks, real-time)
- Combines vein + storage + processing
**Example**: WhatsApp webhook receiver = pulse, WhatsApp API client = vein.
## Standardization Checklist
When creating a new vein:
- [ ] Follow directory structure (core/, api/, models/)
- [ ] Create .env.example with all settings
- [ ] Implement /health endpoint
- [ ] Support ?text=true for all data endpoints
- [ ] Use from_{service}() factory methods in models
- [ ] Create text formatters in models/formatter.py
- [ ] Include README.md with setup instructions
- [ ] Choose correct base class (BaseVein or BaseOAuthVein)
- [ ] Allocate unique port (8001+)
- [ ] Keep core/ isolated from FastAPI
## Evolution
This document captures patterns as of having 3 veins (Jira, Slack, Google).
**Do not** enforce these patterns rigidly - they should evolve as we build more veins.
**Do** use this as a starting point for consistency.
**Do** update this document when patterns change.
The abstract classes exist to enforce interfaces, not implementations.
The patterns exist to reduce cognitive load, not to restrict flexibility.

14
artery/veins/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
"""
Veins - Stateless API connectors.
Each vein follows the pattern:
core/ - Isolated API client (no FastAPI dependency)
api/ - FastAPI routes wrapping the core
models/ - Pydantic models and formatters
ui/ - Simple test form (optional)
Available veins:
- jira - Jira issue tracking
- slack - Slack messaging
- google - Google APIs (OAuth)
"""

67
artery/veins/base.py Normal file
View File

@@ -0,0 +1,67 @@
"""
Base class for vein connectors.
Defines the minimal interface all veins should implement.
The core/ module contains isolated API clients.
The api/ module wraps them in FastAPI routes.
"""
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
from fastapi import APIRouter
TCredentials = TypeVar("TCredentials")
TClient = TypeVar("TClient")
class BaseVein(ABC, Generic[TCredentials, TClient]):
"""
Abstract base for vein connectors.
Veins are wrappers around API clients that provide:
- Standard auth patterns (headers or OAuth)
- Health check endpoint
- Consistent routing structure
The core implementation should be isolated and runnable without FastAPI.
"""
@property
@abstractmethod
def name(self) -> str:
"""Vein name (e.g., 'jira', 'slack', 'google')"""
pass
@abstractmethod
def get_client(self, creds: TCredentials) -> TClient:
"""
Create API client with given credentials.
This should delegate to core/ module which contains
the isolated client implementation.
"""
pass
@abstractmethod
async def health_check(self, creds: TCredentials) -> dict:
"""
Test connection and return status.
Should return:
{
"status": "ok",
"user": "...", # or other identifying info
...
}
"""
pass
def create_router(self) -> APIRouter:
"""
Create base router with standard endpoints.
Subclasses should extend this with additional routes.
"""
router = APIRouter()
return router

View File

@@ -0,0 +1,8 @@
# Google OAuth2 Configuration
# Get credentials from: https://console.cloud.google.com/apis/credentials
GOOGLE_CLIENT_ID=your_client_id.apps.googleusercontent.com
GOOGLE_CLIENT_SECRET=your_client_secret
GOOGLE_REDIRECT_URI=https://artery.mcrn.ar/google/oauth/callback
GOOGLE_SCOPES=https://www.googleapis.com/auth/spreadsheets.readonly https://www.googleapis.com/auth/drive.readonly
API_PORT=8003

View File

@@ -0,0 +1,90 @@
# Google Vein
OAuth2-based connector for Google APIs (Sheets, Drive).
## Status: DEVELOPMENT
## Setup
1. Create Google Cloud project and OAuth2 credentials:
- Go to https://console.cloud.google.com/apis/credentials
- Create OAuth 2.0 Client ID (Web application)
- Add authorized redirect URI: `https://artery.mcrn.ar/google/oauth/callback`
- Enable Google Sheets API and Google Drive API
2. Copy `.env.example` to `.env` and fill in credentials:
```bash
cp .env.example .env
# Edit .env with your credentials
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Run standalone:
```bash
python run.py
```
## OAuth Flow
Unlike Jira/Slack (simple token auth), Google uses OAuth2:
1. **Start**: Visit `/google/oauth/start` - redirects to Google login
2. **Callback**: Google redirects to `/google/oauth/callback` with code
3. **Exchange**: Code exchanged for access_token + refresh_token
4. **Storage**: Tokens saved to `storage/tokens_{user_id}.json`
5. **Use**: Subsequent requests use stored tokens
6. **Refresh**: Expired tokens auto-refreshed using refresh_token
## Endpoints
### Authentication
- `GET /google/health` - Check auth status
- `GET /google/oauth/start` - Start OAuth flow
- `GET /google/oauth/callback` - OAuth callback (called by Google)
- `GET /google/oauth/logout` - Clear stored tokens
### Google Sheets
- `GET /google/spreadsheets/{id}` - Get spreadsheet metadata
- `GET /google/spreadsheets/{id}/sheets` - List all sheets
- `GET /google/spreadsheets/{id}/values?range=Sheet1!A1:D10` - Get cell values
All endpoints support `?text=true` for LLM-friendly text output.
## Architecture
```
core/ # Isolated - can run without FastAPI
├── oauth.py # Google OAuth2 client
├── sheets.py # Google Sheets API client
└── config.py # Settings
api/ # FastAPI wrapper
└── routes.py # Endpoints
models/ # Data models
├── spreadsheet.py # Pydantic models
└── formatter.py # Text output
storage/ # Token persistence (gitignored)
```
## Token Storage
For development/demo: File-based storage in `storage/`
For production: Override `TokenStorage` in `vein/oauth.py`:
- Redis for scalability
- Database for audit trail
- Per-user tokens when integrated with auth system
## Future APIs
- Google Drive (file listing, download)
- Gmail (read messages)
- Calendar (events)
Each API gets its own client in `core/` (e.g., `core/drive.py`).

View File

View File

View File

@@ -0,0 +1,194 @@
"""
API routes for Google vein.
"""
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import PlainTextResponse, RedirectResponse
from typing import Optional
from core.oauth import GoogleOAuth
from core.sheets import GoogleSheetsClient, GoogleSheetsError
from models.spreadsheet import SpreadsheetMetadata, SheetValues
from models.formatter import format_spreadsheet_metadata, format_sheet_values
# Import from parent vein module
import sys
from pathlib import Path
vein_path = Path(__file__).parent.parent.parent
sys.path.insert(0, str(vein_path))
from oauth import TokenStorage
router = APIRouter()
# OAuth client and token storage
oauth_client = GoogleOAuth()
token_storage = TokenStorage(vein_name="google")
# For demo/development, use a default user_id
# In production, this would come from session/auth
DEFAULT_USER_ID = "demo_user"
def _get_sheets_client(user_id: str = DEFAULT_USER_ID) -> GoogleSheetsClient:
"""Get authenticated Sheets client for user."""
tokens = token_storage.load_tokens(user_id)
if not tokens:
raise HTTPException(
status_code=401,
detail="Not authenticated. Visit /google/oauth/start to login.",
)
# Check if expired and refresh if needed
if token_storage.is_expired(tokens):
if "refresh_token" not in tokens:
raise HTTPException(
status_code=401,
detail="Token expired and no refresh token. Re-authenticate at /google/oauth/start",
)
try:
new_tokens = oauth_client.refresh_access_token(tokens["refresh_token"])
token_storage.save_tokens(user_id, new_tokens)
tokens = new_tokens
except Exception as e:
raise HTTPException(
status_code=401,
detail=f"Failed to refresh token: {e}. Re-authenticate at /google/oauth/start",
)
credentials = oauth_client.get_credentials(
access_token=tokens["access_token"],
refresh_token=tokens.get("refresh_token"),
)
return GoogleSheetsClient(credentials)
def _maybe_text(data, text: bool, formatter):
"""Return text or JSON based on query param."""
if not text:
return data
return PlainTextResponse(formatter(data))
@router.get("/health")
async def health():
"""Check if user is authenticated."""
try:
tokens = token_storage.load_tokens(DEFAULT_USER_ID)
if not tokens:
return {
"status": "not_authenticated",
"message": "Visit /google/oauth/start to login",
}
expired = token_storage.is_expired(tokens)
return {
"status": "ok" if not expired else "token_expired",
"has_refresh_token": "refresh_token" in tokens,
"user": DEFAULT_USER_ID,
}
except Exception as e:
raise HTTPException(500, str(e))
@router.get("/oauth/start")
async def start_oauth(state: Optional[str] = None):
"""Start OAuth flow - redirect to Google authorization."""
auth_url = oauth_client.get_authorization_url(state=state)
return RedirectResponse(auth_url)
@router.get("/oauth/callback")
async def oauth_callback(
code: Optional[str] = None,
state: Optional[str] = None,
error: Optional[str] = None,
):
"""Handle OAuth callback from Google."""
if error:
raise HTTPException(400, f"OAuth error: {error}")
if not code:
raise HTTPException(400, "Missing authorization code")
try:
tokens = oauth_client.exchange_code_for_tokens(code)
token_storage.save_tokens(DEFAULT_USER_ID, tokens)
return {
"status": "ok",
"message": "Successfully authenticated with Google",
"user": DEFAULT_USER_ID,
}
except Exception as e:
raise HTTPException(500, f"Failed to exchange code: {e}")
@router.get("/oauth/logout")
async def logout():
"""Clear stored tokens."""
token_storage.delete_tokens(DEFAULT_USER_ID)
return {"status": "ok", "message": "Logged out"}
@router.get("/spreadsheets/{spreadsheet_id}")
async def get_spreadsheet(
spreadsheet_id: str,
text: bool = False,
):
"""Get spreadsheet metadata (title, sheets list, etc.)."""
try:
client = _get_sheets_client()
metadata = client.get_spreadsheet_metadata(spreadsheet_id)
result = SpreadsheetMetadata.from_google(metadata)
return _maybe_text(result, text, format_spreadsheet_metadata)
except GoogleSheetsError as e:
raise HTTPException(404, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@router.get("/spreadsheets/{spreadsheet_id}/values")
async def get_sheet_values(
spreadsheet_id: str,
range: str = Query(..., description="A1 notation range (e.g., 'Sheet1!A1:D10')"),
text: bool = False,
max_rows: int = Query(100, ge=1, le=10000),
):
"""Get values from a sheet range."""
try:
client = _get_sheets_client()
values = client.get_sheet_values(spreadsheet_id, range)
result = SheetValues.from_google(spreadsheet_id, range, values)
if text:
return PlainTextResponse(format_sheet_values(result, max_rows=max_rows))
return result
except GoogleSheetsError as e:
raise HTTPException(404, str(e))
except Exception as e:
raise HTTPException(500, str(e))
@router.get("/spreadsheets/{spreadsheet_id}/sheets")
async def list_sheets(
spreadsheet_id: str,
text: bool = False,
):
"""List all sheets in a spreadsheet."""
try:
client = _get_sheets_client()
sheets = client.get_all_sheets(spreadsheet_id)
if text:
lines = [f"Sheets in {spreadsheet_id}:", ""]
for sheet in sheets:
lines.append(
f" [{sheet['index']}] {sheet['title']} "
f"({sheet['row_count']} rows x {sheet['column_count']} cols)"
)
return PlainTextResponse("\n".join(lines))
return {"spreadsheet_id": spreadsheet_id, "sheets": sheets}
except GoogleSheetsError as e:
raise HTTPException(404, str(e))
except Exception as e:
raise HTTPException(500, str(e))

View File

View File

@@ -0,0 +1,24 @@
"""
Google OAuth2 configuration loaded from .env file.
"""
from pathlib import Path
from pydantic_settings import BaseSettings
ENV_FILE = Path(__file__).parent.parent / ".env"
class GoogleConfig(BaseSettings):
google_client_id: str
google_client_secret: str
google_redirect_uri: str # e.g., https://artery.mcrn.ar/google/oauth/callback
google_scopes: str = "https://www.googleapis.com/auth/spreadsheets.readonly https://www.googleapis.com/auth/drive.readonly"
api_port: int = 8003
model_config = {
"env_file": ENV_FILE,
"env_file_encoding": "utf-8",
}
settings = GoogleConfig()

View File

@@ -0,0 +1,147 @@
"""
Google OAuth2 flow implementation.
Isolated OAuth2 client that can run without FastAPI.
"""
from typing import Optional
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import Flow
from .config import settings
class GoogleOAuth:
"""
Google OAuth2 client.
Handles authorization flow, token exchange, and token refresh.
"""
def __init__(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
redirect_uri: Optional[str] = None,
scopes: Optional[list[str]] = None,
):
"""
Initialize OAuth client.
Falls back to settings if parameters not provided.
"""
self.client_id = client_id or settings.google_client_id
self.client_secret = client_secret or settings.google_client_secret
self.redirect_uri = redirect_uri or settings.google_redirect_uri
self.scopes = scopes or settings.google_scopes.split()
def _create_flow(self) -> Flow:
"""Create OAuth flow object."""
client_config = {
"web": {
"client_id": self.client_id,
"client_secret": self.client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
}
}
flow = Flow.from_client_config(
client_config,
scopes=self.scopes,
redirect_uri=self.redirect_uri,
)
return flow
def get_authorization_url(self, state: Optional[str] = None) -> str:
"""
Generate OAuth2 authorization URL.
Args:
state: Optional state parameter for CSRF protection
Returns:
URL to redirect user for Google authorization
"""
flow = self._create_flow()
auth_url, _ = flow.authorization_url(
access_type="offline", # Request refresh token
include_granted_scopes="true",
state=state,
)
return auth_url
def exchange_code_for_tokens(self, code: str) -> dict:
"""
Exchange authorization code for tokens.
Args:
code: Authorization code from callback
Returns:
Token dict containing:
- access_token
- refresh_token
- expires_in
- scope
- token_type
"""
flow = self._create_flow()
flow.fetch_token(code=code)
credentials = flow.credentials
return {
"access_token": credentials.token,
"refresh_token": credentials.refresh_token,
"expires_in": 3600, # Google tokens typically 1 hour
"scope": " ".join(credentials.scopes or []),
"token_type": "Bearer",
}
def refresh_access_token(self, refresh_token: str) -> dict:
"""
Refresh an expired access token.
Args:
refresh_token: The refresh token
Returns:
New token dict with fresh access_token
"""
credentials = Credentials(
token=None,
refresh_token=refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=self.client_id,
client_secret=self.client_secret,
)
request = Request()
credentials.refresh(request)
return {
"access_token": credentials.token,
"refresh_token": refresh_token, # Keep original refresh token
"expires_in": 3600,
"scope": " ".join(credentials.scopes or []),
"token_type": "Bearer",
}
def get_credentials(self, access_token: str, refresh_token: Optional[str] = None) -> Credentials:
"""
Create Google Credentials object from tokens.
Args:
access_token: OAuth2 access token
refresh_token: Optional refresh token
Returns:
Google Credentials object for API calls
"""
return Credentials(
token=access_token,
refresh_token=refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=self.client_id,
client_secret=self.client_secret,
scopes=self.scopes,
)

View File

@@ -0,0 +1,130 @@
"""
Google Sheets API client.
Isolated client that can run without FastAPI.
"""
from typing import Optional
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
class GoogleSheetsError(Exception):
"""Sheets API error."""
pass
class GoogleSheetsClient:
"""
Google Sheets API client.
Provides methods to read spreadsheet data.
"""
def __init__(self, credentials: Credentials):
"""
Initialize Sheets client.
Args:
credentials: Google OAuth2 credentials
"""
self.credentials = credentials
self.service = build("sheets", "v4", credentials=credentials)
def get_spreadsheet_metadata(self, spreadsheet_id: str) -> dict:
"""
Get spreadsheet metadata (title, sheets, etc.).
Args:
spreadsheet_id: The spreadsheet ID
Returns:
Spreadsheet metadata
"""
try:
result = self.service.spreadsheets().get(
spreadsheetId=spreadsheet_id
).execute()
return result
except HttpError as e:
raise GoogleSheetsError(f"Failed to get spreadsheet: {e}")
def get_sheet_values(
self,
spreadsheet_id: str,
range_name: str,
value_render_option: str = "FORMATTED_VALUE",
) -> list[list]:
"""
Get values from a sheet range.
Args:
spreadsheet_id: The spreadsheet ID
range_name: A1 notation range (e.g., 'Sheet1!A1:D10')
value_render_option: How values should be rendered
- FORMATTED_VALUE: Values formatted as strings (default)
- UNFORMATTED_VALUE: Values in calculated format
- FORMULA: Formulas
Returns:
List of rows, each row is a list of cell values
"""
try:
result = self.service.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=range_name,
valueRenderOption=value_render_option,
).execute()
return result.get("values", [])
except HttpError as e:
raise GoogleSheetsError(f"Failed to get values: {e}")
def get_all_sheets(self, spreadsheet_id: str) -> list[dict]:
"""
Get list of all sheets in a spreadsheet.
Args:
spreadsheet_id: The spreadsheet ID
Returns:
List of sheet metadata (title, id, index, etc.)
"""
metadata = self.get_spreadsheet_metadata(spreadsheet_id)
return [
{
"title": sheet["properties"]["title"],
"sheet_id": sheet["properties"]["sheetId"],
"index": sheet["properties"]["index"],
"row_count": sheet["properties"]["gridProperties"].get("rowCount", 0),
"column_count": sheet["properties"]["gridProperties"].get("columnCount", 0),
}
for sheet in metadata.get("sheets", [])
]
def batch_get_values(
self,
spreadsheet_id: str,
ranges: list[str],
value_render_option: str = "FORMATTED_VALUE",
) -> dict:
"""
Get multiple ranges in a single request.
Args:
spreadsheet_id: The spreadsheet ID
ranges: List of A1 notation ranges
value_render_option: How values should be rendered
Returns:
Dict with spreadsheetId and valueRanges list
"""
try:
result = self.service.spreadsheets().values().batchGet(
spreadsheetId=spreadsheet_id,
ranges=ranges,
valueRenderOption=value_render_option,
).execute()
return result
except HttpError as e:
raise GoogleSheetsError(f"Failed to batch get values: {e}")

View File

@@ -0,0 +1,15 @@
"""
Google Vein - FastAPI app.
"""
from fastapi import FastAPI
from api.routes import router
from core.config import settings
app = FastAPI(title="Google Vein", version="0.1.0")
app.include_router(router, prefix="/google")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=settings.api_port)

View File

View File

@@ -0,0 +1,71 @@
"""
Text formatters for spreadsheet data (LLM-friendly output).
"""
from .spreadsheet import SpreadsheetMetadata, SheetValues
def format_spreadsheet_metadata(metadata: SpreadsheetMetadata) -> str:
"""Format spreadsheet metadata as text."""
lines = [
f"Spreadsheet: {metadata.title}",
f"ID: {metadata.spreadsheet_id}",
f"Locale: {metadata.locale or 'N/A'}",
f"Timezone: {metadata.timezone or 'N/A'}",
"",
"Sheets:",
]
for sheet in metadata.sheets:
lines.append(
f" [{sheet.index}] {sheet.title} "
f"({sheet.row_count} rows x {sheet.column_count} cols)"
)
return "\n".join(lines)
def format_sheet_values(values: SheetValues, max_rows: int = 100) -> str:
"""
Format sheet values as text table.
Args:
values: Sheet values
max_rows: Maximum rows to display
"""
lines = [
f"Spreadsheet ID: {values.spreadsheet_id}",
f"Range: {values.range}",
f"Size: {values.row_count} rows x {values.column_count} cols",
"",
]
if not values.values:
lines.append("(empty)")
return "\n".join(lines)
# Display up to max_rows
display_rows = values.values[:max_rows]
# Calculate column widths (for basic alignment)
col_widths = [0] * values.column_count
for row in display_rows:
for i, cell in enumerate(row):
col_widths[i] = max(col_widths[i], len(str(cell)))
# Format rows
for row_idx, row in enumerate(display_rows):
cells = []
for col_idx, cell in enumerate(row):
width = col_widths[col_idx] if col_idx < len(col_widths) else 0
cells.append(str(cell).ljust(width))
# Pad with empty cells if row is shorter
while len(cells) < values.column_count:
width = col_widths[len(cells)] if len(cells) < len(col_widths) else 0
cells.append("".ljust(width))
lines.append(" | ".join(cells))
if values.row_count > max_rows:
lines.append(f"\n... ({values.row_count - max_rows} more rows)")
return "\n".join(lines)

View File

@@ -0,0 +1,69 @@
"""
Spreadsheet models with self-parsing from Google Sheets API responses.
"""
from pydantic import BaseModel
from typing import Optional, List
class SheetInfo(BaseModel):
"""Individual sheet within a spreadsheet."""
title: str
sheet_id: int
index: int
row_count: int
column_count: int
class SpreadsheetMetadata(BaseModel):
"""Spreadsheet metadata."""
spreadsheet_id: str
title: str
locale: Optional[str] = None
timezone: Optional[str] = None
sheets: List[SheetInfo] = []
@classmethod
def from_google(cls, data: dict) -> "SpreadsheetMetadata":
"""Parse from Google Sheets API response."""
sheets = [
SheetInfo(
title=sheet["properties"]["title"],
sheet_id=sheet["properties"]["sheetId"],
index=sheet["properties"]["index"],
row_count=sheet["properties"]["gridProperties"].get("rowCount", 0),
column_count=sheet["properties"]["gridProperties"].get("columnCount", 0),
)
for sheet in data.get("sheets", [])
]
return cls(
spreadsheet_id=data["spreadsheetId"],
title=data["properties"]["title"],
locale=data["properties"].get("locale"),
timezone=data["properties"].get("timeZone"),
sheets=sheets,
)
class SheetValues(BaseModel):
"""Sheet data values."""
spreadsheet_id: str
range: str
values: List[List[str]] # rows of cells
row_count: int
column_count: int
@classmethod
def from_google(cls, spreadsheet_id: str, range_name: str, values: List[List]) -> "SheetValues":
"""Parse from Google Sheets API values response."""
row_count = len(values)
column_count = max((len(row) for row in values), default=0)
return cls(
spreadsheet_id=spreadsheet_id,
range=range_name,
values=values,
row_count=row_count,
column_count=column_count,
)

View File

@@ -0,0 +1,8 @@
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
google-auth>=2.23.0
google-auth-oauthlib>=1.1.0
google-auth-httplib2>=0.1.1
google-api-python-client>=2.100.0

View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python3
"""
Standalone runner for Google vein.
"""
if __name__ == "__main__":
import uvicorn
from main import app
from core.config import settings
uvicorn.run(app, host="0.0.0.0", port=settings.api_port, reload=True)

View File

@@ -0,0 +1,5 @@
# Ignore all token files
tokens_*.json
# But keep this directory in git
!.gitignore

View File

@@ -0,0 +1,4 @@
JIRA_URL=https://yourcompany.atlassian.net
JIRA_EMAIL=your.email@company.com
JIRA_API_TOKEN=your_api_token
API_PORT=8001

View File

@@ -0,0 +1,37 @@
# Jira Vein
Jira connector for Pawprint Artery.
## Authentication
Two ways to provide Jira credentials:
### 1. Web UI (Headers)
Enter credentials in the web form at https://artery.mcrn.ar
- Credentials sent as `X-Jira-Email` and `X-Jira-Token` headers
- Use for demos, testing, or when credentials change frequently
### 2. Local .env file (Fallback)
Create `.env` (not committed to git):
```bash
cp .env.example .env
# Edit .env with your credentials
```
The system tries headers first, then falls back to `.env`.
## Getting a Jira API Token
1. Go to https://id.atlassian.com/manage-profile/security/api-tokens
2. Click "Create API token"
3. Copy the token (starts with `ATATT3x`)
4. Use in UI or add to `.env`
## Endpoints
- `GET /jira/health` - Connection test
- `GET /jira/mine` - My assigned tickets
- `GET /jira/ticket/{key}` - Ticket details
- `POST /jira/search` - Raw JQL search
Add `?text=true` for LLM-friendly output.

View File

@@ -0,0 +1 @@
# Jira Vein

View File

View File

@@ -0,0 +1,299 @@
"""
API routes for Jira vein.
"""
import base64
import logging
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import PlainTextResponse, StreamingResponse
from typing import Optional, Union
from io import BytesIO
from ..core.auth import get_jira_credentials, JiraCredentials
from ..core.client import connect_jira, JiraClientError
from ..core.config import settings
from ..core.query import JQL, Queries
from ..models.ticket import Ticket, TicketDetail, TicketList
from ..models.formatter import format_ticket_list, format_ticket_detail
logger = logging.getLogger(__name__)
router = APIRouter()
def _download_attachments(jira, ticket: TicketDetail) -> TicketDetail:
"""Download attachment content and populate base64 field."""
for att in ticket.attachments:
try:
response = jira._session.get(att.url)
if response.status_code == 200:
att.content_base64 = base64.b64encode(response.content).decode("utf-8")
except Exception:
pass # Skip failed downloads
return ticket
def _search(creds: JiraCredentials, jql: JQL, page: int, page_size: int) -> TicketList:
jira = connect_jira(creds.email, creds.token)
start = (page - 1) * page_size
issues = jira.search_issues(jql.build(), startAt=start, maxResults=page_size)
tickets = [Ticket.from_jira(i, settings.jira_url) for i in issues]
return TicketList(tickets=tickets, total=issues.total, page=page, page_size=page_size)
def _maybe_text(data: Union[TicketList, TicketDetail], text: bool):
if not text:
return data
if isinstance(data, TicketList):
return PlainTextResponse(format_ticket_list(data))
return PlainTextResponse(format_ticket_detail(data))
@router.get("/health")
def health(creds: JiraCredentials = Depends(get_jira_credentials)):
try:
jira = connect_jira(creds.email, creds.token)
me = jira.myself()
return {"status": "ok", "user": me["displayName"]}
except Exception as e:
raise HTTPException(500, str(e))
@router.get("/mine")
def my_tickets(
creds: JiraCredentials = Depends(get_jira_credentials),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
project: Optional[str] = None,
text: bool = False,
):
"""Get my assigned open tickets."""
try:
return _maybe_text(_search(creds, Queries.my_tickets(project), page, page_size), text)
except Exception as e:
raise HTTPException(500, str(e))
@router.get("/backlog")
def backlog(
creds: JiraCredentials = Depends(get_jira_credentials),
project: str = Query(...),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
text: bool = False,
):
"""Get backlog tickets for a project."""
try:
return _maybe_text(_search(creds, Queries.backlog(project), page, page_size), text)
except Exception as e:
raise HTTPException(500, str(e))
@router.get("/sprint")
def current_sprint(
creds: JiraCredentials = Depends(get_jira_credentials),
project: str = Query(...),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
text: bool = False,
):
"""Get current sprint tickets for a project."""
try:
return _maybe_text(_search(creds, Queries.current_sprint(project), page, page_size), text)
except Exception as e:
raise HTTPException(500, str(e))
@router.get("/ticket/{key}")
def get_ticket(
key: str,
creds: JiraCredentials = Depends(get_jira_credentials),
text: bool = False,
include_attachments: bool = False,
include_children: bool = True,
):
"""Get ticket details with comments, attachments, and child work items."""
try:
jira = connect_jira(creds.email, creds.token)
issue = jira.issue(key, expand="comments")
ticket = TicketDetail.from_jira(issue, settings.jira_url)
if include_attachments and ticket.attachments:
ticket = _download_attachments(jira, ticket)
# Fetch child work items if requested and ticket has subtasks
children = []
if include_children and ticket.subtasks:
# Fetch all children in one query
child_jql = f"key in ({','.join(ticket.subtasks)})"
child_issues = jira.search_issues(child_jql, maxResults=len(ticket.subtasks))
children = [Ticket.from_jira(i, settings.jira_url) for i in child_issues]
# Sort children by key
children.sort(key=lambda t: t.key)
# Return as special format that includes children
if text:
from ..models.formatter import format_ticket_with_children
return PlainTextResponse(format_ticket_with_children(ticket, children))
# For JSON, add children to response
result = ticket.model_dump()
result["children"] = [c.model_dump() for c in children]
return result
except Exception as e:
# Return the actual Jira error for debugging
raise HTTPException(404, f"Error fetching {key}: {str(e)}")
@router.post("/search")
def search(
jql: str,
creds: JiraCredentials = Depends(get_jira_credentials),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
text: bool = False,
):
"""Search with raw JQL."""
try:
return _maybe_text(_search(creds, JQL().raw(jql), page, page_size), text)
except Exception as e:
raise HTTPException(500, str(e))
@router.post("/epic/{key}/process")
def process_epic(
key: str,
creds: JiraCredentials = Depends(get_jira_credentials),
):
"""Process epic: fetch epic and all children, save to files."""
import time
import json
from pathlib import Path
from fastapi.responses import StreamingResponse
logger.info(f"EPIC endpoint called: key={key}, email={creds.email}")
def generate():
try:
logger.info(f"Starting EPIC process for {key}")
jira = connect_jira(creds.email, creds.token)
logger.info(f"Connected to Jira for {key}")
# Fetch epic
yield json.dumps({"status": "fetching_epic", "completed": 0, "total": 0}) + "\n"
logger.info(f"Sent fetching_epic status for {key}")
time.sleep(0.5)
logger.info(f"Fetching issue {key}")
epic_issue = jira.issue(key, expand="comments")
logger.info(f"Got issue {key}")
epic = TicketDetail.from_jira(epic_issue, settings.jira_url)
logger.info(f"Parsed epic: {epic.key} with {len(epic.subtasks)} subtasks")
# Get children keys from subtasks
if not epic.subtasks:
yield json.dumps({"status": "no_children", "completed": 0, "total": 0}) + "\n"
return
total = len(epic.subtasks)
# Create storage folder in larder
larder_path = Path(__file__).parent.parent.parent.parent / "larder" / "jira_epics" / key
larder_path.mkdir(parents=True, exist_ok=True)
# Save epic
epic_file = larder_path / f"{key}.json"
with open(epic_file, "w") as f:
json.dump(epic.model_dump(), f, indent=2, default=str)
yield json.dumps({"status": "processing", "completed": 0, "total": total}) + "\n"
# Fetch each child
children = []
for idx, child_key in enumerate(epic.subtasks, 1):
time.sleep(0.8) # Human speed
try:
child_issue = jira.issue(child_key, expand="comments")
child = TicketDetail.from_jira(child_issue, settings.jira_url)
# Save child
child_file = larder_path / f"{child_key}.json"
with open(child_file, "w") as f:
json.dump(child.model_dump(), f, indent=2, default=str)
# Collect children for text formatting
children.append(Ticket.from_jira(child_issue, settings.jira_url))
yield json.dumps({"status": "processing", "completed": idx, "total": total}) + "\n"
except Exception as e:
import traceback
yield json.dumps({
"status": "error",
"completed": idx,
"total": total,
"error": str(e),
"error_type": type(e).__name__,
"child_key": child_key,
"traceback": traceback.format_exc()
}) + "\n"
# Format as text for display
from ..models.formatter import format_ticket_with_children
formatted_text = format_ticket_with_children(epic, children)
yield json.dumps({
"status": "complete",
"completed": total,
"total": total,
"path": str(larder_path),
"text": formatted_text
}) + "\n"
except Exception as e:
import traceback
yield json.dumps({
"status": "error",
"error": str(e),
"error_type": type(e).__name__,
"traceback": traceback.format_exc()
}) + "\n"
return StreamingResponse(generate(), media_type="application/x-ndjson")
@router.get("/epic/{key}/status")
def get_epic_status(key: str):
"""Check if epic has been processed and files exist."""
from pathlib import Path
import json
larder_path = Path(__file__).parent.parent.parent.parent / "larder" / "jira_epics" / key
if not larder_path.exists():
return {"processed": False}
files = list(larder_path.glob("*.json"))
return {
"processed": True,
"path": str(larder_path),
"files": [f.name for f in files],
"count": len(files)
}
@router.get("/attachment/{attachment_id}")
def get_attachment(
attachment_id: str,
creds: JiraCredentials = Depends(get_jira_credentials),
):
"""Stream attachment content directly from Jira."""
jira = connect_jira(creds.email, creds.token)
att_url = f"{settings.jira_url}/rest/api/2/attachment/content/{attachment_id}"
response = jira._session.get(att_url, allow_redirects=True)
if response.status_code != 200:
raise HTTPException(404, f"Attachment not found: {attachment_id}")
content_type = response.headers.get("Content-Type", "application/octet-stream")
return StreamingResponse(
BytesIO(response.content),
media_type=content_type,
)

View File

View File

@@ -0,0 +1,37 @@
"""
Jira credentials authentication for Jira vein.
"""
from dataclasses import dataclass
from fastapi import Header, HTTPException
from .config import settings
@dataclass
class JiraCredentials:
email: str
token: str
async def get_jira_credentials(
x_jira_email: str | None = Header(None),
x_jira_token: str | None = Header(None),
) -> JiraCredentials:
"""
Dependency that extracts Jira credentials from headers or falls back to config.
- Headers provided → per-request credentials (web demo)
- No headers → use .env credentials (API/standalone)
"""
# Use headers if provided (check for non-empty strings)
if x_jira_email and x_jira_token and x_jira_email.strip() and x_jira_token.strip():
return JiraCredentials(email=x_jira_email.strip(), token=x_jira_token.strip())
# Fall back to config
if settings.jira_email and settings.jira_api_token:
return JiraCredentials(email=settings.jira_email, token=settings.jira_api_token)
raise HTTPException(
status_code=401,
detail="Missing credentials: provide X-Jira-Email and X-Jira-Token headers, or configure in .env",
)

View File

@@ -0,0 +1,19 @@
"""
Jira connection client.
"""
from jira import JIRA
from .config import settings
class JiraClientError(Exception):
pass
def connect_jira(email: str, token: str) -> JIRA:
"""Create a Jira connection with the given credentials."""
return JIRA(
server=settings.jira_url,
basic_auth=(email, token),
)

View File

@@ -0,0 +1,23 @@
"""
Jira credentials loaded from .env file.
"""
from pathlib import Path
from pydantic_settings import BaseSettings
ENV_FILE = Path(__file__).parent.parent / ".env"
class JiraConfig(BaseSettings):
jira_url: str
jira_email: str | None = None # Optional: can be provided per-request via headers
jira_api_token: str | None = None # Optional: can be provided per-request via headers
api_port: int = 8001
model_config = {
"env_file": ENV_FILE,
"env_file_encoding": "utf-8",
}
settings = JiraConfig()

View File

@@ -0,0 +1,86 @@
"""
JQL query builder.
"""
from typing import Optional, List
class JQL:
"""Fluent JQL builder."""
def __init__(self):
self._parts: List[str] = []
self._order: Optional[str] = None
def _q(self, val: str) -> str:
return f'"{val}"' if " " in val else val
# Conditions
def assigned_to_me(self) -> "JQL":
self._parts.append("assignee = currentUser()")
return self
def project(self, key: str) -> "JQL":
self._parts.append(f"project = {self._q(key)}")
return self
def sprint_open(self) -> "JQL":
self._parts.append("sprint in openSprints()")
return self
def in_backlog(self) -> "JQL":
self._parts.append("sprint is EMPTY")
return self
def not_done(self) -> "JQL":
self._parts.append("statusCategory != Done")
return self
def status(self, name: str) -> "JQL":
self._parts.append(f"status = {self._q(name)}")
return self
def label(self, name: str) -> "JQL":
self._parts.append(f"labels = {self._q(name)}")
return self
def text(self, search: str) -> "JQL":
self._parts.append(f'text ~ "{search}"')
return self
def issue_type(self, name: str) -> "JQL":
self._parts.append(f"issuetype = {self._q(name)}")
return self
def raw(self, jql: str) -> "JQL":
self._parts.append(jql)
return self
# Ordering
def order_by(self, field: str, desc: bool = True) -> "JQL":
self._order = f"ORDER BY {field} {'DESC' if desc else 'ASC'}"
return self
def build(self) -> str:
jql = " AND ".join(self._parts)
if self._order:
jql = f"{jql} {self._order}"
return jql.strip()
# Preset queries for main use cases
class Queries:
@staticmethod
def my_tickets(project: Optional[str] = None) -> JQL:
q = JQL().assigned_to_me().not_done().order_by("updated")
if project:
q.project(project)
return q
@staticmethod
def backlog(project: str) -> JQL:
return JQL().project(project).in_backlog().not_done().order_by("priority")
@staticmethod
def current_sprint(project: str) -> JQL:
return JQL().project(project).sprint_open().order_by("priority")

15
artery/veins/jira/main.py Normal file
View File

@@ -0,0 +1,15 @@
"""
Jira Vein - FastAPI app.
"""
from fastapi import FastAPI
from .api.routes import router
from .core.config import settings
app = FastAPI(title="Jira Vein", version="0.1.0")
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=settings.api_port)

View File

View File

@@ -0,0 +1,182 @@
"""
Text formatters for LLM/human readable output.
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .ticket import Attachment, Ticket, TicketDetail, TicketList
def _fmt_size(size: int) -> str:
"""Format bytes to human readable."""
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024:
return f"{size:.1f}{unit}" if unit != "B" else f"{size}{unit}"
size /= 1024
return f"{size:.1f}TB"
def _fmt_dt(dt) -> str:
if not dt:
return "-"
return dt.strftime("%Y-%m-%d %H:%M")
def format_ticket(t: "Ticket") -> str:
lines = [
f"[{t.key}] {t.summary}",
f" Project: {t.project} | Type: {t.issue_type} | Priority: {t.priority or '-'}",
f" Status: {t.status} ({t.status_category or '-'})",
f" Assignee: {t.assignee or '-'} | Reporter: {t.reporter or '-'}",
f" Labels: {', '.join(t.labels) if t.labels else '-'}",
f" Created: {_fmt_dt(t.created)} | Updated: {_fmt_dt(t.updated)}",
f" URL: {t.url}",
]
if t.description:
lines.append(f" Description: {t.description}")
return "\n".join(lines)
def format_ticket_detail(t: "TicketDetail") -> str:
lines = [
f"# {t.key}: {t.summary}",
"",
f"Project: {t.project}",
f"Type: {t.issue_type}",
f"Status: {t.status} ({t.status_category or '-'})",
f"Priority: {t.priority or '-'}",
f"Assignee: {t.assignee or '-'}",
f"Reporter: {t.reporter or '-'}",
f"Labels: {', '.join(t.labels) if t.labels else '-'}",
f"Created: {_fmt_dt(t.created)}",
f"Updated: {_fmt_dt(t.updated)}",
f"Parent: {t.parent_key or '-'}",
f"Subtasks: {', '.join(t.subtasks) if t.subtasks else '-'}",
f"Linked issues: {', '.join(t.linked_issues) if t.linked_issues else '-'}",
f"URL: {t.url}",
"",
"## Description",
t.description or "(no description)",
"",
]
lines.append(f"## Comments ({len(t.comments)})")
if t.comments:
for c in t.comments:
lines.append(f"### {c.get('author', 'Unknown')} ({c.get('created', '')[:16] if c.get('created') else '-'})")
lines.append(c.get("body", ""))
lines.append("")
else:
lines.append("(no comments)")
lines.append("")
lines.append(f"## Attachments ({len(t.attachments)})")
if t.attachments:
for a in t.attachments:
has_content = "[downloaded]" if a.content_base64 else ""
lines.append(f"- {a.filename} ({_fmt_size(a.size)}, {a.mimetype}) {has_content}")
else:
lines.append("(no attachments)")
return "\n".join(lines)
def format_ticket_with_children(parent: "TicketDetail", children: list) -> str:
"""Format a ticket with its children (subtasks/stories)."""
lines = [
f"# {parent.key}: {parent.summary}",
"",
f"Project: {parent.project}",
f"Type: {parent.issue_type}",
f"Status: {parent.status} ({parent.status_category or '-'})",
f"Priority: {parent.priority or '-'}",
f"Assignee: {parent.assignee or '-'}",
f"Reporter: {parent.reporter or '-'}",
f"Labels: {', '.join(parent.labels) if parent.labels else '-'}",
f"Created: {_fmt_dt(parent.created)}",
f"Updated: {_fmt_dt(parent.updated)}",
f"URL: {parent.url}",
"",
"## Description",
parent.description or "(no description)",
"",
]
# Add children section
if children:
child_type = "Sub-tasks" if parent.issue_type in ("Story", "Task") else "Stories"
lines.append(f"## {child_type} ({len(children)})")
lines.append("=" * 60)
lines.append("")
for child in children:
lines.append(f"[{child.key}] {child.summary}")
lines.append(f" Type: {child.issue_type} | Status: {child.status} | Priority: {child.priority or '-'}")
lines.append(f" Assignee: {child.assignee or '-'}")
lines.append(f" URL: {child.url}")
lines.append("")
lines.append("-" * 60)
lines.append("")
lines.append(f"## Comments ({len(parent.comments)})")
if parent.comments:
for c in parent.comments:
lines.append(f"### {c.get('author', 'Unknown')} ({c.get('created', '')[:16] if c.get('created') else '-'})")
lines.append(c.get("body", ""))
lines.append("")
else:
lines.append("(no comments)")
lines.append("")
lines.append(f"## Attachments ({len(parent.attachments)})")
if parent.attachments:
for a in parent.attachments:
has_content = "[downloaded]" if a.content_base64 else ""
lines.append(f"- {a.filename} ({_fmt_size(a.size)}, {a.mimetype}) {has_content}")
else:
lines.append("(no attachments)")
return "\n".join(lines)
def format_ticket_list(tl: "TicketList") -> str:
# Sort for text output: stories with subtasks, then bugs
stories = []
bugs = []
subtasks = []
for t in tl.tickets:
if t.parent_key:
subtasks.append(t)
elif t.issue_type in ("Story", "Epic", "Task"):
stories.append(t)
elif t.issue_type == "Bug":
bugs.append(t)
else:
stories.append(t) # fallback
# Build sorted list: parent stories, then their subtasks, then bugs
sorted_tickets = []
for story in sorted(stories, key=lambda t: t.key):
sorted_tickets.append(story)
# Add subtasks for this story
story_subtasks = [st for st in subtasks if st.parent_key == story.key]
sorted_tickets.extend(sorted(story_subtasks, key=lambda t: t.key))
# Add bugs at the end
sorted_tickets.extend(sorted(bugs, key=lambda t: t.key))
lines = [
f"Total: {tl.total} | Page: {tl.page} | Page size: {tl.page_size}",
f"Showing: {len(tl.tickets)} tickets",
"=" * 60,
"",
]
for i, t in enumerate(sorted_tickets):
lines.append(format_ticket(t))
if i < len(sorted_tickets) - 1:
lines.append("")
lines.append("-" * 60)
lines.append("")
return "\n".join(lines)

View File

@@ -0,0 +1,135 @@
"""
Ticket models with self-parsing from Jira objects.
"""
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
class Attachment(BaseModel):
id: str
filename: str
mimetype: str
size: int # bytes
url: str
content_base64: Optional[str] = None # populated when include_attachments=true
@classmethod
def from_jira(cls, att) -> "Attachment":
return cls(
id=att.id,
filename=att.filename,
mimetype=att.mimeType,
size=att.size,
url=att.content,
)
class Ticket(BaseModel):
key: str
summary: str
description: Optional[str] = None
status: str
status_category: Optional[str] = None
issue_type: str
priority: Optional[str] = None
project: str
assignee: Optional[str] = None
reporter: Optional[str] = None
labels: List[str] = []
created: Optional[datetime] = None
updated: Optional[datetime] = None
url: str
parent_key: Optional[str] = None # For subtasks
@classmethod
def from_jira(cls, issue, base_url: str) -> "Ticket":
f = issue.fields
status_cat = None
if hasattr(f.status, "statusCategory"):
status_cat = f.status.statusCategory.name
# Get parent key for subtasks
parent = None
if hasattr(f, "parent") and f.parent:
parent = f.parent.key
return cls(
key=issue.key,
summary=f.summary or "",
description=f.description,
status=f.status.name,
status_category=status_cat,
issue_type=f.issuetype.name,
priority=f.priority.name if f.priority else None,
project=f.project.key,
assignee=f.assignee.displayName if f.assignee else None,
reporter=f.reporter.displayName if f.reporter else None,
labels=f.labels or [],
created=cls._parse_dt(f.created),
updated=cls._parse_dt(f.updated),
url=f"{base_url}/browse/{issue.key}",
parent_key=parent,
)
@staticmethod
def _parse_dt(val: Optional[str]) -> Optional[datetime]:
if not val:
return None
try:
return datetime.fromisoformat(val.replace("Z", "+00:00"))
except ValueError:
return None
class TicketDetail(Ticket):
comments: List[dict] = []
linked_issues: List[str] = []
subtasks: List[str] = []
attachments: List[Attachment] = []
@classmethod
def from_jira(cls, issue, base_url: str) -> "TicketDetail":
base = Ticket.from_jira(issue, base_url)
f = issue.fields
comments = []
if hasattr(f, "comment") and f.comment:
for c in f.comment.comments:
comments.append({
"author": c.author.displayName if hasattr(c, "author") else None,
"body": c.body,
"created": c.created,
})
linked = []
if hasattr(f, "issuelinks") and f.issuelinks:
for link in f.issuelinks:
if hasattr(link, "outwardIssue"):
linked.append(link.outwardIssue.key)
if hasattr(link, "inwardIssue"):
linked.append(link.inwardIssue.key)
subtasks = []
if hasattr(f, "subtasks") and f.subtasks:
subtasks = [st.key for st in f.subtasks]
attachments = []
if hasattr(f, "attachment") and f.attachment:
attachments = [Attachment.from_jira(a) for a in f.attachment]
return cls(
**base.model_dump(),
comments=comments,
linked_issues=linked,
subtasks=subtasks,
attachments=attachments,
)
class TicketList(BaseModel):
tickets: List[Ticket]
total: int
page: int
page_size: int

View File

@@ -0,0 +1,5 @@
fastapi>=0.104.0
uvicorn>=0.24.0
jira>=3.5.0
pydantic>=2.0.0
pydantic-settings>=2.0.0

19
artery/veins/jira/run.py Normal file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python
"""Run the Jira vein API."""
import sys
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent))
import uvicorn
from core.config import settings
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=settings.api_port,
reload=True,
)

179
artery/veins/oauth.py Normal file
View File

@@ -0,0 +1,179 @@
"""
OAuth2 utilities and base classes for OAuth-based veins.
Any vein using OAuth2 (Google, GitHub, GitLab, etc.) can inherit from
BaseOAuthVein and use TokenStorage.
"""
import json
from abc import abstractmethod
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from .base import BaseVein, TClient, TCredentials
class TokenStorage:
"""
File-based token storage for OAuth2 tokens.
Can be overridden for Redis/database storage in production.
Each vein gets its own storage directory.
"""
def __init__(self, vein_name: str, storage_dir: Optional[Path] = None):
"""
Initialize token storage.
Args:
vein_name: Name of the vein (e.g., 'google', 'github')
storage_dir: Base storage directory (defaults to veins/{vein_name}/storage)
"""
if storage_dir is None:
# Default: veins/{vein_name}/storage/
storage_dir = Path(__file__).parent / vein_name / "storage"
self.storage_dir = storage_dir
self.storage_dir.mkdir(parents=True, exist_ok=True)
def _get_path(self, user_id: str) -> Path:
"""Get token file path for user."""
return self.storage_dir / f"tokens_{user_id}.json"
def save_tokens(self, user_id: str, tokens: dict) -> None:
"""
Save OAuth2 tokens for a user.
tokens should contain:
- access_token
- refresh_token (optional)
- expires_in (seconds)
- scope
- token_type
"""
# Add expiry timestamp
if "expires_in" in tokens:
expires_at = datetime.now() + timedelta(seconds=tokens["expires_in"])
tokens["expires_at"] = expires_at.isoformat()
path = self._get_path(user_id)
with open(path, "w") as f:
json.dump(tokens, f, indent=2)
def load_tokens(self, user_id: str) -> Optional[dict]:
"""Load OAuth2 tokens for a user. Returns None if not found."""
path = self._get_path(user_id)
if not path.exists():
return None
with open(path, "r") as f:
return json.load(f)
def is_expired(self, tokens: dict) -> bool:
"""
Check if access token is expired.
Returns True if expired or expiring in less than 5 minutes.
"""
if "expires_at" not in tokens:
return True
expires_at = datetime.fromisoformat(tokens["expires_at"])
# Consider expired if less than 5 minutes remaining
return datetime.now() >= expires_at - timedelta(minutes=5)
def delete_tokens(self, user_id: str) -> None:
"""Delete tokens for a user."""
path = self._get_path(user_id)
if path.exists():
path.unlink()
class BaseOAuthVein(BaseVein[TCredentials, TClient]):
"""
Base class for OAuth2-based veins.
Extends BaseVein with OAuth2 flow management:
- Authorization URL generation
- Code exchange for tokens
- Token refresh
- Token storage
"""
def __init__(self, storage: Optional[TokenStorage] = None):
"""
Initialize OAuth vein.
Args:
storage: Token storage instance (creates default if None)
"""
if storage is None:
storage = TokenStorage(vein_name=self.name)
self.storage = storage
@abstractmethod
def get_auth_url(self, state: Optional[str] = None) -> str:
"""
Generate OAuth2 authorization URL.
Args:
state: Optional state parameter for CSRF protection
Returns:
URL to redirect user for authorization
"""
pass
@abstractmethod
async def exchange_code(self, code: str) -> dict:
"""
Exchange authorization code for tokens.
Args:
code: Authorization code from callback
Returns:
Token dict containing access_token, refresh_token, etc.
"""
pass
@abstractmethod
async def refresh_token(self, refresh_token: str) -> dict:
"""
Refresh an expired access token.
Args:
refresh_token: The refresh token
Returns:
New token dict with fresh access_token
"""
pass
def get_valid_tokens(self, user_id: str) -> Optional[dict]:
"""
Get valid tokens for user, refreshing if needed.
Args:
user_id: User identifier
Returns:
Valid tokens or None if not authenticated
"""
tokens = self.storage.load_tokens(user_id)
if not tokens:
return None
if self.storage.is_expired(tokens) and "refresh_token" in tokens:
# Try to refresh
try:
import asyncio
new_tokens = asyncio.run(self.refresh_token(tokens["refresh_token"]))
self.storage.save_tokens(user_id, new_tokens)
return new_tokens
except Exception:
# Refresh failed, user needs to re-authenticate
return None
return tokens

View File

@@ -0,0 +1 @@
# Slack Vein

View File

@@ -0,0 +1 @@
# Slack API routes

View File

@@ -0,0 +1,233 @@
"""
API routes for Slack vein.
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import PlainTextResponse
from typing import Optional, Union
from pydantic import BaseModel
from ..core.auth import get_slack_credentials, SlackCredentials
from ..core.client import get_client, test_auth, SlackClientError
from ..models.message import (
Channel, ChannelList, Message, MessageList,
User, UserList,
)
from ..models.formatter import (
format_channel_list, format_message_list, format_user_list,
)
router = APIRouter()
class PostMessageRequest(BaseModel):
channel: str # Channel ID or name
text: str
thread_ts: Optional[str] = None # Reply to thread
class PostMessageResponse(BaseModel):
ok: bool
channel: str
ts: str
message: Optional[Message] = None
def _maybe_text(data, text: bool, formatter):
if not text:
return data
return PlainTextResponse(formatter(data))
@router.get("/health")
def health(creds: SlackCredentials = Depends(get_slack_credentials)):
"""Test Slack connection."""
try:
client = get_client(creds.token)
info = test_auth(client)
return {"status": "ok", **info}
except SlackClientError as e:
raise HTTPException(500, str(e))
except Exception as e:
raise HTTPException(500, f"Connection failed: {e}")
@router.get("/channels")
def list_channels(
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(100, ge=1, le=1000),
types: str = Query("public_channel", description="Channel types: public_channel, private_channel (needs groups:read), mpim, im"),
text: bool = False,
):
"""List channels the bot/user has access to."""
try:
client = get_client(creds.token)
response = client.conversations_list(limit=limit, types=types)
channels = [Channel.from_slack(ch) for ch in response.get("channels", [])]
result = ChannelList(channels=channels, total=len(channels))
return _maybe_text(result, text, format_channel_list)
except Exception as e:
raise HTTPException(500, f"Failed to list channels: {e}")
@router.get("/channels/{channel_id}/messages")
def get_messages(
channel_id: str,
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(50, ge=1, le=1000),
oldest: Optional[str] = None,
latest: Optional[str] = None,
text: bool = False,
include_users: bool = False,
):
"""Get messages from a channel."""
try:
client = get_client(creds.token)
kwargs = {"channel": channel_id, "limit": limit}
if oldest:
kwargs["oldest"] = oldest
if latest:
kwargs["latest"] = latest
response = client.conversations_history(**kwargs)
messages = [Message.from_slack(m) for m in response.get("messages", [])]
result = MessageList(
messages=messages,
channel_id=channel_id,
has_more=response.get("has_more", False),
)
# Optionally fetch user names for better text output
users_map = None
if text and include_users:
try:
users_resp = client.users_list(limit=200)
users_map = {
u["id"]: u.get("profile", {}).get("display_name") or u.get("real_name") or u.get("name")
for u in users_resp.get("members", [])
}
except Exception:
pass # Continue without user names
if text:
return PlainTextResponse(format_message_list(result, users_map))
return result
except Exception as e:
raise HTTPException(500, f"Failed to get messages: {e}")
@router.get("/channels/{channel_id}/thread/{thread_ts}")
def get_thread(
channel_id: str,
thread_ts: str,
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(100, ge=1, le=1000),
text: bool = False,
):
"""Get replies in a thread."""
try:
client = get_client(creds.token)
response = client.conversations_replies(
channel=channel_id,
ts=thread_ts,
limit=limit,
)
messages = [Message.from_slack(m) for m in response.get("messages", [])]
result = MessageList(
messages=messages,
channel_id=channel_id,
has_more=response.get("has_more", False),
)
return _maybe_text(result, text, format_message_list)
except Exception as e:
raise HTTPException(500, f"Failed to get thread: {e}")
@router.get("/users")
def list_users(
creds: SlackCredentials = Depends(get_slack_credentials),
limit: int = Query(200, ge=1, le=1000),
text: bool = False,
):
"""List workspace users."""
try:
client = get_client(creds.token)
response = client.users_list(limit=limit)
users = [User.from_slack(u) for u in response.get("members", [])]
result = UserList(users=users, total=len(users))
return _maybe_text(result, text, format_user_list)
except Exception as e:
raise HTTPException(500, f"Failed to list users: {e}")
@router.post("/post")
def post_message(
request: PostMessageRequest,
creds: SlackCredentials = Depends(get_slack_credentials),
):
"""Post a message to a channel."""
try:
client = get_client(creds.token)
kwargs = {
"channel": request.channel,
"text": request.text,
}
if request.thread_ts:
kwargs["thread_ts"] = request.thread_ts
response = client.chat_postMessage(**kwargs)
msg = None
if response.get("message"):
msg = Message.from_slack(response["message"])
return PostMessageResponse(
ok=response.get("ok", False),
channel=response.get("channel", request.channel),
ts=response.get("ts", ""),
message=msg,
)
except Exception as e:
raise HTTPException(500, f"Failed to post message: {e}")
@router.get("/search")
def search_messages(
query: str,
creds: SlackCredentials = Depends(get_slack_credentials),
count: int = Query(20, ge=1, le=100),
text: bool = False,
):
"""Search messages (requires user token with search:read scope)."""
try:
client = get_client(creds.token)
response = client.search_messages(query=query, count=count)
messages_data = response.get("messages", {}).get("matches", [])
messages = []
for m in messages_data:
messages.append(Message(
ts=m.get("ts", ""),
user=m.get("user"),
text=m.get("text", ""),
thread_ts=m.get("thread_ts"),
))
result = MessageList(
messages=messages,
channel_id="search",
has_more=len(messages) >= count,
)
return _maybe_text(result, text, format_message_list)
except Exception as e:
raise HTTPException(500, f"Search failed: {e}")

View File

@@ -0,0 +1 @@
# Slack core

View File

@@ -0,0 +1,37 @@
"""
Slack credentials authentication for Slack vein.
"""
from dataclasses import dataclass
from fastapi import Header, HTTPException
from .config import settings
@dataclass
class SlackCredentials:
token: str
async def get_slack_credentials(
x_slack_token: str | None = Header(None),
) -> SlackCredentials:
"""
Dependency that extracts Slack token from headers or falls back to config.
- Header provided → per-request token (web demo)
- No header → use .env token (API/standalone)
"""
# Use header if provided
if x_slack_token and x_slack_token.strip():
return SlackCredentials(token=x_slack_token.strip())
# Fall back to config (prefer bot token, then user token)
if settings.slack_bot_token:
return SlackCredentials(token=settings.slack_bot_token)
if settings.slack_user_token:
return SlackCredentials(token=settings.slack_user_token)
raise HTTPException(
status_code=401,
detail="Missing credentials: provide X-Slack-Token header, or configure in .env",
)

View File

@@ -0,0 +1,30 @@
"""
Slack connection client using slack_sdk.
"""
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
class SlackClientError(Exception):
pass
def get_client(token: str) -> WebClient:
"""Create a Slack WebClient with the given token."""
return WebClient(token=token)
def test_auth(client: WebClient) -> dict:
"""Test authentication and return user/bot info."""
try:
response = client.auth_test()
return {
"ok": response["ok"],
"user": response.get("user"),
"user_id": response.get("user_id"),
"team": response.get("team"),
"team_id": response.get("team_id"),
}
except SlackApiError as e:
raise SlackClientError(f"Auth failed: {e.response['error']}")

View File

@@ -0,0 +1,22 @@
"""
Slack credentials loaded from .env file.
"""
from pathlib import Path
from pydantic_settings import BaseSettings
ENV_FILE = Path(__file__).parent.parent / ".env"
class SlackConfig(BaseSettings):
slack_bot_token: str | None = None # xoxb-... Bot token
slack_user_token: str | None = None # xoxp-... User token (optional, for user-level actions)
api_port: int = 8002
model_config = {
"env_file": ENV_FILE,
"env_file_encoding": "utf-8",
}
settings = SlackConfig()

View File

@@ -0,0 +1,15 @@
"""
Slack Vein - FastAPI app.
"""
from fastapi import FastAPI
from .api.routes import router
from .core.config import settings
app = FastAPI(title="Slack Vein", version="0.1.0")
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=settings.api_port)

View File

@@ -0,0 +1 @@
# Slack models

View File

@@ -0,0 +1,70 @@
"""
Text formatters for Slack data (LLM-friendly output).
"""
from .message import MessageList, ChannelList, UserList, Message, Channel
def format_message(msg: Message, users: dict[str, str] | None = None) -> str:
"""Format a single message."""
user_display = msg.user
if users and msg.user and msg.user in users:
user_display = users[msg.user]
time_str = msg.timestamp.strftime("%Y-%m-%d %H:%M") if msg.timestamp else msg.ts
thread_info = f" [thread: {msg.reply_count} replies]" if msg.reply_count > 0 else ""
return f"[{time_str}] {user_display}: {msg.text}{thread_info}"
def format_message_list(data: MessageList, users: dict[str, str] | None = None) -> str:
"""Format message list for text output."""
lines = [f"Channel: {data.channel_id}", f"Messages: {len(data.messages)}", ""]
for msg in data.messages:
lines.append(format_message(msg, users))
if data.has_more:
lines.append("\n[More messages available...]")
return "\n".join(lines)
def format_channel(ch: Channel) -> str:
"""Format a single channel."""
flags = []
if ch.is_private:
flags.append("private")
if ch.is_archived:
flags.append("archived")
if ch.is_member:
flags.append("member")
flag_str = f" ({', '.join(flags)})" if flags else ""
members_str = f" [{ch.num_members} members]" if ch.num_members else ""
return f"#{ch.name} ({ch.id}){flag_str}{members_str}"
def format_channel_list(data: ChannelList) -> str:
"""Format channel list for text output."""
lines = [f"Channels: {data.total}", ""]
for ch in data.channels:
lines.append(format_channel(ch))
if ch.purpose:
lines.append(f" Purpose: {ch.purpose}")
return "\n".join(lines)
def format_user_list(data: UserList) -> str:
"""Format user list for text output."""
lines = [f"Users: {data.total}", ""]
for u in data.users:
bot_flag = " [bot]" if u.is_bot else ""
display = u.display_name or u.real_name or u.name
lines.append(f"@{u.name} ({u.id}) - {display}{bot_flag}")
return "\n".join(lines)

View File

@@ -0,0 +1,98 @@
"""
Slack models with self-parsing from Slack API responses.
"""
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
class User(BaseModel):
id: str
name: str
real_name: Optional[str] = None
display_name: Optional[str] = None
is_bot: bool = False
@classmethod
def from_slack(cls, user: dict) -> "User":
profile = user.get("profile", {})
return cls(
id=user["id"],
name=user.get("name", ""),
real_name=profile.get("real_name") or user.get("real_name"),
display_name=profile.get("display_name"),
is_bot=user.get("is_bot", False),
)
class Channel(BaseModel):
id: str
name: str
is_private: bool = False
is_archived: bool = False
is_member: bool = False
topic: Optional[str] = None
purpose: Optional[str] = None
num_members: Optional[int] = None
@classmethod
def from_slack(cls, channel: dict) -> "Channel":
return cls(
id=channel["id"],
name=channel.get("name", ""),
is_private=channel.get("is_private", False),
is_archived=channel.get("is_archived", False),
is_member=channel.get("is_member", False),
topic=channel.get("topic", {}).get("value"),
purpose=channel.get("purpose", {}).get("value"),
num_members=channel.get("num_members"),
)
class Message(BaseModel):
ts: str # Slack timestamp (unique message ID)
user: Optional[str] = None
text: str
thread_ts: Optional[str] = None
reply_count: int = 0
reactions: List[dict] = []
timestamp: Optional[datetime] = None
@classmethod
def from_slack(cls, msg: dict) -> "Message":
ts = msg.get("ts", "")
return cls(
ts=ts,
user=msg.get("user"),
text=msg.get("text", ""),
thread_ts=msg.get("thread_ts"),
reply_count=msg.get("reply_count", 0),
reactions=msg.get("reactions", []),
timestamp=cls._ts_to_datetime(ts),
)
@staticmethod
def _ts_to_datetime(ts: str) -> Optional[datetime]:
if not ts:
return None
try:
return datetime.fromtimestamp(float(ts))
except (ValueError, TypeError):
return None
class MessageList(BaseModel):
messages: List[Message]
channel_id: str
has_more: bool = False
class ChannelList(BaseModel):
channels: List[Channel]
total: int
class UserList(BaseModel):
users: List[User]
total: int

View File

@@ -0,0 +1,5 @@
fastapi>=0.104.0
uvicorn>=0.24.0
slack_sdk>=3.23.0
pydantic>=2.0.0
pydantic-settings>=2.0.0

19
artery/veins/slack/run.py Normal file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python
"""Run the Slack vein API."""
import sys
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent))
import uvicorn
from core.config import settings
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=settings.api_port,
reload=True,
)