import datetime from pathlib import Path from typing import Optional from bson import ObjectId from config import logger, tasks task_file = "/home/mariano/LETRAS/adm/task/main" def parse_line(line: str) -> tuple[Optional[str], Optional[str]]: """Parse a task line to extract task name and ID.""" line = line.strip() if not line: return None, None parts = line.split("|") if len(parts) > 1: task_name = parts[0].strip() id_parts = parts[1].split() if id_parts: task_id = id_parts[0].strip() return task_name, task_id return task_name, None return parts[0].strip(), None def file_to_db(filepath: str): """Convert task file to MongoDB entries.""" if filepath is None: filepath = task_file current_path = [] # Only delete non-historic tasks tasks.delete_many({"historic": {"$ne": True}}) seen_paths = set() with open(filepath, "r") as f: for line in f: if not line.strip(): tasks.insert_one({"path": "", "_id": ObjectId()}) continue indent = len(line) - len(line.lstrip()) level = indent // 4 task_name, task_id = parse_line(line) if task_name is None: continue current_path = current_path[:level] current_path.append(task_name) full_path = "/".join(current_path) if task_id: tasks.update_one( {"task_id": task_id}, { "$set": { "path": full_path, "task_id": task_id, "historic": False, } }, upsert=True, ) elif full_path not in seen_paths: tasks.insert_one({"_id": ObjectId(), "path": full_path}) seen_paths.add(full_path) def get_all_tasks(prefix): return list(tasks.find({"path": {"$ne": ""}}).sort("path", 1)) def get_file_mtime(filepath: str) -> str: """Get file modification time as ISO format string.""" if filepath is None: filepath = task_file return datetime.datetime.fromtimestamp(Path(filepath).stat().st_mtime).isoformat() def get_tasks_tree(root_path: str, only_with_ids: bool = True) -> list[dict]: """ Get all tasks under a given path node, including the node itself. """ query = {"path": {"$regex": root_path}} # Add task_id filter if requested if only_with_ids: query["task_id"] = {"$exists": True} # Query and sort by path to maintain hierarchy cursor = tasks.find( query, {"path": 1, "task_id": 1, "_id": 0}, ).sort("path", 1) return list(cursor)