# Same Patterns, Different Domains The architecture behind sysmonstm isn't specific to system monitoring. The patterns - streaming data collection, event-driven processing, tiered storage, real-time dashboards - apply to many domains. This article explores two: payment processing systems and desktop productivity tracking. ## Payment Processing Systems The sysmonstm architecture was intentionally designed to map to payment processing. Here's how each component translates. ### Domain Mapping | sysmonstm | Payment System | |-----------|----------------| | Machine | Payment Processor (Stripe, PayPal, bank API) | | Metrics Stream | Transaction Stream | | Aggregator | Payment Hub | | Alert Thresholds | Fraud Detection Rules | | Alert Service | Risk Management | | Redis (current state) | Transaction Cache | | TimescaleDB (history) | Transaction Ledger | | Event Stream | Audit Trail | ### How It Would Work **Collectors become processor adapters.** Instead of collecting CPU and memory via psutil, each adapter connects to a payment processor's API or webhook endpoint: ```python # Conceptual - not actual code class StripeAdapter: async def stream_transactions(self): async for event in stripe.webhook_events(): yield Transaction( processor="stripe", amount=event.amount, currency=event.currency, status=event.status, customer_id=event.customer, timestamp=event.created, ) ``` The gRPC streaming pattern remains identical. Each adapter streams transactions to a central aggregator. **The aggregator normalizes data.** Stripe sends amounts in cents. PayPal sends them in dollars. Bank APIs use different currency codes. The aggregator normalizes everything to a consistent format before storage: ```python # In the aggregator's StreamTransactions handler async for tx in request_iterator: normalized = normalize_transaction(tx) await self.store(normalized) await self.publisher.publish("transactions.raw", normalized) ``` This is the same pattern as `services/aggregator/main.py:47-95` - receive stream, batch, flush to storage, publish events. **Alerts become fraud detection.** Instead of "CPU > 80%", rules look like: - Transaction amount > $10,000 (large transaction) - More than 5 transactions from same card in 1 minute (velocity check) - Transaction from country different than cardholder's (geographic anomaly) The `AlertEvaluator` pattern from `services/alerts/main.py:44-77` handles this: ```python class FraudEvaluator: RULES = [ FraudRule("large_transaction", "amount", "gt", 10000, "review"), FraudRule("velocity", "transactions_per_minute", "gt", 5, "block"), ] def evaluate(self, transaction: dict) -> list[FraudAlert]: # Same operator-based evaluation as AlertEvaluator pass ``` **The event stream becomes an audit trail.** Financial systems require complete audit logs. Every transaction, every state change, every decision must be recorded. The event abstraction from `shared/events/base.py` already provides this: ```python await self.publisher.publish( topic="transactions.processed", payload={ "transaction_id": tx.id, "processor": tx.processor, "amount": tx.amount, "decision": "approved", "timestamp": datetime.utcnow().isoformat(), }, ) ``` Subscribe to these events for compliance reporting, analytics, or real-time monitoring. **Tiered storage handles transaction volumes.** Hot transactions (last hour) in Redis for quick lookups. Recent transactions (last month) in PostgreSQL for operational queries. Historical transactions archived to S3 for compliance retention. Same pattern as sysmonstm's Redis + TimescaleDB setup. ### What Changes - **Authentication**: Payment APIs require OAuth, API keys, mTLS. The collector adapters need credential management. - **Idempotency**: Transactions must be processed exactly once. The aggregator needs deduplication. - **Compliance**: PCI-DSS requires encryption, access controls, audit logging. More infrastructure, same patterns. ### What Stays the Same - gRPC streaming from multiple sources to central aggregator - Event-driven processing for decoupled services - Threshold-based alerting - Real-time dashboard via WebSocket - Tiered storage for different access patterns ## Deskmeter: A Workspace Timer Application Deskmeter is a productivity tracking application that monitors desktop workspace switches and task changes. It runs on Linux, tracks time spent on different tasks, and displays the data through a web dashboard. Current architecture: - **dmcore daemon**: Polls workspace state every 2 seconds using `wmctrl` - **MongoDB**: Stores workspace switches with timestamps and durations - **Flask web server**: Serves calendar views and task summaries - **GNOME extension**: Shows current task in the top panel This works, but sysmonstm patterns could enhance it significantly. ### Current Deskmeter Implementation The core daemon (`dmapp/dmcore/main.py`) polls in a loop: ```python while True: current_workspace = active_workspace() # Calls wmctrl current_task = state.retrieve("current").get("task") # Track the switch last_switch_time = track_workspace_switch( current_workspace, current_task, last_switch_time ) time.sleep(2) ``` The web server (`dmapp/dmweb/dm.py`) uses Flask with template rendering: ```python @dmbp.route("/calendar/") def calendar_view(scope="daily", year=None, month=None, day=None): blocks = get_task_blocks_calendar(start, end, task, ...) return render_template("calendar_view.html", blocks=blocks, ...) ``` The dashboard refreshes via page reload or AJAX polling. ### How sysmonstm Patterns Would Improve It **Replace polling with streaming.** Instead of the daemon polling every 2 seconds and the web dashboard polling for updates, use the same event-driven architecture as sysmonstm. The daemon becomes an event publisher: ```python # Conceptual improvement class WorkspaceMonitor: async def run(self): publisher = get_publisher(source="workspace-monitor") await publisher.connect() while self.running: workspace = await self.detect_workspace() task = await self.get_current_task() if workspace != self.last_workspace or task != self.last_task: await publisher.publish( topic="workspace.switch", payload={ "workspace": workspace, "task": task, "timestamp": datetime.now().isoformat(), }, ) self.last_workspace = workspace self.last_task = task await asyncio.sleep(2) ``` The web server subscribes to events and pushes to browsers via WebSocket - exactly like `services/gateway/main.py:88-130`: ```python async def event_listener(): async with get_subscriber(topics=["workspace.*"]) as subscriber: async for event in subscriber.consume(): await manager.broadcast({ "type": "workspace_switch", "data": event.payload, }) ``` The GNOME extension could subscribe directly instead of polling an HTTP endpoint. **Add multi-machine support.** With sysmonstm's architecture, tracking multiple machines is trivial. Run the workspace monitor daemon on each machine. Each streams events to an aggregator. The dashboard shows all machines. ```python # Each machine's monitor includes machine_id await publisher.publish( topic="workspace.switch", payload={ "machine_id": self.machine_id, # "workstation", "laptop", etc. "workspace": workspace, "task": task, "timestamp": datetime.now().isoformat(), }, ) ``` The dashboard groups by machine or shows a combined view. Same pattern as sysmonstm's multi-machine monitoring. **Add focus alerts.** The alert service pattern from `services/alerts/main.py` applies directly: ```python # Focus time rules FocusRule("context_switching", "switches_per_hour", "gt", 10, "warning") FocusRule("long_idle", "idle_minutes", "gt", 30, "info") FocusRule("deep_work", "focus_minutes", "gt", 90, "success") ``` When you switch tasks more than 10 times in an hour, get a notification. When you've been focused for 90 minutes, celebrate. The evaluator pattern handles both alerts and achievements. **Improve time-series storage.** Deskmeter uses MongoDB for everything. With sysmonstm's tiered approach: - **Redis**: Current task, current workspace, last 5 minutes of switches - **TimescaleDB**: Historical switches with automatic downsampling Query "what was I doing at 3pm yesterday" hits warm storage. Query "how much time did I spend on project X this month" uses aggregated data. Same queries, faster execution. ### Implementation Path 1. **Add event publishing to dmcore.** Keep the polling loop but publish events instead of writing directly to MongoDB. 2. **Add WebSocket to dmweb.** Subscribe to events, push to connected browsers. The calendar view updates in real-time. 3. **Add Redis for current state.** Dashboard reads current task from Redis instead of querying MongoDB. 4. **Add focus alerts.** New service that subscribes to workspace events, evaluates rules, publishes alerts. 5. **Add multi-machine support.** Run dmcore on multiple machines. Aggregate events centrally. Each step is independent. The system works after each one. Same phased approach as sysmonstm. ### Code Mapping | sysmonstm Component | Deskmeter Equivalent | |---------------------|---------------------| | `services/collector/` | `dmapp/dmcore/main.py` - workspace monitoring | | `services/aggregator/` | Event aggregation (new) | | `services/gateway/` | `dmapp/dmweb/dm.py` + WebSocket (enhanced) | | `services/alerts/` | Focus alerts service (new) | | `proto/metrics.proto` | Workspace event schema | | `shared/events/` | Same - reusable | The event abstraction from sysmonstm (`shared/events/`) works directly. The configuration pattern from `shared/config.py` works directly. The structured logging from `shared/logging.py` works directly. ## The Common Thread Both payment processing and productivity tracking share the same fundamental pattern: 1. **Multiple data sources** streaming to a central point 2. **Normalization** of different formats into consistent schema 3. **Real-time processing** for dashboards and alerts 4. **Historical storage** for analysis and compliance 5. **Event-driven decoupling** for extensibility sysmonstm demonstrates these patterns with system metrics. The patterns transfer to any domain with similar characteristics: - IoT sensor networks (temperature, humidity, motion) - Log aggregation (application logs from multiple services) - Social media analytics (tweets, posts, mentions) - Trading systems (market data from multiple exchanges) - Fleet management (GPS, fuel, diagnostics from vehicles) The specific metrics change. The thresholds change. The domain vocabulary changes. The architecture stays the same. Build it once for metrics. Apply it anywhere.