165 lines
5.6 KiB
Python
165 lines
5.6 KiB
Python
"""
|
|
Pure HTTP Contract Tests - Base Class
|
|
|
|
Framework-agnostic: works against ANY backend implementation.
|
|
Does NOT manage database - expects a ready environment.
|
|
|
|
Requirements:
|
|
- Server running at CONTRACT_TEST_URL
|
|
- Database migrated and seeded
|
|
- Test user exists OR CONTRACT_TEST_TOKEN provided
|
|
|
|
Usage:
|
|
CONTRACT_TEST_URL=http://127.0.0.1:8000 pytest
|
|
CONTRACT_TEST_TOKEN=your_jwt_token pytest
|
|
"""
|
|
|
|
import os
|
|
import unittest
|
|
import httpx
|
|
|
|
from .endpoints import Endpoints
|
|
|
|
|
|
def get_base_url():
|
|
"""Get base URL from environment (required)"""
|
|
url = os.environ.get("CONTRACT_TEST_URL", "")
|
|
if not url:
|
|
raise ValueError("CONTRACT_TEST_URL environment variable required")
|
|
return url.rstrip("/")
|
|
|
|
|
|
class ContractTestCase(unittest.TestCase):
|
|
"""
|
|
Base class for pure HTTP contract tests.
|
|
|
|
Features:
|
|
- Framework-agnostic (works with Django, FastAPI, Node, etc.)
|
|
- Pure HTTP via requests library
|
|
- No database access - all data through API
|
|
- JWT authentication
|
|
"""
|
|
|
|
# Auth credentials - override via environment
|
|
TEST_USER_EMAIL = os.environ.get("CONTRACT_TEST_USER", "contract_test@example.com")
|
|
TEST_USER_PASSWORD = os.environ.get("CONTRACT_TEST_PASSWORD", "testpass123")
|
|
|
|
# Class-level cache
|
|
_base_url = None
|
|
_token = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Set up once per test class"""
|
|
super().setUpClass()
|
|
cls._base_url = get_base_url()
|
|
|
|
# Use provided token or fetch one
|
|
cls._token = os.environ.get("CONTRACT_TEST_TOKEN", "")
|
|
if not cls._token:
|
|
cls._token = cls._fetch_token()
|
|
|
|
@classmethod
|
|
def _fetch_token(cls):
|
|
"""Get JWT token for authentication"""
|
|
url = f"{cls._base_url}{Endpoints.TOKEN}"
|
|
try:
|
|
response = httpx.post(url, json={
|
|
"username": cls.TEST_USER_EMAIL,
|
|
"password": cls.TEST_USER_PASSWORD,
|
|
}, timeout=10)
|
|
if response.status_code == 200:
|
|
return response.json().get("access", "")
|
|
else:
|
|
print(f"Warning: Token request failed with {response.status_code}")
|
|
except httpx.RequestError as e:
|
|
print(f"Warning: Token request failed: {e}")
|
|
return ""
|
|
|
|
@property
|
|
def base_url(self):
|
|
return self._base_url
|
|
|
|
@property
|
|
def token(self):
|
|
return self._token
|
|
|
|
def _auth_headers(self):
|
|
"""Get authorization headers"""
|
|
if self.token:
|
|
return {"Authorization": f"Bearer {self.token}"}
|
|
return {}
|
|
|
|
# =========================================================================
|
|
# HTTP helpers
|
|
# =========================================================================
|
|
|
|
def get(self, path: str, params: dict = None, **kwargs):
|
|
"""GET request"""
|
|
url = f"{self.base_url}{path}"
|
|
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
|
|
response = httpx.get(url, params=params, headers=headers, timeout=30, **kwargs)
|
|
return self._wrap_response(response)
|
|
|
|
def post(self, path: str, data: dict = None, **kwargs):
|
|
"""POST request with JSON"""
|
|
url = f"{self.base_url}{path}"
|
|
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
|
|
response = httpx.post(url, json=data, headers=headers, timeout=30, **kwargs)
|
|
return self._wrap_response(response)
|
|
|
|
def put(self, path: str, data: dict = None, **kwargs):
|
|
"""PUT request with JSON"""
|
|
url = f"{self.base_url}{path}"
|
|
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
|
|
response = httpx.put(url, json=data, headers=headers, timeout=30, **kwargs)
|
|
return self._wrap_response(response)
|
|
|
|
def patch(self, path: str, data: dict = None, **kwargs):
|
|
"""PATCH request with JSON"""
|
|
url = f"{self.base_url}{path}"
|
|
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
|
|
response = httpx.patch(url, json=data, headers=headers, timeout=30, **kwargs)
|
|
return self._wrap_response(response)
|
|
|
|
def delete(self, path: str, **kwargs):
|
|
"""DELETE request"""
|
|
url = f"{self.base_url}{path}"
|
|
headers = {**self._auth_headers(), **kwargs.pop("headers", {})}
|
|
response = httpx.delete(url, headers=headers, timeout=30, **kwargs)
|
|
return self._wrap_response(response)
|
|
|
|
def _wrap_response(self, response):
|
|
"""Add .data attribute for consistency with DRF responses"""
|
|
try:
|
|
response.data = response.json()
|
|
except Exception:
|
|
response.data = None
|
|
return response
|
|
|
|
# =========================================================================
|
|
# Assertion helpers
|
|
# =========================================================================
|
|
|
|
def assert_status(self, response, expected_status: int):
|
|
"""Assert response has expected status code"""
|
|
self.assertEqual(
|
|
response.status_code,
|
|
expected_status,
|
|
f"Expected {expected_status}, got {response.status_code}. "
|
|
f"Response: {response.data if hasattr(response, 'data') else response.content[:500]}"
|
|
)
|
|
|
|
def assert_has_fields(self, data: dict, *fields: str):
|
|
"""Assert dictionary has all specified fields"""
|
|
missing = [f for f in fields if f not in data]
|
|
self.assertEqual(missing, [], f"Missing fields: {missing}. Got: {list(data.keys())}")
|
|
|
|
def assert_is_list(self, data, min_length: int = 0):
|
|
"""Assert data is a list with minimum length"""
|
|
self.assertIsInstance(data, list)
|
|
self.assertGreaterEqual(len(data), min_length)
|
|
|
|
|
|
__all__ = ["ContractTestCase"]
|