282 lines
9.1 KiB
Python
282 lines
9.1 KiB
Python
"""
|
|
Integration tests for API with real database
|
|
"""
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
from sqlalchemy.pool import StaticPool
|
|
from src.api.main import app, get_auth_service
|
|
from src.database.models import Base, User
|
|
from src.database.repositories import UserRepository
|
|
from src.auth.service import AuthService
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def test_engine():
|
|
"""Create a test database engine"""
|
|
# Use in-memory SQLite database for testing
|
|
# check_same_thread=False allows usage across threads (needed for FastAPI TestClient)
|
|
engine = create_engine(
|
|
"sqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool
|
|
)
|
|
Base.metadata.create_all(engine)
|
|
yield engine
|
|
engine.dispose()
|
|
|
|
|
|
@pytest.fixture
|
|
def test_db(test_engine):
|
|
"""Create a test database session"""
|
|
TestSessionLocal = sessionmaker(bind=test_engine)
|
|
session = TestSessionLocal()
|
|
yield session
|
|
session.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_db):
|
|
"""Create a test client with dependency injection"""
|
|
# Override the get_auth_service dependency to use test database
|
|
def override_get_auth_service():
|
|
user_repo = UserRepository(test_db)
|
|
return AuthService(user_repo)
|
|
|
|
app.dependency_overrides[get_auth_service] = override_get_auth_service
|
|
|
|
test_client = TestClient(app)
|
|
yield test_client
|
|
|
|
# Clean up
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
def test_user(test_db):
|
|
"""Create a test user in the database"""
|
|
user_repo = UserRepository(test_db)
|
|
auth_service = AuthService(user_repo)
|
|
|
|
user = auth_service.create_user_with_hashed_password(
|
|
username="testuser",
|
|
password="testpassword",
|
|
role="User"
|
|
)
|
|
test_db.commit()
|
|
return user
|
|
|
|
|
|
@pytest.fixture
|
|
def test_admin(test_db):
|
|
"""Create a test admin user in the database"""
|
|
user_repo = UserRepository(test_db)
|
|
auth_service = AuthService(user_repo)
|
|
|
|
admin = auth_service.create_user_with_hashed_password(
|
|
username="admin",
|
|
password="adminpassword",
|
|
role="Admin"
|
|
)
|
|
test_db.commit()
|
|
return admin
|
|
|
|
|
|
class TestHealthEndpointIntegration:
|
|
"""Integration tests for health endpoint"""
|
|
|
|
def test_health_check_with_real_api(self, client):
|
|
"""Test health check with real API client"""
|
|
response = client.get("/health")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "healthy"
|
|
assert data["message"] == "API is running"
|
|
|
|
|
|
class TestAuthenticationFlowIntegration:
|
|
"""Integration tests for authentication flow with database"""
|
|
|
|
def test_authenticate_with_real_user(self, client, test_user):
|
|
"""Test authentication with real user from database"""
|
|
response = client.get(
|
|
"/api/v1/me",
|
|
auth=("testuser", "testpassword")
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["username"] == "testuser"
|
|
assert data["role"] == "User"
|
|
|
|
def test_authenticate_with_wrong_password(self, client, test_user):
|
|
"""Test authentication fails with wrong password"""
|
|
response = client.get(
|
|
"/api/v1/me",
|
|
auth=("testuser", "wrongpassword")
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "detail" in data
|
|
|
|
def test_authenticate_with_nonexistent_user(self, client):
|
|
"""Test authentication fails with nonexistent user"""
|
|
response = client.get(
|
|
"/api/v1/me",
|
|
auth=("nonexistent", "password")
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_me_endpoint_returns_correct_user_data(self, client, test_user):
|
|
"""Test /me endpoint returns correct user data from database"""
|
|
response = client.get(
|
|
"/api/v1/me",
|
|
auth=("testuser", "testpassword")
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Verify all fields
|
|
assert "id" in data
|
|
assert data["username"] == "testuser"
|
|
assert data["role"] == "User"
|
|
assert isinstance(data["id"], int)
|
|
|
|
|
|
class TestAdminEndpointsIntegration:
|
|
"""Integration tests for admin endpoints"""
|
|
|
|
def test_admin_endpoint_with_admin_user(self, client, test_admin):
|
|
"""Test admin endpoint with real admin user"""
|
|
response = client.get(
|
|
"/api/v1/admin/status",
|
|
auth=("admin", "adminpassword")
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "admin_access_granted"
|
|
assert data["admin"] is True
|
|
assert "admin" in data["message"].lower()
|
|
|
|
def test_admin_endpoint_with_regular_user(self, client, test_user):
|
|
"""Test admin endpoint rejects regular user"""
|
|
response = client.get(
|
|
"/api/v1/admin/status",
|
|
auth=("testuser", "testpassword")
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert "Admin privileges required" in data["detail"]
|
|
|
|
def test_admin_endpoint_without_authentication(self, client):
|
|
"""Test admin endpoint requires authentication"""
|
|
response = client.get("/api/v1/admin/status")
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestMultipleUsersIntegration:
|
|
"""Integration tests with multiple users"""
|
|
|
|
def test_multiple_users_can_authenticate(self, client, test_user, test_admin):
|
|
"""Test that multiple different users can authenticate"""
|
|
# Test regular user
|
|
response1 = client.get(
|
|
"/api/v1/me",
|
|
auth=("testuser", "testpassword")
|
|
)
|
|
assert response1.status_code == 200
|
|
assert response1.json()["username"] == "testuser"
|
|
|
|
# Test admin user
|
|
response2 = client.get(
|
|
"/api/v1/me",
|
|
auth=("admin", "adminpassword")
|
|
)
|
|
assert response2.status_code == 200
|
|
assert response2.json()["username"] == "admin"
|
|
|
|
def test_user_cannot_use_another_users_password(self, client, test_user, test_admin):
|
|
"""Test that users cannot authenticate with another user's password"""
|
|
# Try to authenticate as testuser with admin's password
|
|
response = client.get(
|
|
"/api/v1/me",
|
|
auth=("testuser", "adminpassword")
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestAPIExtensibility:
|
|
"""Tests to verify API is extensible for future use"""
|
|
|
|
def test_api_has_version_prefix(self, client, test_user):
|
|
"""Test that API uses versioned endpoints for extensibility"""
|
|
response = client.get(
|
|
"/api/v1/me",
|
|
auth=("testuser", "testpassword")
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
def test_api_metadata_accessible(self, client):
|
|
"""Test that API metadata (OpenAPI docs) is accessible"""
|
|
# FastAPI automatically creates /docs and /openapi.json
|
|
response = client.get("/openapi.json")
|
|
assert response.status_code == 200
|
|
|
|
openapi_schema = response.json()
|
|
assert "openapi" in openapi_schema
|
|
assert "info" in openapi_schema
|
|
assert openapi_schema["info"]["title"] == "Content Automation & Syndication Platform API"
|
|
|
|
|
|
class TestSecurityIntegration:
|
|
"""Integration tests for security features"""
|
|
|
|
def test_passwords_are_hashed_in_database(self, test_db, test_user):
|
|
"""Test that passwords are stored hashed, not in plain text"""
|
|
# Query the database directly
|
|
user = test_db.query(User).filter_by(username="testuser").first()
|
|
|
|
# Password should be hashed (not equal to plain text)
|
|
assert user.hashed_password != "testpassword"
|
|
# Bcrypt hashes start with $2b$
|
|
assert user.hashed_password.startswith("$2b$")
|
|
|
|
def test_authentication_timing_is_consistent(self, client, test_user):
|
|
"""Test that authentication doesn't leak timing information"""
|
|
# Both invalid username and invalid password should take similar time
|
|
# This is a basic test - bcrypt provides timing attack resistance
|
|
|
|
response1 = client.get(
|
|
"/api/v1/me",
|
|
auth=("nonexistent", "password")
|
|
)
|
|
assert response1.status_code == 401
|
|
|
|
response2 = client.get(
|
|
"/api/v1/me",
|
|
auth=("testuser", "wrongpassword")
|
|
)
|
|
assert response2.status_code == 401
|
|
|
|
def test_unauthorized_requests_return_401(self, client, test_user):
|
|
"""Test that unauthorized requests consistently return 401"""
|
|
endpoints = [
|
|
"/api/v1/me",
|
|
"/api/v1/admin/status",
|
|
]
|
|
|
|
for endpoint in endpoints:
|
|
response = client.get(endpoint)
|
|
assert response.status_code == 401
|
|
assert "detail" in response.json()
|
|
|