This commit is contained in:
2026-02-04 05:05:56 -03:00
parent de2ea3b7cb
commit c97ef63756
2 changed files with 76 additions and 14 deletions

View File

@@ -11,7 +11,7 @@ import logging
import os
from pathlib import Path
from pymongo import MongoClient
from pymongo import MongoClient, ReplaceOne
from pymongo.errors import PyMongoError
logging.basicConfig(
@@ -43,6 +43,35 @@ def save_resume_token(token):
log.error(f"Failed to save resume token: {e}")
def bulk_sync(local_db, remote_db):
"""Bulk sync all missing documents from local to remote."""
total_synced = 0
for coll_name in COLLECTIONS:
local_coll = local_db[coll_name]
remote_coll = remote_db[coll_name]
# Get all local docs and remote IDs
local_docs = {doc["_id"]: doc for doc in local_coll.find()}
remote_ids = set(doc["_id"] for doc in remote_coll.find({}, {"_id": 1}))
# Find missing docs
missing_ids = set(local_docs.keys()) - remote_ids
if missing_ids:
# Bulk insert missing docs
ops = [
ReplaceOne({"_id": _id}, local_docs[_id], upsert=True)
for _id in missing_ids
]
result = remote_coll.bulk_write(ops)
count = result.upserted_count + result.modified_count
log.info(f"{coll_name}: bulk synced {count} documents")
total_synced += count
return total_synced
def sync():
"""Main sync loop using Change Streams."""
log.info(f"Connecting to local MongoDB...")
@@ -54,12 +83,19 @@ def sync():
local_db = local.deskmeter
remote_db = remote.deskmeter
resume_token = load_resume_token()
if resume_token:
log.info("Resuming from saved token")
# Bulk sync first to catch up
log.info("Performing bulk sync to catch up...")
synced = bulk_sync(local_db, remote_db)
log.info(f"Bulk sync complete: {synced} documents")
watch_kwargs = {"resume_after": resume_token} if resume_token else {}
watch_kwargs["full_document"] = "updateLookup" # Get full doc on updates
# Clear resume token to start fresh with Change Streams
# (we're now caught up, don't need to replay old changes)
if RESUME_TOKEN_FILE.exists():
RESUME_TOKEN_FILE.unlink()
log.info("Cleared old resume token")
# Now watch for new changes only (no resume token)
watch_kwargs = {"full_document": "updateLookup"}
# Watch for inserts, updates, and replaces on the database
pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "replace"]}}}]
@@ -83,8 +119,8 @@ def sync():
{"_id": doc["_id"]}, doc, upsert=True
)
action = "inserted" if result.upserted_id else "updated"
log.info(f"{collection}: {action} {doc['_id']}")
if result.upserted_id:
log.info(f"{collection}: inserted {doc['_id']}")
save_resume_token(stream.resume_token)