Files
deskmeter/dmapp/dmcore/main.py
buenosairesam 72f9e9d9a7 changed config
2025-10-03 03:30:22 -03:00

122 lines
3.4 KiB
Python

import datetime
import os
import subprocess
import time
from pprint import pprint
import state
import task
from config import logger, switches
from zoneinfo import ZoneInfo
desktops = ("Plan", "Think", "Work", "Other", "Away", "Work", "Work", "Work")
work_desktops = {2: "snk", 5: "dlt", 6: "vhs", 7: "own"}
unlabeled = "Away"
def now():
return datetime.datetime.now(ZoneInfo("America/Argentina/Buenos_Aires"))
def active_workspace():
workspaces = (
subprocess.check_output(["wmctrl", "-d"])
.decode("utf-8")
.strip("\n")
.split("\n")
)
for workspace in workspaces:
if workspace[3] == "*":
return int(workspace[0])
def desktop(workspace_index):
try:
return desktops[workspace_index]
except IndexError:
return unlabeled
task.read_and_extract(None)
state.init_work_state(work_desktops)
current_workspace = active_workspace()
current_task = state.retrieve("current").get("task")
last_switch_time = now()
switch = {
"workspace": desktop(current_workspace),
"date": now(),
"delta": 0,
"task": current_task,
}
switches.insert_one(switch)
while True:
current_mtime = state.retrieve("current").get("filetime")
file_mtime = task.get_file_mtime(None)
# First handle file changes
if current_mtime != file_mtime:
task_id = task.read_and_extract(None)
logger.debug(f"task_id:{task_id}")
task.file_to_db(None)
if task_id != current_task: # Only update state if different
state.save("current", task=task_id)
current_task = task_id
current_task = state.retrieve("current").get("task")
current_workspace = active_workspace()
state.save("current", workspace=current_workspace)
last_doc = switches.find_one(sort=[("_id", -1)])
# work workflow
if current_workspace in work_desktops.keys():
work_states = state.retrieve_work_state()
current_work_task = work_states[work_desktops[current_workspace]]
# Get all task IDs under current workspace path
workspace_tasks = task.get_tasks_tree(
f"work/{work_desktops[current_workspace]}"
)
work_task_ids = {t["task_id"] for t in workspace_tasks if "task_id" in t}
# if current_task in work_task_ids and current_task != current_work_task:
if current_task not in work_task_ids:
# Enforce work task if current task is not in workspace
if current_task != current_work_task:
current_task = current_work_task
state.save("current", task=current_task)
task.db_to_file_as_is(None)
elif current_task != current_work_task:
# Update work state when switching to a different valid task
state.update_work_state(work_desktops[current_workspace], current_task)
# regular flow
if (
last_doc["workspace"] == desktop(current_workspace)
and last_doc["task"] == current_task
):
delta = round((now() - last_switch_time).total_seconds())
switches.update_one(
{"_id": last_doc["_id"]}, {"$set": {"delta": delta, "task": current_task}}
)
else:
current_workspace = active_workspace()
switch = {
"workspace": desktop(current_workspace),
"date": now(),
"delta": 0,
"task": current_task,
}
switches.insert_one(switch)
last_switch_time = now()
time.sleep(2)