chore(logger): Optimize code

This commit is contained in:
MSDNicrosoft 2023-01-27 12:32:14 +00:00 committed by GitHub
parent 9dcf407ff7
commit 5c922ec1dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,7 +2,6 @@
@author shenjackyuanjie
@contact 3695888@qq.com
"""
import enum
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
@ -263,10 +262,12 @@ class Message_content(NamedTuple):
frame: Optional[FrameType] = None
def __str__(self):
return f"Message Content at {self.log_time}|by logger {self.logger_name}|in level {self.level}" \
f"|with marker {self.marker}|ends as {self.end}|" \
f"by frame {self.frame}|{'and will flush' if self.flush else 'and will not flush'}|" \
f"text are: {self.text}"
return (
f"Message Content at {self.log_time}|by logger {self.logger_name}|in level {self.level}"
f"|with marker {self.marker}|ends as {self.end}|"
f"by frame {self.frame}|{'and will flush' if self.flush else 'and will not flush'}|"
f"text are: {self.text}"
)
class ThreadLock:
@ -316,7 +317,10 @@ class ListCache:
except IndexError as exp:
print(f'cache:{self.cache}')
raise IndexError(
f'there is no cache at {item}!\ncache:{self.cache}\n{exp}')
f'there is no cache at {item}!\n'
f'cache:{self.cache}\n'
f'{exp}'
) from exp
def __call__(self, *args, **kwargs) -> List[str]:
return self.cache
@ -334,7 +338,7 @@ class ListCache:
return returns
def __bool__(self):
return True if len(self.cache) > 0 else False
return len(self.cache) > 0
@property
def cache(self):
@ -399,8 +403,8 @@ class StdFormatter(FormatterTemplate):
def get_color_code(self, level: str, content: ColorCodeEnum) -> str:
assert content in ColorCodeEnum
if content in self.configs.color[level]:
return self.configs.color[level][content.name].replace("\\u001b", '\u001b')
return self.configs.color[content.name].replace('\\u001b', '\u001b')
return self.configs.color[level][content.name].replace(r"\u001b", '\u001b')
return self.configs.color[content.name].replace(r'\u001b', '\u001b')
"""
format 支持的内容:
@ -418,7 +422,7 @@ class StdFormatter(FormatterTemplate):
if not self.configs.support_color:
return self.format(message=message)
times = self.format_time(input_time=message.log_time)
need_colors = [x for x in re_find_formats.findall(self.configs.format)]
need_colors = list(re_find_formats.findall(self.configs.format))
new_config = self.configs.format
for need_color in need_colors:
if not hasattr(FormatCodeEnum, need_color[1:-1]):
@ -442,17 +446,20 @@ class StdFormatter(FormatterTemplate):
formatted_str = self.configs.format
for time_text, value in times.items():
formatted_str.replace(time_text, value)
formatted_str.replace("{marker}", message.marker)
formatted_str.replace("{message}", message.text)
formatted_str.replace("{level}", level)
formatted_str.replace("{marker}", message.marker) \
.replace("{message}", message.text) \
.replace("{level}", level)
file_name = '*'
code_line = '*'
if message.frame is not None:
file_name = message.frame.f_code.co_filename
code_line = message.frame.f_lineno
formatted_str.replace("{file_name}", file_name)
formatted_str.replace("{code_line}", code_line)
formatted_str.replace("{file_name}", file_name) \
.replace("{code_line}", code_line)
# 注意:类似这种“链式调用”不应超过 5 行,否则将降低可读性
# 关键词 MUST、MUST NOT、REQUIRED、SHALL、SHALL NOT、SHOULD、SHOULD NOT、 RECOMMENDED、MAY、OPTIONAL
# 依照 RFC 2119 的叙述解读
return formatted_str
@ -466,7 +473,7 @@ handler b -> shell
handler c -> file
.enable = True
.level = 0
a.info('abc')
b -> none
@ -560,13 +567,12 @@ class CachedFileHandler(StreamHandlerTemplate):
:param file_conf: 文件配置
"""
super().__init__(level=level, formatter=formatter)
if file_conf is not None:
if isinstance(file_conf, dict):
self.file_conf = LogFileConf(**file_conf)
elif isinstance(file_conf, LogFileConf):
self.file_conf = file_conf
else:
if file_conf is None:
self.file_conf = LogFileConf()
elif isinstance(file_conf, dict):
self.file_conf = LogFileConf(**file_conf)
elif isinstance(file_conf, LogFileConf):
self.file_conf = file_conf
# 缓存
self.len = 0
self.cache_stream = io.StringIO()
@ -603,8 +609,7 @@ class CachedFileHandler(StreamHandlerTemplate):
def write(self, message: Message_content) -> bool:
self.len += 1
formatted_message = self.formatter.format(message)
formatted_message = f'{formatted_message}{message.end}'
formatted_message = f'{self.formatter.format(message)}{message.end}'
self.cache_stream.write(formatted_message)
if message.flush or self.len >= self.file_conf.file_cache_len:
if not self.flush():
@ -729,7 +734,8 @@ class Logger:
file_conf: Optional[List[LogFileCache]] = None,
colors: Optional[Dict[Union[int, str],
Dict[str, str]]] = None,
formats=None) -> None:
formats=None
) -> None:
"""
配置模式: 使用 kwargs 配置
:param name: logger 名称 默认为 root
@ -771,11 +777,7 @@ class Logger:
return True
def enabled_for(self, level: logging_level_type) -> bool:
if not self.enable:
return False
if level < self.min_level:
return False
return True
return level >= self.min_level if self.enable else False
# def format_time(self, input_time: Optional[float] = None) -> Dict[str, str]:
# # 毫秒
@ -794,11 +796,21 @@ class Logger:
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
if frame is None:
if (frame := inspect.currentframe()) is not None:
frame = frame if frame.f_back is None else frame.f_back if frame.f_back.f_back is None else frame.f_back.f_back
message = Message_content(log_time=time.time(), text=sep.join(i if type(i) is str else str(i) for i in values),
frame = (
frame
if frame.f_back is None
else (
frame.f_back
if frame.f_back.f_back is None
else frame.f_back.f_back
)
)
message = Message_content(log_time=time.time(),
text=sep.join(i if type(i) is str else str(i) for i in values),
level=level, logger_name=self.name, marker=marker, end=end, flush=flush, frame=frame)
# 调用 steams
@ -816,36 +828,52 @@ class Logger:
file: LogFileCache
if level < file.level:
continue
file.write_logs(
f"{re.sub(re_find_color_code, '', print_text)}{end}", flush=flush)
file.write_logs(f"{re.sub(re_find_color_code, '', print_text)}{end}", flush=flush)
return print_text
def format_text(self, level: int, text: str, frame: Optional[FrameType]) -> str:
level_with_color = f"[{self.colors[get_name_by_level(level)].get('info')}{get_name_by_level(level)}{color_reset_suffix}]"
level_with_color = (
f"[{self.colors[get_name_by_level(level)].get('info')}"
f"{get_name_by_level(level)}{color_reset_suffix}]"
)
level_with_color = f"{level_with_color}{' ' * (9 - len_without_color_maker(level_with_color))}"
formats = self.formats.copy()
if frame is not None:
formats['file_name'] = f"{self.colors[get_name_by_level(level)].get('file_name', self.colors['file_name'])}{os.path.split(frame.f_code.co_filename)[-1]}{color_reset_suffix}"
formats['code_line'] = f"{self.colors[get_name_by_level(level)].get('code_line', self.colors['code_line'])}{frame.f_lineno}{color_reset_suffix}"
formats['logger_name'] = f'{self.colors[get_name_by_level(level)].get("logger", self.colors["logger"])}{self.name}{color_reset_suffix}'
formats['file_name'] = (
f"{self.colors[get_name_by_level(level)].get('file_name', self.colors['file_name'])}"
f"{os.path.split(frame.f_code.co_filename)[-1]}{color_reset_suffix}"
)
formats['code_line'] = (
f"{self.colors[get_name_by_level(level)].get('code_line', self.colors['code_line'])}"
f"{frame.f_lineno}{color_reset_suffix}"
)
formats['logger_name'] = (
f'{self.colors[get_name_by_level(level)].get("logger", self.colors["logger"])}'
f'{self.name}{color_reset_suffix}'
)
now_time = str(time.time())
for key, value in formats.items():
if isinstance(value, dict):
if 'strftime' in value:
value['strftime']: str
formats[
key] = f"{self.colors[get_name_by_level(level)].get(key, self.colors[key])}{time.strftime(value['strftime'].replace('%%S', now_time[now_time.find('.') + 1:now_time.find('.') + 5]))}{color_reset_suffix}"
print_text = self.formats['MESSAGE']['format'].format(level_with_color=level_with_color,
level=level_with_color, message=text,
**formats)
return print_text
if isinstance(value, dict) and 'strftime' in value:
value['strftime']: str
formats[key] = (
f"{self.colors[get_name_by_level(level)].get(key, self.colors[key])}"
f"{time.strftime(value['strftime'].replace('%%S', now_time[now_time.find('.') + 1:now_time.find('.') + 5]))}"
f"{color_reset_suffix}"
)
return self.formats['MESSAGE']['format'].format(
level_with_color=level_with_color,
level=level_with_color,
message=text,
**formats
)
def trace(self, *values: object,
marker: Optional[str] = None,
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
return self.make_log(*values, level=LoggingLevel.TRACE, marker=marker, sep=sep, end=end, flush=flush,
frame=frame)
@ -854,7 +882,8 @@ class Logger:
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
return self.make_log(*values, level=LoggingLevel.FINE, marker=marker, sep=sep, end=end, flush=flush,
frame=frame)
@ -863,7 +892,8 @@ class Logger:
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
return self.make_log(*values, level=LoggingLevel.DEBUG, marker=marker, sep=sep, end=end, flush=flush,
frame=frame)
@ -872,7 +902,8 @@ class Logger:
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
return self.make_log(*values, level=LoggingLevel.INFO, marker=marker, sep=sep, end=end, flush=flush,
frame=frame)
@ -881,7 +912,8 @@ class Logger:
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
return self.make_log(*values, level=LoggingLevel.WARNING, marker=marker, sep=sep, end=end, flush=flush,
frame=frame)
@ -890,7 +922,8 @@ class Logger:
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
return self.make_log(*values, level=LoggingLevel.ERROR, marker=marker, sep=sep, end=end, flush=flush,
frame=frame)
@ -899,7 +932,8 @@ class Logger:
sep: Optional[str] = ' ',
end: Optional[str] = '\n',
flush: Optional[bool] = False,
frame: Optional[FrameType] = None) -> Optional[str]:
frame: Optional[FrameType] = None
) -> Optional[str]:
return self.make_log(*values, level=LoggingLevel.FATAL, marker=marker, sep=sep, end=end, flush=flush,
frame=frame)
@ -934,11 +968,11 @@ def format_str(text: str) -> str:
formats = logger_configs['Formatter'].copy()
now_time = str(time.time())
for key, value in formats.items():
if isinstance(value, dict):
if 'strftime' in value:
value['strftime']: str
formats[key] = time.strftime(value['strftime'].replace(
'%%S', now_time[now_time.find('.') + 1:now_time.find('.') + 5]))
if isinstance(value, dict) and 'strftime' in value:
value['strftime']: str
formats[key] = (
time.strftime(value['strftime'].replace('%%S', now_time[now_time.find('.') + 1:now_time.find('.') + 5]))
)
return text.format(**formats)
@ -952,7 +986,8 @@ def gen_file_conf(file_name: str,
file_mode: str = 'a',
file_encoding: str = 'utf-8',
file_cache_len: int = 10,
file_cache_time: Union[int, float] = 1) -> dict:
file_cache_time: Union[int, float] = 1
) -> dict:
"""
生成一个文件配置
:param file_name: 日志文件名
@ -981,7 +1016,8 @@ def logger_with_default_settings(name: str,
level: int = DEBUG,
file_conf: Optional[dict] = None,
colors: Optional[dict] = None,
formats: Optional[dict] = None) -> Logger:
formats: Optional[dict] = None
) -> Logger:
return Logger(name=name,
level=level,
file_conf=[LogFileCache(gen_file_conf(**file_conf))],
@ -995,7 +1031,8 @@ def add_file_config(conf_name: str,
file_mode: str = 'a',
file_encoding: str = 'utf-8',
file_cache_len: int = 10,
file_cache_time: Union[int, float] = 1) -> None:
file_cache_time: Union[int, float] = 1
) -> None:
"""
logger config 里添加一个文件配置
:param conf_name: 文件配置名称
@ -1074,7 +1111,7 @@ if __name__ == "__main__":
a_logger.error('error haaaa')
a_logger.fatal('oh no')
logger.info('my name is:', logger.name)
for x in range(5):
for _ in range(5):
test_logger(logger)
test_logger(a_logger)
print(Message_content(log_time=time.time(), text='aaa', level=4, marker='abc', end='abc', flush=False,