프로젝트 구조
my_service/
├── app/
│ ├── init.py
│ ├── config.py
│ ├── database.py
│ ├── service.py
│ └── utils.py
├── tests/
│ ├── init.py
│ ├── test_service.py
│ └── conftest.py
├── .env.development
├── .env.production
├── Dockerfile
├── requirements.txt
├── setup.sh
├── run.sh
├── test.sh
└── main.py
설정 파일 (개발)
DB_URL=your_dev_db_url
DB_USER=your_dev_db_user
DB_PASSWORD=your_dev_db_password
DB_PORT=your_dev_db_port
DB_NAME=your_dev_db_name
API_URL=your_dev_api_url
CHECK_INTERVAL=5
STATUS_OK=1
STATUS_FAIL=2
HTTP_PROXY=http://dev_proxy1,http://dev_proxy2
HTTPS_PROXY=https://dev_proxy1,https://dev_proxy2
NO_PROXY=localhost,127.0.0.1,192.168.0.1
LOG_LEVEL=DEBUG
설정 파일 (운영)
DB_URL=your_prod_db_url
DB_USER=your_prod_db_user
DB_PASSWORD=your_prod_db_password
DB_PORT=your_prod_db_port
DB_NAME=your_prod_db_name
API_URL=your_prod_api_url
CHECK_INTERVAL=5
STATUS_OK=1
STATUS_FAIL=2
HTTP_PROXY=http://prod_proxy1,http://prod_proxy2
HTTPS_PROXY=https://prod_proxy1,https://prod_proxy2
NO_PROXY=localhost,127.0.0.1,192.168.0.1
LOG_LEVEL=INFO
의존성 (requirements.txt)
pymysql
requests
python-dotenv
pytest
setup.sh
#!/bin/bash
Docker 이미지 빌드
docker build -t my_service:latest .
쿠버네티스에 배포
kubectl apply -f deployment.yaml
run.sh
#!/bin/bash
이미지를 빌드합니다.
docker build -t my_service:latest .
기존 컨테이너를 중지하고 삭제합니다.
docker stop my_service_container || true
docker rm my_service_container || true
새로운 컨테이너를 실행합니다.
docker run -d
--name my_service_container
--log-opt max-size=10m
--log-opt max-file=3
my_service:latest
echo "Container is running with name 'my_service_container'."
test.sh
#!/bin/bash
테스트를 실행합니다.
pytest --maxfail=1 --disable-warnings -v
종료 상태 코드를 저장합니다.
EXIT_CODE=$?
종료 상태 코드에 따라 메시지를 출력합니다.
if [ $EXIT_CODE -eq 0 ]; then
echo "All tests passed successfully."
else
echo "Some tests failed."
fi
종료 상태 코드를 반환합니다.
exit $EXIT_CODE
Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY . /app
RUN pip install --no-cache-dir -r requirements.txt
기본 환경 설정 (개발 환경)
ENV ENV=development
운영 환경에서 사용할 .env 파일 설정
ENV ENV=production
ENV NO_PROXY="localhost,127.0.0.1,192.168.0.1"
CMD ["python", "main.py"]
main.py
import time
from app.service import check_and_update_status
from app.config import config
from app.utils import setup_logging
if name == "main":
setup_logging(config.LOG_LEVEL)
while True:
check_and_update_status()
time.sleep(config.CHECK_INTERVAL)
app.config.py
import os
from dotenv import load_dotenv
현재 환경을 가져옵니다. 기본값은 development입니다.
ENV = os.getenv('ENV', 'development')
환경에 따라 다른 .env 파일을 로드합니다.
if ENV == 'production':
load_dotenv('.env.production')
else:
load_dotenv('.env.development')
class Config:
def init(self):
self.DB_URL = os.getenv("DB_URL")
self.DB_USER = os.getenv("DB_USER")
self.DB_PASSWORD = os.getenv("DB_PASSWORD")
self.DB_PORT = int(os.getenv("DB_PORT", 3306))
self.DB_NAME = os.getenv("DB_NAME")
self.API_URL = os.getenv("API_URL")
self.CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 5))
self.STATUS_OK = os.getenv("STATUS_OK", "1")
self.STATUS_FAIL = os.getenv("STATUS_FAIL", "2")
self.HTTP_PROXY = os.getenv("HTTP_PROXY").split(",") if os.getenv("HTTP_PROXY") else []
self.HTTPS_PROXY = os.getenv("HTTPS_PROXY").split(",") if os.getenv("HTTPS_PROXY") else []
self.NO_PROXY = os.getenv("NO_PROXY").split(",") if os.getenv("NO_PROXY") else []
self.LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
def get_proxies(self):
proxies = {}
if self.HTTP_PROXY:
proxies['http'] = self.HTTP_PROXY[0]
if self.HTTPS_PROXY:
proxies['https'] = self.HTTPS_PROXY[0]
if self.NO_PROXY:
os.environ["NO_PROXY"] = ",".join(self.NO_PROXY)
return proxiesconfig = Config()
app.database.py
import pymysql.cursors
from .config import config
from .utils import get_logger
logger = get_logger(name)
def get_connection():
logger.debug("Attempting to connect to the database.")
try:
connection = pymysql.connect(
host=config.DB_URL,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
port=config.DB_PORT,
cursorclass=pymysql.cursors.DictCursor
)
logger.debug("Database connection established successfully.")
return connection
except Exception as e:
logger.error(f"Error connecting to the database: {e}")
raise
app.service.py
import requests
from .database import get_connection
from .config import config
from .utils import get_logger
logger = get_logger(name)
def check_and_update_status():
logger.info("Checking and updating status.")
connection = get_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT processid, userid, status FROM your_table WHERE status='wait'"
cursor.execute(sql)
rows = cursor.fetchall()
proxies = config.get_proxies()
logger.debug(f"Fetched rows from database: {rows}")
logger.debug(f"Using proxies: {proxies}")
for row in rows:
processid = row['processid']
logger.info(f"Processing row with processid: {processid}")
try:
response = requests.get(config.API_URL, params={'processid': processid}, proxies=proxies)
data = response.json()
process_status = data.get('process_status')
logger.debug(f"API response for processid {processid}: {data}")
if process_status == int(config.STATUS_OK):
new_status = 'OK'
elif process_status == int(config.STATUS_FAIL):
new_status = 'FAIL'
else:
logger.warning(f"Unknown process_status {process_status} for processid {processid}")
continue
update_sql = "UPDATE your_table SET status=%s WHERE processid=%s"
cursor.execute(update_sql, (new_status, processid))
connection.commit()
logger.info(f"Updated processid {processid} to status {new_status}")
except requests.RequestException as e:
logger.error(f"Request error for processid {processid}: {e}")
except Exception as e:
logger.error(f"Error in check_and_update_status: {e}")
finally:
connection.close()
logger.debug("Database connection closed.")app.utils.py
import logging
import sys
def setup_logging(log_level="INFO"):
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
def get_logger(module_name):
return logging.getLogger(module_name)
tests/conftest.py
import logging
import sys
def setup_logging(log_level="INFO"):
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
def get_logger(module_name):
return logging.getLogger(module_name)
'Software Engineering > Python' 카테고리의 다른 글
| [streamlit] custom label 예제 (0) | 2025.06.12 |
|---|---|
| Python requests 패키지 proxy 설정에 관한 정리 (1) | 2024.07.11 |
| fastapi background task를 사용하여 지연 동작 수행하기 (0) | 2024.07.05 |
| mac에서 poetry 설치하기 (0) | 2024.07.02 |
| poetry 사용법 정리 (2) | 2024.07.02 |