본문 바로가기

Software Engineering/Python

sample project 연습

(1) 프로젝트 트리

 

(2) main.py

import time
from app_control import AppControlParameter
from my_timer import MyTimer
from db_manager import DatabaseManager
from event_manager import EventItem, EventMatcher, EventPusher


def process():
    control_para = AppControlParameter("my-servicename")
    control_timer = MyTimer()
    control_timer.start()

    reload_timer = MyTimer()
    reload_timer.start()

    db = DatabaseManager()
    
    event_itmes = []
    event_matcher = EventMatcher(event_itmes)
    event_pusher = EventPusher("my-servicename", "url_auth", "url_push")

    while True:
        if control_timer.is_over_sec(control_para.sync_interval_sec):
            control_para = db.update_control_para(control_para)
            control_timer.restart()
        
        if control_para.shutdown_request:
            break

        if reload_timer.is_over_sec(control_para.reload_interval_sec):
            event_itmes = db.reload_event_items(event_itmes)
            reload_timer.restart()

        # consume

        # matching and handling
        item, keyword = event_matcher.find_item_and_keyword("input_str")
        if keyword:
            event_pusher.push_item(item, keyword)

    # shutdown process
    db.update_shutdown_request()
    time.sleep(10)
    event_pusher.cleanup()
    db.close()

# 사용 예제
if __name__ == "__main__":
    process()

(3) my_timer.py

import time

class MyTimer:
    def __init__(self):
        self.start_time = None
    
    def start(self):
        self.start_time = time.time()

    def restart(self):
        self.start()
    
    def is_over_sec(self, expected_time_sec: int) -> bool:
        if self.start_time is None:
            return False
        
        elapsed_time = time.time() - self.start_time
        return elapsed_time >= expected_time_sec
    
    def is_over_min(self, expected_time_min: int) -> bool:
        return self.is_over_sec(expected_time_min * 60)

# 사용 예제
if __name__ == "__main__":

    timer = MyTimer()   # MyTimer 객체 생성
    timer.start()       # 타이머 시작
    
    time.sleep(60)  # 60초 대기
    print(timer.is_over_min(2))  # False 출력
    
    time.sleep(60)  # 추가 60초 대기하여 총 120초 (2분) 경과
    print(timer.is_over_min(2))  # True 출력

(4) app_control.py

class AppControlParameter():
    def __init__(self, servicename: str):
        self.servicename = servicename
        self.sync_interval_sec = 0
        self.auth_interval_sec = 600
        self.reload_interval_sec = 600
        self.log_level = 'DEBUG'    # DEBUG, INFO, ERROR
        self.shutdown_request = False


"""
CREATE TABLE boilerplate.BP_SERVICE_CONTROL (
	servicename varchar(100) NOT NULL,
	sync_interval_sec INT UNSIGNED NOT NULL,
	auth_interval_sec INTEGER UNSIGNED NOT NULL,
	reload_interval_sec INT UNSIGNED NOT NULL,
	log_level varchar(50) NOT NULL,
	shutdown_request varchar(10) NOT NULL,
	CONSTRAINT BP_SERVICE_CONTROL_PK PRIMARY KEY (app_name)
)
ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci;

"""

(5) db_manager.py

from typing import List
from app_control import AppControlParameter
from event_manager import EventItem

class DatabaseManager():
    def __init__(self):
        pass

    def connect(self):
        pass

    def reconnect(self):
        pass

    def close(self):
        pass

    def update_control_para(self, old: AppControlParameter) -> AppControlParameter:
        # db에서 app para를 조회하여 객체를 생성한다
        # 실패할 경우 로그를 남기고 이전 객체를 반환한다.

        return old

    def reload_event_items(self, old: List[EventItem]) -> EventItem:
        # db에서 event item를 조회하여 객체 리스트를 생성한다.
        # 실패할 경우 로그를 남기고 이전 객체를 반환한다.

        #result = []
        #return result
        return old

    def update_shutdown_request(self):
        # shutdown request parameter를 false로 업데이트 한다.
        pass

(6) event_manager.py

import re
from typing import List

class EventItem():
    def __init__(self):
        self.name = None # str
        self.type = None # str
        self.subtype_group = None # str
        self.subtype = None # str
        self.match_pattern = None # str
        self.keyword_pattern1 = None # str
        self.keyword_pattern2 = None # str
        self.pre_delete = None # str (NO, SUBTYPEGROUP, ALL)
        self.post = None # NO, YES
        self.post_pattern = None # str
        self.after_delete = None # str
        self.after_delete_sec = 3 # int
'''
CREATE TABLE boilerplate.BP_EVENT_PARSE_INFO (
	servicename varchar(100) NOT NULL,
	eventname varchar(100) NOT NULL,
	`type` varchar(50) NOT NULL,
	subtype_group varchar(50) NOT NULL,
	subtype varchar(50) NOT NULL,
	match_pattern varchar(200) NOT NULL,
	keyword_pattern1 varchar(200) NOT NULL,
	keyword_pattern2 varchar(200) NOT NULL,
	pre_delete varchar(50) NOT NULL,
	post varchar(10) NOT NULL,
	post_pattern varchar(200) NOT NULL,
	after_delete varchar(50) NOT NULL,
	after_delete_sec INT UNSIGNED NOT NULL
)
ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci;

'''

class EventMatcher():
    def __init__(self, event_items: List[EventItem]):
        self.items = event_items

    def find_item_and_keyword(self, input_str: str) -> EventItem:
        for item in self.items:
            pattern = re.compile(item.match_pattern)
            match = re.search(pattern, input_str)

            if match:
                return item, match.group(1)
            
        return None, None
    
class EventPusher():
    def __init__(self, servicename:str, url_auth: str, url_push: str):
        self.servicename = servicename
        self.url_auth = url_auth
        self.url_push = url_push
        self.eqp_title = {} #key=eqpid_subtypegroup, value=title

    def push_item(self, item: EventItem, match_keyword: str):
        # handle pre-delete
        if item.pre_delete and item.pre_delete != "NO":
            if item.pre_delete == "ALL":
                payload = self.get_all_delete_payload(item)
                # delete
            elif item.pre_delete == "SUBTYPEGROUP" and self.is_subtypegroup_posted(item):
                payload = self.get_subtypegroup_delete_payload(item)
                # delete

        if item.post and item.post == "YES":
            payload = self.get_post_payload(item, match_keyword)
            # post

            if item.after_delete and item.after_delete == "YES":
                payload = self.get_after_delete_payload(item, payload)
                # reserve delete

    def is_subtypegroup_posted(self, item: EventItem):
        if item.subtype_group in self.eqp_title:
            return True
        return False

    def get_all_delete_payload(self, item: EventItem) -> dict:
        result = {}
        return result
    
    def get_subtypegroup_delete_payload(self, item: EventItem) -> dict:
        result = {}
        return result
    
    def get_post_payload(self, item: EventItem, match_keyword: str) -> dict:
        result = {}
        return result
    
    def get_after_delete_payload(self, item: EventItem, before_payload: dict) -> dict:
        result = {}
        return result

    def cleanup():
        pass