158 lines
4.6 KiB
Python
158 lines
4.6 KiB
Python
import re
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import state
|
|
from bson import ObjectId
|
|
from config import logger, tasks
|
|
|
|
task_file = "/home/mariano/wdir/def/deskmeter/dmapp/dmcore/sample_task_file"
|
|
|
|
|
|
def parse_line(line: str) -> tuple[Optional[str], Optional[str]]:
|
|
"""Parse a task line to extract task name and ID."""
|
|
line = line.strip()
|
|
|
|
if not line:
|
|
return None, None
|
|
|
|
parts = line.split("|")
|
|
if len(parts) > 1:
|
|
task_name = parts[0].strip()
|
|
task_id = parts[1].split()[0].strip()
|
|
return task_name, task_id
|
|
|
|
return parts[0].strip(), None
|
|
|
|
|
|
def file_to_db(filepath: str):
|
|
"""Convert task file to MongoDB entries."""
|
|
current_path = []
|
|
tasks.delete_many({})
|
|
seen_paths = set()
|
|
|
|
with open(filepath, "r") as f:
|
|
for line in f:
|
|
if not line.strip():
|
|
tasks.insert_one({"path": "", "_id": ObjectId()})
|
|
continue
|
|
|
|
indent = len(line) - len(line.lstrip())
|
|
level = indent // 4
|
|
|
|
task_name, task_id = parse_line(line)
|
|
if task_name is None:
|
|
continue
|
|
|
|
current_path = current_path[:level]
|
|
current_path.append(task_name)
|
|
full_path = "/".join(current_path)
|
|
|
|
if task_id:
|
|
tasks.update_one(
|
|
{"_id": task_id},
|
|
{"$set": {"path": full_path, "task_id": task_id}},
|
|
upsert=True,
|
|
)
|
|
elif full_path not in seen_paths:
|
|
tasks.insert_one({"_id": ObjectId(), "path": full_path})
|
|
|
|
seen_paths.add(full_path)
|
|
|
|
tasks.createIndex(
|
|
{"path": 1},
|
|
{
|
|
name: "idx_tasks_path_ci",
|
|
background: true,
|
|
collation: {locale: "es", strength: 1},
|
|
},
|
|
)
|
|
|
|
|
|
def format_task_line(
|
|
path_parts: list, indent_level: int, task_id: str, current_task: str
|
|
) -> str:
|
|
line = " " * (4 * indent_level) + path_parts[-1]
|
|
if task_id:
|
|
padding = max(1, 64 - len(line))
|
|
line = f"{line}{' ' * padding}|{task_id}"
|
|
if task_id == current_task:
|
|
line += " *"
|
|
return line
|
|
|
|
|
|
def db_to_file_as_is(filepath: str):
|
|
"""Write tasks from MongoDB to file exactly as they were read."""
|
|
if filepath is None:
|
|
filepath = task_file
|
|
|
|
current_task = state.retrieve("current").get("task")
|
|
all_tasks = list(tasks.find())
|
|
|
|
with open(filepath, "w") as f:
|
|
for task in all_tasks:
|
|
if not task["path"]:
|
|
f.write("\n")
|
|
continue
|
|
|
|
path_parts = task["path"].split("/")
|
|
line = format_task_line(
|
|
path_parts, len(path_parts) - 1, task.get("task_id", ""), current_task
|
|
)
|
|
f.write(f"{line}\n")
|
|
|
|
|
|
def get_all_tasks(prefix):
|
|
return list(tasks.find({"path": {"$ne": ""}}).sort("path", 1))
|
|
|
|
|
|
def db_to_file_consolidated(filepath: str):
|
|
"""Write tasks from MongoDB to file as a consolidated tree."""
|
|
current_task = state.retrieve("current").get("task")
|
|
all_tasks = list(tasks.find({"path": {"$ne": ""}}).sort("path", 1))
|
|
|
|
with open(filepath, "w") as f:
|
|
prev_parts = []
|
|
for task in all_tasks:
|
|
path_parts = task["path"].split("/")
|
|
common = 0
|
|
for i, (prev, curr) in enumerate(zip(prev_parts, path_parts)):
|
|
if prev != curr:
|
|
break
|
|
common = i + 1
|
|
|
|
for i, part in enumerate(path_parts[common:], common):
|
|
path_segment = path_parts[: i + 1]
|
|
task_id = task.get("task_id", "") if path_segment == path_parts else ""
|
|
line = format_task_line([part], i, task_id, current_task)
|
|
f.write(f"{line}\n")
|
|
|
|
prev_parts = path_parts
|
|
|
|
|
|
def extract(line: str) -> Optional[str]:
|
|
"""Extract task ID if line ends with * and has a valid ID."""
|
|
line = line.rstrip()
|
|
if line.endswith("*"):
|
|
pipe_index = line.find("|")
|
|
if pipe_index != -1:
|
|
# Extract everything between | and * and strip spaces
|
|
id_part = line[pipe_index + 1 : -1].strip()
|
|
if len(id_part) == 8:
|
|
state.save("current", task=id_part)
|
|
return id_part
|
|
return None
|
|
|
|
|
|
def read_and_extract(filepath: str) -> Optional[str]:
|
|
"""Read file and update state if current task is found."""
|
|
if filepath is None:
|
|
filepath = task_file
|
|
|
|
with open(filepath, "r") as file:
|
|
for line in file:
|
|
task_id = extract(line)
|
|
if task_id:
|
|
return task_id
|
|
return None
|