remove lib-not-dr file and wait for git submodule

This commit is contained in:
shenjack 2023-11-18 23:06:10 +08:00
parent a204c82577
commit ba3918063d
Signed by: shenjack
GPG Key ID: 7B1134A979775551
18 changed files with 0 additions and 2376 deletions

View File

@ -1,7 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
__version__ = '0.1.8'

View File

@ -1,40 +0,0 @@
from dataclasses import dataclass, field
from typing import Set, List
class Parsed:
...
@dataclass
class Option:
name: str
shortcuts: List[str]
optional: bool
types: Set[type] = field(default_factory=lambda: {str})
@dataclass
class OptionGroup:
options: List[Option]
optional: bool = True
exclusive: bool = False
@dataclass
class Argument:
name: str
types: Set[type] = field(default_factory=lambda: {str})
@dataclass
class Flag:
name: str
shortcuts: List[str]
@dataclass
class FlagGroup:
flags: List[Flag]
exclusive: bool = False

View File

@ -1,14 +0,0 @@
class CallBackDescriptor:
def __init__(self, name):
self.callback_name = name
def __set__(self, instance, value):
assert getattr(instance, self.callback_name) is None, f"Attribute '{self.callback_name}' has been set."
instance.__dict__[self.callback_name] = value
def __get__(self, instance, owner):
return (
self
if instance is None
else instance.__dict__.get(self.callback_name)
)

View File

@ -1,2 +0,0 @@
class IllegalName(Exception):
"""名称或快捷名不合法"""

View File

@ -1,130 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import re
from typing import Callable, List, Optional, Union, Set
from .data import Option, Argument, Flag, Parsed
from .descriptor import CallBackDescriptor
try:
from typing import Self
except ImportError:
from typing import TypeVar
Self = TypeVar("Self") # NOQA
from .exception import IllegalName
CallBack = Union[Callable[[str], None], str] # Equals to `Callable[[str], None] | str`
# 可调用对象或字符串作为回调
# A callable or str as callback
ParseArgFunc = Callable[[str], Optional[type]]
# 解析参数的函数,返回值为 None 时表示解析失败
# function to parse argument, return None when failed
EMPTY_WORDS = re.compile(r"\s", re.I)
def check_name(name: Union[str, List[str]]) -> None:
"""
Check the name or shortcuts of argument(s) or flag(s).
The name must not be empty str, and must not contains \\t or \\n or \\f or \\r.
If that not satisfy the requirements, it will raise exception `IllegalArgumentName`.
检查 参数或标记 名称或快捷方式 是否符合要求
名称必须是非空的字符串且不能包含 \\t \\n \\f \\r
如果不符合要求将会抛出 `IllegalArgumentName` 异常
:param name: arguments
:return: None
"""
if isinstance(name, str) and EMPTY_WORDS.search(name):
raise IllegalName("The name of argument must not contains empty words.")
elif isinstance(name, list) and all((not isinstance(i, str)) and EMPTY_WORDS.search(i) for i in name):
raise IllegalName("The name of shortcut must be 'str', and must not contains empty words.")
else:
raise TypeError("The type of name must be 'str' or 'list[str]'.")
class Literal:
_tip = CallBackDescriptor("_tip")
_func = CallBackDescriptor("_func")
_err_callback = CallBackDescriptor("_err_callback")
def __init__(self, name: str):
self.name: str = name
self.sub: List[Self] = []
self._tip: Optional[CallBack] = None
self._func: Optional[CallBack] = None
self._err_callback: Optional[CallBack] = None
self._opts: List[Option] = []
self._args: List[Argument] = []
self._flags: List[Flag] = []
def __call__(self, *nodes) -> Self:
self.sub += nodes
return self
def __repr__(self):
attrs = (k for k in self.__dict__ if not (k.startswith("__") and k.endswith("__")))
return f"{self.__class__.__name__}({', '.join(f'{k}={v!r}' for k in attrs if (v := self.__dict__[k]))})"
def arg(self, name: str, types: Optional[Set[type]] = None) -> Self:
Argument(name=name, types=types)
return self
def opt(
self,
name: str,
shortcuts: Optional[List[str]] = None,
optional: bool = True,
types: Optional[Set[type]] = None
) -> Self:
check_name(name)
if shortcuts is not None and len(shortcuts) != 0:
check_name(shortcuts)
self._opts.append(
Option(name=name, shortcuts=shortcuts, optional=optional, types=types)
)
return self
def opt_group(self, opts: List[Option], exclusive: bool = False):
...
def flag(self, name: str, shortcuts: Optional[List[str]] = None) -> Self:
check_name(name)
if shortcuts is not None and len(shortcuts) != 0:
check_name(shortcuts)
Flag(name=name, shortcuts=shortcuts)
...
return self
def flag_group(self, flags: List[Flag], exclusive: bool = False) -> Self:
...
return self
def error(self, callback: CallBack) -> Self:
self._err_callback = callback
return self
def run(self, func: CallBack) -> Self:
self._func = func
return self
def tip(self, tip: CallBack) -> Self:
self._tip = tip
return self
def parse(self, cmd: Union[str, List[str]]) -> Parsed:
...
def to_doc(self) -> str:
...
def builder(node: Literal) -> Literal:
...

View File

@ -1,33 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import sys
from lib_not_dr.types.options import Options
COLOR_SUPPORT = True
if sys.platform == "win32":
try:
# https://stackoverflow.com/questions/36760127/...
# how-to-use-the-new-support-for-ansi-escape-sequences-in-the-windows-10-console
from ctypes import windll
kernel32 = windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
except OSError: # pragma: no cover
COLOR_SUPPORT = False
class LogLevel(Options):
name = 'LogLevel'
notset: int = 0
trace: int = 5
fine: int = 7
debug: int = 10
info: int = 20
warn: int = 30
error: int = 40
fatal: int = 50

View File

