soleprint init commit

This commit is contained in:
buenosairesam
2025-12-24 05:38:37 -03:00
commit 329c401ff5
96 changed files with 11564 additions and 0 deletions

View File

@@ -0,0 +1,119 @@
# Playwright Test Integration
Frontend test support for ward/tools/tester.
## Features
- Discover Playwright tests (.spec.ts files)
- Execute tests with Playwright runner
- Capture video recordings and screenshots
- Stream artifacts via API endpoints
- Inline video/screenshot playback in test results
## Directory Structure
```
ward/tools/tester/
├── playwright/
│ ├── discovery.py # Find .spec.ts tests
│ ├── runner.py # Execute Playwright tests
│ └── artifacts.py # Store and serve artifacts
├── frontend-tests/ # Synced Playwright tests (gitignored)
└── artifacts/ # Test artifacts (gitignored)
├── videos/
├── screenshots/
└── traces/
```
## Test Metadata Format
Add Gherkin metadata to Playwright tests via JSDoc comments:
```typescript
/**
* Feature: Reservar turno veterinario
* Scenario: Verificar cobertura en zona disponible
* Tags: @smoke @coverage @frontend
* @description Coverage check shows message for valid address
*/
test('coverage check shows message for valid address', async ({ page }) => {
await page.goto('http://localhost:3000/turnero');
await page.fill('[name="address"]', 'Av Santa Fe 1234, CABA');
await page.click('button:has-text("Verificar")');
await expect(page.locator('.coverage-message')).toContainText('Tenemos cobertura');
});
```
## Playwright Configuration
Tests should use playwright.config.ts with video/screenshot capture:
```typescript
import { defineConfig } from '@playwright/test';
export default defineConfig({
use: {
// Capture video on failure
video: 'retain-on-failure',
// Capture screenshot on failure
screenshot: 'only-on-failure',
},
// Output directory for artifacts
outputDir: './test-results',
reporter: [
['json', { outputFile: 'results.json' }],
['html'],
],
});
```
## API Endpoints
### Stream Artifact
```
GET /tools/tester/api/artifact/{run_id}/{filename}
```
Returns video/screenshot file for inline playback.
### List Artifacts
```
GET /tools/tester/api/artifacts/{run_id}
```
Returns JSON list of all artifacts for a test run.
## Artifact Display
Videos and screenshots are displayed inline in test results:
**Video:**
```html
<video controls>
<source src="/tools/tester/api/artifact/{run_id}/test-video.webm" type="video/webm">
</video>
```
**Screenshot:**
```html
<img src="/tools/tester/api/artifact/{run_id}/screenshot.png">
```
## Integration with Test Runner
Playwright tests are discovered alongside backend tests and can be:
- Run individually or in batches
- Filtered by Gherkin metadata (feature, scenario, tags)
- Filtered by pulse variables (role, stage, state)
## Future Enhancements
- Playwright trace viewer integration
- Test parallelization
- Browser selection (chromium, firefox, webkit)
- Mobile device emulation
- Network throttling
- Test retry logic

View File

@@ -0,0 +1 @@
"""Playwright test support for tester."""

View File

