new view, gnome extension

This commit is contained in:
buenosairesam
2025-12-19 21:08:39 -03:00
parent 38f3c7a711
commit 0dde9f1f54
6 changed files with 220 additions and 39 deletions

View File

@@ -1,5 +1,6 @@
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from pymongo import MongoClient
from zoneinfo import ZoneInfo
@@ -11,9 +12,99 @@ client = MongoClient()
db = client.deskmeter
switches = db.switch
tasks = db.task
task_history = db.task_history
task_file = "/home/mariano/LETRAS/adm/task/main"
task_dir = Path(task_file).parent
def parse_task_line(line):
"""Parse a task line to extract task name and ID."""
line = line.strip()
if not line:
return None, None
parts = line.split("|")
if len(parts) > 1:
task_name = parts[0].strip()
id_parts = parts[1].split()
if id_parts:
task_id = id_parts[0].strip()
return task_name, task_id
return task_name, None
return parts[0].strip(), None
def load_task_from_files(task_id):
"""Search task directory files for a task ID and load it into task_history."""
for task_filepath in task_dir.glob("*"):
if not task_filepath.is_file():
continue
current_path = []
try:
with open(task_filepath, "r") as f:
for line in f:
if not line.strip():
continue
indent = len(line) - len(line.lstrip())
level = indent // 4
task_name, found_id = parse_task_line(line)
if task_name is None:
continue
current_path = current_path[:level]
current_path.append(task_name)
full_path = "/".join(current_path)
if found_id == task_id:
# Found it! Insert into task_history
task_history.update_one(
{"task_id": task_id},
{"$set": {
"path": full_path,
"task_id": task_id,
"source_file": task_filepath.name
}},
upsert=True
)
return full_path
except:
# Skip files that can't be read
continue
return None
def get_task_path(task_id):
"""
Get task path from tasks collection, falling back to task_history.
If not found, searches task directory files and populates task_history on-demand.
"""
if not task_id:
return None
# Try current tasks first
task_doc = tasks.find_one({"task_id": task_id})
if task_doc and "path" in task_doc:
return task_doc["path"]
# Try task history cache
task_doc = task_history.find_one({"task_id": task_id})
if task_doc and "path" in task_doc:
return task_doc["path"]
# Not in cache, search files and populate history
task_path = load_task_from_files(task_id)
if task_path:
return task_path
# Still not found, return ID as fallback
return task_id
def get_current_task_info():
@@ -134,9 +225,8 @@ def get_work_period_totals(start, end):
total_seconds = result["total_seconds"]
if total_seconds > 0:
# Get task path from tasks collection
task_doc = tasks.find_one({"task_id": task_id})
task_path = task_doc["path"] if task_doc and "path" in task_doc else task_id
# Get task path with history fallback
task_path = get_task_path(task_id)
combined_rows.append({
"ws": task_path,
@@ -194,15 +284,12 @@ def get_task_blocks_calendar(start, end, task=None, min_block_seconds=300):
if current_block is not None:
blocks.append(current_block)
# Get task path
task_path = None
if task_id:
task_doc = tasks.find_one({"task_id": task_id})
task_path = task_doc["path"] if task_doc and "path" in task_doc else task_id
# Get task path with history fallback
task_path = get_task_path(task_id) or "No Task"
current_block = {
"task_id": task_id,
"task_path": task_path or "No Task",
"task_path": task_path,
"start": switch_start,
"end": switch_end,
"duration": switch_duration,
@@ -256,15 +343,13 @@ def get_raw_switches(start, end, task=None):
result = []
for switch in raw_switches:
task_id = switch.get("task")
task_path = None
if task_id:
task_doc = tasks.find_one({"task_id": task_id})
task_path = task_doc["path"] if task_doc and "path" in task_doc else task_id
# Get task path with history fallback
task_path = get_task_path(task_id) or "No Task"
result.append({
"workspace": switch["workspace"],
"task_id": task_id,
"task_path": task_path or "No Task",
"task_path": task_path,
"date": switch["date"].replace(tzinfo=utctz).astimezone(timezone),
"delta": switch["delta"]
})