99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
"""
|
|
ResultCollector — reassembles chunk results in sequence order using a min-heap.
|
|
|
|
Demonstrates:
|
|
- Algorithms and sorting (Interview Topic 6) — heapq for ordered reassembly
|
|
- Core data structures (Interview Topic 5) — heap, deque
|
|
"""
|
|
|
|
import heapq
|
|
from collections import deque
|
|
from typing import List
|
|
|
|
from .exceptions import ReassemblyError
|
|
from .models import ChunkResult
|
|
|
|
|
|
class ResultCollector:
|
|
"""
|
|
Receives ChunkResults out of order, emits them in sequence order.
|
|
|
|
Uses a min-heap keyed on sequence number. Only emits a chunk when
|
|
all prior sequences have been accounted for.
|
|
|
|
Args:
|
|
total_chunks: Expected total number of chunks
|
|
"""
|
|
|
|
def __init__(self, total_chunks: int):
|
|
self.total_chunks = total_chunks
|
|
self._heap: List[tuple[int, ChunkResult]] = []
|
|
self._next_sequence = 0
|
|
self._emitted: List[ChunkResult] = []
|
|
self._seen_sequences: set[int] = set()
|
|
# Sliding window for throughput calculation
|
|
self._recent_times: deque[float] = deque(maxlen=50)
|
|
|
|
def add(self, result: ChunkResult) -> List[ChunkResult]:
|
|
"""
|
|
Add a result and return any newly emittable results in order.
|
|
|
|
Args:
|
|
result: A ChunkResult (may arrive out of order)
|
|
|
|
Returns:
|
|
List of results that can now be emitted in sequence order
|
|
(may be empty if we're still waiting for earlier sequences)
|
|
|
|
Raises:
|
|
ReassemblyError: If a duplicate sequence is received
|
|
"""
|
|
if result.sequence in self._seen_sequences:
|
|
raise ReassemblyError(
|
|
f"Duplicate sequence number: {result.sequence}"
|
|
)
|
|
self._seen_sequences.add(result.sequence)
|
|
|
|
# Track processing time for throughput
|
|
if result.processing_time > 0:
|
|
self._recent_times.append(result.processing_time)
|
|
|
|
# Push to min-heap
|
|
heapq.heappush(self._heap, (result.sequence, result))
|
|
|
|
# Emit all consecutive results starting from _next_sequence
|
|
newly_emitted = []
|
|
while self._heap and self._heap[0][0] == self._next_sequence:
|
|
_, emitted_result = heapq.heappop(self._heap)
|
|
self._emitted.append(emitted_result)
|
|
newly_emitted.append(emitted_result)
|
|
self._next_sequence += 1
|
|
|
|
return newly_emitted
|
|
|
|
@property
|
|
def is_complete(self) -> bool:
|
|
"""True if all expected chunks have been emitted in order."""
|
|
return self._next_sequence == self.total_chunks
|
|
|
|
@property
|
|
def buffered_count(self) -> int:
|
|
"""Number of results waiting in the heap (arrived out of order)."""
|
|
return len(self._heap)
|
|
|
|
@property
|
|
def emitted_count(self) -> int:
|
|
"""Number of results emitted in sequence order."""
|
|
return len(self._emitted)
|
|
|
|
@property
|
|
def avg_processing_time(self) -> float:
|
|
"""Average processing time from recent results (sliding window)."""
|
|
if not self._recent_times:
|
|
return 0.0
|
|
return sum(self._recent_times) / len(self._recent_times)
|
|
|
|
def get_ordered_results(self) -> List[ChunkResult]:
|
|
"""Get all emitted results in sequence order."""
|
|
return list(self._emitted)
|