@ -1,285 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import time
from pathlib import Path
from string import Template
from typing import List, Union, Optional, Dict, Tuple, TYPE_CHECKING
from lib_not_dr.logger import LogLevel
from lib_not_dr.types.options import Options
from lib_not_dr.logger.structure import LogMessage, FormattingMessage
if TYPE_CHECKING:
from lib_not_dr.logger.formatter.colors import BaseColorFormatter
class BaseFormatter(Options):
name = 'BaseFormatter'
sub_formatter: List['BaseFormatter'] = []
color_formatters: List['BaseColorFormatter'] = []
default_template: str = '[${log_time}][${level}]|${logger_name}:${logger_tag}|${messages}'
@classmethod
def add_info(cls, match: str, to: str, description: str) -> str:
return f'- {to} -> ${{{match}}} : {description}'
@classmethod
def info(cls) -> str:
infos = {BaseFormatter.name: BaseFormatter._info()}
cache = ''
for formatter in cls.sub_formatter:
infos[formatter.name] = formatter._info()
infos[cls.name] = cls._info()
for name, info in infos.items():
cache += f"## {name}\n"
cache += info
cache += '\n'
return cache
@classmethod
def _info(cls) -> str:
info = cls.add_info('logger_name', 'logger name', 'The name of the logger')
info += '\n'
info += cls.add_info('logger_tag', 'logger tag', 'The tag of the logger')
return info
def format_message(self,
message: LogMessage,
template: Optional[Union[Template, str]] = None) -> str:
"""
Format message
:param message: 输入的消息
:param template: 日志输出模板
:return:
"""
basic_info = message.format_for_message()
message, info = self._format((message, basic_info))
if template is None:
template = Template(self.default_template)
elif isinstance(template, str):
template = Template(template)
try:
return template.substitute(**info)
except (KeyError, ValueError):
return template.safe_substitute(**info)
def _format(self, message: FormattingMessage) -> FormattingMessage:
"""
Format message
:param message:
:return:
"""
for formatter in self.sub_formatter:
message = formatter._format(message)
return message
@property
def template(self) -> str:
return self.default_template
@template.setter
def template(self, template: str) -> None:
if not isinstance(template, str):
raise TypeError(f'The template must be str, not {type(template)}')
self.default_template = template
class LevelFormatter(BaseFormatter):
name = 'LevelFormatter'
default_level: int = 20
# If True, the undefined level will be set to the higher nearest level.
level_get_higher: bool = True
level_name_map = {
LogLevel.notset: 'NOTSET',
LogLevel.trace: ' TRACE',
LogLevel.fine: ' FINE ',
LogLevel.debug: ' DEBUG',
LogLevel.info: ' INFO ',
LogLevel.warn: ' WARN ',
LogLevel.error: 'ERROR ',
LogLevel.fatal: 'FATAL ',
}
name_level_map = {
'NOTSET': LogLevel.notset,
' TRACE': LogLevel.trace,
' FINE ': LogLevel.fine,
' DEBUG': LogLevel.debug,
' INFO ': LogLevel.info,
' WARN ': LogLevel.warn,
'ERROR ': LogLevel.error,
'FATAL ': LogLevel.fatal,
}
@classmethod
def _info(cls) -> str:
return cls.add_info('level', 'log level', 'The log level')
def _format(self, message: FormattingMessage) -> FormattingMessage:
if message[0].level in self.name_level_map:
level_tag = self.level_name_map[message[0].level]
else:
if self.level_get_higher:
for level in self.name_level_map:
if message[0].level <= self.name_level_map[level]:
level_tag = level
break
else:
level_tag = 'FATAL'
else:
for level in self.name_level_map:
if message[0].level >= self.name_level_map[level]:
level_tag = level
break
else:
level_tag = 'NOTSET'
message[1]['level'] = level_tag
return message
class TraceFormatter(BaseFormatter):
name = 'TraceFormatter'
time_format: str = '%Y-%m-%d %H:%M:%S'
msec_time_format: str = '{}-{:03d}'
use_absolute_path: bool = False
@classmethod
def _info(cls) -> str:
info = cls.add_info('log_time', 'formatted time when logging', 'The time format string'
'. See https://docs.python.org/3/library/time'
'.html#time.strftime for more information.')
info += '\n'
info += cls.add_info('log_source', 'logging file', 'the logging file name')
info += '\n'
info += cls.add_info('log_line', 'logging line', 'the logging line number')
info += '\n'
info += cls.add_info('log_function', 'logging function', 'the logging function name')
return info
def _format(self, message: FormattingMessage) -> FormattingMessage:
message = self._time_format(message)
message = self._trace_format(message)
return message
def _time_format(self, message: FormattingMessage) -> FormattingMessage:
time_mark = time.localtime(message[0].log_time / 1000000000)
if self.msec_time_format:
time_mark = self.msec_time_format.format(time.strftime(self.time_format, time_mark),
message[0].create_msec_3)
message[1]['log_time'] = time_mark
return message
def _trace_format(self, message: FormattingMessage) -> FormattingMessage:
if message[0].stack_trace is None:
return message
path = Path(message[0].stack_trace.f_code.co_filename)
if self.use_absolute_path:
message[1]['log_source'] = path.absolute()
message[1]['log_source'] = path
message[1]['log_line'] = message[0].stack_trace.f_lineno
message[1]['log_function'] = message[0].stack_trace.f_code.co_name
return message
class StdFormatter(BaseFormatter):
name = 'StdFormatter'
enable_color: bool = True
sub_formatter: List[BaseFormatter] = [LevelFormatter(),
TraceFormatter()]
from lib_not_dr.logger.formatter.colors import (LevelColorFormatter,
LoggerColorFormatter,
TimeColorFormatter,
TraceColorFormatter,
MessageColorFormatter)
color_formatters: List[BaseFormatter] = [LevelColorFormatter(),
LoggerColorFormatter(),
TimeColorFormatter(),
TraceColorFormatter(),
MessageColorFormatter()]
def __init__(self,
enable_color: bool = True,
sub_formatter: Optional[List[BaseFormatter]] = None,
color_formatters: Optional[List[BaseFormatter]] = None,
**kwargs) -> None:
"""
Initialize the StdFormatter
:param enable_color: enable color
:param sub_formatter: list of sub formatter
:param color_formatters: list of color formatter
:param kwargs: other options
"""
# 同 structures.LogMessage.__init__ 的注释 (逃)
self.enable_color = enable_color
if sub_formatter is not None:
self.sub_formatter = sub_formatter
if color_formatters is not None:
self.color_formatters = color_formatters
super().__init__(**kwargs)
def _format(self, message: FormattingMessage) -> FormattingMessage:
super()._format(message)
if not self.enable_color:
return message
for formatter in self.color_formatters:
message = formatter._format(message)
return message
@classmethod
def _info(cls) -> str:
return 'None'
if __name__ == '__main__':
import inspect
log_message = LogMessage(messages=['Hello World!'],
level=7,
stack_trace=inspect.currentframe(),
logger_tag='tester',
logger_name='test')
print(LevelFormatter.info())
print(LevelFormatter().format_message(log_message))
print(TraceFormatter.info())
print(TraceFormatter().format_message(log_message))
print(StdFormatter.info())
print(StdFormatter().format_message(log_message))
std_format = StdFormatter()
std_format.default_template = "${log_time}|${logger_name}|${logger_tag}|${log_source}:${log_line}|${log_function}|${level}|${messages}"
test_levels = (0, 5, 7, 10, 20, 30, 40, 50)
print("with color")
for test_level in test_levels:
log_message.level = test_level
print(std_format.format_message(log_message), end='')
print("without color")
std_format.enable_color = False
for test_level in test_levels:
log_message.level = test_level
print(std_format.format_message(log_message), end='')
print(std_format.as_markdown())

View File

