Files
deskmeter/dmapp/dmweb/get_period_times.py
2025-05-08 22:58:56 -03:00

242 lines
7.2 KiB
Python

from collections import Counter, defaultdict
from datetime import datetime, timedelta
from pprint import pprint
from pymongo import MongoClient
from zoneinfo import ZoneInfo
timezone = ZoneInfo("America/Argentina/Buenos_Aires")
utctz = ZoneInfo("UTC")
client = MongoClient()
db = client.deskmeter
switches = db.switch
task_file = "/home/mariano/LETRAS/org/task/main"
def task_or_none(task=None):
if not task:
task = read_and_extract(task_file)
if task == "all":
task = None
return task
def now():
return datetime.now(timezone)
def convert_seconds(seconds, use_days=False):
days = seconds // 86400
hours = (seconds % 86400) // 3600
minutes = (seconds % 3600) // 60
remaining_seconds = seconds % 60
if use_days:
return "{} days, {:02d}:{:02d}:{:02d}".format(
days, hours, minutes, remaining_seconds
)
return "{:02d}:{:02d}:{:02d}".format(hours + days * 24, minutes, remaining_seconds)
def extract(line):
if line.rstrip().endswith("*"):
pipe_index = line.find("|")
if pipe_index != -1 and len(line) > pipe_index + 8:
value = line[pipe_index + 1 : pipe_index + 9]
return value
return None
def read_and_extract(file_path):
with open(file_path, "r") as file:
for line in file:
value = extract(line)
if value:
return value
return None
def get_period_totals(start, end, task=None):
task_query = {"$in": task.split(",")} if task else {}
match_query = {"date": {"$gte": start, "$lte": end}}
if task_query:
match_query["task"] = task_query
pipeline = [
{"$match": match_query},
{"$sort": {"date": 1}},
{
"$group": {
"_id": None,
"documents_in_range": {"$push": "$$ROOT"},
"first_doc": {"$first": "$$ROOT"},
"last_doc": {"$last": "$$ROOT"},
}
},
{
"$lookup": {
"from": "switch",
"let": {"first_date": "$first_doc.date", "task": "$first_doc.task"},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$lt": ["$date", "$$first_date"]
}, # Only before the first date
{
"$eq": ["$task", "$$task"]
}, # Must have the same task
]
}
}
},
{"$sort": {"date": -1}}, # Get the most recent (closest) document
{"$limit": 1}, # Only the immediate previous document
],
"as": "before_first",
}
},
{
"$project": {
"documents": {
"$concatArrays": [
{"$ifNull": ["$before_first", []]}, # Add only if found
"$documents_in_range",
]
}
}
},
{"$unwind": "$documents"},
{"$replaceRoot": {"newRoot": "$documents"}},
{
"$group": {
"_id": "$workspace",
"total": {"$sum": "$delta"},
}
},
]
results = list(switches.aggregate(pipeline))
if not results:
return [{"ws": "No Data", "total": ""}]
pipeline_before_after = [
# Match documents within the date range
{"$match": match_query},
{"$sort": {"date": 1}},
{
"$group": {
"_id": None,
"first_doc": {"$first": "$$ROOT"},
"last_doc": {"$last": "$$ROOT"},
}
},
# Lookup to get one document before the first document in the range
{
"$lookup": {
"from": "switch",
"let": {"first_date": "$first_doc.date", "task": "$first_doc.task"},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$lt": ["$date", "$$first_date"]
}, # Only before the first date
{
"$eq": ["$task", "$$task"]
}, # Must have the same task
]
}
}
},
{"$sort": {"date": -1}}, # Get the most recent (closest) document
{"$limit": 1}, # Only the immediate previous document
],
"as": "before_first",
}
},
{
"$project": {
"before_first": {
"$ifNull": [{"$arrayElemAt": ["$before_first", 0]}, ""]
},
"last_doc": "$last_doc", # Include the last_doc from the matched period
}
},
]
aux_results = list(switches.aggregate(pipeline_before_after))
print(aux_results)
bfirst = aux_results[0]["before_first"]
if bfirst:
bfdate = bfirst["date"].replace(tzinfo=utctz)
start_delta = round((start - bfdate.astimezone(timezone)).total_seconds())
ldoc = aux_results[0]["last_doc"]
lastdate = ldoc["date"].replace(tzinfo=utctz)
end_delta = round((end - lastdate.astimezone(timezone)).total_seconds())
rows = []
active_vs_idle = {"Active": 0, "Idle": 0}
for result in results:
if bfirst:
if result["_id"] == bfirst["workspace"]:
result["total"] -= start_delta
if end < now():
if result["_id"] == ldoc["workspace"]:
result["total"] -= ldoc["delta"] - end_delta
for result in results:
if result["total"] > 0:
rows.append(
{"ws": result["_id"], "total": convert_seconds(result["total"])}
)
if result["_id"] in ["Think", "Plan", "Work"]:
active_vs_idle["Active"] += result["total"]
if result["_id"] in ["Away", "Other"]:
active_vs_idle["Idle"] += result["total"]
order = ["Plan", "Think", "Work", "Other", "Away", "Active", "Idle"]
rows = sorted(rows, key=lambda x: order.index(x["ws"]))
for k, v in active_vs_idle.items():
rows.append({"ws": k, "total": convert_seconds(v)})
return rows
# print(
# get_period_totals(
# datetime.today().replace(hour=0, minute=0, second=0, tzinfo=timezone)
# - timedelta(days=1),
# datetime.today().replace(hour=23, minute=59, second=59, tzinfo=timezone)
# - timedelta(days=1),
# # "ffbe198e",
# )
# )
# print(
# get_period_totals(
# datetime.today().replace(hour=0, minute=0, second=0, tzinfo=timezone),
# datetime.today().replace(hour=23, minute=59, second=59, tzinfo=timezone),
# "5fc751ec",
# )
# )