@@ -0,0 +1,178 @@
"""
Artifact storage and retrieval for test results.
"""
import shutil
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
@dataclass
class TestArtifact:
"""Test artifact (video, screenshot, trace, etc.)."""
type: str # "video", "screenshot", "trace", "log"
filename: str
path: str
size: int
mimetype: str
url: str # Streaming endpoint
class ArtifactStore:
"""Manage test artifacts."""
def __init__(self, artifacts_dir: Path):
self.artifacts_dir = artifacts_dir
self.videos_dir = artifacts_dir / "videos"
self.screenshots_dir = artifacts_dir / "screenshots"
self.traces_dir = artifacts_dir / "traces"
# Ensure directories exist
self.videos_dir.mkdir(parents=True, exist_ok=True)
self.screenshots_dir.mkdir(parents=True, exist_ok=True)
self.traces_dir.mkdir(parents=True, exist_ok=True)
def store_artifact(
self,
source_path: Path,
run_id: str,
artifact_type: str
) -> Optional[TestArtifact]:
"""
Store an artifact and return its metadata.
Args:
source_path: Path to the source file
run_id: Test run ID
artifact_type: Type of artifact (video, screenshot, trace)
Returns:
TestArtifact metadata or None if storage fails
"""
if not source_path.exists():
return None
# Determine destination directory
if artifact_type == "video":
dest_dir = self.videos_dir
mimetype = "video/webm"
elif artifact_type == "screenshot":
dest_dir = self.screenshots_dir
mimetype = "image/png"
elif artifact_type == "trace":
dest_dir = self.traces_dir
mimetype = "application/zip"
else:
# Unknown type, store in root artifacts dir
dest_dir = self.artifacts_dir
mimetype = "application/octet-stream"
# Create run-specific subdirectory
run_dir = dest_dir / run_id
run_dir.mkdir(parents=True, exist_ok=True)
# Copy file
dest_path = run_dir / source_path.name
try:
shutil.copy2(source_path, dest_path)
except Exception:
return None
# Build streaming URL
url = f"/tools/tester/api/artifact/{run_id}/{source_path.name}"
return TestArtifact(
type=artifact_type,
filename=source_path.name,
path=str(dest_path),
size=dest_path.stat().st_size,
mimetype=mimetype,
url=url,
)
def get_artifact(self, run_id: str, filename: str) -> Optional[Path]:
"""
Retrieve an artifact file.
Args:
run_id: Test run ID
filename: Artifact filename
Returns:
Path to artifact file or None if not found
"""
# Search in all artifact directories
for artifact_dir in [self.videos_dir, self.screenshots_dir, self.traces_dir]:
artifact_path = artifact_dir / run_id / filename
if artifact_path.exists():
return artifact_path
# Check root artifacts dir
artifact_path = self.artifacts_dir / run_id / filename
if artifact_path.exists():
return artifact_path
return None
def list_artifacts(self, run_id: str) -> list[TestArtifact]:
"""
List all artifacts for a test run.
Args:
run_id: Test run ID
Returns:
List of TestArtifact metadata
"""
artifacts = []
# Search in all artifact directories
type_mapping = {
self.videos_dir: ("video", "video/webm"),
self.screenshots_dir: ("screenshot", "image/png"),
self.traces_dir: ("trace", "application/zip"),
}
for artifact_dir, (artifact_type, mimetype) in type_mapping.items():
run_dir = artifact_dir / run_id
if not run_dir.exists():
continue
for artifact_file in run_dir.iterdir():
if artifact_file.is_file():
artifacts.append(TestArtifact(
type=artifact_type,
filename=artifact_file.name,
path=str(artifact_file),
size=artifact_file.stat().st_size,
mimetype=mimetype,
url=f"/tools/tester/api/artifact/{run_id}/{artifact_file.name}",
))
return artifacts
def cleanup_old_artifacts(self, keep_recent: int = 10):
"""
Clean up old artifact directories, keeping only the most recent runs.
Args:
keep_recent: Number of recent runs to keep
"""
# Get all run directories sorted by modification time
all_runs = []
for artifact_dir in [self.videos_dir, self.screenshots_dir, self.traces_dir]:
for run_dir in artifact_dir.iterdir():
if run_dir.is_dir():
all_runs.append(run_dir)
# Sort by modification time (newest first)
all_runs.sort(key=lambda p: p.stat().st_mtime, reverse=True)
# Keep only the most recent
for old_run in all_runs[keep_recent:]:
try:
shutil.rmtree(old_run)
except Exception:
pass # Ignore errors during cleanup

View File

