diff --git a/dmapp/dmdb/sync.py b/dmapp/dmdb/sync.py index f532125..22a1c5c 100644 --- a/dmapp/dmdb/sync.py +++ b/dmapp/dmdb/sync.py @@ -11,7 +11,7 @@ import logging import os from pathlib import Path -from pymongo import MongoClient +from pymongo import MongoClient, ReplaceOne from pymongo.errors import PyMongoError logging.basicConfig( @@ -43,6 +43,35 @@ def save_resume_token(token): log.error(f"Failed to save resume token: {e}") +def bulk_sync(local_db, remote_db): + """Bulk sync all missing documents from local to remote.""" + total_synced = 0 + + for coll_name in COLLECTIONS: + local_coll = local_db[coll_name] + remote_coll = remote_db[coll_name] + + # Get all local docs and remote IDs + local_docs = {doc["_id"]: doc for doc in local_coll.find()} + remote_ids = set(doc["_id"] for doc in remote_coll.find({}, {"_id": 1})) + + # Find missing docs + missing_ids = set(local_docs.keys()) - remote_ids + + if missing_ids: + # Bulk insert missing docs + ops = [ + ReplaceOne({"_id": _id}, local_docs[_id], upsert=True) + for _id in missing_ids + ] + result = remote_coll.bulk_write(ops) + count = result.upserted_count + result.modified_count + log.info(f"{coll_name}: bulk synced {count} documents") + total_synced += count + + return total_synced + + def sync(): """Main sync loop using Change Streams.""" log.info(f"Connecting to local MongoDB...") @@ -54,12 +83,19 @@ def sync(): local_db = local.deskmeter remote_db = remote.deskmeter - resume_token = load_resume_token() - if resume_token: - log.info("Resuming from saved token") + # Bulk sync first to catch up + log.info("Performing bulk sync to catch up...") + synced = bulk_sync(local_db, remote_db) + log.info(f"Bulk sync complete: {synced} documents") - watch_kwargs = {"resume_after": resume_token} if resume_token else {} - watch_kwargs["full_document"] = "updateLookup" # Get full doc on updates + # Clear resume token to start fresh with Change Streams + # (we're now caught up, don't need to replay old changes) + if RESUME_TOKEN_FILE.exists(): + RESUME_TOKEN_FILE.unlink() + log.info("Cleared old resume token") + + # Now watch for new changes only (no resume token) + watch_kwargs = {"full_document": "updateLookup"} # Watch for inserts, updates, and replaces on the database pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "replace"]}}}] @@ -83,8 +119,8 @@ def sync(): {"_id": doc["_id"]}, doc, upsert=True ) - action = "inserted" if result.upserted_id else "updated" - log.info(f"{collection}: {action} {doc['_id']}") + if result.upserted_id: + log.info(f"{collection}: inserted {doc['_id']}") save_resume_token(stream.resume_token) diff --git a/dmapp/dmweb/get_period_times.py b/dmapp/dmweb/get_period_times.py index 4a8653f..6a6cd6b 100644 --- a/dmapp/dmweb/get_period_times.py +++ b/dmapp/dmweb/get_period_times.py @@ -6,9 +6,31 @@ from zoneinfo import ZoneInfo from pymongo import MongoClient -timezone = ZoneInfo("America/Argentina/Buenos_Aires") +default_timezone = ZoneInfo("America/Argentina/Buenos_Aires") +timezone = default_timezone # Keep for backwards compatibility utctz = ZoneInfo("UTC") +SUPPORTED_TIMEZONES = [ + ("America/Argentina/Buenos_Aires", "Buenos Aires"), + ("America/New_York", "New York"), + ("America/Los_Angeles", "Los Angeles"), + ("Europe/London", "London"), + ("Europe/Paris", "Paris"), + ("UTC", "UTC"), +] + + +def get_timezone(tz_name=None): + """Get ZoneInfo for timezone name, with validation.""" + if not tz_name: + return default_timezone + # Validate against supported list + valid_names = [tz[0] for tz in SUPPORTED_TIMEZONES] + if tz_name in valid_names: + return ZoneInfo(tz_name) + return default_timezone + + client = MongoClient(os.environ.get("MONGODB_HOST", "localhost")) db = client.deskmeter switches = db.switch @@ -209,7 +231,7 @@ def get_work_period_totals(start, end): def get_task_blocks_calendar( - start, end, task=None, min_block_seconds=300, grid_hours=1 + start, end, task=None, min_block_seconds=300, grid_hours=1, tz=None ): """ Get task blocks for calendar-style visualization, aggregated by time grid. @@ -231,6 +253,8 @@ def get_task_blocks_calendar( 'active_ratio': float (always 1.0) }, ...] """ + local_tz = tz if tz else default_timezone + task_query = {"$in": task.split(",")} if task else {} match_query = { @@ -252,7 +276,7 @@ def get_task_blocks_calendar( for switch in raw_switches: task_id = switch.get("task") - switch_start = switch["date"].replace(tzinfo=utctz).astimezone(timezone) + switch_start = switch["date"].replace(tzinfo=utctz).astimezone(local_tz) switch_duration = switch["delta"] switch_end = switch_start + timedelta(seconds=switch_duration) @@ -290,7 +314,7 @@ def get_task_blocks_calendar( for (date, grid_hour, task_id), data in grid_task_time.items(): if data["duration"] >= min_block_seconds: grid_start = datetime( - date.year, date.month, date.day, grid_hour, 0, 0, tzinfo=timezone + date.year, date.month, date.day, grid_hour, 0, 0, tzinfo=local_tz ) blocks.append( @@ -310,7 +334,7 @@ def get_task_blocks_calendar( return sorted(blocks, key=lambda x: (x["start"], x["task_path"])) -def get_raw_switches(start, end, task=None): +def get_raw_switches(start, end, task=None, tz=None): """ Get all raw switch documents in the period. @@ -323,6 +347,8 @@ def get_raw_switches(start, end, task=None): 'delta': int (seconds) }, ...] """ + local_tz = tz if tz else default_timezone + task_query = {"$in": task.split(",")} if task else {} match_query = {"date": {"$gte": start, "$lte": end}} @@ -342,7 +368,7 @@ def get_raw_switches(start, end, task=None): "workspace": switch["workspace"], "task_id": task_id, "task_path": task_path, - "date": switch["date"].replace(tzinfo=utctz).astimezone(timezone), + "date": switch["date"].replace(tzinfo=utctz).astimezone(local_tz), "delta": switch["delta"], } )