import datetime import re from pathlib import Path from typing import Optional import state from bson import ObjectId from config import logger, tasks task_file = "/home/mariano/LETRAS/adm/task/main" def parse_line(line: str) -> tuple[Optional[str], Optional[str]]: """Parse a task line to extract task name and ID.""" line = line.strip() if not line: return None, None parts = line.split("|") if len(parts) > 1: task_name = parts[0].strip() task_id = parts[1].split()[0].strip() return task_name, task_id return parts[0].strip(), None def file_to_db(filepath: str): """Convert task file to MongoDB entries.""" if filepath is None: filepath = task_file current_path = [] tasks.delete_many({}) seen_paths = set() with open(filepath, "r") as f: for line in f: if not line.strip(): tasks.insert_one({"path": "", "_id": ObjectId()}) continue indent = len(line) - len(line.lstrip()) level = indent // 4 task_name, task_id = parse_line(line) if task_name is None: continue current_path = current_path[:level] current_path.append(task_name) full_path = "/".join(current_path) if task_id: tasks.update_one( {"_id": task_id}, {"$set": {"path": full_path, "task_id": task_id}}, upsert=True, ) elif full_path not in seen_paths: tasks.insert_one({"_id": ObjectId(), "path": full_path}) seen_paths.add(full_path) def format_task_line( path_parts: list, indent_level: int, task_id: str, current_task: str ) -> str: line = " " * (4 * indent_level) + path_parts[-1] if task_id: padding = max(1, 64 - len(line)) line = f"{line}{' ' * padding}|{task_id}" if task_id == current_task: line += " *" return line def db_to_file_as_is(filepath: str): """Write tasks from MongoDB to file exactly as they were read.""" if filepath is None: filepath = task_file current_task = state.retrieve("current").get("task") all_tasks = list(tasks.find()) with open(filepath, "w") as f: for task in all_tasks: if not task["path"]: f.write("\n") continue path_parts = task["path"].split("/") line = format_task_line( path_parts, len(path_parts) - 1, task.get("task_id", ""), current_task ) f.write(f"{line}\n") def get_all_tasks(prefix): return list(tasks.find({"path": {"$ne": ""}}).sort("path", 1)) def db_to_file_consolidated(filepath: str): """Write tasks from MongoDB to file as a consolidated tree.""" current_task = state.retrieve("current").get("task") all_tasks = list(tasks.find({"path": {"$ne": ""}}).sort("path", 1)) with open(filepath, "w") as f: prev_parts = [] for task in all_tasks: path_parts = task["path"].split("/") common = 0 for i, (prev, curr) in enumerate(zip(prev_parts, path_parts)): if prev != curr: break common = i + 1 for i, part in enumerate(path_parts[common:], common): path_segment = path_parts[: i + 1] task_id = task.get("task_id", "") if path_segment == path_parts else "" line = format_task_line([part], i, task_id, current_task) f.write(f"{line}\n") prev_parts = path_parts def extract(line: str) -> Optional[str]: """Extract task ID if line ends with * and has a valid ID.""" line = line.rstrip() if line.endswith("*"): pipe_index = line.find("|") if pipe_index != -1: # Extract everything between | and * and strip spaces id_part = line[pipe_index + 1 : -1].strip() if len(id_part) == 8: return id_part return None def read_and_extract(filepath: str) -> Optional[str]: """Read file and update state if current task is found.""" if filepath is None: filepath = task_file mtime = get_file_mtime(filepath) state.save("current", filetime=mtime) with open(filepath, "r") as file: for line in file: task_id = extract(line) if task_id: return task_id return None def get_file_mtime(filepath: str) -> str: """Get file modification time as ISO format string.""" if filepath is None: filepath = task_file return datetime.datetime.fromtimestamp(Path(filepath).stat().st_mtime).isoformat() def get_tasks_tree(root_path: str, only_with_ids: bool = True) -> list[dict]: """ Get all tasks under a given path node, including the node itself. """ query = {"path": {"$regex": root_path}} # Add task_id filter if requested if only_with_ids: query["task_id"] = {"$exists": True} # Query and sort by path to maintain hierarchy cursor = tasks.find( query, {"path": 1, "task_id": 1, "_id": 0}, ).sort("path", 1) return list(cursor)