Files
soleprint/station/tools/tester/tests/base.py
2025-12-24 05:38:37 -03:00

165 lines
5.6 KiB
Python

"""
Pure HTTP Contract Tests - Base Class
Framework-agnostic: works against ANY backend implementation.
Does NOT manage database - expects a ready environment.
Requirements:
- Server running at CONTRACT_TEST_URL
- Database migrated and seeded
- Test user exists OR CONTRACT_TEST_TOKEN provided
Usage:
CONTRACT_TEST_URL=http://127.0.0.1:8000 pytest
CONTRACT_TEST_TOKEN=your_jwt_token pytest
"""
import os
import unittest
import httpx
from .endpoints import Endpoints
def get_base_url():
"""Get base URL from environment (required)"""
url = os.environ.get("CONTRACT_TEST_URL", "")
if not url:
raise ValueError("CONTRACT_TEST_URL environment variable required")
return url.rstrip("/")
class ContractTestCase(unittest.TestCase):
"""
Base class for pure HTTP contract tests.
Features:
- Framework-agnostic (works with Django, FastAPI, Node, etc.)
- Pure HTTP via requests library
- No database access - all data through API
- JWT authentication
"""
# Auth credentials - override via environment
TEST_USER_EMAIL = os.environ.get("CONTRACT_TEST_USER", "contract_test@example.com")
TEST_USER_PASSWORD = os.environ.get("CONTRACT_TEST_PASSWORD", "testpass123")
# Class-level cache
_base_url = None
_token = None
@classmethod
def setUpClass(cls):
"""Set up once per test class"""
super().setUpClass()
cls._base_url = get_base_url()
# Use provided token or fetch one
cls._token = os.environ.get("CONTRACT_TEST_TOKEN", "")
if not cls._token:
cls._token = cls._fetch_token()
@classmethod
def _fetch_token(cls):
"""Get JWT token for authentication"""
url = f"{cls._base_url}{Endpoints.TOKEN}"
try:
response = httpx.post(url, json={
"username": cls.TEST_USER_EMAIL,
"password": cls.TEST_USER_PASSWORD,
}, timeout=10)
if response.status_code == 200:
return response.json().get("access", "")
else:
print(f"Warning: Token request failed with {response.status_code}")
except httpx.RequestError as e:
print(f"Warning: Token request failed: {e}")
return ""
@property
def base_url(self):
return self._base_url
@property
def token(self):
return self._token
def _auth_headers(self):
"""Get authorization headers"""
if self.token:
return {"Authorization": f"Bearer {self.token}"}
return {}
# =========================================================================
# HTTP helpers
# =========================================================================
def get(self, path: str, params: dict = None, **kwargs):
"""GET request"""
url = f"{self.base_url}{path}"
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
response = httpx.get(url, params=params, headers=headers, timeout=30, **kwargs)
return self._wrap_response(response)
def post(self, path: str, data: dict = None, **kwargs):
"""POST request with JSON"""
url = f"{self.base_url}{path}"
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
response = httpx.post(url, json=data, headers=headers, timeout=30, **kwargs)
return self._wrap_response(response)
def put(self, path: str, data: dict = None, **kwargs):
"""PUT request with JSON"""
url = f"{self.base_url}{path}"
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
response = httpx.put(url, json=data, headers=headers, timeout=30, **kwargs)
return self._wrap_response(response)
def patch(self, path: str, data: dict = None, **kwargs):
"""PATCH request with JSON"""
url = f"{self.base_url}{path}"
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
response = httpx.patch(url, json=data, headers=headers, timeout=30, **kwargs)
return self._wrap_response(response)
def delete(self, path: str, **kwargs):
"""DELETE request"""
url = f"{self.base_url}{path}"
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
response = httpx.delete(url, headers=headers, timeout=30, **kwargs)
return self._wrap_response(response)
def _wrap_response(self, response):
"""Add .data attribute for consistency with DRF responses"""
try:
response.data = response.json()
except Exception:
response.data = None
return response
# =========================================================================
# Assertion helpers
# =========================================================================
def assert_status(self, response, expected_status: int):
"""Assert response has expected status code"""
self.assertEqual(
response.status_code,
expected_status,
f"Expected {expected_status}, got {response.status_code}. "
f"Response: {response.data if hasattr(response, 'data') else response.content[:500]}"
)
def assert_has_fields(self, data: dict, *fields: str):
"""Assert dictionary has all specified fields"""
missing = [f for f in fields if f not in data]
self.assertEqual(missing, [], f"Missing fields: {missing}. Got: {list(data.keys())}")
def assert_is_list(self, data, min_length: int = 0):
"""Assert data is a list with minimum length"""
self.assertIsInstance(data, list)
self.assertGreaterEqual(len(data), min_length)
__all__ = ["ContractTestCase"]