@ -1,256 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
from lib_not_dr.logger import LogLevel, COLOR_SUPPORT
from lib_not_dr.logger.formatter import BaseFormatter
from lib_not_dr.logger.structure import FormattingMessage
__all__ = [
'BaseColorFormatter',
'LevelColorFormatter',
'LoggerColorFormatter',
'TimeColorFormatter',
'TraceColorFormatter',
'MessageColorFormatter',
'RESET_COLOR'
]
RESET_COLOR = '\033[0m'
class BaseColorFormatter(BaseFormatter):
name = 'BaseColorFormatter'
# TODO 迁移老 logger 颜色
color = {
# Notset: just black
LogLevel.notset: '',
# Trace: blue
LogLevel.trace: '\033[38;2;138;173;244m',
# Fine: green
LogLevel.fine: '\033[0;32m',
# Debug: cyan
LogLevel.debug: '\033[0;36m',
# Info: white
LogLevel.info: '\033[0;37m',
# Warn: yellow
LogLevel.warn: '\033[0;33m',
# Error: red
LogLevel.error: '\033[0;31m',
# Fatal: red background
LogLevel.fatal: '\033[0;41m'
}
def get_color(self, message: FormattingMessage) -> str:
for level in self.color:
if message[0].level <= level:
break
else:
level = 90
return self.color[level]
class LevelColorFormatter(BaseColorFormatter):
name = 'LevelColorFormatter'
# TODO 迁移老 logger 颜色
color = {
# Notset: just black
LogLevel.notset: '',
# Trace: blue
LogLevel.trace: '\033[38;2;138;173;244m',
# Fine: green
LogLevel.fine: '\033[35;48;2;44;44;54m',
# Debug: cyan
LogLevel.debug: '\033[38;2;133;138;149m',
# Info: white
LogLevel.info: '\033[0;37m',
# Warn: yellow
LogLevel.warn: '\033[0;33m',
# Error: red
LogLevel.error: '\033[0;31m',
# Fatal: red background
LogLevel.fatal: '\033[0;41m'
}
@classmethod
def _info(cls) -> str:
return cls.add_info('colored level', 'level', 'A colored level')
def _format(self, message: FormattingMessage) -> FormattingMessage:
if isinstance(message[1].get('level'), int) or not COLOR_SUPPORT:
return message
# 获取颜色
color = self.get_color(message)
# 添加颜色
if color == '' or color == RESET_COLOR:
return message
message[1]['level'] = f'{color}{message[1]["level"]}{RESET_COLOR}'
return message
class LoggerColorFormatter(BaseColorFormatter):
name = 'LoggerColorFormatter'
# TODO 迁移老 logger 颜色
color = {
# Notset: just black
LogLevel.notset: '',
# Trace: blue
LogLevel.trace: '\033[38;2;138;173;244m',
# Fine: green
LogLevel.fine: '\033[0;32m',
# Debug: cyan
LogLevel.debug: '\033[0;36m',
# Info: white
LogLevel.info: '\033[0;37m',
# Warn: yellow
LogLevel.warn: '\033[0;33m',
# Error: red
LogLevel.error: '\033[0;31m',
# Fatal: red background
LogLevel.fatal: '\033[38;2;245;189;230m',
}
@classmethod
def _info(cls) -> str:
return cls.add_info('colored logger name', 'logger name', 'A colored logger name')
def _format(self, message: FormattingMessage) -> FormattingMessage:
if message[1].get('logger_name') is None or not COLOR_SUPPORT:
return message
# 获取颜色
color = self.get_color(message)
# 添加颜色
if color == '' or color == RESET_COLOR:
return message
message[1]['logger_name'] = f'{color}{message[1]["logger_name"]}{RESET_COLOR}'
if message[1].get('logger_tag') is not None and message[1].get('logger_tag') != ' ':
message[1]['logger_tag'] = f'{color}{message[1]["logger_tag"]}{RESET_COLOR}'
return message
class TimeColorFormatter(BaseColorFormatter):
name = 'TimeColorFormatter'
# TODO 迁移老 logger 颜色
color = {
# Notset: just black
LogLevel.notset: '',
# Trace: blue
LogLevel.trace: '\033[38;2;138;173;244m',
# Fine: green
LogLevel.fine: '\033[0;32m',
# Debug: cyan
LogLevel.debug: '\033[0;36m',
# Info: white
LogLevel.info: '\033[0;37m',
# Warn: yellow
LogLevel.warn: '\033[0;33m',
# Error: red
LogLevel.error: '\033[0;31m',
# Fatal: red background
LogLevel.fatal: '\033[38;2;255;255;0;48;2;120;10;10m',
}
@classmethod
def _info(cls) -> str:
return cls.add_info('colored time', 'time', 'A colored time')
def _format(self, message: FormattingMessage) -> FormattingMessage:
if message[1].get('log_time') is None or not COLOR_SUPPORT:
return message
# 获取颜色
color = self.get_color(message)
# 添加颜色
if color == '' or color == RESET_COLOR:
return message
message[1]['log_time'] = f'{color}{message[1]["log_time"]}{RESET_COLOR}'
return message
class TraceColorFormatter(BaseColorFormatter):
name = 'TraceColorFormatter'
# TODO 迁移老 logger 颜色
color = {
# Notset: just black
LogLevel.notset: '\033[38;2;0;255;180m',
# Trace: blue
LogLevel.trace: '\033[38;2;0;255;180m',
# Fine: green
LogLevel.fine: '\033[38;2;0;255;180m',
# Debug: cyan
LogLevel.debug: '\033[38;2;0;255;180m',
# Info: white
LogLevel.info: '\033[38;2;0;255;180m',
# Warn: yellow
LogLevel.warn: '\033[38;2;0;255;180m',
# Error: red
LogLevel.error: '\033[38;2;0;255;180m',
# Fatal: red background
LogLevel.fatal: '\033[38;2;255;255;0;48;2;120;10;10m',
}
@classmethod
def _info(cls) -> str:
info = cls.add_info('colored logging file', 'log_source', 'A colored logging file name')
info += '\n'
info += cls.add_info('colored logging line', 'log_line', 'A colored logging line number')
info += '\n'
info += cls.add_info('colored logging function', 'log_function', 'A colored logging function name')
return info
def _format(self, message: FormattingMessage) -> FormattingMessage:
if message[0].stack_trace is None or not COLOR_SUPPORT:
return message
# 获取颜色
color = self.get_color(message)
# 添加颜色
if color == '' or color == RESET_COLOR:
return message
message[1]['log_source'] = f'{color}{message[1]["log_source"]}{RESET_COLOR}'
message[1]['log_line'] = f'{color}{message[1]["log_line"]}{RESET_COLOR}'
message[1]['log_function'] = f'{color}{message[1]["log_function"]}{RESET_COLOR}'
return message
class MessageColorFormatter(BaseColorFormatter):
name = 'MessageColorFormatter'
color = {
# Notset: just black
LogLevel.notset: '',
# Trace: blue
LogLevel.trace: '\033[38;2;138;173;244m',
# Fine: blue
LogLevel.fine: '\033[38;2;138;173;244m',
# Debug: blue
LogLevel.debug: '\033[38;2;138;173;244m',
# Info: no color
LogLevel.info: '',
# Warn: yellow
LogLevel.warn: '\033[0;33m',
# Error: red
LogLevel.error: '\033[0;31m',
# Fatal: red background
LogLevel.fatal: '\033[38;2;255;255;0;48;2;120;10;10m',
}
@classmethod
def _info(cls) -> str:
return cls.add_info('colored message', 'message', 'A colored message')
def _format(self, message: FormattingMessage) -> FormattingMessage:
if message[1].get('messages') is None or not COLOR_SUPPORT:
return message
# 获取颜色
color = self.get_color(message)
# 添加颜色
if color == '' or color == RESET_COLOR:
return message
if message[1]['messages'][-1] == '\n':
message[1]['messages'] = f'{color}{message[1]["messages"][:-1]}{RESET_COLOR}\n'
else:
message[1]['messages'] = f'{color}{message[1]["messages"]}{RESET_COLOR}'
return message

View File

@ -1,267 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import time
import inspect
from types import FrameType
from typing import List, Optional
from lib_not_dr.logger import LogLevel
from lib_not_dr.types.options import Options
from lib_not_dr.logger.structure import LogMessage
from lib_not_dr.logger.outstream import BaseOutputStream, StdioOutputStream
class Logger(Options):
name = 'Logger-v2'
outputs: List[BaseOutputStream] = [StdioOutputStream()]
log_name: str = 'root'
enable: bool = True
level: int = 20 # info
def log_for(self, level: int) -> bool:
"""
Check if logging is enabled for a specific level.
Args:
level (int): The logging level to check.
Returns:
bool: True if logging is enabled for the given level, False otherwise.
"""
return self.enable and level >= self.level
def add_output(self, output: BaseOutputStream) -> None:
"""
Add an output to the list of outputs.
Args:
output (BaseOutputStream): The output to be added.
Returns:
None
"""
self.outputs.append(output)
self.level = min(self.level, output.level)
def remove_output(self, output: BaseOutputStream) -> None:
"""
Removes the specified output from the list of outputs.
Args:
output (BaseOutputStream): The output to be removed.
Returns:
None
"""
self.outputs.remove(output)
self.level = max(self.level, *[output.level for output in self.outputs])
@property
def global_level(self) -> int:
"""
Get the global logging level.
Returns:
int: The global logging level.
"""
return self.level
@global_level.setter
def global_level(self, level: int) -> None:
"""
Set the global logging level.
Args:
level (int): The global logging level.
Returns:
None
"""
self.level = level
for output in self.outputs:
output.level = level
def make_log(self,
messages: List[str],
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
level: int = 20, # info
# log_time: Optional[float] = None,
# logger_name: str = 'root',
# logger_tag: Optional[str] = None,
stack_trace: Optional[FrameType] = None) -> None:
# 检查是否需要记录
if not self.log_for(level):
return
log_time = time.time_ns()
# 处理堆栈信息
if stack_trace is None:
# 尝试获取堆栈信息
if (stack := inspect.currentframe()) is not None:
# 如果可能 尝试获取上两层的堆栈信息
if (up_stack := stack.f_back) is not None:
if (upper_stack := up_stack.f_back) is not None:
stack_trace = upper_stack
else:
stack_trace = up_stack
else:
stack_trace = stack
message = LogMessage(messages=messages,
end=end,
split=split,
flush=flush,
level=level,
log_time=log_time,
logger_name=self.log_name,
logger_tag=tag,
stack_trace=stack_trace)
if level >= 30: # WARN
for output in self.outputs:
output.write_stderr(message)
else:
for output in self.outputs:
output.write_stdout(message)
# done?
# 20231106 00:06
@staticmethod
def get_logger_by_name(name: str) -> 'Logger':
"""
Get a logger by name.
Args:
name (str): The name of the logger.
Returns:
Logger: The logger with the specified name.
"""
return Logger(log_name=name)
def info(self,
*message,
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
stack_trace: Optional[FrameType] = None) -> None:
if not self.log_for(LogLevel.info):
return
self.make_log(messages=list(message),
tag=tag,
end=end,
split=split,
flush=flush,
level=LogLevel.info,
stack_trace=stack_trace)
def trace(self,
*message,
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
stack_trace: Optional[FrameType] = None) -> None:
if not self.log_for(LogLevel.trace):
return
self.make_log(messages=list(message),
tag=tag,
end=end,
split=split,
flush=flush,
level=LogLevel.trace,
stack_trace=stack_trace)
def fine(self,
*message,
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
stack_trace: Optional[FrameType] = None) -> None:
if not self.log_for(LogLevel.fine):
return
self.make_log(messages=list(message),
tag=tag,
end=end,
split=split,
flush=flush,
level=LogLevel.fine,
stack_trace=stack_trace)
def debug(self,
*message,
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
stack_trace: Optional[FrameType] = None) -> None:
if not self.log_for(LogLevel.debug):
return
self.make_log(messages=list(message),
tag=tag,
end=end,
split=split,
flush=flush,
level=LogLevel.debug,
stack_trace=stack_trace)
def warn(self,
*message,
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
stack_trace: Optional[FrameType] = None) -> None:
if not self.log_for(LogLevel.warn):
return
self.make_log(messages=list(message),
tag=tag,
end=end,
split=split,
flush=flush,
level=LogLevel.warn,
stack_trace=stack_trace)
def error(self,
*message,
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
stack_trace: Optional[FrameType] = None) -> None:
if not self.log_for(LogLevel.error):
return
self.make_log(messages=list(message),
tag=tag,
end=end,
split=split,
flush=flush,
level=LogLevel.error,
stack_trace=stack_trace)
def fatal(self,
*message,
tag: Optional[str] = None,
end: str = '\n',
split: str = ' ',
flush: bool = True,
stack_trace: Optional[FrameType] = None) -> None:
if not self.log_for(LogLevel.fatal):
return
self.make_log(messages=list(message),
tag=tag,
end=end,
split=split,
flush=flush,
level=LogLevel.fatal,
stack_trace=stack_trace)

