Files
deskmeter/dmapp/dmweb/get_period_times.py

543 lines
17 KiB
Python

from collections import Counter, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
from pymongo import MongoClient
timezone = ZoneInfo("America/Argentina/Buenos_Aires")
utctz = ZoneInfo("UTC")
client = MongoClient()
db = client.deskmeter
switches = db.switch
tasks = db.task
task_history = db.task_history
task_file = "/home/mariano/LETRAS/adm/task/main"
task_dir = Path(task_file).parent
def parse_task_line(line):
"""Parse a task line to extract task name and ID."""
line = line.strip()
if not line:
return None, None
parts = line.split("|")
if len(parts) > 1:
task_name = parts[0].strip()
id_parts = parts[1].split()
if id_parts:
task_id = id_parts[0].strip()
return task_name, task_id
return task_name, None
return parts[0].strip(), None
def load_task_from_files(task_id):
"""Search task directory files (recursively) for a task ID and load it into task_history."""
for task_filepath in task_dir.glob("**/*"):
if not task_filepath.is_file():
continue
current_path = []
try:
with open(task_filepath, "r") as f:
for line in f:
if not line.strip():
continue
indent = len(line) - len(line.lstrip())
level = indent // 4
task_name, found_id = parse_task_line(line)
if task_name is None:
continue
current_path = current_path[:level]
current_path.append(task_name)
full_path = "/".join(current_path)
if found_id == task_id:
# Found it! Insert into task_history
task_history.update_one(
{"task_id": task_id},
{
"$set": {
"path": full_path,
"task_id": task_id,
"source_file": task_filepath.name,
}
},
upsert=True,
)
return full_path
except:
# Skip files that can't be read
continue
return None
def get_task_path(task_id):
"""
Get task path from tasks collection, falling back to task_history.
If not found, searches task directory files and populates task_history on-demand.
"""
if not task_id:
return None
# Try current tasks first
task_doc = tasks.find_one({"task_id": task_id})
if task_doc and "path" in task_doc:
return task_doc["path"]
# Try task history cache
task_doc = task_history.find_one({"task_id": task_id})
if task_doc and "path" in task_doc:
return task_doc["path"]
# Not in cache, search files and populate history
task_path = load_task_from_files(task_id)
if task_path:
return task_path
# Still not found, return ID as fallback
return task_id
def get_current_task_info():
"""Get current task ID and path from state and tasks collection"""
states = db.state
current_doc = states.find_one({"_id": "current"})
if not current_doc or "task" not in current_doc:
return None, None
task_id = current_doc["task"]
task_doc = tasks.find_one({"task_id": task_id})
if task_doc and "path" in task_doc:
return task_id, task_doc["path"]
return task_id, None
def get_task_time_seconds(start, end, task_id, workspaces=None):
"""Get total seconds for a task within a time period using MongoDB aggregation."""
if workspaces is None:
workspaces = ["Plan", "Think", "Work"]
pipeline = [
{
"$match": {
"date": {"$gte": start, "$lte": end},
"task": task_id,
"workspace": {"$in": workspaces},
}
},
{"$group": {"_id": None, "total_seconds": {"$sum": "$delta"}}},
]
result = list(switches.aggregate(pipeline))
if result and len(result) > 0:
return result[0]["total_seconds"]
return 0
def task_or_none(task=None):
if task == "all":
task = None
return task
def now():
return datetime.now(timezone)
def convert_seconds(seconds, use_days=False):
days = seconds // 86400
hours = (seconds % 86400) // 3600
minutes = (seconds % 3600) // 60
remaining_seconds = seconds % 60
if use_days:
return "{} days, {:02d}:{:02d}:{:02d}".format(
days, hours, minutes, remaining_seconds
)
return "{:02d}:{:02d}:{:02d}".format(hours + days * 24, minutes, remaining_seconds)
def get_work_period_totals(start, end):
"""Get period totals grouped by task with full path."""
# Get all tasks with time in the period
pipeline = [
{
"$match": {
"date": {"$gte": start, "$lte": end},
"workspace": {"$in": ["Plan", "Think", "Work"]},
"task": {"$exists": True, "$ne": None},
}
},
{"$group": {"_id": "$task", "total_seconds": {"$sum": "$delta"}}},
]
results = list(switches.aggregate(pipeline))
combined_rows = []
for result in results:
task_id = result["_id"]
total_seconds = result["total_seconds"]
if total_seconds > 0:
# Get task path with history fallback
task_path = get_task_path(task_id)
combined_rows.append(
{"ws": task_path, "total": convert_seconds(total_seconds)}
)
# Sort by path for consistency
combined_rows.sort(key=lambda x: x["ws"])
return combined_rows
def get_task_blocks_calendar(
start, end, task=None, min_block_seconds=300, grid_hours=1
):
"""
Get task blocks for calendar-style visualization, aggregated by time grid.
Shows all tasks worked on during each grid period, with overlapping blocks.
Each task block's height is proportional to time spent in that grid period.
Args:
grid_hours: Grid size in hours (1, 3, or 6)
Returns list of blocks:
[{
'task_id': str,
'task_path': str,
'start': datetime (start of grid period),
'end': datetime (end of grid period or actual end time if less),
'duration': int (seconds in this grid block),
'hour': int (hour of grid start, 0-23),
'active_seconds': int,
'active_ratio': float (always 1.0)
}, ...]
"""
task_query = {"$in": task.split(",")} if task else {}
match_query = {
"date": {"$gte": start, "$lte": end},
"workspace": {"$in": ["Plan", "Think", "Work"]}, # Only active workspaces
}
if task_query:
match_query["task"] = task_query
# Get all active switches in period
raw_switches = list(switches.find(match_query).sort("date", 1))
if not raw_switches:
return []
# Aggregate by grid period and task
# Structure: {(date, grid_start_hour, task_id): total_seconds}
grid_task_time = defaultdict(lambda: {"duration": 0, "task_path": None})
for switch in raw_switches:
task_id = switch.get("task")
switch_start = switch["date"].replace(tzinfo=utctz).astimezone(timezone)
switch_duration = switch["delta"]
switch_end = switch_start + timedelta(seconds=switch_duration)
# Calculate how much time falls in each grid period this switch spans
current_time = switch_start
remaining_duration = switch_duration
while remaining_duration > 0 and current_time < switch_end:
# Calculate grid period start (hour rounded down to grid_hours)
grid_hour = (current_time.hour // grid_hours) * grid_hours
grid_start = current_time.replace(
hour=grid_hour, minute=0, second=0, microsecond=0
)
grid_end = grid_start + timedelta(hours=grid_hours)
# Time in this grid period
time_in_grid = min(
(grid_end - current_time).total_seconds(), remaining_duration
)
key = (current_time.date(), grid_hour, task_id)
# Get task path (cache it)
if grid_task_time[key]["task_path"] is None:
task_path = get_task_path(task_id) or "No Task"
grid_task_time[key]["task_path"] = task_path
grid_task_time[key]["duration"] += time_in_grid
remaining_duration -= time_in_grid
current_time = grid_end
# Convert to blocks
blocks = []
for (date, grid_hour, task_id), data in grid_task_time.items():
if data["duration"] >= min_block_seconds:
grid_start = datetime(
date.year, date.month, date.day, grid_hour, 0, 0, tzinfo=timezone
)
blocks.append(
{
"task_id": task_id,
"task_path": data["task_path"],
"start": grid_start,
"end": grid_start + timedelta(seconds=data["duration"]),
"hour": grid_hour,
"duration": int(data["duration"]),
"active_seconds": int(data["duration"]),
"idle_seconds": 0,
"active_ratio": 1.0,
}
)
return sorted(blocks, key=lambda x: (x["start"], x["task_path"]))
def get_raw_switches(start, end, task=None):
"""
Get all raw switch documents in the period.
Returns list of switches:
[{
'workspace': str,
'task_id': str,
'task_path': str,
'date': datetime,
'delta': int (seconds)
}, ...]
"""
task_query = {"$in": task.split(",")} if task else {}
match_query = {"date": {"$gte": start, "$lte": end}}
if task_query:
match_query["task"] = task_query
raw_switches = list(switches.find(match_query).sort("date", 1))
result = []
for switch in raw_switches:
task_id = switch.get("task")
# Get task path with history fallback
task_path = get_task_path(task_id) or "No Task"
result.append(
{
"workspace": switch["workspace"],
"task_id": task_id,
"task_path": task_path,
"date": switch["date"].replace(tzinfo=utctz).astimezone(timezone),
"delta": switch["delta"],
}
)
return result
def get_period_totals(start, end, task=None):
task_query = {"$in": task.split(",")} if task else {}
match_query = {"date": {"$gte": start, "$lte": end}}
if task_query:
match_query["task"] = task_query
pipeline = [
{"$match": match_query},
{"$sort": {"date": 1}},
{
"$group": {
"_id": None,
"documents_in_range": {"$push": "$$ROOT"},
"first_doc": {"$first": "$$ROOT"},
"last_doc": {"$last": "$$ROOT"},
}
},
{
"$lookup": {
"from": "switch",
"let": {"first_date": "$first_doc.date", "task": "$first_doc.task"},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$lt": ["$date", "$$first_date"]
}, # Only before the first date
{
"$eq": ["$task", "$$task"]
}, # Must have the same task
]
}
}
},
{"$sort": {"date": -1}}, # Get the most recent (closest) document
{"$limit": 1}, # Only the immediate previous document
],
"as": "before_first",
}
},
{
"$project": {
"documents": {
"$concatArrays": [
{"$ifNull": ["$before_first", []]}, # Add only if found
"$documents_in_range",
]
}
}
},
{"$unwind": "$documents"},
{"$replaceRoot": {"newRoot": "$documents"}},
{
"$group": {
"_id": "$workspace",
"total": {"$sum": "$delta"},
}
},
]
results = list(switches.aggregate(pipeline))
if not results:
return [{"ws": "No Data", "total": ""}]
pipeline_before_after = [
# Match documents within the date range
{"$match": match_query},
{"$sort": {"date": 1}},
{
"$group": {
"_id": None,
"first_doc": {"$first": "$$ROOT"},
"last_doc": {"$last": "$$ROOT"},
}
},
# Lookup to get one document before the first document in the range
{
"$lookup": {
"from": "switch",
"let": {"first_date": "$first_doc.date", "task": "$first_doc.task"},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$lt": ["$date", "$$first_date"]
}, # Only before the first date
{
"$eq": ["$task", "$$task"]
}, # Must have the same task
]
}
}
},
{"$sort": {"date": -1}}, # Get the most recent (closest) document
{"$limit": 1}, # Only the immediate previous document
],
"as": "before_first",
}
},
{
"$project": {
"before_first": {
"$ifNull": [{"$arrayElemAt": ["$before_first", 0]}, ""]
},
"last_doc": "$last_doc", # Include the last_doc from the matched period
}
},
]
aux_results = list(switches.aggregate(pipeline_before_after))
# Safety check: if aux_results is empty, return early with no data
if not aux_results:
return [{"ws": "No Data", "total": ""}]
bfirst = aux_results[0]["before_first"]
start_delta = 0
if bfirst:
bfdate = bfirst["date"].replace(tzinfo=utctz)
time_since_bfirst = round((start - bfdate.astimezone(timezone)).total_seconds())
# Only apply start_delta if the before_first switch actually crosses into the period
# If time_since_bfirst > bfirst["delta"], the switch ended before the period started
if time_since_bfirst <= bfirst["delta"]:
start_delta = time_since_bfirst
ldoc = aux_results[0]["last_doc"]
lastdate = ldoc["date"].replace(tzinfo=utctz)
end_delta = round((end - lastdate.astimezone(timezone)).total_seconds())
rows = []
active_vs_idle = {"Active": 0, "Idle": 0}
for result in results:
if bfirst:
if result["_id"] == bfirst["workspace"]:
# Safety: ensure start_delta doesn't exceed total
adjustment = min(start_delta, result["total"])
result["total"] -= adjustment
if end < now():
if result["_id"] == ldoc["workspace"]:
# Safety: ensure we don't subtract more than the total
adjustment = ldoc["delta"] - end_delta
safe_adjustment = min(adjustment, result["total"])
result["total"] -= safe_adjustment
for result in results:
if result["total"] > 0:
rows.append(
{"ws": result["_id"], "total": convert_seconds(result["total"])}
)
if result["_id"] in ["Think", "Plan", "Work"]:
active_vs_idle["Active"] += result["total"]
if result["_id"] in ["Away", "Other"]:
active_vs_idle["Idle"] += result["total"]
order = ["Plan", "Think", "Work", "Other", "Away", "Active", "Idle"]
rows = sorted(rows, key=lambda x: order.index(x["ws"]))
for k, v in active_vs_idle.items():
rows.append({"ws": k, "total": convert_seconds(v)})
return rows
# print(
# get_period_totals(
# datetime.today().replace(hour=0, minute=0, second=0, tzinfo=timezone)
# - timedelta(days=1),
# datetime.today().replace(hour=23, minute=59, second=59, tzinfo=timezone)
# - timedelta(days=1),
# # "ffbe198e",
# )
# )
# print(
# get_period_totals(
# datetime.today().replace(hour=0, minute=0, second=0, tzinfo=timezone),
# datetime.today().replace(hour=23, minute=59, second=59, tzinfo=timezone),
# "5fc751ec",
# )
# )