본문 바로가기

Software Engineering/Python

[실습 코드] 리팩토링 예제

# 리팩토링 전: 테스트하기 어려운 구조
import redis
import pymysql
import requests
import json
from typing import Dict, Any

class MessageProcessor:
    def __init__(self):
        # 하드코딩된 연결 정보
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.db_config = {
            'host': 'localhost',
            'user': 'user',
            'password': 'password',
            'database': 'mydb'
        }
        self.api_base_url = 'https://api.example.com'
    
    def process_messages(self):
        """Redis 메시지를 구독하여 처리"""
        pubsub = self.redis_client.pubsub()
        pubsub.subscribe('user_events')
        
        for message in pubsub.listen():
            if message['type'] == 'message':
                try:
                    # 메시지 파싱
                    data = json.loads(message['data'].decode('utf-8'))
                    
                    # DB에 저장
                    self._save_to_db(data)
                    
                    # API 호출
                    self._call_external_api(data)
                    
                except Exception as e:
                    print(f"Error processing message: {e}")
    
    def _save_to_db(self, data: Dict[str, Any]):
        """데이터베이스에 데이터 저장"""
        connection = pymysql.connect(**self.db_config)
        try:
            with connection.cursor() as cursor:
                sql = "INSERT INTO user_events (user_id, event_type, timestamp) VALUES (%s, %s, %s)"
                cursor.execute(sql, (data['user_id'], data['event_type'], data['timestamp']))
            connection.commit()
        finally:
            connection.close()
    
    def _call_external_api(self, data: Dict[str, Any]):
        """외부 API 호출"""
        url = f"{self.api_base_url}/events"
        response = requests.post(url, json=data, timeout=10)
        response.raise_for_status()
        return response.json()

 

# message_processor.py - 리팩토링 후: 테스트 가능한 구조

import json
import logging
from typing import Dict, Any, Protocol
from abc import ABC, abstractmethod

# 의존성을 추상화하는 프로토콜들
class RedisClient(Protocol):
    def pubsub(self): ...
    def publish(self, channel: str, message: str): ...

class DatabaseClient(Protocol):
    def execute(self, query: str, params: tuple): ...
    def commit(self): ...
    def close(self): ...

class HttpClient(Protocol):
    def post(self, url: str, json: Dict[str, Any], timeout: int) -> Dict[str, Any]: ...

# 구체적인 구현체들
class RedisAdapter:
    def __init__(self, redis_client):
        self._client = redis_client
    
    def pubsub(self):
        return self._client.pubsub()
    
    def publish(self, channel: str, message: str):
        return self._client.publish(channel, message)

class DatabaseAdapter:
    def __init__(self, connection):
        self._connection = connection
    
    def execute(self, query: str, params: tuple):
        with self._connection.cursor() as cursor:
            cursor.execute(query, params)
    
    def commit(self):
        self._connection.commit()
    
    def close(self):
        self._connection.close()

class HttpAdapter:
    def __init__(self, requests_session):
        self._session = requests_session
    
    def post(self, url: str, json: Dict[str, Any], timeout: int = 10) -> Dict[str, Any]:
        response = self._session.post(url, json=json, timeout=timeout)
        response.raise_for_status()
        return response.json()

# 비즈니스 로직을 담당하는 서비스 클래스들
class MessageParser:
    """메시지 파싱 로직"""
    
    @staticmethod
    def parse_message(message_data: bytes) -> Dict[str, Any]:
        """Redis 메시지 데이터를 파싱"""
        try:
            return json.loads(message_data.decode('utf-8'))
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            raise ValueError(f"Invalid message format: {e}")

class DatabaseService:
    """데이터베이스 작업을 담당"""
    
    def __init__(self, db_client: DatabaseClient):
        self.db_client = db_client
    
    def save_user_event(self, data: Dict[str, Any]):
        """사용자 이벤트를 데이터베이스에 저장"""
        if not self._validate_event_data(data):
            raise ValueError("Invalid event data")
        
        query = "INSERT INTO user_events (user_id, event_type, timestamp) VALUES (%s, %s, %s)"
        params = (data['user_id'], data['event_type'], data['timestamp'])
        
        self.db_client.execute(query, params)
        self.db_client.commit()
    
    def _validate_event_data(self, data: Dict[str, Any]) -> bool:
        """이벤트 데이터 유효성 검사"""
        required_fields = ['user_id', 'event_type', 'timestamp']
        return all(field in data for field in required_fields)

