Big-Link-Man/tests/integration/test_api_integration.py

282 lines
9.1 KiB
Python

"""
Integration tests for API with real database
"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import StaticPool
from src.api.main import app, get_auth_service
from src.database.models import Base, User
from src.database.repositories import UserRepository
from src.auth.service import AuthService
@pytest.fixture(scope="function")
def test_engine():
"""Create a test database engine"""
# Use in-memory SQLite database for testing
# check_same_thread=False allows usage across threads (needed for FastAPI TestClient)
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool
)
Base.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture
def test_db(test_engine):
"""Create a test database session"""
TestSessionLocal = sessionmaker(bind=test_engine)
session = TestSessionLocal()
yield session
session.close()
@pytest.fixture
def client(test_db):
"""Create a test client with dependency injection"""
# Override the get_auth_service dependency to use test database
def override_get_auth_service():
user_repo = UserRepository(test_db)
return AuthService(user_repo)
app.dependency_overrides[get_auth_service] = override_get_auth_service
test_client = TestClient(app)
yield test_client
# Clean up
app.dependency_overrides.clear()
@pytest.fixture
def test_user(test_db):
"""Create a test user in the database"""
user_repo = UserRepository(test_db)
auth_service = AuthService(user_repo)
user = auth_service.create_user_with_hashed_password(
username="testuser",
password="testpassword",
role="User"
)
test_db.commit()
return user
@pytest.fixture
def test_admin(test_db):
"""Create a test admin user in the database"""
user_repo = UserRepository(test_db)
auth_service = AuthService(user_repo)
admin = auth_service.create_user_with_hashed_password(
username="admin",
password="adminpassword",
role="Admin"
)
test_db.commit()
return admin
class TestHealthEndpointIntegration:
"""Integration tests for health endpoint"""
def test_health_check_with_real_api(self, client):
"""Test health check with real API client"""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert data["message"] == "API is running"
class TestAuthenticationFlowIntegration:
"""Integration tests for authentication flow with database"""
def test_authenticate_with_real_user(self, client, test_user):
"""Test authentication with real user from database"""
response = client.get(
"/api/v1/me",
auth=("testuser", "testpassword")
)
assert response.status_code == 200
data = response.json()
assert data["username"] == "testuser"
assert data["role"] == "User"
def test_authenticate_with_wrong_password(self, client, test_user):
"""Test authentication fails with wrong password"""
response = client.get(
"/api/v1/me",
auth=("testuser", "wrongpassword")
)
assert response.status_code == 401
data = response.json()
assert "detail" in data
def test_authenticate_with_nonexistent_user(self, client):
"""Test authentication fails with nonexistent user"""
response = client.get(
"/api/v1/me",
auth=("nonexistent", "password")
)
assert response.status_code == 401
def test_me_endpoint_returns_correct_user_data(self, client, test_user):
"""Test /me endpoint returns correct user data from database"""
response = client.get(
"/api/v1/me",
auth=("testuser", "testpassword")
)
assert response.status_code == 200
data = response.json()
# Verify all fields
assert "id" in data
assert data["username"] == "testuser"
assert data["role"] == "User"
assert isinstance(data["id"], int)
class TestAdminEndpointsIntegration:
"""Integration tests for admin endpoints"""
def test_admin_endpoint_with_admin_user(self, client, test_admin):
"""Test admin endpoint with real admin user"""
response = client.get(
"/api/v1/admin/status",
auth=("admin", "adminpassword")
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "admin_access_granted"
assert data["admin"] is True
assert "admin" in data["message"].lower()
def test_admin_endpoint_with_regular_user(self, client, test_user):
"""Test admin endpoint rejects regular user"""
response = client.get(
"/api/v1/admin/status",
auth=("testuser", "testpassword")
)
assert response.status_code == 403
data = response.json()
assert "Admin privileges required" in data["detail"]
def test_admin_endpoint_without_authentication(self, client):
"""Test admin endpoint requires authentication"""
response = client.get("/api/v1/admin/status")
assert response.status_code == 401
class TestMultipleUsersIntegration:
"""Integration tests with multiple users"""
def test_multiple_users_can_authenticate(self, client, test_user, test_admin):
"""Test that multiple different users can authenticate"""
# Test regular user
response1 = client.get(
"/api/v1/me",
auth=("testuser", "testpassword")
)
assert response1.status_code == 200
assert response1.json()["username"] == "testuser"
# Test admin user
response2 = client.get(
"/api/v1/me",
auth=("admin", "adminpassword")
)
assert response2.status_code == 200
assert response2.json()["username"] == "admin"
def test_user_cannot_use_another_users_password(self, client, test_user, test_admin):
"""Test that users cannot authenticate with another user's password"""
# Try to authenticate as testuser with admin's password
response = client.get(
"/api/v1/me",
auth=("testuser", "adminpassword")
)
assert response.status_code == 401
class TestAPIExtensibility:
"""Tests to verify API is extensible for future use"""
def test_api_has_version_prefix(self, client, test_user):
"""Test that API uses versioned endpoints for extensibility"""
response = client.get(
"/api/v1/me",
auth=("testuser", "testpassword")
)
assert response.status_code == 200
def test_api_metadata_accessible(self, client):
"""Test that API metadata (OpenAPI docs) is accessible"""
# FastAPI automatically creates /docs and /openapi.json
response = client.get("/openapi.json")
assert response.status_code == 200
openapi_schema = response.json()
assert "openapi" in openapi_schema
assert "info" in openapi_schema
assert openapi_schema["info"]["title"] == "Content Automation & Syndication Platform API"
class TestSecurityIntegration:
"""Integration tests for security features"""
def test_passwords_are_hashed_in_database(self, test_db, test_user):
"""Test that passwords are stored hashed, not in plain text"""
# Query the database directly
user = test_db.query(User).filter_by(username="testuser").first()
# Password should be hashed (not equal to plain text)
assert user.hashed_password != "testpassword"
# Bcrypt hashes start with $2b$
assert user.hashed_password.startswith("$2b$")
def test_authentication_timing_is_consistent(self, client, test_user):
"""Test that authentication doesn't leak timing information"""
# Both invalid username and invalid password should take similar time
# This is a basic test - bcrypt provides timing attack resistance
response1 = client.get(
"/api/v1/me",
auth=("nonexistent", "password")
)
assert response1.status_code == 401
response2 = client.get(
"/api/v1/me",
auth=("testuser", "wrongpassword")
)
assert response2.status_code == 401
def test_unauthorized_requests_return_401(self, client, test_user):
"""Test that unauthorized requests consistently return 401"""
endpoints = [
"/api/v1/me",
"/api/v1/admin/status",
]
for endpoint in endpoints:
response = client.get(endpoint)
assert response.status_code == 401
assert "detail" in response.json()