Difficult-Rocket/libs/lib_not_dr/logger/outstream.py
shenjack b3c52d2a3f
with more logger
Add | more formatter and some more

Fix | type mis match

sync pyglet

Enhance | logger with Template

add lib-not-dr as requirement

sync pyglet

sync pyglet

Add | add lto=yes to nuitka_build

just incase

sync pyglet

sync lib_not_dr

Remove | external requirement lib-not-dr

some logger

sync lib-not-dr

sync pyglet

sync lib-not-dr

sync lib-not-dr

sync pyglet

sync pyglet

Fix | console thread been block

Update DR rs and DR sdk

sync lib not dr

sync lib-not-dr

sync lib-not-dr

sync pyglet and lib-not-dr

sync pyglet 0.1.8

sync lib not dr

logger almost done?

almost!

sync pyglet (clicpboard support!)

sync lib not dr

sync lib not dr

color code and sync pyglet

do not show memory and progress building localy

sync pyglet

synclibs
2023-11-13 01:14:54 +08:00

237 lines
7.8 KiB
Python

# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import io
import sys
import time
import string
import atexit
import threading
from pathlib import Path
from typing import Optional
from lib_not_dr.logger import LogLevel
from lib_not_dr.types.options import Options
from lib_not_dr.logger.structure import LogMessage
from lib_not_dr.logger.formatter import BaseFormatter, StdFormatter
__all__ = [
'BaseOutputStream',
'StdioOutputStream',
'FileCacheOutputStream'
]
class BaseOutputStream(Options):
name = 'BaseOutputStream'
level: int = LogLevel.info
enable: bool = True
formatter: BaseFormatter
def write_stdout(self, message: LogMessage) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.write_stdout is not implemented')
def write_stderr(self, message: LogMessage) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.write_stderr is not implemented')
def flush(self) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.flush is not implemented')
def close(self) -> None:
self.enable = False
class StdioOutputStream(BaseOutputStream):
name = 'StdioOutputStream'
level: int = LogLevel.info
formatter: BaseFormatter = StdFormatter()
use_stderr: bool = True
def write_stdout(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
print(self.formatter.format_message(message), end='', flush=message.flush)
return None
def write_stderr(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
if self.use_stderr:
print(self.formatter.format_message(message), end='', flush=message.flush, file=sys.stderr)
else:
print(self.formatter.format_message(message), end='', flush=message.flush)
return None
def flush(self) -> None:
"""
flush stdout and stderr
:return: None
"""
print('', end='', flush=True)
print('', end='', flush=True, file=sys.stderr)
return None
class FileCacheOutputStream(BaseOutputStream):
name = 'FileCacheOutputStream'
level: int = LogLevel.info
formatter: BaseFormatter = StdFormatter(enable_color=False)
text_cache: io.StringIO = None
flush_counter: int = 0
# 默认 10 次 flush 一次
flush_count_limit: int = 10
flush_time_limit: int = 10 # time limit in sec, 0 means no limit
flush_timer: threading.Timer = None
file_path: Optional[Path] = Path('./logs')
file_name: str
# file mode: always 'a'
file_encoding: str = 'utf-8'
# do file swap or not
file_swap: bool = False
at_exit_register: bool = False
file_swap_counter: int = 0
file_swap_name_template: str = '${name}-${counter}.log'
# ${name} -> file_name
# ${counter} -> file_swap_counter
# ${log_time} -> time when file swap ( round(time.time()) )
# ${start_time} -> start time of output stream ( round(time.time()) )
current_file_name: str = None
file_start_time: int = None
# log file swap triggers
# 0 -> no limit
file_size_limit: int = 0 # size limit in kb
file_time_limit: int = 0 # time limit in sec 0
file_swap_on_both: bool = False # swap file when both size and time limit reached
def init(self, **kwargs) -> bool:
self.file_start_time = round(time.time())
if self.text_cache is None:
self.text_cache = io.StringIO()
self.get_file_path()
return False
def _write(self, message: LogMessage) -> None:
"""
write message to text cache
默认已经检查过了
:param message: message to write
:return: None
"""
self.text_cache.write(self.formatter.format_message(message))
self.flush_counter += 1
if message.flush or self.flush_counter >= self.flush_count_limit:
self.flush()
else:
if self.flush_time_limit > 0:
if self.flush_timer is None or not self.flush_timer.is_alive():
self.flush_timer = threading.Timer(self.flush_time_limit, self.flush)
self.flush_timer.daemon = True
self.flush_timer.start()
if not self.at_exit_register:
atexit.register(self.flush)
self.at_exit_register = True
return None
def write_stdout(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
self._write(message)
return None
def write_stderr(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
self._write(message)
return None
def get_file_path(self) -> Path:
"""
get file path
:return:
"""
if (current_file := self.current_file_name) is None:
if not self.file_swap:
# 直接根据 file name 生成文件
current_file = Path(self.file_path) / self.file_name
self.current_file_name = str(current_file)
return current_file
template = string.Template(self.file_swap_name_template)
file_name = template.safe_substitute(name=self.file_name,
counter=self.file_swap_counter,
log_time=round(time.time()),
start_time=self.file_start_time)
current_file = Path(self.file_path) / file_name
self.current_file_name = str(current_file)
else:
current_file = Path(current_file)
return current_file
def check_flush(self) -> Path:
current_file = self.get_file_path()
# 获取当前文件的路径
if not self.file_swap:
# 不需要 swap 直接返回
return current_file
# 检查是否需要 swap
size_pass = True
if self.file_size_limit > 0:
file_size = current_file.stat().st_size / 1024 # kb
if file_size > self.file_size_limit: # kb
size_pass = False
time_pass = True
if self.file_time_limit > 0:
file_time = round(time.time()) - current_file.stat().st_mtime
if file_time > self.file_time_limit:
time_pass = False
if (self.file_swap_on_both and size_pass and time_pass) or \
(not self.file_swap_on_both and (size_pass or time_pass)):
# 两个都满足
# 或者只有一个满足
if size_pass and time_pass:
self.file_swap_counter += 1
# 生成新的文件名
return self.get_file_path()
def flush(self) -> None:
new_cache = io.StringIO() # 创建新的缓存
self.flush_counter = 0 # atomic, no lock
old_cache, self.text_cache = self.text_cache, new_cache
text = old_cache.getvalue()
old_cache.close() # 关闭旧的缓存
if text == '':
return None
current_file = self.check_flush()
if not current_file.exists():
current_file.parent.mkdir(parents=True, exist_ok=True)
current_file.touch(exist_ok=True)
with current_file.open('a', encoding=self.file_encoding) as f:
f.write(text)
return None
def close(self) -> None:
super().close()
self.flush()
self.text_cache.close()
atexit.unregister(self.flush)
return None