major fm updates

This commit is contained in:
buenosairesam
2025-12-19 22:47:38 -03:00
parent 0dde9f1f54
commit 23b4341842
7 changed files with 197 additions and 116 deletions

View File

@@ -238,85 +238,99 @@ def get_work_period_totals(start, end):
return combined_rows
def get_task_blocks_calendar(start, end, task=None, min_block_seconds=300):
def get_task_blocks_calendar(start, end, task=None, min_block_seconds=300, grid_hours=1):
"""
Get task blocks for calendar-style visualization.
Groups consecutive switches to the same task into blocks, tracking active/idle time.
Get task blocks for calendar-style visualization, aggregated by time grid.
Shows all tasks worked on during each grid period, with overlapping blocks.
Each task block's height is proportional to time spent in that grid period.
Args:
grid_hours: Grid size in hours (1, 3, or 6)
Returns list of blocks:
[{
'task_id': str,
'task_path': str,
'start': datetime,
'end': datetime,
'duration': int (total seconds),
'active_seconds': int (Plan/Think/Work time),
'idle_seconds': int (Other/Away time),
'active_ratio': float (0.0 to 1.0)
'start': datetime (start of grid period),
'end': datetime (end of grid period or actual end time if less),
'duration': int (seconds in this grid block),
'hour': int (hour of grid start, 0-23),
'active_seconds': int,
'active_ratio': float (always 1.0)
}, ...]
"""
task_query = {"$in": task.split(",")} if task else {}
match_query = {"date": {"$gte": start, "$lte": end}}
match_query = {
"date": {"$gte": start, "$lte": end},
"workspace": {"$in": ["Plan", "Think", "Work"]} # Only active workspaces
}
if task_query:
match_query["task"] = task_query
# Get all switches in period, sorted by date
# Get all active switches in period
raw_switches = list(switches.find(match_query).sort("date", 1))
if not raw_switches:
return []
blocks = []
current_block = None
# Aggregate by grid period and task
# Structure: {(date, grid_start_hour, task_id): total_seconds}
grid_task_time = defaultdict(lambda: {"duration": 0, "task_path": None})
for switch in raw_switches:
ws = switch["workspace"]
task_id = switch.get("task")
switch_start = switch["date"].replace(tzinfo=utctz).astimezone(timezone)
switch_duration = switch["delta"]
switch_end = switch_start + timedelta(seconds=switch_duration)
is_active = ws in ["Plan", "Think", "Work"]
# Calculate how much time falls in each grid period this switch spans
current_time = switch_start
remaining_duration = switch_duration
# Start new block if task changed
if current_block is None or current_block["task_id"] != task_id:
if current_block is not None:
blocks.append(current_block)
while remaining_duration > 0 and current_time < switch_end:
# Calculate grid period start (hour rounded down to grid_hours)
grid_hour = (current_time.hour // grid_hours) * grid_hours
grid_start = current_time.replace(hour=grid_hour, minute=0, second=0, microsecond=0)
grid_end = grid_start + timedelta(hours=grid_hours)
# Get task path with history fallback
task_path = get_task_path(task_id) or "No Task"
# Time in this grid period
time_in_grid = min(
(grid_end - current_time).total_seconds(),
remaining_duration
)
current_block = {
key = (current_time.date(), grid_hour, task_id)
# Get task path (cache it)
if grid_task_time[key]["task_path"] is None:
task_path = get_task_path(task_id) or "No Task"
grid_task_time[key]["task_path"] = task_path
grid_task_time[key]["duration"] += time_in_grid
remaining_duration -= time_in_grid
current_time = grid_end
# Convert to blocks
blocks = []
for (date, grid_hour, task_id), data in grid_task_time.items():
if data["duration"] >= min_block_seconds:
grid_start = datetime(date.year, date.month, date.day, grid_hour, 0, 0, tzinfo=timezone)
blocks.append({
"task_id": task_id,
"task_path": task_path,
"start": switch_start,
"end": switch_end,
"duration": switch_duration,
"active_seconds": switch_duration if is_active else 0,
"idle_seconds": 0 if is_active else switch_duration
}
else:
# Extend current block
current_block["end"] = switch_end
current_block["duration"] += switch_duration
if is_active:
current_block["active_seconds"] += switch_duration
else:
current_block["idle_seconds"] += switch_duration
"task_path": data["task_path"],
"start": grid_start,
"end": grid_start + timedelta(seconds=data["duration"]),
"hour": grid_hour,
"duration": int(data["duration"]),
"active_seconds": int(data["duration"]),
"idle_seconds": 0,
"active_ratio": 1.0
})
# Add final block
if current_block is not None:
blocks.append(current_block)
# Filter out very short blocks and calculate active ratio
filtered_blocks = []
for block in blocks:
if block["duration"] >= min_block_seconds:
block["active_ratio"] = block["active_seconds"] / block["duration"] if block["duration"] > 0 else 0
filtered_blocks.append(block)
return filtered_blocks
return sorted(blocks, key=lambda x: (x["start"], x["task_path"]))
def get_raw_switches(start, end, task=None):