Files
deskmeter/dmapp/dmcore/main.py

159 lines
4.4 KiB
Python

import datetime
import json
import os
import subprocess
import time
from pprint import pprint
import state
import task
from config import logger, switches
from zoneinfo import ZoneInfo
desktops = ("Plan", "Think", "Work", "Other", "Away", "Work", "Work", "Work")
unlabeled = "Away"
config_file = "config.json"
def load_config():
"""Load configuration from JSON file"""
with open(config_file, "r") as f:
return json.load(f)
def reload_config_if_changed():
"""Check if config file changed and reload to state if needed"""
current_config_mtime = state.retrieve("current").get("config_mtime")
config_mtime = os.path.getmtime(config_file)
if current_config_mtime != config_mtime:
cfg = load_config()
work_desktop_tasks = {int(k): v for k, v in cfg["work_desktop_tasks"].items()}
state.sync_desktop_tasks(work_desktop_tasks)
state.save("current", config_mtime=config_mtime)
logger.info(f"Config reloaded: {work_desktop_tasks}")
def now():
cfg = load_config()
return datetime.datetime.now(ZoneInfo(cfg["timezone"]))
def handle_task_file_changes(current_task):
"""Check if task file changed and update task if needed"""
current_mtime = state.retrieve("current").get("filetime")
file_mtime = task.get_file_mtime(None)
if current_mtime != file_mtime:
task_id = task.read_and_extract(None)
logger.debug(f"task_id:{task_id}")
task.file_to_db(None)
if task_id != current_task:
state.save("current", task=task_id)
current_task = task_id
return current_task
def update_workspace_state():
"""Update current workspace in state"""
current_workspace = active_workspace()
state.save("current", workspace=current_workspace)
return current_workspace
def enforce_desktop_task(current_workspace, work_desktop_tasks, current_task):
"""Enforce assigned task for work desktops"""
if current_workspace in work_desktop_tasks and work_desktop_tasks[current_workspace]:
assigned_task = work_desktop_tasks[current_workspace]
if current_task != assigned_task:
current_task = assigned_task
state.save("current", task=current_task)
task.db_to_file_as_is(None)
return current_task
def track_workspace_switch(current_workspace, current_task, last_switch_time):
"""Update or create switch record"""
last_doc = switches.find_one(sort=[("_id", -1)])
if (
last_doc["workspace"] == desktop(current_workspace)
and last_doc["task"] == current_task
):
delta = round((now() - last_switch_time).total_seconds())
switches.update_one(
{"_id": last_doc["_id"]}, {"$set": {"delta": delta, "task": current_task}}
)
return last_switch_time
else:
switch = {
"workspace": desktop(current_workspace),
"date": now(),
"delta": 0,
"task": current_task,
}
switches.insert_one(switch)
return now()
def active_workspace():
workspaces = (
subprocess.check_output(["wmctrl", "-d"])
.decode("utf-8")
.strip("\n")
.split("\n")
)
for workspace in workspaces:
if workspace[3] == "*":
return int(workspace[0])
def desktop(workspace_index):
try:
return desktops[workspace_index]
except IndexError:
return unlabeled
task.read_and_extract(None)
current_workspace = active_workspace()
current_task = state.retrieve("current").get("task")
last_switch_time = now()
switch = {
"workspace": desktop(current_workspace),
"date": now(),
"delta": 0,
"task": current_task,
}
switches.insert_one(switch)
while True:
# Check if config changed and reload
reload_config_if_changed()
# Load work_desktop_tasks from state
work_desktop_tasks = state.retrieve_desktop_state()
# Handle task file changes
current_task = handle_task_file_changes(current_task)
# Update current task and workspace
current_task = state.retrieve("current").get("task")
current_workspace = update_workspace_state()
# Enforce desktop task assignments
current_task = enforce_desktop_task(current_workspace, work_desktop_tasks, current_task)
# Track workspace switches
last_switch_time = track_workspace_switch(current_workspace, current_task, last_switch_time)
time.sleep(2)