class ExternalApiService:
    """외부 API 호출을 담당"""
    
    def __init__(self, http_client: HttpClient, base_url: str):
        self.http_client = http_client
        self.base_url = base_url
    
    def send_event(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """외부 API에 이벤트 전송"""
        url = f"{self.base_url}/events"
        return self.http_client.post(url, json=data, timeout=10)

# 메인 메시지 프로세서
class MessageProcessor:
    """Redis 메시지를 처리하는 메인 클래스"""
    
    def __init__(
        self,
        redis_client: RedisClient,
        db_service: DatabaseService,
        api_service: ExternalApiService,
        message_parser: MessageParser = None
    ):
        self.redis_client = redis_client
        self.db_service = db_service
        self.api_service = api_service
        self.message_parser = message_parser or MessageParser()
        self.logger = logging.getLogger(__name__)
    
    def process_single_message(self, message_data: bytes) -> bool:
        """단일 메시지를 처리 (테스트하기 쉬운 메서드)"""
        try:
            # 1. 메시지 파싱
            data = self.message_parser.parse_message(message_data)
            
            # 2. DB에 저장
            self.db_service.save_user_event(data)
            
            # 3. 외부 API 호출
            self.api_service.send_event(data)
            
            self.logger.info(f"Successfully processed message for user {data.get('user_id')}")
            return True
            
        except Exception as e:
            self.logger.error(f"Error processing message: {e}")
            return False
    
    def start_listening(self, channel: str = 'user_events'):
        """Redis 메시지 구독 시작"""
        pubsub = self.redis_client.pubsub()
        pubsub.subscribe(channel)
        
        self.logger.info(f"Started listening to channel: {channel}")
        
        for message in pubsub.listen():
            if message['type'] == 'message':
                self.process_single_message(message['data'])

# 팩토리 함수 (실제 운영 환경에서 사용)
def create_message_processor():
    """실제 의존성들로 MessageProcessor 인스턴스 생성"""
    import redis
    import pymysql
    import requests
    
    # Redis 클라이언트
    redis_client = RedisAdapter(redis.Redis(host='localhost', port=6379, db=0))
    
    # Database 클라이언트
    db_connection = pymysql.connect(
        host='localhost',
        user='user',
        password='password',
        database='mydb'
    )
    db_client = DatabaseAdapter(db_connection)
    
    # HTTP 클라이언트
    session = requests.Session()
    http_client = HttpAdapter(session)
    
    # 서비스 인스턴스들
    db_service = DatabaseService(db_client)
    api_service = ExternalApiService(http_client, 'https://api.example.com')
    
    return MessageProcessor(redis_client, db_service, api_service)

 

# test_message_processor.py

import pytest
import json
from unittest.mock import Mock, MagicMock
from message_processor import (
    MessageProcessor, DatabaseService, ExternalApiService, 
    MessageParser, RedisAdapter, DatabaseAdapter, HttpAdapter
)

class TestMessageParser:
    """MessageParser 클래스 테스트"""
    
    def test_parse_valid_json_message(self):
        """유효한 JSON 메시지 파싱 테스트"""
        # Given
        message_data = json.dumps({
            'user_id': 123,
            'event_type': 'login',
            'timestamp': '2024-01-01T10:00:00'
        }).encode('utf-8')
        
        # When
        result = MessageParser.parse_message(message_data)
        
        # Then
        assert result['user_id'] == 123
        assert result['event_type'] == 'login'
        assert result['timestamp'] == '2024-01-01T10:00:00'
    
    def test_parse_invalid_json_message(self):
        """잘못된 JSON 메시지 파싱 테스트"""
        # Given
        invalid_message = b'invalid json'
        
        # When & Then
        with pytest.raises(ValueError, match="Invalid message format"):
            MessageParser.parse_message(invalid_message)
    
    def test_parse_invalid_encoding_message(self):
        """잘못된 인코딩 메시지 테스트"""
        # Given
        invalid_encoding = b'\x80\x81\x82'
        
        # When & Then
        with pytest.raises(ValueError, match="Invalid message format"):
            MessageParser.parse_message(invalid_encoding)

class TestDatabaseService:
    """DatabaseService 클래스 테스트"""
    
    @pytest.fixture
    def mock_db_client(self):
        """Mock 데이터베이스 클라이언트 픽스처"""
        return Mock()
    
    @pytest.fixture
    def db_service(self, mock_db_client):
        """DatabaseService 인스턴스 픽스처"""
        return DatabaseService(mock_db_client)
    
    def test_save_valid_user_event(self, db_service, mock_db_client):
        """유효한 사용자 이벤트 저장 테스트"""
        # Given
        event_data = {
            'user_id': 123,
            'event_type': 'login',
            'timestamp': '2024-01-01T10:00:00'
        }
        
        # When
        db_service.save_user_event(event_data)
        
        # Then
        expected_query = "INSERT INTO user_events (user_id, event_type, timestamp) VALUES (%s, %s, %s)"
        expected_params = (123, 'login', '2024-01-01T10:00:00')
        
        mock_db_client.execute.assert_called_once_with(expected_query, expected_params)
        mock_db_client.commit.assert_called_once()
    
    def test_save_invalid_user_event_missing_fields(self, db_service, mock_db_client):
        """필수 필드가 누락된 이벤트 저장 테스트"""
        # Given
        invalid_event_data = {'user_id': 123}  # event_type, timestamp 누락
        
        # When & Then
        with pytest.raises(ValueError, match="Invalid event data"):
            db_service.save_user_event(invalid_event_data)
        
        # 데이터베이스 호출이 없어야 함
        mock_db_client.execute.assert_not_called()
        mock_db_client.commit.assert_not_called()
    
    @pytest.mark.parametrize("missing_field", ["user_id", "event_type", "timestamp"])
    def test_validate_event_data_missing_required_fields(self, db_service, missing_field):
        """필수 필드별 유효성 검사 테스트"""
        # Given
        complete_data = {
            'user_id': 123,
            'event_type': 'login',
            'timestamp': '2024-01-01T10:00:00'
        }
        incomplete_data = {k: v for k, v in complete_data.items() if k != missing_field}
        
        # When
        result = db_service._validate_event_data(incomplete_data)
        
        # Then
        assert result is False

class TestExternalApiService:
    """ExternalApiService 클래스 테스트"""
    
    @pytest.fixture
    def mock_http_client(self):
        """Mock HTTP 클라이언트 픽스처"""
        return Mock()
    
    @pytest.fixture
    def api_service(self, mock_http_client):
        """ExternalApiService 인스턴스 픽스처"""
        return ExternalApiService(mock_http_client, 'https://api.example.com')
    
    def test_send_event_success(self, api_service, mock_http_client):
        """이벤트 전송 성공 테스트"""
        # Given
        event_data = {'user_id': 123, 'event_type': 'login'}
        expected_response = {'status': 'success', 'id': 'event_123'}
        mock_http_client.post.return_value = expected_response
        
        # When
        result = api_service.send_event(event_data)
        
        # Then
        assert result == expected_response
        mock_http_client.post.assert_called_once_with(
            'https://api.example.com/events',
            json=event_data,
            timeout=10
        )
    
    def test_send_event_http_error(self, api_service, mock_http_client):
        """HTTP 에러 발생 테스트"""
        # Given
        event_data = {'user_id': 123, 'event_type': 'login'}
        mock_http_client.post.side_effect = Exception("Connection error")
        
        # When & Then
        with pytest.raises(Exception, match="Connection error"):
            api_service.send_event(event_data)

class TestMessageProcessor:
    """MessageProcessor 클래스 테스트"""
    
    @pytest.fixture
    def mock_redis_client(self):
        return Mock()
    
    @pytest.fixture
    def mock_db_service(self):
        return Mock()
    
    @pytest.fixture
    def mock_api_service(self):
        return Mock()
    
    @pytest.fixture
    def mock_message_parser(self):
        return Mock()
    
    @pytest.fixture
    def message_processor(self, mock_redis_client, mock_db_service, 
                         mock_api_service, mock_message_parser):
        """MessageProcessor 인스턴스 픽스처"""
        return MessageProcessor(
            mock_redis_client, 
            mock_db_service, 
            mock_api_service, 
            mock_message_parser
        )
    
    def test_process_single_message_success(self, message_processor, 
                                          mock_message_parser, mock_db_service, 
                                          mock_api_service):
        """단일 메시지 처리 성공 테스트"""
        # Given
        message_data = b'test_message'
        parsed_data = {'user_id': 123, 'event_type': 'login'}
        mock_message_parser.parse_message.return_value = parsed_data
        
        # When
        result = message_processor.process_single_message(message_data)
        
        # Then
        assert result is True
        mock_message_parser.parse_message.assert_called_once_with(message_data)
        mock_db_service.save_user_event.assert_called_once_with(parsed_data)
        mock_api_service.send_event.assert_called_once_with(parsed_data)
    
    def test_process_single_message_parse_error(self, message_processor, 
                                              mock_message_parser, mock_db_service, 
                                              mock_api_service):
        """메시지 파싱 에러 테스트"""
        # Given
        message_data = b'invalid_message'
        mock_message_parser.parse_message.side_effect = ValueError("Parse error")
        
        # When
        result = message_processor.process_single_message(message_data)
        
        # Then
        assert result is False
        mock_message_parser.parse_message.assert_called_once_with(message_data)
        mock_db_service.save_user_event.assert_not_called()
        mock_api_service.send_event.assert_not_called()
    
    def test_process_single_message_db_error(self, message_processor, 
                                           mock_message_parser, mock_db_service, 
                                           mock_api_service):
        """데이터베이스 에러 테스트"""
        # Given
        message_data = b'test_message'
        parsed_data = {'user_id': 123, 'event_type': 'login'}
        mock_message_parser.parse_message.return_value = parsed_data
        mock_db_service.save_user_event.side_effect = Exception("DB error")
        
        # When
        result = message_processor.process_single_message(message_data)
        
        # Then
        assert result is False
        mock_db_service.save_user_event.assert_called_once_with(parsed_data)
        mock_api_service.send_event.assert_not_called()  # DB 에러로 API 호출 안됨
    
    def test_process_single_message_api_error(self, message_processor, 
                                            mock_message_parser, mock_db_service, 
                                            mock_api_service):
        """API 호출 에러 테스트"""
        # Given
        message_data = b'test_message'
        parsed_data = {'user_id': 123, 'event_type': 'login'}
        mock_message_parser.parse_message.return_value = parsed_data
        mock_api_service.send_event.side_effect = Exception("API error")
        
        # When
        result = message_processor.process_single_message(message_data)
        
        # Then
        assert result is False
        mock_db_service.save_user_event.assert_called_once_with(parsed_data)
        mock_api_service.send_event.assert_called_once_with(parsed_data)

# 통합 테스트
class TestIntegration:
    """통합 테스트 - 실제 의존성에 가까운 Mock 사용"""
    
    def test_end_to_end_message_processing(self, mocker):
        """엔드투엔드 메시지 처리 테스트"""
        # Given - 실제 클래스들을 사용하되 외부 의존성만 Mock
        mock_redis = Mock()
        mock_db_connection = Mock()
        mock_requests_session = Mock()
        
        # Mock 응답 설정
        mock_requests_session.post.return_value.json.return_value = {'status': 'success'}
        mock_requests_session.post.return_value.raise_for_status = Mock()
        
        # 실제 어댑터들 생성
        redis_adapter = RedisAdapter(mock_redis)
        db_adapter = DatabaseAdapter(mock_db_connection)
        http_adapter = HttpAdapter(mock_requests_session)
        
        # 실제 서비스들 생성
        db_service = DatabaseService(db_adapter)
        api_service = ExternalApiService(http_adapter, 'https://api.example.com')
        
        # 실제 프로세서 생성
        processor = MessageProcessor(redis_adapter, db_service, api_service)
        
        # 테스트 메시지
        test_message = json.dumps({
            'user_id': 123,
            'event_type': 'login',
            'timestamp': '2024-01-01T10:00:00'
        }).encode('utf-8')
        
        # When
        result = processor.process_single_message(test_message)
        
        # Then
        assert result is True
        
        # DB 호출 확인
        mock_db_connection.cursor.assert_called()
        
        # API 호출 확인
        mock_requests_session.post.assert_called_once()
        call_args = mock_requests_session.post.call_args
        assert call_args[1]['json']['user_id'] == 123
        assert call_args[1]['json']['event_type'] == 'login'

# pytest 실행을 위한 설정
if __name__ == "__main__":
    pytest.main([__file__, "-v"])

 

# pytest.ini - pytest 설정 파일
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_functions = test_*
python_classes = Test*
addopts = 
    -v 
    --tb=short 
    --strict-markers
    --strict-config
    --cov=message_processor
    --cov-report=html
    --cov-report=term-missing
markers =
    unit: 단위 테스트
    integration: 통합 테스트
    slow: 느린 테스트

---

# conftest.py - 공통 픽스처 설정
import pytest
from unittest.mock import Mock
import json

@pytest.fixture
def sample_event_data():
    """테스트용 샘플 이벤트 데이터"""
    return {
        'user_id': 123,
        'event_type': 'login',
        'timestamp': '2024-01-01T10:00:00',
        'metadata': {'ip': '192.168.1.1', 'user_agent': 'test-agent'}
    }

@pytest.fixture
def sample_message_bytes(sample_event_data):
    """테스트용 샘플 메시지 바이트"""
    return json.dumps(sample_event_data).encode('utf-8')

@pytest.fixture
def redis_message(sample_message_bytes):
    """Redis 메시지 형태로 변환"""
    return {
        'type': 'message',
        'channel': b'user_events',
        'data': sample_message_bytes
    }

---

# requirements-test.txt - 테스트 의존성
pytest>=7.0.0
pytest-mock>=3.10.0
pytest-cov>=4.0.0
pytest-asyncio>=0.21.0  # 비동기 테스트가 필요한 경우
redis>=4.0.0
pymysql>=1.0.0
requests>=2.28.0

---

# Makefile - 편리한 테스트 실행
.PHONY: test test-unit test-integration test-cov clean

# 모든 테스트 실행
test:
	pytest

# 단위 테스트만 실행
test-unit:
	pytest -m unit

# 통합 테스트만 실행  
test-integration:
	pytest -m integration

# 커버리지와 함께 테스트 실행
test-cov:
	pytest --cov=message_processor --cov-report=html --cov-report=term

# 특정 테스트 파일만 실행
test-file:
	pytest tests/test_message_processor.py -v

# 실패한 테스트만 다시 실행
test-failed:
	pytest --lf

# 테스트 결과 정리
clean:
	rm -rf .pytest_cache htmlcov .coverage

---

# 고급 테스트 예제들

# test_advanced_scenarios.py
import pytest
from unittest.mock import Mock, patch, call
import time

class TestAdvancedScenarios:
    """고급 테스트 시나리오들"""
    
    def test_concurrent_message_processing(self, mocker):
        """동시 메시지 처리 테스트"""
        from concurrent.futures import ThreadPoolExecutor
        
        # Mock 설정
        mock_processor = mocker.Mock()
        mock_processor.process_single_message.return_value = True
        
        messages = [f'message_{i}'.encode() for i in range(5)]
        
        # When - 동시 처리
        with ThreadPoolExecutor(max_workers=3) as executor:
            futures = [
                executor.submit(mock_processor.process_single_message, msg)
                for msg in messages
            ]
            results = [f.result() for f in futures]
        
        # Then
        assert all(results)
        assert mock_processor.process_single_message.call_count == 5
    
    def test_message_processing_with_retry(self, mocker):
        """재시도 로직이 있는 메시지 처리 테스트"""
        # Given
        processor = mocker.Mock()
        # 첫 번째, 두 번째 호출은 실패, 세 번째는 성공
        processor.process_single_message.side_effect = [False, False, True]
        
        message = b'test_message'
        max_retries = 3
        
        # When - 재시도 로직 시뮬레이션
        success = False
        for attempt in range(max_retries):
            success = processor.process_single_message(message)
            if success:
                break
            time.sleep(0.1)  # 실제로는 더 긴 지연
        
        # Then
        assert success is True
        assert processor.process_single_message.call_count == 3
    
    @pytest.mark.parametrize("error_stage,expected_calls", [
        ("parse", {"parse": 1, "db": 0, "api": 0}),
        ("db", {"parse": 1, "db": 1, "api": 0}),
        ("api", {"parse": 1, "db": 1, "api": 1}),
    ])
    def test_error_handling_at_different_stages(self, mocker, error_stage, expected_calls):
        """다양한 단계에서의 에러 처리 테스트"""
        # Given
        mock_parser = mocker.Mock()
        mock_db_service = mocker.Mock()
        mock_api_service = mocker.Mock()
        
        from message_processor import MessageProcessor
        processor = MessageProcessor(
            mocker.Mock(), mock_db_service, mock_api_service, mock_parser
        )
        
        # 에러 단계별 설정
        if error_stage == "parse":
            mock_parser.parse_message.side_effect = ValueError("Parse error")
        elif error_stage == "db":
            mock_parser.parse_message.return_value = {"user_id": 123}
            mock_db_service.save_user_event.side_effect = Exception("