@@ -0,0 +1,153 @@
"""
Discover Playwright tests (.spec.ts files).
"""
import re
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
@dataclass
class PlaywrightTestInfo:
"""Information about a discovered Playwright test."""
id: str
name: str
file_path: str
test_name: str
description: Optional[str] = None
gherkin_feature: Optional[str] = None
gherkin_scenario: Optional[str] = None
tags: list[str] = None
def __post_init__(self):
if self.tags is None:
self.tags = []
def discover_playwright_tests(tests_dir: Path) -> list[PlaywrightTestInfo]:
"""
Discover all Playwright tests in the frontend-tests directory.
Parses .spec.ts files to extract:
- test() calls
- describe() blocks
- Gherkin metadata from comments
- Tags from comments
"""
if not tests_dir.exists():
return []
tests = []
# Find all .spec.ts files
for spec_file in tests_dir.rglob("*.spec.ts"):
relative_path = spec_file.relative_to(tests_dir)
# Read file content
try:
content = spec_file.read_text()
except Exception:
continue
# Extract describe blocks and tests
tests_in_file = _parse_playwright_file(content, spec_file, relative_path)
tests.extend(tests_in_file)
return tests
def _parse_playwright_file(
content: str,
file_path: Path,
relative_path: Path
) -> list[PlaywrightTestInfo]:
"""Parse a Playwright test file to extract test information."""
tests = []
# Pattern to match test() calls
# test('test name', async ({ page }) => { ... })
# test.only('test name', ...)
test_pattern = re.compile(
r"test(?:\.\w+)?\s*\(\s*['\"]([^'\"]+)['\"]",
re.MULTILINE
)
# Pattern to match describe() blocks
describe_pattern = re.compile(
r"describe\s*\(\s*['\"]([^'\"]+)['\"]",
re.MULTILINE
)
# Extract metadata from comments above tests
# Looking for JSDoc-style comments with metadata
metadata_pattern = re.compile(
r"/\*\*\s*\n((?:\s*\*.*\n)+)\s*\*/\s*\n\s*test",
re.MULTILINE
)
# Find all describe blocks to use as context
describes = describe_pattern.findall(content)
describe_context = describes[0] if describes else None
# Find all tests
for match in test_pattern.finditer(content):
test_name = match.group(1)
# Look for metadata comment before this test
# Search backwards from the match position
before_test = content[:match.start()]
metadata_match = None
for m in metadata_pattern.finditer(before_test):
metadata_match = m
# Parse metadata if found
gherkin_feature = None
gherkin_scenario = None
tags = []
description = None
if metadata_match:
metadata_block = metadata_match.group(1)
# Extract Feature, Scenario, Tags from metadata
feature_match = re.search(r"\*\s*Feature:\s*(.+)", metadata_block)
scenario_match = re.search(r"\*\s*Scenario:\s*(.+)", metadata_block)
tags_match = re.search(r"\*\s*Tags:\s*(.+)", metadata_block)
desc_match = re.search(r"\*\s*@description\s+(.+)", metadata_block)
if feature_match:
gherkin_feature = feature_match.group(1).strip()
if scenario_match:
gherkin_scenario = scenario_match.group(1).strip()
if tags_match:
tags_str = tags_match.group(1).strip()
tags = [t.strip() for t in re.findall(r"@[\w-]+", tags_str)]
if desc_match:
description = desc_match.group(1).strip()
# Build test ID
module_name = str(relative_path).replace("/", ".").replace(".spec.ts", "")
test_id = f"frontend.{module_name}.{_sanitize_test_name(test_name)}"
tests.append(PlaywrightTestInfo(
id=test_id,
name=test_name,
file_path=str(relative_path),
test_name=test_name,
description=description or test_name,
gherkin_feature=gherkin_feature,
gherkin_scenario=gherkin_scenario,
tags=tags,
))
return tests
def _sanitize_test_name(name: str) -> str:
"""Convert test name to a valid identifier."""
# Replace spaces and special chars with underscores
sanitized = re.sub(r"[^\w]+", "_", name.lower())
# Remove leading/trailing underscores
sanitized = sanitized.strip("_")
return sanitized

View File

