""" Pure HTTP Contract Tests - Base Class Framework-agnostic: works against ANY backend implementation. Does NOT manage database - expects a ready environment. Requirements: - Server running at CONTRACT_TEST_URL - Database migrated and seeded - Test user exists OR CONTRACT_TEST_TOKEN provided Usage: CONTRACT_TEST_URL=http://127.0.0.1:8000 pytest CONTRACT_TEST_TOKEN=your_jwt_token pytest """ import os import unittest import httpx from .endpoints import Endpoints def get_base_url(): """Get base URL from environment (required)""" url = os.environ.get("CONTRACT_TEST_URL", "") if not url: raise ValueError("CONTRACT_TEST_URL environment variable required") return url.rstrip("/") class ContractTestCase(unittest.TestCase): """ Base class for pure HTTP contract tests. Features: - Framework-agnostic (works with Django, FastAPI, Node, etc.) - Pure HTTP via requests library - No database access - all data through API - JWT authentication """ # Auth credentials - override via environment TEST_USER_EMAIL = os.environ.get("CONTRACT_TEST_USER", "contract_test@example.com") TEST_USER_PASSWORD = os.environ.get("CONTRACT_TEST_PASSWORD", "testpass123") # Class-level cache _base_url = None _token = None @classmethod def setUpClass(cls): """Set up once per test class""" super().setUpClass() cls._base_url = get_base_url() # Use provided token or fetch one cls._token = os.environ.get("CONTRACT_TEST_TOKEN", "") if not cls._token: cls._token = cls._fetch_token() @classmethod def _fetch_token(cls): """Get JWT token for authentication""" url = f"{cls._base_url}{Endpoints.TOKEN}" try: response = httpx.post(url, json={ "username": cls.TEST_USER_EMAIL, "password": cls.TEST_USER_PASSWORD, }, timeout=10) if response.status_code == 200: return response.json().get("access", "") else: print(f"Warning: Token request failed with {response.status_code}") except httpx.RequestError as e: print(f"Warning: Token request failed: {e}") return "" @property def base_url(self): return self._base_url @property def token(self): return self._token def _auth_headers(self): """Get authorization headers""" if self.token: return {"Authorization": f"Bearer {self.token}"} return {} # ========================================================================= # HTTP helpers # ========================================================================= def get(self, path: str, params: dict = None, **kwargs): """GET request""" url = f"{self.base_url}{path}" headers = {**self._auth_headers(), **kwargs.pop("headers", {})} response = httpx.get(url, params=params, headers=headers, timeout=30, **kwargs) return self._wrap_response(response) def post(self, path: str, data: dict = None, **kwargs): """POST request with JSON""" url = f"{self.base_url}{path}" headers = {**self._auth_headers(), **kwargs.pop("headers", {})} response = httpx.post(url, json=data, headers=headers, timeout=30, **kwargs) return self._wrap_response(response) def put(self, path: str, data: dict = None, **kwargs): """PUT request with JSON""" url = f"{self.base_url}{path}" headers = {**self._auth_headers(), **kwargs.pop("headers", {})} response = httpx.put(url, json=data, headers=headers, timeout=30, **kwargs) return self._wrap_response(response) def patch(self, path: str, data: dict = None, **kwargs): """PATCH request with JSON""" url = f"{self.base_url}{path}" headers = {**self._auth_headers(), **kwargs.pop("headers", {})} response = httpx.patch(url, json=data, headers=headers, timeout=30, **kwargs) return self._wrap_response(response) def delete(self, path: str, **kwargs): """DELETE request""" url = f"{self.base_url}{path}" headers = {**self._auth_headers(), **kwargs.pop("headers", {})} response = httpx.delete(url, headers=headers, timeout=30, **kwargs) return self._wrap_response(response) def _wrap_response(self, response): """Add .data attribute for consistency with DRF responses""" try: response.data = response.json() except Exception: response.data = None return response # ========================================================================= # Assertion helpers # ========================================================================= def assert_status(self, response, expected_status: int): """Assert response has expected status code""" self.assertEqual( response.status_code, expected_status, f"Expected {expected_status}, got {response.status_code}. " f"Response: {response.data if hasattr(response, 'data') else response.content[:500]}" ) def assert_has_fields(self, data: dict, *fields: str): """Assert dictionary has all specified fields""" missing = [f for f in fields if f not in data] self.assertEqual(missing, [], f"Missing fields: {missing}. Got: {list(data.keys())}") def assert_is_list(self, data, min_length: int = 0): """Assert data is a list with minimum length""" self.assertIsInstance(data, list) self.assertGreaterEqual(len(data), min_length) __all__ = ["ContractTestCase"]