View File

@ -1,236 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import io
import sys
import time
import string
import atexit
import threading
from pathlib import Path
from typing import Optional
from lib_not_dr.logger import LogLevel
from lib_not_dr.types.options import Options
from lib_not_dr.logger.structure import LogMessage
from lib_not_dr.logger.formatter import BaseFormatter, StdFormatter
__all__ = [
'BaseOutputStream',
'StdioOutputStream',
'FileCacheOutputStream'
]
class BaseOutputStream(Options):
name = 'BaseOutputStream'
level: int = LogLevel.info
enable: bool = True
formatter: BaseFormatter
def write_stdout(self, message: LogMessage) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.write_stdout is not implemented')
def write_stderr(self, message: LogMessage) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.write_stderr is not implemented')
def flush(self) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.flush is not implemented')
def close(self) -> None:
self.enable = False
class StdioOutputStream(BaseOutputStream):
name = 'StdioOutputStream'
level: int = LogLevel.info
formatter: BaseFormatter = StdFormatter()
use_stderr: bool = True
def write_stdout(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
print(self.formatter.format_message(message), end='', flush=message.flush)
return None
def write_stderr(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
if self.use_stderr:
print(self.formatter.format_message(message), end='', flush=message.flush, file=sys.stderr)
else:
print(self.formatter.format_message(message), end='', flush=message.flush)
return None
def flush(self) -> None:
"""
flush stdout and stderr
:return: None
"""
print('', end='', flush=True)
print('', end='', flush=True, file=sys.stderr)
return None
class FileCacheOutputStream(BaseOutputStream):
name = 'FileCacheOutputStream'
level: int = LogLevel.info
formatter: BaseFormatter = StdFormatter(enable_color=False)
text_cache: io.StringIO = None
flush_counter: int = 0
# 默认 10 次 flush 一次
flush_count_limit: int = 10
flush_time_limit: int = 10 # time limit in sec, 0 means no limit
flush_timer: threading.Timer = None
file_path: Optional[Path] = Path('./logs')
file_name: str
# file mode: always 'a'
file_encoding: str = 'utf-8'
# do file swap or not
file_swap: bool = False
at_exit_register: bool = False
file_swap_counter: int = 0
file_swap_name_template: str = '${name}-${counter}.log'
# ${name} -> file_name
# ${counter} -> file_swap_counter
# ${log_time} -> time when file swap ( round(time.time()) )
# ${start_time} -> start time of output stream ( round(time.time()) )
current_file_name: str = None
file_start_time: int = None
# log file swap triggers
# 0 -> no limit
file_size_limit: int = 0 # size limit in kb
file_time_limit: int = 0 # time limit in sec 0
file_swap_on_both: bool = False # swap file when both size and time limit reached
def init(self, **kwargs) -> bool:
self.file_start_time = round(time.time())
if self.text_cache is None:
self.text_cache = io.StringIO()
self.get_file_path()
return False
def _write(self, message: LogMessage) -> None:
"""
write message to text cache
默认已经检查过了
:param message: message to write
:return: None
"""
self.text_cache.write(self.formatter.format_message(message))
self.flush_counter += 1
if message.flush or self.flush_counter >= self.flush_count_limit:
self.flush()
else:
if self.flush_time_limit > 0:
if self.flush_timer is None or not self.flush_timer.is_alive():
self.flush_timer = threading.Timer(self.flush_time_limit, self.flush)
self.flush_timer.daemon = True
self.flush_timer.start()
if not self.at_exit_register:
atexit.register(self.flush)
self.at_exit_register = True
return None
def write_stdout(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
self._write(message)
return None
def write_stderr(self, message: LogMessage) -> None:
if not self.enable:
return None
if message.level < self.level:
return None
self._write(message)
return None
def get_file_path(self) -> Path:
"""
get file path
:return:
"""
if (current_file := self.current_file_name) is None:
if not self.file_swap:
# 直接根据 file name 生成文件
current_file = Path(self.file_path) / self.file_name
self.current_file_name = str(current_file)
return current_file
template = string.Template(self.file_swap_name_template)
file_name = template.safe_substitute(name=self.file_name,
counter=self.file_swap_counter,
log_time=round(time.time()),
start_time=self.file_start_time)
current_file = Path(self.file_path) / file_name
self.current_file_name = str(current_file)
else:
current_file = Path(current_file)
return current_file
def check_flush(self) -> Path:
current_file = self.get_file_path()
# 获取当前文件的路径
if not self.file_swap:
# 不需要 swap 直接返回
return current_file
# 检查是否需要 swap
size_pass = True
if self.file_size_limit > 0:
file_size = current_file.stat().st_size / 1024 # kb
if file_size > self.file_size_limit: # kb
size_pass = False
time_pass = True
if self.file_time_limit > 0:
file_time = round(time.time()) - current_file.stat().st_mtime
if file_time > self.file_time_limit:
time_pass = False
if (self.file_swap_on_both and size_pass and time_pass) or \
(not self.file_swap_on_both and (size_pass or time_pass)):
# 两个都满足
# 或者只有一个满足
if size_pass and time_pass:
self.file_swap_counter += 1
# 生成新的文件名
return self.get_file_path()
def flush(self) -> None:
new_cache = io.StringIO() # 创建新的缓存
self.flush_counter = 0 # atomic, no lock
old_cache, self.text_cache = self.text_cache, new_cache
text = old_cache.getvalue()
old_cache.close() # 关闭旧的缓存
if text == '':
return None
current_file = self.check_flush()
if not current_file.exists():
current_file.parent.mkdir(parents=True, exist_ok=True)
current_file.touch(exist_ok=True)
with current_file.open('a', encoding=self.file_encoding) as f:
f.write(text)
return None
def close(self) -> None:
super().close()
self.flush()
self.text_cache.close()
atexit.unregister(self.flush)
return None

View File

@ -1,12 +0,0 @@
# Logger
```
[App -> Logger +-> Handler(Formatter)] -> Queue(string io) -> [File Output] ?
|-> Handler(Formatter) -> Console Output]
```
```
[App -> Logger] -> Queue(raw log) -> [Facade +-> Handler(Formatter) -> File Output ] ?
[ |-> Handler(Formatter) -> Console Output]
```

View File

@ -1,99 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import time
from pathlib import Path
from types import FrameType
from typing import List, Optional, Tuple, Dict, Union
from lib_not_dr.types.options import Options
__all__ = ['LogMessage',
'FormattingMessage']
class LogMessage(Options):
name = 'LogMessage'
# 消息内容本身的属性
messages: List[str] = []
end: str = '\n'
split: str = ' '
# 消息的属性
flush: bool = None
level: int = 20
log_time: float = None # time.time_ns()
logger_name: str = 'root'
logger_tag: Optional[str] = None
stack_trace: Optional[FrameType] = None
def __init__(self,
messages: Optional[List[str]] = None,
end: Optional[str] = '\n',
split: Optional[str] = ' ',
flush: Optional[bool] = None,
level: Optional[int] = 20,
log_time: Optional[float] = None,
logger_name: Optional[str] = 'root',
logger_tag: Optional[str] = None,
stack_trace: Optional[FrameType] = None,
**kwargs) -> None:
"""
Init for LogMessage
:param messages: message list for log
:param end: end of message
:param split: split for messages
:param flush: do flush or not
:param level: level of message
:param log_time: time of message (default: time.time_ns())
:param logger_name: name of logger
:param logger_tag: tag of logger
:param stack_trace: stack trace of logger
:param kwargs: other options
"""
# 为了方便使用 单独覆盖了 __init__ 方法来提供代码补全的选项
super().__init__(messages=messages,
end=end,
split=split,
flush=flush,
level=level,
log_time=log_time,
logger_name=logger_name,
logger_tag=logger_tag,
stack_trace=stack_trace,
**kwargs)
def init(self, **kwargs) -> bool:
if self.log_time is None:
self.log_time = time.time_ns()
if not isinstance(self.flush, bool) and self.flush is not None:
self.flush = True if self.flush else False
return False
def format_message(self) -> str:
return self.split.join(self.messages) + self.end
def format_for_message(self) -> Dict[str, str]:
basic_info = self.option()
if self.logger_tag is None:
basic_info['logger_tag'] = ' '
basic_info['messages'] = self.format_message()
return basic_info
@property
def create_msec_3(self) -> int:
return int(self.log_time / 1000000) % 1000
FormattingMessage = Tuple[LogMessage, Dict[str, Union[str, Path]]]
if __name__ == '__main__':
print(LogMessage().as_markdown())

View File

@ -1,472 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import platform
import warnings
from pathlib import Path
from typing import List, Tuple, Optional, Union, Any
from enum import Enum
from lib_not_dr.types import Options, Version, VersionRequirement
def ensure_cmd_readable(cmd: str) -> str:
"""
保证 参数中 不含空格
:param cmd: 要格式化的命令行参数
:return: 格式化后的命令行参数
"""
if ' ' in str(cmd):
return f'"{cmd}"'
return cmd
def format_cmd(arg_name: Optional[str] = None,
arg_value: Optional[Union[str, List[str]]] = None,
write: Optional[Any] = True) -> List[str]:
"""
用来格式化输出命令行参数
:param arg_name: 类似 --show-memory 之类的主项
:param arg_value: 类似 xxx 类的内容
:param write: 是否写入
:return: 直接拼接好的命令行参数 不带 =
"""
if not write:
return []
if arg_name is None:
return []
if arg_value is None:
return [arg_name]
if isinstance(arg_value, list):
arg_value = ','.join([ensure_cmd_readable(value) for value in arg_value])
return [f'{arg_name}{arg_value}']
arg_value = ensure_cmd_readable(arg_value)
return [f'{arg_name}{arg_value}']
class NuitkaSubConfig(Options):
"""
Nuitka 配置的子项
Nuitka configuration sub-items
"""
name = 'Nuitka Sub Configuration'
def gen_cmd(self) -> List[str]:
"""
生成命令行参数
:return:
"""
raise NotImplementedError
class NuitkaPluginConfig(NuitkaSubConfig):
"""
控制 nuitka plugin 相关参数的部分
Control part of nuitka's plugin related parameters
"""
name = 'Nuitka Plugin Configuration'
# --enable-plugin=PLUGIN_NAME
enable_plugin: List[str] = []
# --disable-plugin=PLUGIN_NAME
disable_plugin: List[str] = []
# --plugin-no-detection
plugin_no_detection: bool = False
# --user-plugin=PATH
user_plugin: List[Path] = []
# --show-source-changes
show_source_changes: bool = False
# --include-plugin-directory=MODULE/PACKAGE
include_plugin_dir: List[str] = []
# --include-plugin-files=PATTERN
include_plugin_files: List[str] = []
def gen_cmd(self) -> List[str]:
lst = []
lst += format_cmd('--enable-plugin=', self.enable_plugin, self.enable_plugin)
lst += format_cmd('--disable-plugin=', self.disable_plugin, self.disable_plugin)
lst += format_cmd('--plugin-no-detection' if self.plugin_no_detection else None)
lst += format_cmd('--user-plugin=', [str(plugin.absolute()) for plugin in self.user_plugin], self.user_plugin)
lst += format_cmd('--show-source-changes' if self.show_source_changes else None)
lst += format_cmd('--include-plugin-directory=', self.include_plugin_dir, self.include_plugin_dir)
lst += format_cmd('--include-plugin-files=', self.include_plugin_files, self.include_plugin_files)
return lst
class NuitkaIncludeConfig(NuitkaSubConfig):
"""
控制 nuitka include 数据 相关参数的部分
Control part of nuitka's include related parameters
"""
name = 'Nuitka Include Configuration'
# --include-package=PACKAGE
include_packages: List[str] = []
# --include-module=MODULE
include_modules: List[str] = []
# --prefer-source-code
# --no-prefer-source-code for --module
prefer_source_code: bool = False
# --follow-stdlib
follow_stdlib: bool = False
def gen_cmd(self) -> List[str]:
lst = []
lst += format_cmd('--include-package=', self.include_packages, self.include_packages)
lst += format_cmd('--include-module=', self.include_modules, self.include_modules)
lst += format_cmd('--prefer-source-code' if self.prefer_source_code else None)
lst += format_cmd('--no-prefer-source-code' if not self.prefer_source_code else None)
lst += format_cmd('--follow-stdlib' if self.follow_stdlib else None)
return lst
class NuitkaDataConfig(NuitkaSubConfig):
"""
控制 nuitka 数据 相关参数的部分
Control part of nuitka's data related parameters
"""
name = 'Nuitka Data Configuration'
# --include-package-data=PACKAGE=PACKAGE_PATH
include_package_data: List[Tuple[Path, Path]] = []
# --include-data-files=PATH=PATH
include_data_files: List[Tuple[Path, Path]] = []
# --include-data-dir=DIRECTORY=PATH
include_data_dir: List[Tuple[Path, Path]] = []
# --noinclude-data-files=PATH
no_include_data_files: List[Path] = []
# --list-package-data=LIST_PACKAGE_DATA
list_package_data: List[str] = []
# --list-package-dlls=LIST_PACKAGE_DLLS
list_package_dlls: List[str] = []
# --include-distribution-metadata=DISTRIBUTION
include_distribution_metadata: List[str] = []
class NuitkaBinaryInfo(Options):
"""
nuitka 构建的二进制文件的信息
nuitka build binary file information
"""
name = 'Nuitka Binary Info'
# --company-name=COMPANY_NAME
company_name: Optional[str] = None
# --product-name=PRODUCT_NAME
product_name: Optional[str] = None
# --file-version=FILE_VERSION
# --macos-app-version=MACOS_APP_VERSION
file_version: Optional[Union[str, Version]] = None
# --product-version=PRODUCT_VERSION
product_version: Optional[Union[str, Version]] = None
# --file-description=FILE_DESCRIPTION
file_description: Optional[str] = None
# --copyright=COPYRIGHT_TEXT
copyright: Optional[str] = None
# --trademarks=TRADEMARK_TEXT
trademarks: Optional[str] = None
# Icon
# --linux-icon=ICON_PATH
# --macos-app-icon=ICON_PATH
# --windows-icon-from-ico=ICON_PATH
# --windows-icon-from-exe=ICON_EXE_PATH
# 注意: 只有 Windows 下 才可以提供多个 ICO 文件
# 其他平台 和 EXE 下只会使用第一个路径
icon: Optional[List[Path]] = None
# Console
# --enable-console
# --disable-console
console: bool = True
# Windows UAC
# --windows-uac-admin
windows_uac_admin: bool = False
# --windows-uac-uiaccess
windows_uac_ui_access: bool = False
class NuitkaOutputConfig(Options):
"""
nuitka 构建的选项
nuitka build output information
"""
name = 'Nuitka Output Config'
# --output-dir=DIRECTORY
output_dir: Optional[Path] = None
# --output-filename=FILENAME
output_filename: Optional[str] = None
# --quiet
quiet: bool = False
# --no-progressbar
no_progressbar: bool = False
# --verbose
verbose: bool = False
# --verbose-output=PATH
verbose_output: Optional[Path] = None
# --show-progress
show_progress: bool = False
# --show-memory
show_memory: bool = False
# --show-scons
show_scons: bool = False
# --show-modules
show_modules: bool = False
# --show-modules-output=PATH
show_modules_output: Optional[Path] = None
# --xml=XML_FILENAME
xml: Optional[Path] = None
# --report=REPORT_FILENAME
report: Optional[Path] = None
# --report-diffable
report_diffable: bool = False
# --remove-output
remove_output: bool = False
# --no-pyo-file
no_pyo_file: bool = False
class NuitkaDebugConfig(Options):
"""
nuitka 构建的调试选项
nuikta build debug information
"""
name = 'Nuitka Debug Config'
# --debug
debug: bool = False
# --unstripped
strip: bool = True
# --profile
profile: bool = False
# --internal-graph
internal_graph: bool = False
# --trace-execution
trace_execution: bool = False
# --recompile-c-only
recompile_c_only: bool = False
# --generate-c-only
generate_c_only: bool = False
# --deployment
deployment: bool = False
# --no-deployment-flag=FLAG
deployment_flag: Optional[str] = None
# --experimental=FLAG
experimental: Optional[str] = None
class NuitkaTarget(Enum):
"""
用于指定 nuitka 构建的目标
Use to specify the target of nuitka build
exe: 不带任何参数
module: --module
standalone: --standalone
one_file: --onefile
"""
exe = ''
module = 'module'
standalone = 'standalone'
one_file = 'package'
class NuitkaScriptGenerator(Options):
"""
用于帮助生成 nuitka 构建脚本的类
Use to help generate nuitka build script
:arg main 需要编译的文件
"""
name = 'Nuitka Script Generator'
# --main=PATH
# 可以有多个 输入时需要包在列表里
main: List[Path]
# --run
run_after_build: bool = False
# --debugger
debugger: bool = False
# --execute-with-pythonpath
execute_with_python_path: bool = False
# --assume-yes-for-downloads
download_confirm: bool = True
# standalone/one_file/module/exe
target: NuitkaTarget = NuitkaTarget.exe
# --python-debug
python_debug: bool = False
# --python-flag=FLAG
python_flag: List[str] = []
# --python-for-scons=PATH
python_for_scons: Optional[Path] = None
class CompilerHelper(Options):
"""
用于帮助生成 nuitka 构建脚本的类
Use to help generate nuitka build script
"""
name = 'Nuitka Compiler Helper'
output_path: Path = Path('./build')
src_file: Path
python_cmd: str = 'python'
compat_nuitka_version: VersionRequirement = VersionRequirement("~1.8.0") # STATIC VERSION
# 以下为 nuitka 的参数
# nuitka options below
use_lto: bool = False # --lto=yes (no is faster)
use_clang: bool = True # --clang
use_msvc: bool = True # --msvc=latest
use_mingw: bool = False # --mingw64
onefile: bool = False # --onefile
onefile_tempdir: Optional[str] = '' # --onefile-tempdir-spec=
standalone: bool = True # --standalone
use_ccache: bool = True # not --disable-ccache
enable_console: bool = True # --enable-console / --disable-console
show_progress: bool = True # --show-progress
show_memory: bool = False # --show-memory
remove_output: bool = True # --remove-output
save_xml: bool = False # --xml
xml_path: Path = Path('build/compile_data.xml')
save_report: bool = False # --report
report_path: Path = Path('build/compile_report.xml')
download_confirm: bool = True # --assume-yes-for-download
run_after_build: bool = False # --run
company_name: Optional[str] = ''
product_name: Optional[str] = ''
file_version: Optional[Version] = None
product_version: Optional[Version] = None
file_description: Optional[str] = '' # --file-description
copy_right: Optional[str] = '' # --copyright
icon_path: Optional[Path] = None
follow_import: List[str] = []
no_follow_import: List[str] = []
include_data_dir: List[Tuple[str, str]] = []
include_packages: List[str] = []
enable_plugin: List[str] = [] # --enable-plugin=xxx,xxx
disable_plugin: List[str] = [] # --disable-plugin=xxx,xxx
def init(self, **kwargs) -> None:
if (compat_version := kwargs.get('compat_nuitka_version')) is not None:
if not self.compat_nuitka_version.accept(compat_version):
warnings.warn(
f"Nuitka version may not compat with {compat_version}\n"
"requirement: {self.compat_nuitka_version}"
)
# 非 windows 平台不使用 msvc
if platform.system() != 'Windows':
self.use_msvc = False
self.use_mingw = False
else:
self.use_mingw = self.use_mingw and not self.use_msvc
# Windows 平台下使用 msvc 时不使用 mingw
def __str__(self):
return self.as_markdown()
def as_markdown(self, longest: Optional[int] = None) -> str:
"""
输出编译器帮助信息
Output compiler help information
Args:
longest (Optional[int], optional):
输出信息的最大长度限制 The maximum length of output information.
Defaults to None.
Returns:
str: markdown 格式输出的编译器帮助信息
Compile helper information in markdown format
"""
front = super().as_markdown(longest)
gen_cmd = self.gen_subprocess_cmd()
return f"{front}\n\n```bash\n{' '.join(gen_cmd)}\n```"
def gen_subprocess_cmd(self) -> List[str]:
"""生成 nuitka 构建脚本
Generate nuitka build script
Returns:
List[str]:
生成的 nuitka 构建脚本
Generated nuitka build script
"""
cmd_list = [self.python_cmd, '-m', 'nuitka']
# macos 和 非 macos icon 参数不同
if platform.system() == 'Darwin':
cmd_list += format_cmd('--macos-app-version=', self.product_version, self.product_version)
cmd_list += format_cmd('--macos-app-icon=', self.icon_path.absolute(), self.icon_path)
elif platform.system() == 'Windows':
cmd_list += format_cmd('--windows-icon-from-ico=', self.icon_path.absolute(), self.icon_path)
elif platform.system() == 'Linux':
cmd_list += format_cmd('--linux-icon=', self.icon_path.absolute(), self.icon_path)
cmd_list += format_cmd('--lto=', 'yes' if self.use_lto else 'no')
cmd_list += format_cmd('--clang' if self.use_clang else None)
cmd_list += format_cmd('--msvc=latest' if self.use_msvc else None)
cmd_list += format_cmd('--mingw64' if self.use_mingw else None)
cmd_list += format_cmd('--standalone' if self.standalone else None)
cmd_list += format_cmd('--onefile' if self.onefile else None)
cmd_list += format_cmd('--onefile-tempdir-spec=', self.onefile_tempdir, self.onefile_tempdir)
cmd_list += format_cmd('--disable-ccache' if not self.use_ccache else None)
cmd_list += format_cmd('--show-progress' if self.show_progress else None)
cmd_list += format_cmd('--show-memory' if self.show_memory else None)
cmd_list += format_cmd('--remove-output' if self.remove_output else None)
cmd_list += format_cmd('--assume-yes-for-download' if self.download_confirm else None)
cmd_list += format_cmd('--run' if self.run_after_build else None)
cmd_list += format_cmd('--enable-console' if self.enable_console else '--disable-console')
cmd_list += format_cmd('--xml=', str(self.xml_path.absolute()), self.save_xml)
cmd_list += format_cmd('--report=', str(self.report_path.absolute()), self.save_report)
cmd_list += format_cmd('--output-dir=', str(self.output_path.absolute()), self.output_path)
cmd_list += format_cmd('--company-name=', self.company_name, self.company_name)
cmd_list += format_cmd('--product-name=', self.product_name, self.product_name)
cmd_list += format_cmd('--file-version=', str(self.file_version), self.file_version)
cmd_list += format_cmd('--product-version=', str(self.product_version), self.product_version)
cmd_list += format_cmd('--file-description=', self.file_description, self.file_description)
cmd_list += format_cmd('--copyright=', self.copy_right, self.copy_right)
cmd_list += format_cmd('--follow-import-to=', self.follow_import, self.follow_import)
cmd_list += format_cmd('--nofollow-import-to=', self.no_follow_import, self.no_follow_import)
cmd_list += format_cmd('--enable-plugin=', self.enable_plugin, self.enable_plugin)
cmd_list += format_cmd('--disable-plugin=', self.disable_plugin, self.disable_plugin)
if self.include_data_dir:
cmd_list += [f"--include-data-dir={src}={dst}" for src, dst in self.include_data_dir]
if self.include_packages:
cmd_list += [f"--include-package={package}" for package in self.include_packages]
cmd_list.append(f"--main={self.src_file}")
return cmd_list

View File

@ -1,31 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
from .options import (Options,
OptionsError,
OptionNotFound,
OptionNameNotDefined,
get_type_hints_)
from .version import (Version,
VersionRequirement,
VersionParsingError,
ExtraElement)
__all__ = [
# options
'get_type_hints_',
'Options',
'OptionsError',
'OptionNotFound',
'OptionNameNotDefined',
# version
'Version',
'VersionRequirement',
'VersionParsingError',
'ExtraElement'
]

View File

@ -1,272 +0,0 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import shutil
import traceback
from io import StringIO
from typing import get_type_hints, Type, List, Union, Dict, Any, Callable, Tuple, Optional, TYPE_CHECKING, Iterable
__all__ = [
'get_type_hints_',
'Options',
'OptionsError',
'OptionNotFound',
'OptionNameNotDefined'
]
def get_type_hints_(cls: Type):
try:
return get_type_hints(cls)
except ValueError:
return get_type_hints(cls, globalns={})
def to_str_value_(value: Any) -> Any:
"""递归的将输入值的每一个非 builtin type 转换成 str"""
if isinstance(value, (str, bytes, bytearray, int, float, bool, type(None))):
return value
elif isinstance(value, dict):
return {k: to_str_value_(v) for k, v in value.items()}
elif isinstance(value, (list, Iterable)):
return [to_str_value_(v) for v in value]
else:
return str(value)
class OptionsError(Exception):
""" option 的错误基类"""
class OptionNameNotDefined(OptionsError):
""" 向初始化的 option 里添加了一个不存在于选项里的选项 """
class OptionNotFound(OptionsError):
""" 某个选项没有找到 """
class Options:
"""
一个用于存储选项 / 提供 API 定义 的类
用法:
存储配置: 继承 Options
在类里定义 option: typing
(可选 定义 name: str = 'Option Base' 用于在打印的时候显示名字)
提供 API 接口: 继承 Options
在类里定义 option: typing
定义 一些需要的方法
子类: 继承 新的 Options
实现定义的方法
"""
name = 'Option Base'
cached_options: Dict[str, Union[str, Any]] = {}
_check_options: bool = True
def __init__(self, **kwargs):
"""
创建一个新的 Options 的时候的配置
如果存在 init 方法 会在设置完 kwargs 之后运行子类的 init 方法
:param kwargs: 需要设置的选项
"""
if TYPE_CHECKING:
self._options: Dict[str, Union[Callable, object]] = {}
self.flush_option()
for option, value in kwargs.items():
if option not in self.cached_options and self._check_options:
raise OptionNameNotDefined(f"option: {option} with value: {value} is not defined")
setattr(self, option, value)
run_load_file = True
if hasattr(self, 'init'):
run_load_file = self.init(**kwargs) # 默认 False/None
run_load_file = not run_load_file
if hasattr(self, 'load_file') and run_load_file:
try:
self.load_file()
except Exception:
traceback.print_exc()
self.flush_option()
def __str__(self):
return f"<{self.__class__.__name__} {self.name}>" if self.name else f"<{self.__class__.__name__}>"
def __repr__(self):
return self.__str__()
if TYPE_CHECKING:
_options: Dict[str, Union[Callable, object]] = {}
def init(self, **kwargs) -> bool:
""" 如果子类定义了这个函数,则会在 __init__ 之后调用这个函数
返回值为 True 则不会调用 load_file 函数
"""
def load_file(self) -> bool:
"""如果子类定义了这个函数,则会在 __init__ 和 init 之后再调用这个函数
请注意这个函数请尽量使用 try 包裹住可能出现错误的部分
否则会在控制台输出你的报错"""
return True
def option(self) -> Dict[str, Any]:
"""
获取配置类的所有配置
:return: 自己的所有配置
"""
values = {}
for ann in self.__annotations__: # 获取类型注释
values[ann] = getattr(self, ann, None)
if values[ann] is None:
values[ann] = self.__annotations__[ann]
if not hasattr(self, '_options'):
self._options: Dict[str, Union[Callable, object]] = {}
for option, a_fun in self._options.items(): # 获取额外内容
values[option] = a_fun
for option, a_fun in values.items(): # 检查是否为 property
if a_fun is bool and getattr(self, option, None) is not None:
values[option] = False
if isinstance(a_fun, property):
try:
values[option] = getattr(self, option)
except AttributeError:
raise OptionNotFound(f'Option {option} is not found in {self.name}') from None
return values
def str_option(self, shrink_to_long: Optional[int] = None) -> Dict[str, Union[str, Any]]:
"""
获取配置类的所有配置 并将所有非 BuiltIn 类型的值转换为 str
:return:
"""
raw_option = self.option()
str_option = to_str_value_(raw_option)
if shrink_to_long is None:
return str_option
if not isinstance(shrink_to_long, int) or shrink_to_long <= 0:
return str_option
for option, value in str_option.items():
if value is not None:
if len(str(value)) > shrink_to_long:
str_option[option] = str(value)[:shrink_to_long] + '...'
return str_option
def format(self, text: str) -> str:
"""
使用自己的选项给输入的字符串替换内容
:param text: 想替换的内容
:return: 替换之后的内容
"""
cache_option = self.flush_option()
for option, value in cache_option.items():
text = text.replace(f'{{{option}}}', str(value))
return text
def flush_option(self) -> Dict[str, Any]:
"""
刷新缓存 options 的内容
:return: 刷新过的 options
"""
self.cached_options = self.option()
return self.cached_options
def option_with_len(self) -> Tuple[List[Tuple[str, Any, Type]], int, int, int]:
"""
返回一个可以用于打印的 option 列表
:return:
"""
options = self.flush_option()
max_len_key = 1
max_len_value = 1
max_len_value_t = 1
option_list = []
for key, value in options.items():
value_t = type(value) if isinstance(value, type(value)) else type(value) # 判定这个类型 是不是 基本类型
max_len_key = max(max_len_key, len(key))
max_len_value = max(max_len_value, len(str(value)))
max_len_value_t = max(max_len_value_t, len(str(value_t)))
option_list.append([key, value, value_t])
return [option_list, max_len_key, max_len_value, max_len_value_t] # noqa
def as_markdown(self, longest: Optional[int] = None) -> str:
"""
返回一个 markdown 格式的 option 字符串
:param longest: 最长的输出长度
:return: markdown 格式的 option 字符串
"""
value = self.option_with_len()
cache = StringIO()
option_len = max(value[1], len('Option'))
value_len = max(value[2], len('Value'))
value_type_len = max(value[3], len('Value Type'))
# | Option | Value | Value Type |
shortest = len('Option | Value | Value Type')
if longest is not None:
console_width = max(longest, shortest)
else:
console_width = shutil.get_terminal_size(fallback=(100, 80)).columns
console_width = max(console_width, shortest)
# 为每一栏 预分配 1/3 或者 需要的宽度 (如果不需要 1/3)
option_len = min(option_len, console_width // 3)
value_len = min(value_len, console_width // 3)
value_type_len = min(value_type_len, console_width // 3)
# 先指定每一个列的输出最窄宽度, 然后去尝试增加宽度
# 循环分配新空间之前 首先检查是否已经不需要多分配 (and 后面)
while option_len + value_len + value_type_len + 16 < console_width\
and (option_len < value[1]
or value_len < value[2]
or value_type_len < value[3]):
# 每一个部分的逻辑都是
# 如果现在的输出长度小于原始长度
# 并且长度 + 1 之后的总长度依然在允许范围内
# 那么就 + 1
if option_len < value[1] and option_len + value_len + value_type_len + 16 < console_width:
option_len += 1
if value_len < value[2] and option_len + value_len + value_type_len + 16 < console_width:
value_len += 1
if value_type_len < value[3] and option_len + value_len + value_type_len + 16 < console_width:
value_type_len += 1
# 实际上 对于列表(可变对象) for 出来的这个值是一个引用
# 所以可以直接修改 string
for v in value[0]:
if len(str(v[0])) > option_len:
v[0] = f'{str(v[0])[:value_len - 3]}...'
if len(str(v[1])) > value_len:
v[1] = f'{str(v[1])[:value_len - 3]}...'
if len(str(v[2])) > value_type_len:
v[2] = f'{str(v[2])[:value_len - 3]}..'
cache.write(
f"| Option{' ' * (option_len - 3)}| Value{' ' * (value_len - 2)}| Value Type{' ' * (value_type_len - 7)}|\n")
cache.write(f'|:{"-" * (option_len + 3)}|:{"-" * (value_len + 3)}|:{"-" * (value_type_len + 3)}|\n')
for option, value, value_t in value[0]:
cache.write(f"| `{option}`{' ' * (option_len - len(option))} "
f"| `{value}`{' ' * (value_len - len(str(value)))} "
f"| `{value_t}`{' ' * (value_type_len - len(str(value_t)))} |\n")
result = cache.getvalue()
cache.close()
return result
@classmethod
def add_option(cls, name: str, value: Union[Callable, object]) -> Dict:
"""
向配置类中添加一个额外的配置
:param name: 配置的名字
:param value: 用于获取配置的函数或者类
:return: 配置类的所有配置
"""
if not hasattr(cls, '_options'):
cls._options: Dict[str, Union[Callable, object]] = {}
cls._options[name] = value
return cls._options
@staticmethod
def init_option(options_class: Type['Options'], init_value: Optional[dict] = None) -> 'Options':
return options_class(**init_value if init_value is not None else {})

View File

@ -1,220 +0,0 @@
# 本文件以 GNU Lesser General Public License v3.0GNU LGPL v3) 开源协议进行授权 (谢谢狐狸写出这么好的MCDR)
# 顺便说一句,我把所有的tab都改成了空格,因为我觉得空格比tab更好看(草,后半句是github copilot自动填充的)
"""
This part of code come from MCDReforged(https://github.com/Fallen-Breath/MCDReforged)
Thanks a lot to Fallen_Breath and MCDR contributors
GNU Lesser General Public License v3.0 (GNU LGPL v3)
"""
import re
from typing import List, Callable, Tuple, Optional, Union
"""
Plugin Version
"""
# beta.3 -> (beta, 3), random -> (random, None)
class ExtraElement:
DIVIDER = '.'
body: str
num: Optional[int]
def __init__(self, segment_str: str):
segments = segment_str.rsplit(self.DIVIDER, 1)
try:
self.body, self.num = segments[0], int(segments[1])
except (IndexError, ValueError):
self.body, self.num = segment_str, None
def __str__(self):
if self.num is None:
return self.body
return '{}{}{}'.format(self.body, self.DIVIDER, self.num)
def __lt__(self, other):
if not isinstance(other, type(self)):
raise TypeError()
if self.num is None or other.num is None:
return str(self) < str(other)
else:
return (self.body, self.num) < (other.body, other.num)
class Version:
"""
A version container that stores semver like version string
Example:
* ``"1.2.3"``
* ``"1.0.*"``
* ``"1.2.3-pre4+build.5"``
"""
EXTRA_ID_PATTERN = re.compile(r'|[-+0-9A-Za-z]+(\.[-+0-9A-Za-z]+)*')
WILDCARDS = ('*', 'x', 'X')
WILDCARD = -1
component: List[int]
has_wildcard: bool
pre: Optional[ExtraElement]
build: Optional[ExtraElement]
def __init__(self, version_str: str, *, allow_wildcard: bool = True):
"""
:param version_str: The version string to be parsed
:keyword allow_wildcard: If wildcard (``"*"``, ``"x"``, ``"X"``) is allowed. Default: ``True``
"""
if not isinstance(version_str, str):
raise VersionParsingError('Invalid input version string')
def separate_extra(text, char) -> Tuple[str, Optional[ExtraElement]]:
if char in text:
text, extra_str = text.split(char, 1)
if not self.EXTRA_ID_PATTERN.fullmatch(extra_str):
raise VersionParsingError('Invalid build string: ' + extra_str)
extra = ExtraElement(extra_str)
else:
extra = None
return text, extra
self.component = []
self.has_wildcard = False
version_str, self.build = separate_extra(version_str, '+')
version_str, self.pre = separate_extra(version_str, '-')
if len(version_str) == 0:
raise VersionParsingError('Version string is empty')
for comp in version_str.split('.'):
if comp in self.WILDCARDS:
self.component.append(self.WILDCARD)
self.has_wildcard = True
if not allow_wildcard:
raise VersionParsingError('Wildcard {} is not allowed'.format(comp))
else:
try:
num = int(comp)
except ValueError:
num = None
if num is None:
raise VersionParsingError('Invalid version number component: {}'.format(comp))
if num < 0:
raise VersionParsingError('Unsupported negatived number component: {}'.format(num))
self.component.append(num)
if len(self.component) == 0:
raise VersionParsingError('Empty version string')
def __str__(self):
version_str = '.'.join(map(lambda c: str(c) if c != self.WILDCARD else self.WILDCARDS[0], self.component))
if self.pre is not None:
version_str += '-' + str(self.pre)
if self.build is not None:
version_str += '+' + str(self.build)
return version_str
def __repr__(self):
return self.__str__()
def __getitem__(self, index: int) -> int:
if index < len(self.component):
return self.component[index]
else:
return self.WILDCARD if self.component[len(self.component) - 1] == self.WILDCARD else 0
def __lt__(self, other):
if not isinstance(other, Version):
raise TypeError('Cannot compare between instances of {} and {}'.format(Version.__name__, type(other).__name__))
for i in range(max(len(self.component), len(other.component))):
if self[i] == self.WILDCARD or other[i] == self.WILDCARD:
continue
if self[i] != other[i]:
return self[i] < other[i]
if self.pre is not None and other.pre is not None:
return self.pre < other.pre
elif self.pre is not None:
return not other.has_wildcard
elif other.pre is not None:
return False
else:
return False
def __eq__(self, other):
return not self < other and not other < self
def __le__(self, other):
return self == other or self < other
def compare_to(self, other):
if self < other:
return -1
elif self > other:
return 1
else:
return 0
DEFAULT_CRITERION_OPERATOR = '='
class Criterion:
def __init__(self, opt: str, base_version: Version, criterion: Callable[[Version, Version], bool]):
self.opt = opt
self.base_version = base_version
self.criterion = criterion
def test(self, target: Union[Version, str]):
return self.criterion(self.base_version, target)
def __str__(self):
return '{}{}'.format(self.opt if self.opt != DEFAULT_CRITERION_OPERATOR else '', self.base_version)
class VersionRequirement:
"""
A version requirement tester
It can test if a given :class:`Version` object matches its requirement
"""
CRITERIONS = {
'<=': lambda base, ver: ver <= base,
'>=': lambda base, ver: ver >= base,
'<': lambda base, ver: ver < base,
'>': lambda base, ver: ver > base,
'=': lambda base, ver: ver == base,
'^': lambda base, ver: ver >= base and ver[0] == base[0],
'~': lambda base, ver: ver >= base and ver[0] == base[0] and ver[1] == base[1],
}
def __init__(self, requirements: str):
"""
:param requirements: The requirement string, which contains several version predicates connected by space character.
e.g. ``">=1.0.x"``, ``"^2.9"``, ``">=1.2.0 <1.4.3"``
"""
if not isinstance(requirements, str):
raise VersionParsingError('Requirements should be a str, not {}'.format(type(requirements).__name__))
self.criterions = [] # type: List[Criterion]
for requirement in requirements.split(' '):
if len(requirement) > 0:
for prefix, func in self.CRITERIONS.items():
if requirement.startswith(prefix):
opt = prefix
base_version = requirement[len(prefix):]
break
else:
opt = DEFAULT_CRITERION_OPERATOR
base_version = requirement
self.criterions.append(Criterion(opt, Version(base_version), self.CRITERIONS[opt]))
def accept(self, version: Union[Version, str]):
if isinstance(version, str):
version = Version(version)
for criterion in self.criterions:
if not criterion.test(version):
return False
return True
def __str__(self):
return ' '.join(map(str, self.criterions))
class VersionParsingError(ValueError):
pass