# 리팩토링 전: 테스트하기 어려운 구조
import redis
import pymysql
import requests
import json
from typing import Dict, Any
class MessageProcessor:
def __init__(self):
# 하드코딩된 연결 정보
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.db_config = {
'host': 'localhost',
'user': 'user',
'password': 'password',
'database': 'mydb'
}
self.api_base_url = 'https://api.example.com'
def process_messages(self):
"""Redis 메시지를 구독하여 처리"""
pubsub = self.redis_client.pubsub()
pubsub.subscribe('user_events')
for message in pubsub.listen():
if message['type'] == 'message':
try:
# 메시지 파싱
data = json.loads(message['data'].decode('utf-8'))
# DB에 저장
self._save_to_db(data)
# API 호출
self._call_external_api(data)
except Exception as e:
print(f"Error processing message: {e}")
def _save_to_db(self, data: Dict[str, Any]):
"""데이터베이스에 데이터 저장"""
connection = pymysql.connect(**self.db_config)
try:
with connection.cursor() as cursor:
sql = "INSERT INTO user_events (user_id, event_type, timestamp) VALUES (%s, %s, %s)"
cursor.execute(sql, (data['user_id'], data['event_type'], data['timestamp']))
connection.commit()
finally:
connection.close()
def _call_external_api(self, data: Dict[str, Any]):
"""외부 API 호출"""
url = f"{self.api_base_url}/events"
response = requests.post(url, json=data, timeout=10)
response.raise_for_status()
return response.json()
# message_processor.py - 리팩토링 후: 테스트 가능한 구조
import json
import logging
from typing import Dict, Any, Protocol
from abc import ABC, abstractmethod
# 의존성을 추상화하는 프로토콜들
class RedisClient(Protocol):
def pubsub(self): ...
def publish(self, channel: str, message: str): ...
class DatabaseClient(Protocol):
def execute(self, query: str, params: tuple): ...
def commit(self): ...
def close(self): ...
class HttpClient(Protocol):
def post(self, url: str, json: Dict[str, Any], timeout: int) -> Dict[str, Any]: ...
# 구체적인 구현체들
class RedisAdapter:
def __init__(self, redis_client):
self._client = redis_client
def pubsub(self):
return self._client.pubsub()
def publish(self, channel: str, message: str):
return self._client.publish(channel, message)
class DatabaseAdapter:
def __init__(self, connection):
self._connection = connection
def execute(self, query: str, params: tuple):
with self._connection.cursor() as cursor:
cursor.execute(query, params)
def commit(self):
self._connection.commit()
def close(self):
self._connection.close()
class HttpAdapter:
def __init__(self, requests_session):
self._session = requests_session
def post(self, url: str, json: Dict[str, Any], timeout: int = 10) -> Dict[str, Any]:
response = self._session.post(url, json=json, timeout=timeout)
response.raise_for_status()
return response.json()
# 비즈니스 로직을 담당하는 서비스 클래스들
class MessageParser:
"""메시지 파싱 로직"""
@staticmethod
def parse_message(message_data: bytes) -> Dict[str, Any]:
"""Redis 메시지 데이터를 파싱"""
try:
return json.loads(message_data.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise ValueError(f"Invalid message format: {e}")
class DatabaseService:
"""데이터베이스 작업을 담당"""
def __init__(self, db_client: DatabaseClient):
self.db_client = db_client
def save_user_event(self, data: Dict[str, Any]):
"""사용자 이벤트를 데이터베이스에 저장"""
if not self._validate_event_data(data):
raise ValueError("Invalid event data")
query = "INSERT INTO user_events (user_id, event_type, timestamp) VALUES (%s, %s, %s)"
params = (data['user_id'], data['event_type'], data['timestamp'])
self.db_client.execute(query, params)
self.db_client.commit()
def _validate_event_data(self, data: Dict[str, Any]) -> bool:
"""이벤트 데이터 유효성 검사"""
required_fields = ['user_id', 'event_type', 'timestamp']
return all(field in data for field in required_fields)
class ExternalApiService:
"""외부 API 호출을 담당"""
def __init__(self, http_client: HttpClient, base_url: str):
self.http_client = http_client
self.base_url = base_url
def send_event(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""외부 API에 이벤트 전송"""
url = f"{self.base_url}/events"
return self.http_client.post(url, json=data, timeout=10)
# 메인 메시지 프로세서
class MessageProcessor:
"""Redis 메시지를 처리하는 메인 클래스"""
def __init__(
self,
redis_client: RedisClient,
db_service: DatabaseService,
api_service: ExternalApiService,
message_parser: MessageParser = None
):
self.redis_client = redis_client
self.db_service = db_service
self.api_service = api_service
self.message_parser = message_parser or MessageParser()
self.logger = logging.getLogger(__name__)
def process_single_message(self, message_data: bytes) -> bool:
"""단일 메시지를 처리 (테스트하기 쉬운 메서드)"""
try:
# 1. 메시지 파싱
data = self.message_parser.parse_message(message_data)
# 2. DB에 저장
self.db_service.save_user_event(data)
# 3. 외부 API 호출
self.api_service.send_event(data)
self.logger.info(f"Successfully processed message for user {data.get('user_id')}")
return True
except Exception as e:
self.logger.error(f"Error processing message: {e}")
return False
def start_listening(self, channel: str = 'user_events'):
"""Redis 메시지 구독 시작"""
pubsub = self.redis_client.pubsub()
pubsub.subscribe(channel)
self.logger.info(f"Started listening to channel: {channel}")
for message in pubsub.listen():
if message['type'] == 'message':
self.process_single_message(message['data'])
# 팩토리 함수 (실제 운영 환경에서 사용)
def create_message_processor():
"""실제 의존성들로 MessageProcessor 인스턴스 생성"""
import redis
import pymysql
import requests
# Redis 클라이언트
redis_client = RedisAdapter(redis.Redis(host='localhost', port=6379, db=0))
# Database 클라이언트
db_connection = pymysql.connect(
host='localhost',
user='user',
password='password',
database='mydb'
)
db_client = DatabaseAdapter(db_connection)
# HTTP 클라이언트
session = requests.Session()
http_client = HttpAdapter(session)
# 서비스 인스턴스들
db_service = DatabaseService(db_client)
api_service = ExternalApiService(http_client, 'https://api.example.com')
return MessageProcessor(redis_client, db_service, api_service)
# test_message_processor.py
import pytest
import json
from unittest.mock import Mock, MagicMock
from message_processor import (
MessageProcessor, DatabaseService, ExternalApiService,
MessageParser, RedisAdapter, DatabaseAdapter, HttpAdapter
)
class TestMessageParser:
"""MessageParser 클래스 테스트"""
def test_parse_valid_json_message(self):
"""유효한 JSON 메시지 파싱 테스트"""
# Given
message_data = json.dumps({
'user_id': 123,
'event_type': 'login',
'timestamp': '2024-01-01T10:00:00'
}).encode('utf-8')
# When
result = MessageParser.parse_message(message_data)
# Then
assert result['user_id'] == 123
assert result['event_type'] == 'login'
assert result['timestamp'] == '2024-01-01T10:00:00'
def test_parse_invalid_json_message(self):
"""잘못된 JSON 메시지 파싱 테스트"""
# Given
invalid_message = b'invalid json'
# When & Then
with pytest.raises(ValueError, match="Invalid message format"):
MessageParser.parse_message(invalid_message)
def test_parse_invalid_encoding_message(self):
"""잘못된 인코딩 메시지 테스트"""
# Given
invalid_encoding = b'\x80\x81\x82'
# When & Then
with pytest.raises(ValueError, match="Invalid message format"):
MessageParser.parse_message(invalid_encoding)
class TestDatabaseService:
"""DatabaseService 클래스 테스트"""
@pytest.fixture
def mock_db_client(self):
"""Mock 데이터베이스 클라이언트 픽스처"""
return Mock()
@pytest.fixture
def db_service(self, mock_db_client):
"""DatabaseService 인스턴스 픽스처"""
return DatabaseService(mock_db_client)
def test_save_valid_user_event(self, db_service, mock_db_client):
"""유효한 사용자 이벤트 저장 테스트"""
# Given
event_data = {
'user_id': 123,
'event_type': 'login',
'timestamp': '2024-01-01T10:00:00'
}
# When
db_service.save_user_event(event_data)
# Then
expected_query = "INSERT INTO user_events (user_id, event_type, timestamp) VALUES (%s, %s, %s)"
expected_params = (123, 'login', '2024-01-01T10:00:00')
mock_db_client.execute.assert_called_once_with(expected_query, expected_params)
mock_db_client.commit.assert_called_once()
def test_save_invalid_user_event_missing_fields(self, db_service, mock_db_client):
"""필수 필드가 누락된 이벤트 저장 테스트"""
# Given
invalid_event_data = {'user_id': 123} # event_type, timestamp 누락
# When & Then
with pytest.raises(ValueError, match="Invalid event data"):
db_service.save_user_event(invalid_event_data)
# 데이터베이스 호출이 없어야 함
mock_db_client.execute.assert_not_called()
mock_db_client.commit.assert_not_called()
@pytest.mark.parametrize("missing_field", ["user_id", "event_type", "timestamp"])
def test_validate_event_data_missing_required_fields(self, db_service, missing_field):
"""필수 필드별 유효성 검사 테스트"""
# Given
complete_data = {
'user_id': 123,
'event_type': 'login',
'timestamp': '2024-01-01T10:00:00'
}
incomplete_data = {k: v for k, v in complete_data.items() if k != missing_field}
# When
result = db_service._validate_event_data(incomplete_data)
# Then
assert result is False
class TestExternalApiService:
"""ExternalApiService 클래스 테스트"""
@pytest.fixture
def mock_http_client(self):
"""Mock HTTP 클라이언트 픽스처"""
return Mock()
@pytest.fixture
def api_service(self, mock_http_client):
"""ExternalApiService 인스턴스 픽스처"""
return ExternalApiService(mock_http_client, 'https://api.example.com')
def test_send_event_success(self, api_service, mock_http_client):
"""이벤트 전송 성공 테스트"""
# Given
event_data = {'user_id': 123, 'event_type': 'login'}
expected_response = {'status': 'success', 'id': 'event_123'}
mock_http_client.post.return_value = expected_response
# When
result = api_service.send_event(event_data)
# Then
assert result == expected_response
mock_http_client.post.assert_called_once_with(
'https://api.example.com/events',
json=event_data,
timeout=10
)
def test_send_event_http_error(self, api_service, mock_http_client):
"""HTTP 에러 발생 테스트"""
# Given
event_data = {'user_id': 123, 'event_type': 'login'}
mock_http_client.post.side_effect = Exception("Connection error")
# When & Then
with pytest.raises(Exception, match="Connection error"):
api_service.send_event(event_data)
class TestMessageProcessor:
"""MessageProcessor 클래스 테스트"""
@pytest.fixture
def mock_redis_client(self):
return Mock()
@pytest.fixture
def mock_db_service(self):
return Mock()
@pytest.fixture
def mock_api_service(self):
return Mock()
@pytest.fixture
def mock_message_parser(self):
return Mock()
@pytest.fixture
def message_processor(self, mock_redis_client, mock_db_service,
mock_api_service, mock_message_parser):
"""MessageProcessor 인스턴스 픽스처"""
return MessageProcessor(
mock_redis_client,
mock_db_service,
mock_api_service,
mock_message_parser
)
def test_process_single_message_success(self, message_processor,
mock_message_parser, mock_db_service,
mock_api_service):
"""단일 메시지 처리 성공 테스트"""
# Given
message_data = b'test_message'
parsed_data = {'user_id': 123, 'event_type': 'login'}
mock_message_parser.parse_message.return_value = parsed_data
# When
result = message_processor.process_single_message(message_data)
# Then
assert result is True
mock_message_parser.parse_message.assert_called_once_with(message_data)
mock_db_service.save_user_event.assert_called_once_with(parsed_data)
mock_api_service.send_event.assert_called_once_with(parsed_data)
def test_process_single_message_parse_error(self, message_processor,
mock_message_parser, mock_db_service,
mock_api_service):
"""메시지 파싱 에러 테스트"""
# Given
message_data = b'invalid_message'
mock_message_parser.parse_message.side_effect = ValueError("Parse error")
# When
result = message_processor.process_single_message(message_data)
# Then
assert result is False
mock_message_parser.parse_message.assert_called_once_with(message_data)
mock_db_service.save_user_event.assert_not_called()
mock_api_service.send_event.assert_not_called()
def test_process_single_message_db_error(self, message_processor,
mock_message_parser, mock_db_service,
mock_api_service):
"""데이터베이스 에러 테스트"""
# Given
message_data = b'test_message'
parsed_data = {'user_id': 123, 'event_type': 'login'}
mock_message_parser.parse_message.return_value = parsed_data
mock_db_service.save_user_event.side_effect = Exception("DB error")
# When
result = message_processor.process_single_message(message_data)
# Then
assert result is False
mock_db_service.save_user_event.assert_called_once_with(parsed_data)
mock_api_service.send_event.assert_not_called() # DB 에러로 API 호출 안됨
def test_process_single_message_api_error(self, message_processor,
mock_message_parser, mock_db_service,
mock_api_service):
"""API 호출 에러 테스트"""
# Given
message_data = b'test_message'
parsed_data = {'user_id': 123, 'event_type': 'login'}
mock_message_parser.parse_message.return_value = parsed_data
mock_api_service.send_event.side_effect = Exception("API error")
# When
result = message_processor.process_single_message(message_data)
# Then
assert result is False
mock_db_service.save_user_event.assert_called_once_with(parsed_data)
mock_api_service.send_event.assert_called_once_with(parsed_data)
# 통합 테스트
class TestIntegration:
"""통합 테스트 - 실제 의존성에 가까운 Mock 사용"""
def test_end_to_end_message_processing(self, mocker):
"""엔드투엔드 메시지 처리 테스트"""
# Given - 실제 클래스들을 사용하되 외부 의존성만 Mock
mock_redis = Mock()
mock_db_connection = Mock()
mock_requests_session = Mock()
# Mock 응답 설정
mock_requests_session.post.return_value.json.return_value = {'status': 'success'}
mock_requests_session.post.return_value.raise_for_status = Mock()
# 실제 어댑터들 생성
redis_adapter = RedisAdapter(mock_redis)
db_adapter = DatabaseAdapter(mock_db_connection)
http_adapter = HttpAdapter(mock_requests_session)
# 실제 서비스들 생성
db_service = DatabaseService(db_adapter)
api_service = ExternalApiService(http_adapter, 'https://api.example.com')
# 실제 프로세서 생성
processor = MessageProcessor(redis_adapter, db_service, api_service)
# 테스트 메시지
test_message = json.dumps({
'user_id': 123,
'event_type': 'login',
'timestamp': '2024-01-01T10:00:00'
}).encode('utf-8')
# When
result = processor.process_single_message(test_message)
# Then
assert result is True
# DB 호출 확인
mock_db_connection.cursor.assert_called()
# API 호출 확인
mock_requests_session.post.assert_called_once()
call_args = mock_requests_session.post.call_args
assert call_args[1]['json']['user_id'] == 123
assert call_args[1]['json']['event_type'] == 'login'
# pytest 실행을 위한 설정
if __name__ == "__main__":
pytest.main([__file__, "-v"])
# pytest.ini - pytest 설정 파일
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_functions = test_*
python_classes = Test*
addopts =
-v
--tb=short
--strict-markers
--strict-config
--cov=message_processor
--cov-report=html
--cov-report=term-missing
markers =
unit: 단위 테스트
integration: 통합 테스트
slow: 느린 테스트
---
# conftest.py - 공통 픽스처 설정
import pytest
from unittest.mock import Mock
import json
@pytest.fixture
def sample_event_data():
"""테스트용 샘플 이벤트 데이터"""
return {
'user_id': 123,
'event_type': 'login',
'timestamp': '2024-01-01T10:00:00',
'metadata': {'ip': '192.168.1.1', 'user_agent': 'test-agent'}
}
@pytest.fixture
def sample_message_bytes(sample_event_data):
"""테스트용 샘플 메시지 바이트"""
return json.dumps(sample_event_data).encode('utf-8')
@pytest.fixture
def redis_message(sample_message_bytes):
"""Redis 메시지 형태로 변환"""
return {
'type': 'message',
'channel': b'user_events',
'data': sample_message_bytes
}
---
# requirements-test.txt - 테스트 의존성
pytest>=7.0.0
pytest-mock>=3.10.0
pytest-cov>=4.0.0
pytest-asyncio>=0.21.0 # 비동기 테스트가 필요한 경우
redis>=4.0.0
pymysql>=1.0.0
requests>=2.28.0
---
# Makefile - 편리한 테스트 실행
.PHONY: test test-unit test-integration test-cov clean
# 모든 테스트 실행
test:
pytest
# 단위 테스트만 실행
test-unit:
pytest -m unit
# 통합 테스트만 실행
test-integration:
pytest -m integration
# 커버리지와 함께 테스트 실행
test-cov:
pytest --cov=message_processor --cov-report=html --cov-report=term
# 특정 테스트 파일만 실행
test-file:
pytest tests/test_message_processor.py -v
# 실패한 테스트만 다시 실행
test-failed:
pytest --lf
# 테스트 결과 정리
clean:
rm -rf .pytest_cache htmlcov .coverage
---
# 고급 테스트 예제들
# test_advanced_scenarios.py
import pytest
from unittest.mock import Mock, patch, call
import time
class TestAdvancedScenarios:
"""고급 테스트 시나리오들"""
def test_concurrent_message_processing(self, mocker):
"""동시 메시지 처리 테스트"""
from concurrent.futures import ThreadPoolExecutor
# Mock 설정
mock_processor = mocker.Mock()
mock_processor.process_single_message.return_value = True
messages = [f'message_{i}'.encode() for i in range(5)]
# When - 동시 처리
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(mock_processor.process_single_message, msg)
for msg in messages
]
results = [f.result() for f in futures]
# Then
assert all(results)
assert mock_processor.process_single_message.call_count == 5
def test_message_processing_with_retry(self, mocker):
"""재시도 로직이 있는 메시지 처리 테스트"""
# Given
processor = mocker.Mock()
# 첫 번째, 두 번째 호출은 실패, 세 번째는 성공
processor.process_single_message.side_effect = [False, False, True]
message = b'test_message'
max_retries = 3
# When - 재시도 로직 시뮬레이션
success = False
for attempt in range(max_retries):
success = processor.process_single_message(message)
if success:
break
time.sleep(0.1) # 실제로는 더 긴 지연
# Then
assert success is True
assert processor.process_single_message.call_count == 3
@pytest.mark.parametrize("error_stage,expected_calls", [
("parse", {"parse": 1, "db": 0, "api": 0}),
("db", {"parse": 1, "db": 1, "api": 0}),
("api", {"parse": 1, "db": 1, "api": 1}),
])
def test_error_handling_at_different_stages(self, mocker, error_stage, expected_calls):
"""다양한 단계에서의 에러 처리 테스트"""
# Given
mock_parser = mocker.Mock()
mock_db_service = mocker.Mock()
mock_api_service = mocker.Mock()
from message_processor import MessageProcessor
processor = MessageProcessor(
mocker.Mock(), mock_db_service, mock_api_service, mock_parser
)
# 에러 단계별 설정
if error_stage == "parse":
mock_parser.parse_message.side_effect = ValueError("Parse error")
elif error_stage == "db":
mock_parser.parse_message.return_value = {"user_id": 123}
mock_db_service.save_user_event.side_effect = Exception("