Files
deskmeter/dmapp/dmcore/task.py
2025-08-03 06:21:14 -03:00

182 lines
5.2 KiB
Python

import datetime
import re
from pathlib import Path
from typing import Optional
import state
from bson import ObjectId
from config import logger, tasks
task_file = "/home/mariano/LETRAS/adm/task/main"
def parse_line(line: str) -> tuple[Optional[str], Optional[str]]:
"""Parse a task line to extract task name and ID."""
line = line.strip()
if not line:
return None, None
parts = line.split("|")
if len(parts) > 1:
task_name = parts[0].strip()
task_id = parts[1].split()[0].strip()
return task_name, task_id
return parts[0].strip(), None
def file_to_db(filepath: str):
"""Convert task file to MongoDB entries."""
if filepath is None:
filepath = task_file
current_path = []
tasks.delete_many({})
seen_paths = set()
with open(filepath, "r") as f:
for line in f:
if not line.strip():
tasks.insert_one({"path": "", "_id": ObjectId()})
continue
indent = len(line) - len(line.lstrip())
level = indent // 4
task_name, task_id = parse_line(line)
if task_name is None:
continue
current_path = current_path[:level]
current_path.append(task_name)
full_path = "/".join(current_path)
if task_id:
tasks.update_one(
{"_id": task_id},
{"$set": {"path": full_path, "task_id": task_id}},
upsert=True,
)
elif full_path not in seen_paths:
tasks.insert_one({"_id": ObjectId(), "path": full_path})
seen_paths.add(full_path)
def format_task_line(
path_parts: list, indent_level: int, task_id: str, current_task: str
) -> str:
line = " " * (4 * indent_level) + path_parts[-1]
if task_id:
padding = max(1, 64 - len(line))
line = f"{line}{' ' * padding}|{task_id}"
if task_id == current_task:
line += " *"
return line
def db_to_file_as_is(filepath: str):
"""Write tasks from MongoDB to file exactly as they were read."""
if filepath is None:
filepath = task_file
current_task = state.retrieve("current").get("task")
all_tasks = list(tasks.find())
with open(filepath, "w") as f:
for task in all_tasks:
if not task["path"]:
f.write("\n")
continue
path_parts = task["path"].split("/")
line = format_task_line(
path_parts, len(path_parts) - 1, task.get("task_id", ""), current_task
)
f.write(f"{line}\n")
def get_all_tasks(prefix):
return list(tasks.find({"path": {"$ne": ""}}).sort("path", 1))
def db_to_file_consolidated(filepath: str):
"""Write tasks from MongoDB to file as a consolidated tree."""
current_task = state.retrieve("current").get("task")
all_tasks = list(tasks.find({"path": {"$ne": ""}}).sort("path", 1))
with open(filepath, "w") as f:
prev_parts = []
for task in all_tasks:
path_parts = task["path"].split("/")
common = 0
for i, (prev, curr) in enumerate(zip(prev_parts, path_parts)):
if prev != curr:
break
common = i + 1
for i, part in enumerate(path_parts[common:], common):
path_segment = path_parts[: i + 1]
task_id = task.get("task_id", "") if path_segment == path_parts else ""
line = format_task_line([part], i, task_id, current_task)
f.write(f"{line}\n")
prev_parts = path_parts
def extract(line: str) -> Optional[str]:
"""Extract task ID if line ends with * and has a valid ID."""
line = line.rstrip()
if line.endswith("*"):
pipe_index = line.find("|")
if pipe_index != -1:
# Extract everything between | and * and strip spaces
id_part = line[pipe_index + 1 : -1].strip()
if len(id_part) == 8:
return id_part
return None
def read_and_extract(filepath: str) -> Optional[str]:
"""Read file and update state if current task is found."""
if filepath is None:
filepath = task_file
mtime = get_file_mtime(filepath)
state.save("current", filetime=mtime)
with open(filepath, "r") as file:
for line in file:
task_id = extract(line)
if task_id:
return task_id
return None
def get_file_mtime(filepath: str) -> str:
"""Get file modification time as ISO format string."""
if filepath is None:
filepath = task_file
return datetime.datetime.fromtimestamp(Path(filepath).stat().st_mtime).isoformat()
def get_tasks_tree(root_path: str, only_with_ids: bool = True) -> list[dict]:
"""
Get all tasks under a given path node, including the node itself.
"""
query = {"path": {"$regex": root_path}}
# Add task_id filter if requested
if only_with_ids:
query["task_id"] = {"$exists": True}
# Query and sort by path to maintain hierarchy
cursor = tasks.find(
query,
{"path": 1, "task_id": 1, "_id": 0},
).sort("path", 1)
return list(cursor)