lib-not-dr/lib_not_dr/logger/outstream.py
2023-11-05 19:54:34 +08:00

234 lines
7.7 KiB
Python

# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import io
import sys
import time
import string
import atexit
import threading
from pathlib import Path
from typing import Optional
from lib_not_dr.types.options import Options
from lib_not_dr.logger.structure import LogMessage
from lib_not_dr.logger.formatter import BaseFormatter, StdFormatter
__all__ = [
'BaseOutputStream',
'StdioOutputStream',
'FileCacheOutputStream'
]
class BaseOutputStream(Options):
name = 'BaseOutputStream'
level: int = 10
enable: bool = True
formatter: BaseFormatter
def write_stdout(self, message: LogMessage) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.write_stdout is not implemented')
def write_stderr(self, message: LogMessage) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.write_stderr is not implemented')
def flush(self) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.flush is not implemented')
def close(self) -> None:
self.enable = False
class StdioOutputStream(BaseOutputStream):
name = 'StdioOutputStream'
level: int = 10
formatter: BaseFormatter = StdFormatter()
def write_stdout(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
print(self.formatter.format_message(message), end='', flush=message.flush)
return None
def write_stderr(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
print(self.formatter.format_message(message), end='', flush=message.flush, file=sys.stderr)
return None
def flush(self) -> None:
"""
flush stdout and stderr
:return: None
"""
print('', end='', flush=True)
print('', end='', flush=True, file=sys.stderr)
return None
class FileCacheOutputStream(BaseOutputStream):
name = 'FileCacheOutputStream'
level: int = 10
formatter: BaseFormatter = StdFormatter(enable_color=False)
text_cache: io.StringIO = None
flush_counter: int = 0
# 默认 10 次 flush 一次
flush_count_limit: int = 10
flush_time_limit: int = 10 # time limit in sec, 0 means no limit
flush_lock: threading.Lock = None
flush_timer: threading.Timer = None
file_path: Optional[Path] = Path('./logs')
file_name: str
# file mode: always 'a'
file_encoding: str = 'utf-8'
# do file swap or not
file_swap: bool = False
at_exit_register: bool = False
file_swap_counter: int = 0
file_swap_name_template: str = '${name}-${counter}.log'
# ${name} -> file_name
# ${counter} -> file_swap_counter
# ${log_time} -> time when file swap ( round(time.time()) )
# ${start_time} -> start time of output stream ( round(time.time()) )
current_file_name: str = None
file_start_time: int = None
# log file swap triggers
# 0 -> no limit
file_size_limit: int = 0 # size limit in kb
file_time_limit: int = 0 # time limit in sec 0
file_swap_on_both: bool = False # swap file when both size and time limit reached
def init(self, **kwargs) -> bool:
self.file_start_time = round(time.time())
if self.text_cache is None:
self.text_cache = io.StringIO()
self.flush_lock = threading.Lock()
return False
def _write(self, message: LogMessage) -> None:
"""
write message to text cache
默认已经检查过了
:param message: message to write
:return: None
"""
self.text_cache.write(self.formatter.format_message(message))
self.flush_counter += 1
if message.flush or self.flush_counter >= self.flush_count_limit:
self.flush()
else:
if self.flush_time_limit > 0:
if self.flush_timer is None or not self.flush_timer.is_alive():
self.flush_timer = threading.Timer(self.flush_time_limit, self.flush)
self.flush_timer.daemon = True
self.flush_timer.start()
if not self.at_exit_register:
atexit.register(self.flush)
self.at_exit_register = True
return None
def write_stdout(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
self._write(message)
return None
def write_stderr(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
self._write(message)
return None
def get_file_path(self) -> Path:
"""
get file path
:return:
"""
if (current_file := self.current_file_name) is None:
if not self.file_swap:
# 直接根据 file name 生成文件
current_file = Path(self.file_path) / self.file_name
self.current_file_name = str(current_file)
return current_file
template = string.Template(self.file_swap_name_template)
file_name = template.safe_substitute(name=self.file_name,
counter=self.file_swap_counter,
log_time=round(time.time()),
start_time=self.file_start_time)
current_file = Path(self.file_path) / file_name
self.current_file_name = str(current_file)
else:
current_file = Path(current_file)
return current_file
def check_flush(self) -> Path:
current_file = self.get_file_path()
# 获取当前文件的路径
if not self.file_swap:
# 不需要 swap 直接返回
return current_file
# 检查是否需要 swap
size_pass = True
if self.file_size_limit > 0:
file_size = current_file.stat().st_size / 1024 # kb
if file_size > self.file_size_limit: # kb
size_pass = False
time_pass = True
if self.file_time_limit > 0:
file_time = round(time.time()) - current_file.stat().st_mtime
if file_time > self.file_time_limit:
time_pass = False
if (self.file_swap_on_both and size_pass and time_pass) or \
(not self.file_swap_on_both and (size_pass or time_pass)):
# 两个都满足
# 或者只有一个满足
if size_pass and time_pass:
self.file_swap_counter += 1
# 生成新的文件名
return self.get_file_path()
def flush(self) -> None:
new_cache = io.StringIO() # 创建新的缓存
self.flush_counter = 0 # atomic, no lock
with self.flush_lock:
text = self.text_cache.getvalue()
old_cache, self.text_cache = self.text_cache, new_cache
old_cache.close() # 关闭旧的缓存
if text == '':
return None
current_file = self.check_flush()
if not current_file.exists():
current_file.parent.mkdir(parents=True, exist_ok=True)
current_file.touch()
with current_file.open('a', encoding=self.file_encoding) as f:
f.write(text)
return None
def close(self) -> None:
super().close()
self.flush()
self.text_cache.close()
atexit.unregister(self.flush)
return None