본문 바로가기

Software Engineering/Python

Sample Project - API 호출, DB 업데이트

프로젝트 구조

my_service/
├── app/
│ ├── init.py
│ ├── config.py
│ ├── database.py
│ ├── service.py
│ └── utils.py
├── tests/
│ ├── init.py
│ ├── test_service.py
│ └── conftest.py
├── .env.development
├── .env.production
├── Dockerfile
├── requirements.txt
├── setup.sh
├── run.sh
├── test.sh
└── main.py

설정 파일 (개발)

DB_URL=your_dev_db_url
DB_USER=your_dev_db_user
DB_PASSWORD=your_dev_db_password
DB_PORT=your_dev_db_port
DB_NAME=your_dev_db_name
API_URL=your_dev_api_url
CHECK_INTERVAL=5
STATUS_OK=1
STATUS_FAIL=2
HTTP_PROXY=http://dev_proxy1,http://dev_proxy2
HTTPS_PROXY=https://dev_proxy1,https://dev_proxy2
NO_PROXY=localhost,127.0.0.1,192.168.0.1
LOG_LEVEL=DEBUG

설정 파일 (운영)

DB_URL=your_prod_db_url
DB_USER=your_prod_db_user
DB_PASSWORD=your_prod_db_password
DB_PORT=your_prod_db_port
DB_NAME=your_prod_db_name
API_URL=your_prod_api_url
CHECK_INTERVAL=5
STATUS_OK=1
STATUS_FAIL=2
HTTP_PROXY=http://prod_proxy1,http://prod_proxy2
HTTPS_PROXY=https://prod_proxy1,https://prod_proxy2
NO_PROXY=localhost,127.0.0.1,192.168.0.1
LOG_LEVEL=INFO

의존성 (requirements.txt)

pymysql
requests
python-dotenv
pytest

setup.sh

#!/bin/bash

Docker 이미지 빌드

docker build -t my_service:latest .

쿠버네티스에 배포

kubectl apply -f deployment.yaml

run.sh

#!/bin/bash

이미지를 빌드합니다.

docker build -t my_service:latest .

기존 컨테이너를 중지하고 삭제합니다.

docker stop my_service_container || true
docker rm my_service_container || true

새로운 컨테이너를 실행합니다.

docker run -d
--name my_service_container
--log-opt max-size=10m
--log-opt max-file=3
my_service:latest

echo "Container is running with name 'my_service_container'."

test.sh

#!/bin/bash

테스트를 실행합니다.

pytest --maxfail=1 --disable-warnings -v

종료 상태 코드를 저장합니다.

EXIT_CODE=$?

종료 상태 코드에 따라 메시지를 출력합니다.

if [ $EXIT_CODE -eq 0 ]; then
echo "All tests passed successfully."
else
echo "Some tests failed."
fi

종료 상태 코드를 반환합니다.

exit $EXIT_CODE

Dockerfile

FROM python:3.9-slim

WORKDIR /app

COPY . /app

RUN pip install --no-cache-dir -r requirements.txt

기본 환경 설정 (개발 환경)

ENV ENV=development

운영 환경에서 사용할 .env 파일 설정

ENV ENV=production

ENV NO_PROXY="localhost,127.0.0.1,192.168.0.1"

CMD ["python", "main.py"]

main.py

import time
from app.service import check_and_update_status
from app.config import config
from app.utils import setup_logging

if name == "main":
setup_logging(config.LOG_LEVEL)
while True:
check_and_update_status()
time.sleep(config.CHECK_INTERVAL)

app.config.py

import os
from dotenv import load_dotenv

현재 환경을 가져옵니다. 기본값은 development입니다.

ENV = os.getenv('ENV', 'development')

환경에 따라 다른 .env 파일을 로드합니다.

if ENV == 'production':
load_dotenv('.env.production')
else:
load_dotenv('.env.development')

class Config:
def init(self):
self.DB_URL = os.getenv("DB_URL")
self.DB_USER = os.getenv("DB_USER")
self.DB_PASSWORD = os.getenv("DB_PASSWORD")
self.DB_PORT = int(os.getenv("DB_PORT", 3306))
self.DB_NAME = os.getenv("DB_NAME")
self.API_URL = os.getenv("API_URL")
self.CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 5))
self.STATUS_OK = os.getenv("STATUS_OK", "1")
self.STATUS_FAIL = os.getenv("STATUS_FAIL", "2")
self.HTTP_PROXY = os.getenv("HTTP_PROXY").split(",") if os.getenv("HTTP_PROXY") else []
self.HTTPS_PROXY = os.getenv("HTTPS_PROXY").split(",") if os.getenv("HTTPS_PROXY") else []
self.NO_PROXY = os.getenv("NO_PROXY").split(",") if os.getenv("NO_PROXY") else []
self.LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")

def get_proxies(self):
    proxies = {}
    if self.HTTP_PROXY:
        proxies['http'] = self.HTTP_PROXY[0]
    if self.HTTPS_PROXY:
        proxies['https'] = self.HTTPS_PROXY[0]
    if self.NO_PROXY:
        os.environ["NO_PROXY"] = ",".join(self.NO_PROXY)
    return proxies

config = Config()

app.database.py

import pymysql.cursors
from .config import config
from .utils import get_logger

logger = get_logger(name)

def get_connection():
logger.debug("Attempting to connect to the database.")
try:
connection = pymysql.connect(
host=config.DB_URL,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
port=config.DB_PORT,
cursorclass=pymysql.cursors.DictCursor
)
logger.debug("Database connection established successfully.")
return connection
except Exception as e:
logger.error(f"Error connecting to the database: {e}")
raise

app.service.py

import requests
from .database import get_connection
from .config import config
from .utils import get_logger

logger = get_logger(name)

def check_and_update_status():
logger.info("Checking and updating status.")
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT processid, userid, status FROM your_table WHERE status='wait'"
cursor.execute(sql)
rows = cursor.fetchall()

        proxies = config.get_proxies()
        logger.debug(f"Fetched rows from database: {rows}")
        logger.debug(f"Using proxies: {proxies}")

        for row in rows:
            processid = row['processid']
            logger.info(f"Processing row with processid: {processid}")
            try:
                response = requests.get(config.API_URL, params={'processid': processid}, proxies=proxies)
                data = response.json()
                process_status = data.get('process_status')
                logger.debug(f"API response for processid {processid}: {data}")

                if process_status == int(config.STATUS_OK):
                    new_status = 'OK'
                elif process_status == int(config.STATUS_FAIL):
                    new_status = 'FAIL'
                else:
                    logger.warning(f"Unknown process_status {process_status} for processid {processid}")
                    continue

                update_sql = "UPDATE your_table SET status=%s WHERE processid=%s"
                cursor.execute(update_sql, (new_status, processid))
                connection.commit()
                logger.info(f"Updated processid {processid} to status {new_status}")
            except requests.RequestException as e:
                logger.error(f"Request error for processid {processid}: {e}")
except Exception as e:
    logger.error(f"Error in check_and_update_status: {e}")
finally:
    connection.close()
    logger.debug("Database connection closed.")

app.utils.py

import logging
import sys

def setup_logging(log_level="INFO"):
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))

def get_logger(module_name):
return logging.getLogger(module_name)

tests/conftest.py

import logging
import sys

def setup_logging(log_level="INFO"):
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))

def get_logger(module_name):
return logging.getLogger(module_name)