moved config. saved a workable state

This commit is contained in:
buenosairesam
2025-05-13 04:23:40 -03:00
parent f531be7158
commit 7995040c82
4 changed files with 102 additions and 62 deletions

21
dmapp/dmcore/config.py Normal file
View File

@@ -0,0 +1,21 @@
# dmapp/dmcore/config.py
import logging
from pymongo import MongoClient
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# MongoDB configuration
client = MongoClient()
db = client.deskmeter
# Collections
switches = db.switch
states = db.state
tasks = db.task

View File

@@ -4,29 +4,21 @@ import subprocess
import time import time
from pprint import pprint from pprint import pprint
import task
import state import state
from pymongo import MongoClient import task
from config import logger, switches
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
client = MongoClient() desktops = ("Plan", "Think", "Work", "Other", "Away", "Work", "Work")
work_desktops = {2: "default", 5: "dlt", 6: "vhs"}
unlabeled = "Away"
def now(): def now():
return datetime.datetime.now(ZoneInfo("America/Argentina/Buenos_Aires")) return datetime.datetime.now(ZoneInfo("America/Argentina/Buenos_Aires"))
db = client.deskmeter
switches = db.switch
dailies = db.daily
desktops = ("Plan", "Think", "Work", "Other", "Away", "Work", "Work")
unlabeled = "Away"
def active_workspace(): def active_workspace():
workspaces = ( workspaces = (
subprocess.check_output(["wmctrl", "-d"]) subprocess.check_output(["wmctrl", "-d"])
@@ -38,8 +30,6 @@ def active_workspace():
for workspace in workspaces: for workspace in workspaces:
if workspace[3] == "*": if workspace[3] == "*":
return int(workspace[0]) return int(workspace[0])
return None
return None
def desktop(workspace_index): def desktop(workspace_index):
@@ -49,8 +39,12 @@ def desktop(workspace_index):
return unlabeled return unlabeled
task.read_and_extract(None)
state.init_work_state(work_desktops)
current_workspace = active_workspace() current_workspace = active_workspace()
current_task = state.get_current_task() current_task = state.retrieve("current").get("task")
last_switch_time = now() last_switch_time = now()
switch = { switch = {
@@ -63,13 +57,18 @@ switch = {
switches.insert_one(switch) switches.insert_one(switch)
while True: while True:
current_task = state.get_current_task() current_task = state.retrieve("current").get("task")
current_workspace = active_workspace() current_workspace = active_workspace()
state.update_current_workspace(current_workspace) state.save("current", workspace=current_workspace)
last_doc = switches.find_one(sort=[("_id", -1)]) last_doc = switches.find_one(sort=[("_id", -1)])
# work workflow
if current_workspace in work_desktops.keys():
pass
# regular flow
if ( if (
last_doc["workspace"] == desktop(current_workspace) last_doc["workspace"] == desktop(current_workspace)
and last_doc["task"] == current_task and last_doc["task"] == current_task

View File

@@ -1,35 +1,47 @@
from pymongo import MongoClient from config import logger, states, tasks
client = MongoClient()
db = client.deskmeter
state = db.state
def update_current_task(task): def save(doc_id: str, *, task: str | None = None, workspace: str | None = None) -> None:
state.update_one( """
{"_id": "current_task"}, Upsert a document with _id=doc_id, setting any of the provided fields.
{"$set": {"task": task}}, Leave fields you dont pass unchanged.
upsert=True, """
) updates: dict = {}
if task is not None:
updates["task"] = task
if workspace is not None:
updates["workspace"] = workspace
if updates:
states.update_one(
{"_id": doc_id},
{"$set": updates},
upsert=True,
)
def update_current_workspace(workspace): def retrieve(doc_id: str) -> dict[str, str | None]:
state.update_one( """
{"_id": "current_workspace"}, Fetches the document with _id=doc_id and returns its 'task' and 'workspace'.
{"$set": {"workspace": workspace}}, If the document doesnt exist, both will be None.
upsert=True, """
) doc = states.find_one({"_id": doc_id})
return {
"task": doc.get("task") if doc else None,
"workspace": doc.get("workspace") if doc else None,
}
def get_current_task(): def init_work_state(wd: dict):
current_task = state.find_one({"_id": "current_task"}) """
if current_task: init work states with default values
return current_task.get("task") """
return None if not states.find_one({"_id": "work"}):
states.insert_one(
{
def get_current_workspace(): "_id": "work",
current_workspace = state.find_one({"_id": "current_workspace"}) **{
if current_workspace: wd[k]: tasks.find_one({"path": f"work/{wd[k]}"})["task_id"]
return current_workspace.get("workspace") for k in wd
return None },
}
)

View File

@@ -4,11 +4,7 @@ from typing import Optional
import state import state
from bson import ObjectId from bson import ObjectId
from pymongo import MongoClient from config import logger, tasks
client = MongoClient()
db = client.deskmeter
tasks = db.task
task_file = "/home/mariano/wdir/def/deskmeter/dmapp/dmcore/sample_task_file" task_file = "/home/mariano/wdir/def/deskmeter/dmapp/dmcore/sample_task_file"
@@ -16,14 +12,13 @@ task_file = "/home/mariano/wdir/def/deskmeter/dmapp/dmcore/sample_task_file"
def parse_line(line: str) -> tuple[Optional[str], Optional[str]]: def parse_line(line: str) -> tuple[Optional[str], Optional[str]]:
"""Parse a task line to extract task name and ID.""" """Parse a task line to extract task name and ID."""
line = line.strip() line = line.strip()
if not line: if not line:
return None, None return None, None
# Split by | and check if we have an ID part
parts = line.split("|") parts = line.split("|")
if len(parts) > 1: if len(parts) > 1:
task_name = parts[0].strip() task_name = parts[0].strip()
# Take everything after | and clean it up
task_id = parts[1].split()[0].strip() task_id = parts[1].split()[0].strip()
return task_name, task_id return task_name, task_id
@@ -64,6 +59,15 @@ def file_to_db(filepath: str):
seen_paths.add(full_path) seen_paths.add(full_path)
tasks.createIndex(
{"path": 1},
{
name: "idx_tasks_path_ci",
background: true,
collation: {locale: "es", strength: 1},
},
)
def format_task_line( def format_task_line(
path_parts: list, indent_level: int, task_id: str, current_task: str path_parts: list, indent_level: int, task_id: str, current_task: str
@@ -79,7 +83,7 @@ def format_task_line(
def db_to_file_as_is(filepath: str): def db_to_file_as_is(filepath: str):
"""Write tasks from MongoDB to file exactly as they were read.""" """Write tasks from MongoDB to file exactly as they were read."""
current_task = state.get_current_task() current_task = state.retrieve("current").get("task")
all_tasks = list(tasks.find()) all_tasks = list(tasks.find())
with open(filepath, "w") as f: with open(filepath, "w") as f:
@@ -95,9 +99,13 @@ def db_to_file_as_is(filepath: str):
f.write(f"{line}\n") f.write(f"{line}\n")
def get_all_tasks(prefix):
return list(tasks.find({"path": {"$ne": ""}}).sort("path", 1))
def db_to_file_consolidated(filepath: str): def db_to_file_consolidated(filepath: str):
"""Write tasks from MongoDB to file as a consolidated tree.""" """Write tasks from MongoDB to file as a consolidated tree."""
current_task = state.get_current_task() current_task = state.retrieve("current").get("task")
all_tasks = list(tasks.find({"path": {"$ne": ""}}).sort("path", 1)) all_tasks = list(tasks.find({"path": {"$ne": ""}}).sort("path", 1))
with open(filepath, "w") as f: with open(filepath, "w") as f:
@@ -128,15 +136,15 @@ def extract(line: str) -> Optional[str]:
# Extract everything between | and * and strip spaces # Extract everything between | and * and strip spaces
id_part = line[pipe_index + 1 : -1].strip() id_part = line[pipe_index + 1 : -1].strip()
if len(id_part) == 8: if len(id_part) == 8:
state.update_current_task(id_part) state.save("current", task=id_part)
return id_part return id_part
return None return None
def read_and_extract(file_path: str) -> Optional[str]: def read_and_extract(file_path: str) -> Optional[str]:
"""Read file and update state if current task is found.""" """Read file and update state if current task is found."""
if not Path(file_path).exists(): if file_path is None:
return None file_path = task_file
with open(file_path, "r") as file: with open(file_path, "r") as file:
for line in file: for line in file: