99 lines
2.9 KiB
Python
99 lines
2.9 KiB
Python
"""
|
|
dmsync - MongoDB Change Streams sync daemon
|
|
Watches local deskmeter database and pushes changes to remote MongoDB
|
|
|
|
Requires local MongoDB to be configured as a replica set.
|
|
Uses resume tokens to continue from last position after restart.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from pymongo import MongoClient
|
|
from pymongo.errors import PyMongoError
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
|
|
)
|
|
log = logging.getLogger("dmsync")
|
|
|
|
RESUME_TOKEN_FILE = Path.home() / ".dmsync-resume-token"
|
|
REMOTE_HOST = os.environ.get("DMSYNC_REMOTE_HOST", "mcrn.ar")
|
|
REMOTE_PORT = int(os.environ.get("DMSYNC_REMOTE_PORT", 27017))
|
|
COLLECTIONS = ("switch", "task", "task_history", "state")
|
|
|
|
|
|
def load_resume_token():
|
|
"""Load resume token from file if exists."""
|
|
if RESUME_TOKEN_FILE.exists():
|
|
try:
|
|
return json.loads(RESUME_TOKEN_FILE.read_text())
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
log.warning(f"Failed to load resume token: {e}")
|
|
return None
|
|
|
|
|
|
def save_resume_token(token):
|
|
"""Persist resume token to file."""
|
|
try:
|
|
RESUME_TOKEN_FILE.write_text(json.dumps(token))
|
|
except IOError as e:
|
|
log.error(f"Failed to save resume token: {e}")
|
|
|
|
|
|
def sync():
|
|
"""Main sync loop using Change Streams."""
|
|
log.info(f"Connecting to local MongoDB...")
|
|
local = MongoClient()
|
|
|
|
log.info(f"Connecting to remote MongoDB at {REMOTE_HOST}:{REMOTE_PORT}...")
|
|
remote = MongoClient(REMOTE_HOST, REMOTE_PORT)
|
|
|
|
local_db = local.deskmeter
|
|
remote_db = remote.deskmeter
|
|
|
|
resume_token = load_resume_token()
|
|
if resume_token:
|
|
log.info("Resuming from saved token")
|
|
|
|
watch_kwargs = {"resume_after": resume_token} if resume_token else {}
|
|
watch_kwargs["full_document"] = "updateLookup" # Get full doc on updates
|
|
|
|
# Watch for inserts, updates, and replaces on the database
|
|
pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "replace"]}}}]
|
|
|
|
log.info(f"Watching collections: {', '.join(COLLECTIONS)}")
|
|
|
|
try:
|
|
with local_db.watch(pipeline, **watch_kwargs) as stream:
|
|
for change in stream:
|
|
collection = change["ns"]["coll"]
|
|
|
|
if collection not in COLLECTIONS:
|
|
continue
|
|
|
|
doc = change.get("fullDocument")
|
|
if not doc:
|
|
continue
|
|
|
|
# Upsert to remote
|
|
result = remote_db[collection].replace_one(
|
|
{"_id": doc["_id"]}, doc, upsert=True
|
|
)
|
|
|
|
action = "inserted" if result.upserted_id else "updated"
|
|
log.info(f"{collection}: {action} {doc['_id']}")
|
|
|
|
save_resume_token(stream.resume_token)
|
|
|
|
except PyMongoError as e:
|
|
log.error(f"MongoDB error: {e}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
log.info("Starting dmsync daemon")
|
|
sync()
|