@@ -0,0 +1,189 @@
"""
Execute Playwright tests and capture artifacts.
"""
import subprocess
import json
import time
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field
@dataclass
class PlaywrightResult:
"""Result of a Playwright test execution."""
test_id: str
name: str
status: str # "passed", "failed", "skipped"
duration: float
error_message: Optional[str] = None
traceback: Optional[str] = None
artifacts: list[dict] = field(default_factory=list)
class PlaywrightRunner:
"""Run Playwright tests and collect artifacts."""
def __init__(self, tests_dir: Path, artifacts_dir: Path):
self.tests_dir = tests_dir
self.artifacts_dir = artifacts_dir
self.videos_dir = artifacts_dir / "videos"
self.screenshots_dir = artifacts_dir / "screenshots"
self.traces_dir = artifacts_dir / "traces"
# Ensure artifact directories exist
self.videos_dir.mkdir(parents=True, exist_ok=True)
self.screenshots_dir.mkdir(parents=True, exist_ok=True)
self.traces_dir.mkdir(parents=True, exist_ok=True)
def run_tests(
self,
test_files: Optional[list[str]] = None,
run_id: Optional[str] = None
) -> list[PlaywrightResult]:
"""
Run Playwright tests and collect results.
Args:
test_files: List of test file paths to run (relative to tests_dir).
If None, runs all tests.
run_id: Optional run ID to namespace artifacts.
Returns:
List of PlaywrightResult objects.
"""
if not self.tests_dir.exists():
return []
# Build playwright command
cmd = ["npx", "playwright", "test"]
# Add specific test files if provided
if test_files:
cmd.extend(test_files)
# Add reporter for JSON output
results_file = self.artifacts_dir / f"results_{run_id or 'latest'}.json"
cmd.extend([
"--reporter=json",
f"--output={results_file}"
])
# Configure artifact collection
# Videos and screenshots are configured in playwright.config.ts
# We'll assume config is set to capture on failure
# Run tests
start_time = time.time()
try:
result = subprocess.run(
cmd,
cwd=self.tests_dir,
capture_output=True,
text=True,
timeout=600 # 10 minute timeout
)
# Parse results
if results_file.exists():
with open(results_file) as f:
results_data = json.load(f)
return self._parse_results(results_data, run_id)
else:
# No results file - likely error
return self._create_error_result(result.stderr)
except subprocess.TimeoutExpired:
return self._create_error_result("Tests timed out after 10 minutes")
except Exception as e:
return self._create_error_result(str(e))
def _parse_results(
self,
results_data: dict,
run_id: Optional[str]
) -> list[PlaywrightResult]:
"""Parse Playwright JSON results."""
parsed_results = []
# Playwright JSON reporter structure:
# {
# "suites": [...],
# "tests": [...],
# }
tests = results_data.get("tests", [])
for test in tests:
test_id = test.get("testId", "unknown")
title = test.get("title", "Unknown test")
status = test.get("status", "unknown") # passed, failed, skipped
duration = test.get("duration", 0) / 1000.0 # Convert ms to seconds
error_message = None
traceback = None
# Extract error if failed
if status == "failed":
error = test.get("error", {})
error_message = error.get("message", "Test failed")
traceback = error.get("stack", "")
# Collect artifacts
artifacts = []
for attachment in test.get("attachments", []):
artifact_type = attachment.get("contentType", "")
artifact_path = attachment.get("path", "")
if artifact_path:
artifact_file = Path(artifact_path)
if artifact_file.exists():
# Determine type
if "video" in artifact_type:
type_label = "video"
elif "image" in artifact_type:
type_label = "screenshot"
elif "trace" in artifact_type:
type_label = "trace"
else:
type_label = "attachment"
artifacts.append({
"type": type_label,
"filename": artifact_file.name,
"path": str(artifact_file),
"size": artifact_file.stat().st_size,
"mimetype": artifact_type,
})
parsed_results.append(PlaywrightResult(
test_id=test_id,
name=title,
status=status,
duration=duration,
error_message=error_message,
traceback=traceback,
artifacts=artifacts,
))
return parsed_results
def _create_error_result(self, error_msg: str) -> list[PlaywrightResult]:
"""Create an error result when test execution fails."""
return [
PlaywrightResult(
test_id="playwright_error",
name="Playwright Execution Error",
status="failed",
duration=0.0,
error_message=error_msg,
traceback="",
artifacts=[],
)
]
def get_artifact_url(self, run_id: str, artifact_filename: str) -> str:
"""Generate URL for streaming an artifact."""
return f"/tools/tester/api/artifact/{run_id}/{artifact_filename}"