894 lines
34 KiB
Python
894 lines
34 KiB
Python
"""
|
|
Unit tests for CLI commands
|
|
"""
|
|
|
|
import pytest
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
from click.testing import CliRunner
|
|
from datetime import datetime
|
|
from src.cli.commands import app, authenticate_admin, prompt_admin_credentials
|
|
from src.database.models import User
|
|
|
|
|
|
class TestAuthenticateAdmin:
|
|
"""Tests for authenticate_admin function"""
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
def test_authenticate_admin_success(self, mock_db_manager):
|
|
"""Test successful admin authentication"""
|
|
# Setup
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user = User(
|
|
id=1,
|
|
username="admin",
|
|
hashed_password="hashed_password",
|
|
role="Admin",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_user
|
|
|
|
# Execute
|
|
result = authenticate_admin("admin", "password")
|
|
|
|
# Assert
|
|
assert result == mock_user
|
|
mock_auth.authenticate_user.assert_called_once_with("admin", "password")
|
|
mock_session.close.assert_called_once()
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
def test_authenticate_admin_not_admin_role(self, mock_db_manager):
|
|
"""Test authentication fails when user is not admin"""
|
|
# Setup
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user = User(
|
|
id=1,
|
|
username="user",
|
|
hashed_password="hashed_password",
|
|
role="User",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_user
|
|
|
|
# Execute
|
|
result = authenticate_admin("user", "password")
|
|
|
|
# Assert
|
|
assert result is None
|
|
mock_session.close.assert_called_once()
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
def test_authenticate_admin_invalid_credentials(self, mock_db_manager):
|
|
"""Test authentication fails with invalid credentials"""
|
|
# Setup
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = None
|
|
|
|
# Execute
|
|
result = authenticate_admin("admin", "wrongpassword")
|
|
|
|
# Assert
|
|
assert result is None
|
|
mock_session.close.assert_called_once()
|
|
|
|
|
|
class TestAddUserCommand:
|
|
"""Tests for add-user CLI command"""
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_add_user_success_with_admin_credentials(self, mock_auth_admin, mock_db_manager):
|
|
"""Test successfully adding a user with admin credentials provided"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_new_user = Mock(username="newuser", role="User")
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.create_user_with_hashed_password.return_value = mock_new_user
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'add-user',
|
|
'--username', 'newuser',
|
|
'--password', 'password123',
|
|
'--role', 'User',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'adminpass'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
assert "Success: User 'newuser' created with role 'User'" in result.output
|
|
mock_auth_admin.assert_called_once_with('admin', 'adminpass')
|
|
mock_auth.create_user_with_hashed_password.assert_called_once_with(
|
|
username='newuser',
|
|
password='password123',
|
|
role='User'
|
|
)
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_add_user_authentication_fails(self, mock_auth_admin, mock_db_manager):
|
|
"""Test add-user fails when admin authentication fails"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_auth_admin.return_value = None
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'add-user',
|
|
'--username', 'newuser',
|
|
'--password', 'password123',
|
|
'--role', 'User',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'wrongpass'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 1
|
|
assert "Authentication failed or insufficient permissions" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_add_user_duplicate_username(self, mock_auth_admin, mock_db_manager):
|
|
"""Test add-user fails when username already exists"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.create_user_with_hashed_password.side_effect = ValueError(
|
|
"User with username 'existinguser' already exists"
|
|
)
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'add-user',
|
|
'--username', 'existinguser',
|
|
'--password', 'password123',
|
|
'--role', 'User',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'adminpass'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 1
|
|
assert "User with username 'existinguser' already exists" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
@patch('src.cli.commands.prompt_admin_credentials')
|
|
def test_add_user_prompts_for_credentials(self, mock_prompt, mock_auth_admin,
|
|
mock_db_manager):
|
|
"""Test add-user prompts for admin credentials when not provided"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_prompt.return_value = ('admin', 'adminpass')
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_new_user = Mock(username="newuser", role="User")
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.create_user_with_hashed_password.return_value = mock_new_user
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'add-user',
|
|
'--username', 'newuser',
|
|
'--password', 'password123',
|
|
'--role', 'User'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
mock_prompt.assert_called_once()
|
|
mock_auth_admin.assert_called_once_with('admin', 'adminpass')
|
|
|
|
|
|
class TestDeleteUserCommand:
|
|
"""Tests for delete-user CLI command"""
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_delete_user_success(self, mock_auth_admin, mock_db_manager):
|
|
"""Test successfully deleting a user"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user_to_delete = Mock(id=2, username="deleteuser")
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
mock_repo.get_by_username.return_value = mock_user_to_delete
|
|
mock_repo.delete.return_value = True
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'delete-user',
|
|
'--username', 'deleteuser',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'adminpass',
|
|
'--yes'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
assert "Success: User 'deleteuser' has been deleted" in result.output
|
|
mock_repo.get_by_username.assert_called_once_with('deleteuser')
|
|
mock_repo.delete.assert_called_once_with(2)
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_delete_user_not_found(self, mock_auth_admin, mock_db_manager):
|
|
"""Test delete-user fails when user doesn't exist"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
mock_repo.get_by_username.return_value = None
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'delete-user',
|
|
'--username', 'nonexistent',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'adminpass',
|
|
'--yes'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 1
|
|
assert "User 'nonexistent' not found" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_delete_user_cannot_delete_self(self, mock_auth_admin, mock_db_manager):
|
|
"""Test admin cannot delete their own account"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'delete-user',
|
|
'--username', 'admin',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'adminpass',
|
|
'--yes'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 1
|
|
assert "Cannot delete your own account" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_delete_user_authentication_fails(self, mock_auth_admin, mock_db_manager):
|
|
"""Test delete-user fails when admin authentication fails"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_auth_admin.return_value = None
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'delete-user',
|
|
'--username', 'someuser',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'wrongpass',
|
|
'--yes'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 1
|
|
assert "Authentication failed or insufficient permissions" in result.output
|
|
|
|
|
|
class TestListUsersCommand:
|
|
"""Tests for list-users CLI command"""
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_list_users_success(self, mock_auth_admin, mock_db_manager):
|
|
"""Test successfully listing users"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_users = [
|
|
Mock(id=1, username="admin", role="Admin",
|
|
created_at=datetime(2024, 1, 1, 12, 0, 0)),
|
|
Mock(id=2, username="user1", role="User",
|
|
created_at=datetime(2024, 1, 2, 12, 0, 0)),
|
|
Mock(id=3, username="user2", role="User",
|
|
created_at=datetime(2024, 1, 3, 12, 0, 0))
|
|
]
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
mock_repo.get_all.return_value = mock_users
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'list-users',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'adminpass'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
assert "Total users: 3" in result.output
|
|
assert "admin" in result.output
|
|
assert "user1" in result.output
|
|
assert "user2" in result.output
|
|
assert "Admin" in result.output
|
|
assert "User" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_list_users_empty(self, mock_auth_admin, mock_db_manager):
|
|
"""Test listing users when no users exist"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_admin = Mock(username="admin", role="Admin")
|
|
mock_auth_admin.return_value = mock_admin
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_repo_class:
|
|
mock_repo = Mock()
|
|
mock_repo_class.return_value = mock_repo
|
|
mock_repo.get_all.return_value = []
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'list-users',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'adminpass'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
assert "No users found" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.authenticate_admin')
|
|
def test_list_users_authentication_fails(self, mock_auth_admin, mock_db_manager):
|
|
"""Test list-users fails when admin authentication fails"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_auth_admin.return_value = None
|
|
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
# Execute
|
|
result = runner.invoke(app, [
|
|
'list-users',
|
|
'--admin-user', 'admin',
|
|
'--admin-password', 'wrongpass'
|
|
])
|
|
|
|
# Assert
|
|
assert result.exit_code == 1
|
|
assert "Authentication failed or insufficient permissions" in result.output
|
|
|
|
|
|
class TestExistingCommands:
|
|
"""Tests for existing CLI commands (config, health, models)"""
|
|
|
|
@patch('src.cli.commands.get_config')
|
|
def test_config_command_success(self, mock_get_config):
|
|
"""Test config command displays configuration"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_config = Mock()
|
|
mock_config.application.name = "Test App"
|
|
mock_config.application.version = "1.0.0"
|
|
mock_config.application.environment = "test"
|
|
mock_config.database.url = "sqlite:///test.db"
|
|
mock_config.ai_service.model = "test-model"
|
|
mock_config.logging.level = "INFO"
|
|
mock_get_config.return_value = mock_config
|
|
|
|
# Execute
|
|
result = runner.invoke(app, ['config'])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
assert "Test App" in result.output
|
|
assert "1.0.0" in result.output
|
|
assert "test" in result.output
|
|
|
|
@patch('src.cli.commands.get_config')
|
|
def test_health_command_success(self, mock_get_config):
|
|
"""Test health command shows system is healthy"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_config = Mock()
|
|
mock_get_config.return_value = mock_config
|
|
|
|
# Execute
|
|
result = runner.invoke(app, ['health'])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
assert "[OK] Configuration loaded successfully" in result.output
|
|
assert "[OK] System is healthy" in result.output
|
|
|
|
@patch('src.cli.commands.get_config')
|
|
def test_models_command_success(self, mock_get_config):
|
|
"""Test models command lists available models"""
|
|
# Setup
|
|
runner = CliRunner()
|
|
mock_config = Mock()
|
|
mock_config.ai_service.model = "test-model-1"
|
|
mock_config.ai_service.provider = "test-provider"
|
|
mock_config.ai_service.base_url = "https://test.api"
|
|
mock_config.ai_service.available_models = {
|
|
"model1": "test-model-1",
|
|
"model2": "test-model-2"
|
|
}
|
|
mock_get_config.return_value = mock_config
|
|
|
|
# Execute
|
|
result = runner.invoke(app, ['models'])
|
|
|
|
# Assert
|
|
assert result.exit_code == 0
|
|
assert "test-provider" in result.output
|
|
assert "test-model-1" in result.output
|
|
assert "test-model-2" in result.output
|
|
|
|
|
|
class TestIngestCoraCommand:
|
|
"""Tests for ingest-cora CLI command"""
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.CORAParser')
|
|
def test_ingest_cora_success(self, mock_parser_class, mock_db_manager):
|
|
"""Test successful CORA ingestion"""
|
|
runner = CliRunner()
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user = User(
|
|
id=1,
|
|
username="testuser",
|
|
hashed_password="hashed",
|
|
role="User",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
mock_project = Mock()
|
|
mock_project.id = 1
|
|
mock_project.name = "Test Project"
|
|
mock_project.main_keyword = "test keyword"
|
|
mock_project.entities = ["entity1", "entity2"]
|
|
mock_project.related_searches = ["search1", "search2"]
|
|
mock_project.custom_anchor_text = []
|
|
|
|
mock_parser = Mock()
|
|
mock_parser_class.return_value = mock_parser
|
|
mock_parser.parse.return_value = {
|
|
"main_keyword": "test keyword",
|
|
"word_count": 1500,
|
|
"term_frequency": 3,
|
|
"entities": ["entity1", "entity2"],
|
|
"related_searches": ["search1", "search2"],
|
|
"custom_anchor_text": [],
|
|
"related_search_density": 0.1,
|
|
"entity_density": 0.05,
|
|
"lsi_density": 0.03,
|
|
}
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_user_repo_class:
|
|
mock_user_repo = Mock()
|
|
mock_user_repo_class.return_value = mock_user_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_user
|
|
|
|
with patch('src.cli.commands.ProjectRepository') as mock_project_repo_class:
|
|
mock_project_repo = Mock()
|
|
mock_project_repo_class.return_value = mock_project_repo
|
|
mock_project_repo.create.return_value = mock_project
|
|
|
|
with runner.isolated_filesystem():
|
|
with open('test.xlsx', 'w') as f:
|
|
f.write('test')
|
|
|
|
result = runner.invoke(app, [
|
|
'ingest-cora',
|
|
'--file', 'test.xlsx',
|
|
'--name', 'Test Project',
|
|
'--username', 'testuser',
|
|
'--password', 'password'
|
|
])
|
|
|
|
assert result.exit_code == 0
|
|
assert "Success: Project 'Test Project' created" in result.output
|
|
assert "test keyword" in result.output
|
|
mock_parser.parse.assert_called_once()
|
|
mock_project_repo.create.assert_called_once()
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.CORAParser')
|
|
def test_ingest_cora_with_custom_anchors(self, mock_parser_class, mock_db_manager):
|
|
"""Test CORA ingestion with custom anchor text"""
|
|
runner = CliRunner()
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user = User(
|
|
id=1,
|
|
username="testuser",
|
|
hashed_password="hashed",
|
|
role="User",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
mock_project = Mock()
|
|
mock_project.id = 1
|
|
mock_project.name = "Test Project"
|
|
mock_project.main_keyword = "test keyword"
|
|
mock_project.entities = []
|
|
mock_project.related_searches = []
|
|
mock_project.custom_anchor_text = ["anchor1", "anchor2"]
|
|
|
|
mock_parser = Mock()
|
|
mock_parser_class.return_value = mock_parser
|
|
mock_parser.parse.return_value = {
|
|
"main_keyword": "test keyword",
|
|
"word_count": 1500,
|
|
"entities": [],
|
|
"related_searches": [],
|
|
"custom_anchor_text": ["anchor1", "anchor2"],
|
|
}
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_user_repo_class:
|
|
mock_user_repo = Mock()
|
|
mock_user_repo_class.return_value = mock_user_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_user
|
|
|
|
with patch('src.cli.commands.ProjectRepository') as mock_project_repo_class:
|
|
mock_project_repo = Mock()
|
|
mock_project_repo_class.return_value = mock_project_repo
|
|
mock_project_repo.create.return_value = mock_project
|
|
|
|
with runner.isolated_filesystem():
|
|
with open('test.xlsx', 'w') as f:
|
|
f.write('test')
|
|
|
|
result = runner.invoke(app, [
|
|
'ingest-cora',
|
|
'--file', 'test.xlsx',
|
|
'--name', 'Test Project',
|
|
'--custom-anchors', 'anchor1,anchor2',
|
|
'--username', 'testuser',
|
|
'--password', 'password'
|
|
])
|
|
|
|
assert result.exit_code == 0
|
|
assert "anchor1, anchor2" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
def test_ingest_cora_authentication_fails(self, mock_db_manager):
|
|
"""Test CORA ingestion fails with invalid credentials"""
|
|
runner = CliRunner()
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_user_repo_class:
|
|
mock_user_repo = Mock()
|
|
mock_user_repo_class.return_value = mock_user_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = None
|
|
|
|
with runner.isolated_filesystem():
|
|
with open('test.xlsx', 'w') as f:
|
|
f.write('test')
|
|
|
|
result = runner.invoke(app, [
|
|
'ingest-cora',
|
|
'--file', 'test.xlsx',
|
|
'--name', 'Test Project',
|
|
'--username', 'testuser',
|
|
'--password', 'wrongpass'
|
|
])
|
|
|
|
assert result.exit_code != 0
|
|
assert "Authentication failed" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
@patch('src.cli.commands.CORAParser')
|
|
def test_ingest_cora_parse_error(self, mock_parser_class, mock_db_manager):
|
|
"""Test CORA ingestion handles parse errors"""
|
|
runner = CliRunner()
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user = User(
|
|
id=1,
|
|
username="testuser",
|
|
hashed_password="hashed",
|
|
role="User",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
mock_parser = Mock()
|
|
mock_parser_class.return_value = mock_parser
|
|
from src.ingestion.parser import CORAParseError
|
|
mock_parser.parse.side_effect = CORAParseError("Invalid file format")
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_user_repo_class:
|
|
mock_user_repo = Mock()
|
|
mock_user_repo_class.return_value = mock_user_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_user
|
|
|
|
with runner.isolated_filesystem():
|
|
with open('test.xlsx', 'w') as f:
|
|
f.write('test')
|
|
|
|
result = runner.invoke(app, [
|
|
'ingest-cora',
|
|
'--file', 'test.xlsx',
|
|
'--name', 'Test Project',
|
|
'--username', 'testuser',
|
|
'--password', 'password'
|
|
])
|
|
|
|
assert result.exit_code != 0
|
|
assert "Error parsing CORA file" in result.output
|
|
|
|
|
|
class TestListProjectsCommand:
|
|
"""Tests for list-projects CLI command"""
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
def test_list_projects_user_view(self, mock_db_manager):
|
|
"""Test listing projects for regular user"""
|
|
runner = CliRunner()
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user = User(
|
|
id=1,
|
|
username="testuser",
|
|
hashed_password="hashed",
|
|
role="User",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
mock_project1 = Mock()
|
|
mock_project1.id = 1
|
|
mock_project1.name = "Project 1"
|
|
mock_project1.main_keyword = "keyword1"
|
|
mock_project1.created_at = datetime(2024, 1, 1, 10, 30, 45)
|
|
|
|
mock_project2 = Mock()
|
|
mock_project2.id = 2
|
|
mock_project2.name = "Project 2"
|
|
mock_project2.main_keyword = "keyword2"
|
|
mock_project2.created_at = datetime(2024, 1, 2, 15, 20, 10)
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_user_repo_class:
|
|
mock_user_repo = Mock()
|
|
mock_user_repo_class.return_value = mock_user_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_user
|
|
|
|
with patch('src.cli.commands.ProjectRepository') as mock_project_repo_class:
|
|
mock_project_repo = Mock()
|
|
mock_project_repo_class.return_value = mock_project_repo
|
|
mock_project_repo.get_by_user_id.return_value = [mock_project1, mock_project2]
|
|
|
|
result = runner.invoke(app, [
|
|
'list-projects',
|
|
'--username', 'testuser',
|
|
'--password', 'password'
|
|
])
|
|
|
|
assert result.exit_code == 0
|
|
assert "Your Projects:" in result.output
|
|
assert "Total projects: 2" in result.output
|
|
assert "Project 1" in result.output
|
|
assert "Project 2" in result.output
|
|
assert "keyword1" in result.output
|
|
assert "keyword2" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
def test_list_projects_admin_view(self, mock_db_manager):
|
|
"""Test listing all projects for admin"""
|
|
runner = CliRunner()
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_admin = User(
|
|
id=1,
|
|
username="admin",
|
|
hashed_password="hashed",
|
|
role="Admin",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
mock_project = Mock()
|
|
mock_project.id = 1
|
|
mock_project.name = "Project 1"
|
|
mock_project.main_keyword = "keyword1"
|
|
mock_project.created_at = datetime(2024, 1, 1, 10, 30, 45)
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_user_repo_class:
|
|
mock_user_repo = Mock()
|
|
mock_user_repo_class.return_value = mock_user_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_admin
|
|
|
|
with patch('src.cli.commands.ProjectRepository') as mock_project_repo_class:
|
|
mock_project_repo = Mock()
|
|
mock_project_repo_class.return_value = mock_project_repo
|
|
mock_project_repo.get_all.return_value = [mock_project]
|
|
|
|
result = runner.invoke(app, [
|
|
'list-projects',
|
|
'--username', 'admin',
|
|
'--password', 'password'
|
|
])
|
|
|
|
assert result.exit_code == 0
|
|
assert "All Projects (Admin View):" in result.output
|
|
assert "Total projects: 1" in result.output
|
|
|
|
@patch('src.cli.commands.db_manager')
|
|
def test_list_projects_empty(self, mock_db_manager):
|
|
"""Test listing projects when none exist"""
|
|
runner = CliRunner()
|
|
mock_session = Mock()
|
|
mock_db_manager.get_session.return_value = mock_session
|
|
|
|
mock_user = User(
|
|
id=1,
|
|
username="testuser",
|
|
hashed_password="hashed",
|
|
role="User",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
with patch('src.cli.commands.UserRepository') as mock_user_repo_class:
|
|
mock_user_repo = Mock()
|
|
mock_user_repo_class.return_value = mock_user_repo
|
|
|
|
with patch('src.cli.commands.AuthService') as mock_auth_class:
|
|
mock_auth = Mock()
|
|
mock_auth_class.return_value = mock_auth
|
|
mock_auth.authenticate_user.return_value = mock_user
|
|
|
|
with patch('src.cli.commands.ProjectRepository') as mock_project_repo_class:
|
|
mock_project_repo = Mock()
|
|
mock_project_repo_class.return_value = mock_project_repo
|
|
mock_project_repo.get_by_user_id.return_value = []
|
|
|
|
result = runner.invoke(app, [
|
|
'list-projects',
|
|
'--username', 'testuser',
|
|
'--password', 'password'
|
|
])
|
|
|
|
assert result.exit_code == 0
|
|
assert "No projects found" in result.output
|