import datetime import os import subprocess import time from pprint import pprint from pymongo import MongoClient from zoneinfo import ZoneInfo client = MongoClient() def now(): return datetime.datetime.now(ZoneInfo("America/Argentina/Buenos_Aires")) db = client.deskmeter switches = db.switch dailies = db.daily desktops = ("Plan", "Think", "Work", "Other", "Away", "Work", "Work") task_file = "/home/mariano/LETRAS/org/task/main" # desktops = ("Admin", # "Learn", # "Tasks", # "Other", # "Text", # "Video", # "Away") unlabeled = "Away" def extract(line): if line.rstrip().endswith("*"): pipe_index = line.find("|") if pipe_index != -1 and len(line) > pipe_index + 8: value = line[pipe_index + 1 : pipe_index + 9] return value return None def read_and_extract(file_path): with open(file_path, "r") as file: for line in file: value = extract(line) if value: return value return None def active_workspace(): workspaces = ( subprocess.check_output(["wmctrl", "-d"]) .decode("utf-8") .strip("\n") .split("\n") ) for workspace in workspaces: if workspace[3] == "*": return int(workspace[0]) def desktop(workspace_index): try: return desktops[workspace_index] except IndexError: return unlabeled current_workspace = active_workspace() current_task = read_and_extract(task_file) last_switch_time = now() switch = { "workspace": desktop(current_workspace), "date": now(), "delta": 0, "task": current_task, } switches.insert_one(switch) while True: current_task = read_and_extract(task_file) current_workspace = active_workspace() last_doc = switches.find_one(sort=[("_id", -1)]) if ( last_doc["workspace"] == desktop(current_workspace) and last_doc["task"] == current_task ): delta = round((now() - last_switch_time).total_seconds()) switches.update_one( {"_id": last_doc["_id"]}, {"$set": {"delta": delta, "task": current_task}} ) else: current_workspace = active_workspace() switch = { "workspace": desktop(current_workspace), "date": now(), "delta": 0, "task": current_task, } switches.insert_one(switch) last_switch_time = now() time.sleep(2)