diff --git a/libs/lib_not_dr/__init__.py b/libs/lib_not_dr/__init__.py deleted file mode 100644 index 9852726..0000000 --- a/libs/lib_not_dr/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -__version__ = '0.1.8' diff --git a/libs/lib_not_dr/command/__init__.py b/libs/lib_not_dr/command/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/libs/lib_not_dr/command/data.py b/libs/lib_not_dr/command/data.py deleted file mode 100644 index bfb5b3e..0000000 --- a/libs/lib_not_dr/command/data.py +++ /dev/null @@ -1,40 +0,0 @@ -from dataclasses import dataclass, field -from typing import Set, List - - -class Parsed: - ... - - -@dataclass -class Option: - name: str - shortcuts: List[str] - optional: bool - types: Set[type] = field(default_factory=lambda: {str}) - - -@dataclass -class OptionGroup: - options: List[Option] - optional: bool = True - exclusive: bool = False - - -@dataclass -class Argument: - name: str - types: Set[type] = field(default_factory=lambda: {str}) - - -@dataclass -class Flag: - name: str - shortcuts: List[str] - - -@dataclass -class FlagGroup: - flags: List[Flag] - exclusive: bool = False - diff --git a/libs/lib_not_dr/command/descriptor.py b/libs/lib_not_dr/command/descriptor.py deleted file mode 100644 index c94c2ff..0000000 --- a/libs/lib_not_dr/command/descriptor.py +++ /dev/null @@ -1,14 +0,0 @@ -class CallBackDescriptor: - def __init__(self, name): - self.callback_name = name - - def __set__(self, instance, value): - assert getattr(instance, self.callback_name) is None, f"Attribute '{self.callback_name}' has been set." - instance.__dict__[self.callback_name] = value - - def __get__(self, instance, owner): - return ( - self - if instance is None - else instance.__dict__.get(self.callback_name) - ) \ No newline at end of file diff --git a/libs/lib_not_dr/command/exception.py b/libs/lib_not_dr/command/exception.py deleted file mode 100644 index 2f79c39..0000000 --- a/libs/lib_not_dr/command/exception.py +++ /dev/null @@ -1,2 +0,0 @@ -class IllegalName(Exception): - """名称或快捷名不合法""" diff --git a/libs/lib_not_dr/command/nodes.py b/libs/lib_not_dr/command/nodes.py deleted file mode 100644 index 2f1158f..0000000 --- a/libs/lib_not_dr/command/nodes.py +++ /dev/null @@ -1,130 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -import re -from typing import Callable, List, Optional, Union, Set - -from .data import Option, Argument, Flag, Parsed -from .descriptor import CallBackDescriptor - -try: - from typing import Self -except ImportError: - from typing import TypeVar - Self = TypeVar("Self") # NOQA - -from .exception import IllegalName - -CallBack = Union[Callable[[str], None], str] # Equals to `Callable[[str], None] | str` -# 可调用对象或字符串作为回调 -# A callable or str as callback - -ParseArgFunc = Callable[[str], Optional[type]] -# 解析参数的函数,返回值为 None 时表示解析失败 -# function to parse argument, return None when failed - -EMPTY_WORDS = re.compile(r"\s", re.I) - - -def check_name(name: Union[str, List[str]]) -> None: - """ - Check the name or shortcuts of argument(s) or flag(s). - The name must not be empty str, and must not contains \\t or \\n or \\f or \\r. - If that not satisfy the requirements, it will raise exception `IllegalArgumentName`. - 检查 参数或标记 的 名称或快捷方式 是否符合要求。 - 名称必须是非空的字符串,且不能包含 \\t 或 \\n 或 \\f 或 \\r。 - 如果不符合要求,将会抛出 `IllegalArgumentName` 异常。 - :param name: arguments - :return: None - """ - if isinstance(name, str) and EMPTY_WORDS.search(name): - raise IllegalName("The name of argument must not contains empty words.") - elif isinstance(name, list) and all((not isinstance(i, str)) and EMPTY_WORDS.search(i) for i in name): - raise IllegalName("The name of shortcut must be 'str', and must not contains empty words.") - else: - raise TypeError("The type of name must be 'str' or 'list[str]'.") - - -class Literal: - _tip = CallBackDescriptor("_tip") - _func = CallBackDescriptor("_func") - _err_callback = CallBackDescriptor("_err_callback") - - def __init__(self, name: str): - self.name: str = name - self.sub: List[Self] = [] - self._tip: Optional[CallBack] = None - self._func: Optional[CallBack] = None - self._err_callback: Optional[CallBack] = None - - self._opts: List[Option] = [] - self._args: List[Argument] = [] - self._flags: List[Flag] = [] - - def __call__(self, *nodes) -> Self: - self.sub += nodes - return self - - def __repr__(self): - attrs = (k for k in self.__dict__ if not (k.startswith("__") and k.endswith("__"))) - return f"{self.__class__.__name__}({', '.join(f'{k}={v!r}' for k in attrs if (v := self.__dict__[k]))})" - - def arg(self, name: str, types: Optional[Set[type]] = None) -> Self: - Argument(name=name, types=types) - return self - - def opt( - self, - name: str, - shortcuts: Optional[List[str]] = None, - optional: bool = True, - types: Optional[Set[type]] = None - ) -> Self: - check_name(name) - if shortcuts is not None and len(shortcuts) != 0: - check_name(shortcuts) - self._opts.append( - Option(name=name, shortcuts=shortcuts, optional=optional, types=types) - ) - return self - - def opt_group(self, opts: List[Option], exclusive: bool = False): - ... - - def flag(self, name: str, shortcuts: Optional[List[str]] = None) -> Self: - check_name(name) - if shortcuts is not None and len(shortcuts) != 0: - check_name(shortcuts) - Flag(name=name, shortcuts=shortcuts) - ... - return self - - def flag_group(self, flags: List[Flag], exclusive: bool = False) -> Self: - - ... - return self - - def error(self, callback: CallBack) -> Self: - self._err_callback = callback - return self - - def run(self, func: CallBack) -> Self: - self._func = func - return self - - def tip(self, tip: CallBack) -> Self: - self._tip = tip - return self - - def parse(self, cmd: Union[str, List[str]]) -> Parsed: - ... - - def to_doc(self) -> str: - ... - - -def builder(node: Literal) -> Literal: - ... diff --git a/libs/lib_not_dr/logger/__init__.py b/libs/lib_not_dr/logger/__init__.py deleted file mode 100644 index a94671b..0000000 --- a/libs/lib_not_dr/logger/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- -import sys - -from lib_not_dr.types.options import Options - -COLOR_SUPPORT = True - -if sys.platform == "win32": - try: - # https://stackoverflow.com/questions/36760127/... - # how-to-use-the-new-support-for-ansi-escape-sequences-in-the-windows-10-console - from ctypes import windll - - kernel32 = windll.kernel32 - kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) - except OSError: # pragma: no cover - COLOR_SUPPORT = False - - -class LogLevel(Options): - name = 'LogLevel' - notset: int = 0 - trace: int = 5 - fine: int = 7 - debug: int = 10 - info: int = 20 - warn: int = 30 - error: int = 40 - fatal: int = 50 diff --git a/libs/lib_not_dr/logger/formatter/__init__.py b/libs/lib_not_dr/logger/formatter/__init__.py deleted file mode 100644 index 3f444db..0000000 --- a/libs/lib_not_dr/logger/formatter/__init__.py +++ /dev/null @@ -1,285 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- -import time - -from pathlib import Path -from string import Template -from typing import List, Union, Optional, Dict, Tuple, TYPE_CHECKING - -from lib_not_dr.logger import LogLevel -from lib_not_dr.types.options import Options -from lib_not_dr.logger.structure import LogMessage, FormattingMessage - -if TYPE_CHECKING: - from lib_not_dr.logger.formatter.colors import BaseColorFormatter - - -class BaseFormatter(Options): - name = 'BaseFormatter' - - sub_formatter: List['BaseFormatter'] = [] - color_formatters: List['BaseColorFormatter'] = [] - default_template: str = '[${log_time}][${level}]|${logger_name}:${logger_tag}|${messages}' - - @classmethod - def add_info(cls, match: str, to: str, description: str) -> str: - return f'- {to} -> ${{{match}}} : {description}' - - @classmethod - def info(cls) -> str: - infos = {BaseFormatter.name: BaseFormatter._info()} - cache = '' - for formatter in cls.sub_formatter: - infos[formatter.name] = formatter._info() - infos[cls.name] = cls._info() - for name, info in infos.items(): - cache += f"## {name}\n" - cache += info - cache += '\n' - return cache - - @classmethod - def _info(cls) -> str: - info = cls.add_info('logger_name', 'logger name', 'The name of the logger') - info += '\n' - info += cls.add_info('logger_tag', 'logger tag', 'The tag of the logger') - return info - - def format_message(self, - message: LogMessage, - template: Optional[Union[Template, str]] = None) -> str: - """ - Format message - :param message: 输入的消息 - :param template: 日志输出模板 - :return: - """ - basic_info = message.format_for_message() - message, info = self._format((message, basic_info)) - - if template is None: - template = Template(self.default_template) - elif isinstance(template, str): - template = Template(template) - - try: - return template.substitute(**info) - except (KeyError, ValueError): - return template.safe_substitute(**info) - - def _format(self, message: FormattingMessage) -> FormattingMessage: - """ - Format message - :param message: - :return: - """ - for formatter in self.sub_formatter: - message = formatter._format(message) - return message - - @property - def template(self) -> str: - return self.default_template - - @template.setter - def template(self, template: str) -> None: - if not isinstance(template, str): - raise TypeError(f'The template must be str, not {type(template)}') - self.default_template = template - - -class LevelFormatter(BaseFormatter): - name = 'LevelFormatter' - - default_level: int = 20 - - # If True, the undefined level will be set to the higher nearest level. - level_get_higher: bool = True - - level_name_map = { - LogLevel.notset: 'NOTSET', - LogLevel.trace: ' TRACE', - LogLevel.fine: ' FINE ', - LogLevel.debug: ' DEBUG', - LogLevel.info: ' INFO ', - LogLevel.warn: ' WARN ', - LogLevel.error: 'ERROR ', - LogLevel.fatal: 'FATAL ', - } - name_level_map = { - 'NOTSET': LogLevel.notset, - ' TRACE': LogLevel.trace, - ' FINE ': LogLevel.fine, - ' DEBUG': LogLevel.debug, - ' INFO ': LogLevel.info, - ' WARN ': LogLevel.warn, - 'ERROR ': LogLevel.error, - 'FATAL ': LogLevel.fatal, - } - - @classmethod - def _info(cls) -> str: - return cls.add_info('level', 'log level', 'The log level') - - def _format(self, message: FormattingMessage) -> FormattingMessage: - if message[0].level in self.name_level_map: - level_tag = self.level_name_map[message[0].level] - else: - if self.level_get_higher: - for level in self.name_level_map: - if message[0].level <= self.name_level_map[level]: - level_tag = level - break - else: - level_tag = 'FATAL' - else: - for level in self.name_level_map: - if message[0].level >= self.name_level_map[level]: - level_tag = level - break - else: - level_tag = 'NOTSET' - message[1]['level'] = level_tag - return message - - -class TraceFormatter(BaseFormatter): - name = 'TraceFormatter' - - time_format: str = '%Y-%m-%d %H:%M:%S' - msec_time_format: str = '{}-{:03d}' - use_absolute_path: bool = False - - @classmethod - def _info(cls) -> str: - info = cls.add_info('log_time', 'formatted time when logging', 'The time format string' - '. See https://docs.python.org/3/library/time' - '.html#time.strftime for more information.') - info += '\n' - info += cls.add_info('log_source', 'logging file', 'the logging file name') - info += '\n' - info += cls.add_info('log_line', 'logging line', 'the logging line number') - info += '\n' - info += cls.add_info('log_function', 'logging function', 'the logging function name') - return info - - def _format(self, message: FormattingMessage) -> FormattingMessage: - message = self._time_format(message) - message = self._trace_format(message) - return message - - def _time_format(self, message: FormattingMessage) -> FormattingMessage: - time_mark = time.localtime(message[0].log_time / 1000000000) - if self.msec_time_format: - time_mark = self.msec_time_format.format(time.strftime(self.time_format, time_mark), - message[0].create_msec_3) - message[1]['log_time'] = time_mark - return message - - def _trace_format(self, message: FormattingMessage) -> FormattingMessage: - if message[0].stack_trace is None: - return message - path = Path(message[0].stack_trace.f_code.co_filename) - if self.use_absolute_path: - message[1]['log_source'] = path.absolute() - message[1]['log_source'] = path - message[1]['log_line'] = message[0].stack_trace.f_lineno - message[1]['log_function'] = message[0].stack_trace.f_code.co_name - return message - - -class StdFormatter(BaseFormatter): - name = 'StdFormatter' - - enable_color: bool = True - - sub_formatter: List[BaseFormatter] = [LevelFormatter(), - TraceFormatter()] - from lib_not_dr.logger.formatter.colors import (LevelColorFormatter, - LoggerColorFormatter, - TimeColorFormatter, - TraceColorFormatter, - MessageColorFormatter) - color_formatters: List[BaseFormatter] = [LevelColorFormatter(), - LoggerColorFormatter(), - TimeColorFormatter(), - TraceColorFormatter(), - MessageColorFormatter()] - - def __init__(self, - enable_color: bool = True, - sub_formatter: Optional[List[BaseFormatter]] = None, - color_formatters: Optional[List[BaseFormatter]] = None, - **kwargs) -> None: - """ - Initialize the StdFormatter - :param enable_color: enable color - :param sub_formatter: list of sub formatter - :param color_formatters: list of color formatter - :param kwargs: other options - """ - # 同 structures.LogMessage.__init__ 的注释 (逃) - self.enable_color = enable_color - if sub_formatter is not None: - self.sub_formatter = sub_formatter - if color_formatters is not None: - self.color_formatters = color_formatters - super().__init__(**kwargs) - - def _format(self, message: FormattingMessage) -> FormattingMessage: - super()._format(message) - - if not self.enable_color: - return message - - for formatter in self.color_formatters: - message = formatter._format(message) - - return message - - @classmethod - def _info(cls) -> str: - return 'None' - - -if __name__ == '__main__': - import inspect - - log_message = LogMessage(messages=['Hello World!'], - level=7, - stack_trace=inspect.currentframe(), - logger_tag='tester', - logger_name='test') - - print(LevelFormatter.info()) - print(LevelFormatter().format_message(log_message)) - - print(TraceFormatter.info()) - print(TraceFormatter().format_message(log_message)) - - print(StdFormatter.info()) - print(StdFormatter().format_message(log_message)) - - std_format = StdFormatter() - std_format.default_template = "${log_time}|${logger_name}|${logger_tag}|${log_source}:${log_line}|${log_function}|${level}|${messages}" - - test_levels = (0, 5, 7, 10, 20, 30, 40, 50) - - print("with color") - - for test_level in test_levels: - log_message.level = test_level - print(std_format.format_message(log_message), end='') - - print("without color") - - std_format.enable_color = False - - for test_level in test_levels: - log_message.level = test_level - print(std_format.format_message(log_message), end='') - - print(std_format.as_markdown()) diff --git a/libs/lib_not_dr/logger/formatter/colors.py b/libs/lib_not_dr/logger/formatter/colors.py deleted file mode 100644 index 49996bc..0000000 --- a/libs/lib_not_dr/logger/formatter/colors.py +++ /dev/null @@ -1,256 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -from lib_not_dr.logger import LogLevel, COLOR_SUPPORT -from lib_not_dr.logger.formatter import BaseFormatter -from lib_not_dr.logger.structure import FormattingMessage - -__all__ = [ - 'BaseColorFormatter', - - 'LevelColorFormatter', - 'LoggerColorFormatter', - 'TimeColorFormatter', - 'TraceColorFormatter', - 'MessageColorFormatter', - - 'RESET_COLOR' -] - -RESET_COLOR = '\033[0m' - - -class BaseColorFormatter(BaseFormatter): - name = 'BaseColorFormatter' - # TODO 迁移老 logger 颜色 - color = { - # Notset: just black - LogLevel.notset: '', - # Trace: blue - LogLevel.trace: '\033[38;2;138;173;244m', - # Fine: green - LogLevel.fine: '\033[0;32m', - # Debug: cyan - LogLevel.debug: '\033[0;36m', - # Info: white - LogLevel.info: '\033[0;37m', - # Warn: yellow - LogLevel.warn: '\033[0;33m', - # Error: red - LogLevel.error: '\033[0;31m', - # Fatal: red background - LogLevel.fatal: '\033[0;41m' - } - - def get_color(self, message: FormattingMessage) -> str: - for level in self.color: - if message[0].level <= level: - break - else: - level = 90 - return self.color[level] - - -class LevelColorFormatter(BaseColorFormatter): - name = 'LevelColorFormatter' - # TODO 迁移老 logger 颜色 - color = { - # Notset: just black - LogLevel.notset: '', - # Trace: blue - LogLevel.trace: '\033[38;2;138;173;244m', - # Fine: green - LogLevel.fine: '\033[35;48;2;44;44;54m', - # Debug: cyan - LogLevel.debug: '\033[38;2;133;138;149m', - # Info: white - LogLevel.info: '\033[0;37m', - # Warn: yellow - LogLevel.warn: '\033[0;33m', - # Error: red - LogLevel.error: '\033[0;31m', - # Fatal: red background - LogLevel.fatal: '\033[0;41m' - } - - @classmethod - def _info(cls) -> str: - return cls.add_info('colored level', 'level', 'A colored level') - - def _format(self, message: FormattingMessage) -> FormattingMessage: - if isinstance(message[1].get('level'), int) or not COLOR_SUPPORT: - return message - # 获取颜色 - color = self.get_color(message) - # 添加颜色 - if color == '' or color == RESET_COLOR: - return message - message[1]['level'] = f'{color}{message[1]["level"]}{RESET_COLOR}' - return message - - -class LoggerColorFormatter(BaseColorFormatter): - name = 'LoggerColorFormatter' - # TODO 迁移老 logger 颜色 - color = { - # Notset: just black - LogLevel.notset: '', - # Trace: blue - LogLevel.trace: '\033[38;2;138;173;244m', - # Fine: green - LogLevel.fine: '\033[0;32m', - # Debug: cyan - LogLevel.debug: '\033[0;36m', - # Info: white - LogLevel.info: '\033[0;37m', - # Warn: yellow - LogLevel.warn: '\033[0;33m', - # Error: red - LogLevel.error: '\033[0;31m', - # Fatal: red background - LogLevel.fatal: '\033[38;2;245;189;230m', - } - - @classmethod - def _info(cls) -> str: - return cls.add_info('colored logger name', 'logger name', 'A colored logger name') - - def _format(self, message: FormattingMessage) -> FormattingMessage: - if message[1].get('logger_name') is None or not COLOR_SUPPORT: - return message - # 获取颜色 - color = self.get_color(message) - # 添加颜色 - if color == '' or color == RESET_COLOR: - return message - message[1]['logger_name'] = f'{color}{message[1]["logger_name"]}{RESET_COLOR}' - if message[1].get('logger_tag') is not None and message[1].get('logger_tag') != ' ': - message[1]['logger_tag'] = f'{color}{message[1]["logger_tag"]}{RESET_COLOR}' - return message - - -class TimeColorFormatter(BaseColorFormatter): - name = 'TimeColorFormatter' - # TODO 迁移老 logger 颜色 - color = { - # Notset: just black - LogLevel.notset: '', - # Trace: blue - LogLevel.trace: '\033[38;2;138;173;244m', - # Fine: green - LogLevel.fine: '\033[0;32m', - # Debug: cyan - LogLevel.debug: '\033[0;36m', - # Info: white - LogLevel.info: '\033[0;37m', - # Warn: yellow - LogLevel.warn: '\033[0;33m', - # Error: red - LogLevel.error: '\033[0;31m', - # Fatal: red background - LogLevel.fatal: '\033[38;2;255;255;0;48;2;120;10;10m', - } - - @classmethod - def _info(cls) -> str: - return cls.add_info('colored time', 'time', 'A colored time') - - def _format(self, message: FormattingMessage) -> FormattingMessage: - if message[1].get('log_time') is None or not COLOR_SUPPORT: - return message - # 获取颜色 - color = self.get_color(message) - # 添加颜色 - if color == '' or color == RESET_COLOR: - return message - message[1]['log_time'] = f'{color}{message[1]["log_time"]}{RESET_COLOR}' - return message - - -class TraceColorFormatter(BaseColorFormatter): - name = 'TraceColorFormatter' - # TODO 迁移老 logger 颜色 - color = { - # Notset: just black - LogLevel.notset: '\033[38;2;0;255;180m', - # Trace: blue - LogLevel.trace: '\033[38;2;0;255;180m', - # Fine: green - LogLevel.fine: '\033[38;2;0;255;180m', - # Debug: cyan - LogLevel.debug: '\033[38;2;0;255;180m', - # Info: white - LogLevel.info: '\033[38;2;0;255;180m', - # Warn: yellow - LogLevel.warn: '\033[38;2;0;255;180m', - # Error: red - LogLevel.error: '\033[38;2;0;255;180m', - # Fatal: red background - LogLevel.fatal: '\033[38;2;255;255;0;48;2;120;10;10m', - } - - @classmethod - def _info(cls) -> str: - info = cls.add_info('colored logging file', 'log_source', 'A colored logging file name') - info += '\n' - info += cls.add_info('colored logging line', 'log_line', 'A colored logging line number') - info += '\n' - info += cls.add_info('colored logging function', 'log_function', 'A colored logging function name') - return info - - def _format(self, message: FormattingMessage) -> FormattingMessage: - if message[0].stack_trace is None or not COLOR_SUPPORT: - return message - # 获取颜色 - color = self.get_color(message) - # 添加颜色 - if color == '' or color == RESET_COLOR: - return message - message[1]['log_source'] = f'{color}{message[1]["log_source"]}{RESET_COLOR}' - message[1]['log_line'] = f'{color}{message[1]["log_line"]}{RESET_COLOR}' - message[1]['log_function'] = f'{color}{message[1]["log_function"]}{RESET_COLOR}' - return message - - -class MessageColorFormatter(BaseColorFormatter): - name = 'MessageColorFormatter' - - color = { - # Notset: just black - LogLevel.notset: '', - # Trace: blue - LogLevel.trace: '\033[38;2;138;173;244m', - # Fine: blue - LogLevel.fine: '\033[38;2;138;173;244m', - # Debug: blue - LogLevel.debug: '\033[38;2;138;173;244m', - # Info: no color - LogLevel.info: '', - # Warn: yellow - LogLevel.warn: '\033[0;33m', - # Error: red - LogLevel.error: '\033[0;31m', - # Fatal: red background - LogLevel.fatal: '\033[38;2;255;255;0;48;2;120;10;10m', - } - - @classmethod - def _info(cls) -> str: - return cls.add_info('colored message', 'message', 'A colored message') - - def _format(self, message: FormattingMessage) -> FormattingMessage: - if message[1].get('messages') is None or not COLOR_SUPPORT: - return message - # 获取颜色 - color = self.get_color(message) - # 添加颜色 - if color == '' or color == RESET_COLOR: - return message - if message[1]['messages'][-1] == '\n': - message[1]['messages'] = f'{color}{message[1]["messages"][:-1]}{RESET_COLOR}\n' - else: - message[1]['messages'] = f'{color}{message[1]["messages"]}{RESET_COLOR}' - return message diff --git a/libs/lib_not_dr/logger/logger.py b/libs/lib_not_dr/logger/logger.py deleted file mode 100644 index 020edbc..0000000 --- a/libs/lib_not_dr/logger/logger.py +++ /dev/null @@ -1,267 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -import time -import inspect -from types import FrameType -from typing import List, Optional - -from lib_not_dr.logger import LogLevel -from lib_not_dr.types.options import Options -from lib_not_dr.logger.structure import LogMessage -from lib_not_dr.logger.outstream import BaseOutputStream, StdioOutputStream - - -class Logger(Options): - name = 'Logger-v2' - - outputs: List[BaseOutputStream] = [StdioOutputStream()] - - log_name: str = 'root' - - enable: bool = True - level: int = 20 # info - - def log_for(self, level: int) -> bool: - """ - Check if logging is enabled for a specific level. - - Args: - level (int): The logging level to check. - - Returns: - bool: True if logging is enabled for the given level, False otherwise. - """ - return self.enable and level >= self.level - - def add_output(self, output: BaseOutputStream) -> None: - """ - Add an output to the list of outputs. - - Args: - output (BaseOutputStream): The output to be added. - - Returns: - None - """ - self.outputs.append(output) - self.level = min(self.level, output.level) - - def remove_output(self, output: BaseOutputStream) -> None: - """ - Removes the specified output from the list of outputs. - - Args: - output (BaseOutputStream): The output to be removed. - - Returns: - None - """ - self.outputs.remove(output) - self.level = max(self.level, *[output.level for output in self.outputs]) - - @property - def global_level(self) -> int: - """ - Get the global logging level. - - Returns: - int: The global logging level. - """ - return self.level - - @global_level.setter - def global_level(self, level: int) -> None: - """ - Set the global logging level. - - Args: - level (int): The global logging level. - - Returns: - None - """ - self.level = level - for output in self.outputs: - output.level = level - - def make_log(self, - messages: List[str], - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - level: int = 20, # info - # log_time: Optional[float] = None, - # logger_name: str = 'root', - # logger_tag: Optional[str] = None, - stack_trace: Optional[FrameType] = None) -> None: - # 检查是否需要记录 - if not self.log_for(level): - return - log_time = time.time_ns() - # 处理堆栈信息 - if stack_trace is None: - # 尝试获取堆栈信息 - if (stack := inspect.currentframe()) is not None: - # 如果可能 尝试获取上两层的堆栈信息 - if (up_stack := stack.f_back) is not None: - if (upper_stack := up_stack.f_back) is not None: - stack_trace = upper_stack - else: - stack_trace = up_stack - else: - stack_trace = stack - - message = LogMessage(messages=messages, - end=end, - split=split, - flush=flush, - level=level, - log_time=log_time, - logger_name=self.log_name, - logger_tag=tag, - stack_trace=stack_trace) - if level >= 30: # WARN - for output in self.outputs: - output.write_stderr(message) - else: - for output in self.outputs: - output.write_stdout(message) - # done? - # 20231106 00:06 - - @staticmethod - def get_logger_by_name(name: str) -> 'Logger': - """ - Get a logger by name. - - Args: - name (str): The name of the logger. - - Returns: - Logger: The logger with the specified name. - """ - return Logger(log_name=name) - - def info(self, - *message, - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - stack_trace: Optional[FrameType] = None) -> None: - if not self.log_for(LogLevel.info): - return - self.make_log(messages=list(message), - tag=tag, - end=end, - split=split, - flush=flush, - level=LogLevel.info, - stack_trace=stack_trace) - - def trace(self, - *message, - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - stack_trace: Optional[FrameType] = None) -> None: - if not self.log_for(LogLevel.trace): - return - self.make_log(messages=list(message), - tag=tag, - end=end, - split=split, - flush=flush, - level=LogLevel.trace, - stack_trace=stack_trace) - - def fine(self, - *message, - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - stack_trace: Optional[FrameType] = None) -> None: - if not self.log_for(LogLevel.fine): - return - self.make_log(messages=list(message), - tag=tag, - end=end, - split=split, - flush=flush, - level=LogLevel.fine, - stack_trace=stack_trace) - - def debug(self, - *message, - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - stack_trace: Optional[FrameType] = None) -> None: - if not self.log_for(LogLevel.debug): - return - self.make_log(messages=list(message), - tag=tag, - end=end, - split=split, - flush=flush, - level=LogLevel.debug, - stack_trace=stack_trace) - - def warn(self, - *message, - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - stack_trace: Optional[FrameType] = None) -> None: - if not self.log_for(LogLevel.warn): - return - self.make_log(messages=list(message), - tag=tag, - end=end, - split=split, - flush=flush, - level=LogLevel.warn, - stack_trace=stack_trace) - - def error(self, - *message, - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - stack_trace: Optional[FrameType] = None) -> None: - if not self.log_for(LogLevel.error): - return - self.make_log(messages=list(message), - tag=tag, - end=end, - split=split, - flush=flush, - level=LogLevel.error, - stack_trace=stack_trace) - - def fatal(self, - *message, - tag: Optional[str] = None, - end: str = '\n', - split: str = ' ', - flush: bool = True, - stack_trace: Optional[FrameType] = None) -> None: - if not self.log_for(LogLevel.fatal): - return - self.make_log(messages=list(message), - tag=tag, - end=end, - split=split, - flush=flush, - level=LogLevel.fatal, - stack_trace=stack_trace) diff --git a/libs/lib_not_dr/logger/outstream.py b/libs/lib_not_dr/logger/outstream.py deleted file mode 100644 index df33209..0000000 --- a/libs/lib_not_dr/logger/outstream.py +++ /dev/null @@ -1,236 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -import io -import sys -import time -import string -import atexit -import threading - -from pathlib import Path -from typing import Optional - -from lib_not_dr.logger import LogLevel -from lib_not_dr.types.options import Options -from lib_not_dr.logger.structure import LogMessage -from lib_not_dr.logger.formatter import BaseFormatter, StdFormatter - -__all__ = [ - 'BaseOutputStream', - 'StdioOutputStream', - 'FileCacheOutputStream' -] - - -class BaseOutputStream(Options): - name = 'BaseOutputStream' - - level: int = LogLevel.info - enable: bool = True - - formatter: BaseFormatter - - def write_stdout(self, message: LogMessage) -> None: - raise NotImplementedError(f'{self.__class__.__name__}.write_stdout is not implemented') - - def write_stderr(self, message: LogMessage) -> None: - raise NotImplementedError(f'{self.__class__.__name__}.write_stderr is not implemented') - - def flush(self) -> None: - raise NotImplementedError(f'{self.__class__.__name__}.flush is not implemented') - - def close(self) -> None: - self.enable = False - - -class StdioOutputStream(BaseOutputStream): - name = 'StdioOutputStream' - - level: int = LogLevel.info - formatter: BaseFormatter = StdFormatter() - use_stderr: bool = True - - def write_stdout(self, message: LogMessage) -> None: - if not self.enable: - return None - if message.level < self.level: - return None - print(self.formatter.format_message(message), end='', flush=message.flush) - return None - - def write_stderr(self, message: LogMessage) -> None: - if not self.enable: - return None - if message.level < self.level: - return None - if self.use_stderr: - print(self.formatter.format_message(message), end='', flush=message.flush, file=sys.stderr) - else: - print(self.formatter.format_message(message), end='', flush=message.flush) - return None - - def flush(self) -> None: - """ - flush stdout and stderr - :return: None - """ - print('', end='', flush=True) - print('', end='', flush=True, file=sys.stderr) - return None - - -class FileCacheOutputStream(BaseOutputStream): - name = 'FileCacheOutputStream' - - level: int = LogLevel.info - formatter: BaseFormatter = StdFormatter(enable_color=False) - text_cache: io.StringIO = None - - flush_counter: int = 0 - # 默认 10 次 flush 一次 - flush_count_limit: int = 10 - flush_time_limit: int = 10 # time limit in sec, 0 means no limit - flush_timer: threading.Timer = None - - file_path: Optional[Path] = Path('./logs') - file_name: str - # file mode: always 'a' - file_encoding: str = 'utf-8' - # do file swap or not - file_swap: bool = False - at_exit_register: bool = False - - file_swap_counter: int = 0 - file_swap_name_template: str = '${name}-${counter}.log' - # ${name} -> file_name - # ${counter} -> file_swap_counter - # ${log_time} -> time when file swap ( round(time.time()) ) - # ${start_time} -> start time of output stream ( round(time.time()) ) - current_file_name: str = None - file_start_time: int = None - - # log file swap triggers - # 0 -> no limit - file_size_limit: int = 0 # size limit in kb - file_time_limit: int = 0 # time limit in sec 0 - file_swap_on_both: bool = False # swap file when both size and time limit reached - - def init(self, **kwargs) -> bool: - self.file_start_time = round(time.time()) - if self.text_cache is None: - self.text_cache = io.StringIO() - self.get_file_path() - return False - - def _write(self, message: LogMessage) -> None: - """ - write message to text cache - 默认已经检查过了 - :param message: message to write - :return: None - """ - self.text_cache.write(self.formatter.format_message(message)) - self.flush_counter += 1 - if message.flush or self.flush_counter >= self.flush_count_limit: - self.flush() - else: - if self.flush_time_limit > 0: - if self.flush_timer is None or not self.flush_timer.is_alive(): - self.flush_timer = threading.Timer(self.flush_time_limit, self.flush) - self.flush_timer.daemon = True - self.flush_timer.start() - if not self.at_exit_register: - atexit.register(self.flush) - self.at_exit_register = True - return None - - def write_stdout(self, message: LogMessage) -> None: - if not self.enable: - return None - if message.level < self.level: - return None - self._write(message) - return None - - def write_stderr(self, message: LogMessage) -> None: - if not self.enable: - return None - if message.level < self.level: - return None - self._write(message) - return None - - def get_file_path(self) -> Path: - """ - get file path - :return: - """ - if (current_file := self.current_file_name) is None: - if not self.file_swap: - # 直接根据 file name 生成文件 - current_file = Path(self.file_path) / self.file_name - self.current_file_name = str(current_file) - return current_file - template = string.Template(self.file_swap_name_template) - file_name = template.safe_substitute(name=self.file_name, - counter=self.file_swap_counter, - log_time=round(time.time()), - start_time=self.file_start_time) - current_file = Path(self.file_path) / file_name - self.current_file_name = str(current_file) - else: - current_file = Path(current_file) - return current_file - - def check_flush(self) -> Path: - current_file = self.get_file_path() - # 获取当前文件的路径 - if not self.file_swap: - # 不需要 swap 直接返回 - return current_file - # 检查是否需要 swap - size_pass = True - if self.file_size_limit > 0: - file_size = current_file.stat().st_size / 1024 # kb - if file_size > self.file_size_limit: # kb - size_pass = False - time_pass = True - if self.file_time_limit > 0: - file_time = round(time.time()) - current_file.stat().st_mtime - if file_time > self.file_time_limit: - time_pass = False - if (self.file_swap_on_both and size_pass and time_pass) or \ - (not self.file_swap_on_both and (size_pass or time_pass)): - # 两个都满足 - # 或者只有一个满足 - if size_pass and time_pass: - self.file_swap_counter += 1 - # 生成新的文件名 - return self.get_file_path() - - def flush(self) -> None: - new_cache = io.StringIO() # 创建新的缓存 - self.flush_counter = 0 # atomic, no lock - old_cache, self.text_cache = self.text_cache, new_cache - text = old_cache.getvalue() - old_cache.close() # 关闭旧的缓存 - if text == '': - return None - current_file = self.check_flush() - if not current_file.exists(): - current_file.parent.mkdir(parents=True, exist_ok=True) - current_file.touch(exist_ok=True) - with current_file.open('a', encoding=self.file_encoding) as f: - f.write(text) - return None - - def close(self) -> None: - super().close() - self.flush() - self.text_cache.close() - atexit.unregister(self.flush) - return None diff --git a/libs/lib_not_dr/logger/structer.md b/libs/lib_not_dr/logger/structer.md deleted file mode 100644 index d4b6647..0000000 --- a/libs/lib_not_dr/logger/structer.md +++ /dev/null @@ -1,12 +0,0 @@ -# Logger - -``` -[App -> Logger +-> Handler(Formatter)] -> Queue(string io) -> [File Output] ? - |-> Handler(Formatter) -> Console Output] -``` - -``` -[App -> Logger] -> Queue(raw log) -> [Facade +-> Handler(Formatter) -> File Output ] ? - [ |-> Handler(Formatter) -> Console Output] -``` - diff --git a/libs/lib_not_dr/logger/structure.py b/libs/lib_not_dr/logger/structure.py deleted file mode 100644 index b8ee936..0000000 --- a/libs/lib_not_dr/logger/structure.py +++ /dev/null @@ -1,99 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -import time - -from pathlib import Path -from types import FrameType -from typing import List, Optional, Tuple, Dict, Union - -from lib_not_dr.types.options import Options - -__all__ = ['LogMessage', - 'FormattingMessage'] - - -class LogMessage(Options): - name = 'LogMessage' - - # 消息内容本身的属性 - messages: List[str] = [] - end: str = '\n' - split: str = ' ' - - # 消息的属性 - flush: bool = None - level: int = 20 - log_time: float = None # time.time_ns() - logger_name: str = 'root' - logger_tag: Optional[str] = None - stack_trace: Optional[FrameType] = None - - def __init__(self, - messages: Optional[List[str]] = None, - end: Optional[str] = '\n', - split: Optional[str] = ' ', - flush: Optional[bool] = None, - level: Optional[int] = 20, - log_time: Optional[float] = None, - logger_name: Optional[str] = 'root', - logger_tag: Optional[str] = None, - stack_trace: Optional[FrameType] = None, - **kwargs) -> None: - """ - Init for LogMessage - :param messages: message list for log - :param end: end of message - :param split: split for messages - :param flush: do flush or not - :param level: level of message - :param log_time: time of message (default: time.time_ns()) - :param logger_name: name of logger - :param logger_tag: tag of logger - :param stack_trace: stack trace of logger - :param kwargs: other options - """ - # 为了方便使用 单独覆盖了 __init__ 方法来提供代码补全的选项 - super().__init__(messages=messages, - end=end, - split=split, - flush=flush, - level=level, - log_time=log_time, - logger_name=logger_name, - logger_tag=logger_tag, - stack_trace=stack_trace, - **kwargs) - - def init(self, **kwargs) -> bool: - if self.log_time is None: - self.log_time = time.time_ns() - if not isinstance(self.flush, bool) and self.flush is not None: - self.flush = True if self.flush else False - return False - - def format_message(self) -> str: - return self.split.join(self.messages) + self.end - - def format_for_message(self) -> Dict[str, str]: - basic_info = self.option() - - if self.logger_tag is None: - basic_info['logger_tag'] = ' ' - - basic_info['messages'] = self.format_message() - - return basic_info - - @property - def create_msec_3(self) -> int: - return int(self.log_time / 1000000) % 1000 - - -FormattingMessage = Tuple[LogMessage, Dict[str, Union[str, Path]]] - -if __name__ == '__main__': - print(LogMessage().as_markdown()) diff --git a/libs/lib_not_dr/nuitka/__init__.py b/libs/lib_not_dr/nuitka/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/libs/lib_not_dr/nuitka/compile.py b/libs/lib_not_dr/nuitka/compile.py deleted file mode 100644 index ad7b06b..0000000 --- a/libs/lib_not_dr/nuitka/compile.py +++ /dev/null @@ -1,472 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -import platform -import warnings -from pathlib import Path -from typing import List, Tuple, Optional, Union, Any -from enum import Enum - -from lib_not_dr.types import Options, Version, VersionRequirement - - -def ensure_cmd_readable(cmd: str) -> str: - """ - 保证 参数中 不含空格 - :param cmd: 要格式化的命令行参数 - :return: 格式化后的命令行参数 - """ - if ' ' in str(cmd): - return f'"{cmd}"' - return cmd - - -def format_cmd(arg_name: Optional[str] = None, - arg_value: Optional[Union[str, List[str]]] = None, - write: Optional[Any] = True) -> List[str]: - """ - 用来格式化输出命令行参数 - :param arg_name: 类似 --show-memory 之类的主项 - :param arg_value: 类似 xxx 类的内容 - :param write: 是否写入 - :return: 直接拼接好的命令行参数 不带 = - """ - if not write: - return [] - if arg_name is None: - return [] - if arg_value is None: - return [arg_name] - if isinstance(arg_value, list): - arg_value = ','.join([ensure_cmd_readable(value) for value in arg_value]) - return [f'{arg_name}{arg_value}'] - arg_value = ensure_cmd_readable(arg_value) - return [f'{arg_name}{arg_value}'] - - -class NuitkaSubConfig(Options): - """ - Nuitka 配置的子项 - Nuitka configuration sub-items - """ - name = 'Nuitka Sub Configuration' - - def gen_cmd(self) -> List[str]: - """ - 生成命令行参数 - :return: - """ - raise NotImplementedError - - -class NuitkaPluginConfig(NuitkaSubConfig): - """ - 控制 nuitka 的 plugin 相关参数的部分 - Control part of nuitka's plugin related parameters - """ - name = 'Nuitka Plugin Configuration' - - # --enable-plugin=PLUGIN_NAME - enable_plugin: List[str] = [] - # --disable-plugin=PLUGIN_NAME - disable_plugin: List[str] = [] - # --plugin-no-detection - plugin_no_detection: bool = False - # --user-plugin=PATH - user_plugin: List[Path] = [] - # --show-source-changes - show_source_changes: bool = False - - # --include-plugin-directory=MODULE/PACKAGE - include_plugin_dir: List[str] = [] - # --include-plugin-files=PATTERN - include_plugin_files: List[str] = [] - - def gen_cmd(self) -> List[str]: - lst = [] - lst += format_cmd('--enable-plugin=', self.enable_plugin, self.enable_plugin) - lst += format_cmd('--disable-plugin=', self.disable_plugin, self.disable_plugin) - lst += format_cmd('--plugin-no-detection' if self.plugin_no_detection else None) - lst += format_cmd('--user-plugin=', [str(plugin.absolute()) for plugin in self.user_plugin], self.user_plugin) - lst += format_cmd('--show-source-changes' if self.show_source_changes else None) - lst += format_cmd('--include-plugin-directory=', self.include_plugin_dir, self.include_plugin_dir) - lst += format_cmd('--include-plugin-files=', self.include_plugin_files, self.include_plugin_files) - return lst - - -class NuitkaIncludeConfig(NuitkaSubConfig): - """ - 控制 nuitka 的 include 和 数据 相关参数的部分 - Control part of nuitka's include related parameters - """ - name = 'Nuitka Include Configuration' - - # --include-package=PACKAGE - include_packages: List[str] = [] - # --include-module=MODULE - include_modules: List[str] = [] - - # --prefer-source-code - # --no-prefer-source-code for --module - prefer_source_code: bool = False - # --follow-stdlib - follow_stdlib: bool = False - - def gen_cmd(self) -> List[str]: - lst = [] - lst += format_cmd('--include-package=', self.include_packages, self.include_packages) - lst += format_cmd('--include-module=', self.include_modules, self.include_modules) - lst += format_cmd('--prefer-source-code' if self.prefer_source_code else None) - lst += format_cmd('--no-prefer-source-code' if not self.prefer_source_code else None) - lst += format_cmd('--follow-stdlib' if self.follow_stdlib else None) - return lst - - -class NuitkaDataConfig(NuitkaSubConfig): - """ - 控制 nuitka 的 数据 相关参数的部分 - Control part of nuitka's data related parameters - """ - name = 'Nuitka Data Configuration' - - # --include-package-data=PACKAGE=PACKAGE_PATH - include_package_data: List[Tuple[Path, Path]] = [] - # --include-data-files=PATH=PATH - include_data_files: List[Tuple[Path, Path]] = [] - # --include-data-dir=DIRECTORY=PATH - include_data_dir: List[Tuple[Path, Path]] = [] - - # --noinclude-data-files=PATH - no_include_data_files: List[Path] = [] - - # --list-package-data=LIST_PACKAGE_DATA - list_package_data: List[str] = [] - # --list-package-dlls=LIST_PACKAGE_DLLS - list_package_dlls: List[str] = [] - - # --include-distribution-metadata=DISTRIBUTION - include_distribution_metadata: List[str] = [] - - -class NuitkaBinaryInfo(Options): - """ - nuitka 构建的二进制文件的信息 - nuitka build binary file information - """ - name = 'Nuitka Binary Info' - - # --company-name=COMPANY_NAME - company_name: Optional[str] = None - # --product-name=PRODUCT_NAME - product_name: Optional[str] = None - - # --file-version=FILE_VERSION - # --macos-app-version=MACOS_APP_VERSION - file_version: Optional[Union[str, Version]] = None - # --product-version=PRODUCT_VERSION - product_version: Optional[Union[str, Version]] = None - - # --file-description=FILE_DESCRIPTION - file_description: Optional[str] = None - # --copyright=COPYRIGHT_TEXT - copyright: Optional[str] = None - # --trademarks=TRADEMARK_TEXT - trademarks: Optional[str] = None - - # Icon - # --linux-icon=ICON_PATH - # --macos-app-icon=ICON_PATH - # --windows-icon-from-ico=ICON_PATH - # --windows-icon-from-exe=ICON_EXE_PATH - # 注意: 只有 Windows 下 才可以提供多个 ICO 文件 - # 其他平台 和 EXE 下只会使用第一个路径 - icon: Optional[List[Path]] = None - - # Console - # --enable-console - # --disable-console - console: bool = True - - # Windows UAC - # --windows-uac-admin - windows_uac_admin: bool = False - # --windows-uac-uiaccess - windows_uac_ui_access: bool = False - - -class NuitkaOutputConfig(Options): - """ - nuitka 构建的选项 - nuitka build output information - """ - name = 'Nuitka Output Config' - - # --output-dir=DIRECTORY - output_dir: Optional[Path] = None - # --output-filename=FILENAME - output_filename: Optional[str] = None - - # --quiet - quiet: bool = False - # --no-progressbar - no_progressbar: bool = False - # --verbose - verbose: bool = False - # --verbose-output=PATH - verbose_output: Optional[Path] = None - - # --show-progress - show_progress: bool = False - # --show-memory - show_memory: bool = False - # --show-scons - show_scons: bool = False - # --show-modules - show_modules: bool = False - # --show-modules-output=PATH - show_modules_output: Optional[Path] = None - - # --xml=XML_FILENAME - xml: Optional[Path] = None - # --report=REPORT_FILENAME - report: Optional[Path] = None - # --report-diffable - report_diffable: bool = False - - # --remove-output - remove_output: bool = False - # --no-pyo-file - no_pyo_file: bool = False - - -class NuitkaDebugConfig(Options): - """ - nuitka 构建的调试选项 - nuikta build debug information - """ - name = 'Nuitka Debug Config' - - # --debug - debug: bool = False - # --unstripped - strip: bool = True - # --profile - profile: bool = False - # --internal-graph - internal_graph: bool = False - # --trace-execution - trace_execution: bool = False - # --recompile-c-only - recompile_c_only: bool = False - # --generate-c-only - generate_c_only: bool = False - # --deployment - deployment: bool = False - # --no-deployment-flag=FLAG - deployment_flag: Optional[str] = None - # --experimental=FLAG - experimental: Optional[str] = None - - -class NuitkaTarget(Enum): - """ - 用于指定 nuitka 构建的目标 - Use to specify the target of nuitka build - exe: 不带任何参数 - module: --module - standalone: --standalone - one_file: --onefile - """ - exe = '' - module = 'module' - standalone = 'standalone' - one_file = 'package' - - -class NuitkaScriptGenerator(Options): - """ - 用于帮助生成 nuitka 构建脚本的类 - Use to help generate nuitka build script - - :arg main 需要编译的文件 - """ - name = 'Nuitka Script Generator' - - # --main=PATH - # 可以有多个 输入时需要包在列表里 - main: List[Path] - - # --run - run_after_build: bool = False - # --debugger - debugger: bool = False - # --execute-with-pythonpath - execute_with_python_path: bool = False - - # --assume-yes-for-downloads - download_confirm: bool = True - - # standalone/one_file/module/exe - target: NuitkaTarget = NuitkaTarget.exe - - # --python-debug - python_debug: bool = False - # --python-flag=FLAG - python_flag: List[str] = [] - # --python-for-scons=PATH - python_for_scons: Optional[Path] = None - - -class CompilerHelper(Options): - """ - 用于帮助生成 nuitka 构建脚本的类 - Use to help generate nuitka build script - - """ - name = 'Nuitka Compiler Helper' - - output_path: Path = Path('./build') - src_file: Path - - python_cmd: str = 'python' - compat_nuitka_version: VersionRequirement = VersionRequirement("~1.8.0") # STATIC VERSION - - # 以下为 nuitka 的参数 - # nuitka options below - use_lto: bool = False # --lto=yes (no is faster) - use_clang: bool = True # --clang - use_msvc: bool = True # --msvc=latest - use_mingw: bool = False # --mingw64 - - onefile: bool = False # --onefile - onefile_tempdir: Optional[str] = '' # --onefile-tempdir-spec= - standalone: bool = True # --standalone - use_ccache: bool = True # not --disable-ccache - enable_console: bool = True # --enable-console / --disable-console - - show_progress: bool = True # --show-progress - show_memory: bool = False # --show-memory - remove_output: bool = True # --remove-output - save_xml: bool = False # --xml - xml_path: Path = Path('build/compile_data.xml') - save_report: bool = False # --report - report_path: Path = Path('build/compile_report.xml') - - download_confirm: bool = True # --assume-yes-for-download - run_after_build: bool = False # --run - - company_name: Optional[str] = '' - product_name: Optional[str] = '' - file_version: Optional[Version] = None - product_version: Optional[Version] = None - file_description: Optional[str] = '' # --file-description - - copy_right: Optional[str] = '' # --copyright - - icon_path: Optional[Path] = None - - follow_import: List[str] = [] - no_follow_import: List[str] = [] - - include_data_dir: List[Tuple[str, str]] = [] - include_packages: List[str] = [] - - enable_plugin: List[str] = [] # --enable-plugin=xxx,xxx - disable_plugin: List[str] = [] # --disable-plugin=xxx,xxx - - def init(self, **kwargs) -> None: - if (compat_version := kwargs.get('compat_nuitka_version')) is not None: - if not self.compat_nuitka_version.accept(compat_version): - warnings.warn( - f"Nuitka version may not compat with {compat_version}\n" - "requirement: {self.compat_nuitka_version}" - ) - # 非 windows 平台不使用 msvc - if platform.system() != 'Windows': - self.use_msvc = False - self.use_mingw = False - else: - self.use_mingw = self.use_mingw and not self.use_msvc - # Windows 平台下使用 msvc 时不使用 mingw - - def __str__(self): - return self.as_markdown() - - def as_markdown(self, longest: Optional[int] = None) -> str: - """ - 输出编译器帮助信息 - Output compiler help information - - Args: - longest (Optional[int], optional): - 输出信息的最大长度限制 The maximum length of output information. - Defaults to None. - - Returns: - str: 以 markdown 格式输出的编译器帮助信息 - Compile helper information in markdown format - """ - front = super().as_markdown(longest) - gen_cmd = self.gen_subprocess_cmd() - return f"{front}\n\n```bash\n{' '.join(gen_cmd)}\n```" - - def gen_subprocess_cmd(self) -> List[str]: - """生成 nuitka 构建脚本 - Generate nuitka build script - - Returns: - List[str]: - 生成的 nuitka 构建脚本 - Generated nuitka build script - """ - cmd_list = [self.python_cmd, '-m', 'nuitka'] - # macos 和 非 macos icon 参数不同 - if platform.system() == 'Darwin': - cmd_list += format_cmd('--macos-app-version=', self.product_version, self.product_version) - cmd_list += format_cmd('--macos-app-icon=', self.icon_path.absolute(), self.icon_path) - elif platform.system() == 'Windows': - cmd_list += format_cmd('--windows-icon-from-ico=', self.icon_path.absolute(), self.icon_path) - elif platform.system() == 'Linux': - cmd_list += format_cmd('--linux-icon=', self.icon_path.absolute(), self.icon_path) - - cmd_list += format_cmd('--lto=', 'yes' if self.use_lto else 'no') - cmd_list += format_cmd('--clang' if self.use_clang else None) - cmd_list += format_cmd('--msvc=latest' if self.use_msvc else None) - cmd_list += format_cmd('--mingw64' if self.use_mingw else None) - cmd_list += format_cmd('--standalone' if self.standalone else None) - cmd_list += format_cmd('--onefile' if self.onefile else None) - cmd_list += format_cmd('--onefile-tempdir-spec=', self.onefile_tempdir, self.onefile_tempdir) - - cmd_list += format_cmd('--disable-ccache' if not self.use_ccache else None) - cmd_list += format_cmd('--show-progress' if self.show_progress else None) - cmd_list += format_cmd('--show-memory' if self.show_memory else None) - cmd_list += format_cmd('--remove-output' if self.remove_output else None) - cmd_list += format_cmd('--assume-yes-for-download' if self.download_confirm else None) - cmd_list += format_cmd('--run' if self.run_after_build else None) - cmd_list += format_cmd('--enable-console' if self.enable_console else '--disable-console') - - cmd_list += format_cmd('--xml=', str(self.xml_path.absolute()), self.save_xml) - cmd_list += format_cmd('--report=', str(self.report_path.absolute()), self.save_report) - cmd_list += format_cmd('--output-dir=', str(self.output_path.absolute()), self.output_path) - cmd_list += format_cmd('--company-name=', self.company_name, self.company_name) - cmd_list += format_cmd('--product-name=', self.product_name, self.product_name) - cmd_list += format_cmd('--file-version=', str(self.file_version), self.file_version) - cmd_list += format_cmd('--product-version=', str(self.product_version), self.product_version) - cmd_list += format_cmd('--file-description=', self.file_description, self.file_description) - cmd_list += format_cmd('--copyright=', self.copy_right, self.copy_right) - - cmd_list += format_cmd('--follow-import-to=', self.follow_import, self.follow_import) - cmd_list += format_cmd('--nofollow-import-to=', self.no_follow_import, self.no_follow_import) - cmd_list += format_cmd('--enable-plugin=', self.enable_plugin, self.enable_plugin) - cmd_list += format_cmd('--disable-plugin=', self.disable_plugin, self.disable_plugin) - - if self.include_data_dir: - cmd_list += [f"--include-data-dir={src}={dst}" for src, dst in self.include_data_dir] - if self.include_packages: - cmd_list += [f"--include-package={package}" for package in self.include_packages] - - cmd_list.append(f"--main={self.src_file}") - return cmd_list diff --git a/libs/lib_not_dr/types/__init__.py b/libs/lib_not_dr/types/__init__.py deleted file mode 100644 index 0717150..0000000 --- a/libs/lib_not_dr/types/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -from .options import (Options, - OptionsError, - OptionNotFound, - OptionNameNotDefined, - get_type_hints_) - -from .version import (Version, - VersionRequirement, - VersionParsingError, - ExtraElement) - -__all__ = [ - # options - 'get_type_hints_', - 'Options', - 'OptionsError', - 'OptionNotFound', - 'OptionNameNotDefined', - - # version - 'Version', - 'VersionRequirement', - 'VersionParsingError', - 'ExtraElement' -] diff --git a/libs/lib_not_dr/types/options.py b/libs/lib_not_dr/types/options.py deleted file mode 100644 index 6f58357..0000000 --- a/libs/lib_not_dr/types/options.py +++ /dev/null @@ -1,272 +0,0 @@ -# ------------------------------- -# Difficult Rocket -# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com -# All rights reserved -# ------------------------------- - -import shutil -import traceback -from io import StringIO -from typing import get_type_hints, Type, List, Union, Dict, Any, Callable, Tuple, Optional, TYPE_CHECKING, Iterable - -__all__ = [ - 'get_type_hints_', - 'Options', - 'OptionsError', - 'OptionNotFound', - 'OptionNameNotDefined' -] - - -def get_type_hints_(cls: Type): - try: - return get_type_hints(cls) - except ValueError: - return get_type_hints(cls, globalns={}) - - -def to_str_value_(value: Any) -> Any: - """递归的将输入值的每一个非 builtin type 转换成 str""" - if isinstance(value, (str, bytes, bytearray, int, float, bool, type(None))): - return value - elif isinstance(value, dict): - return {k: to_str_value_(v) for k, v in value.items()} - elif isinstance(value, (list, Iterable)): - return [to_str_value_(v) for v in value] - else: - return str(value) - - -class OptionsError(Exception): - """ option 的错误基类""" - - -class OptionNameNotDefined(OptionsError): - """ 向初始化的 option 里添加了一个不存在于选项里的选项 """ - - -class OptionNotFound(OptionsError): - """ 某个选项没有找到 """ - - -class Options: - """ - 一个用于存储选项 / 提供 API 定义 的类 - 用法: - 存储配置: 继承 Options 类 - 在类里定义 option: typing - (可选 定义 name: str = 'Option Base' 用于在打印的时候显示名字) - 提供 API 接口: 继承 Options 类 - 在类里定义 option: typing - 定义 一些需要的方法 - 子类: 继承 新的 Options 类 - 实现定义的方法 - """ - name = 'Option Base' - cached_options: Dict[str, Union[str, Any]] = {} - _check_options: bool = True - - def __init__(self, **kwargs): - """ - 创建一个新的 Options 的时候的配置 - 如果存在 init 方法 会在设置完 kwargs 之后运行子类的 init 方法 - :param kwargs: 需要设置的选项 - """ - if TYPE_CHECKING: - self._options: Dict[str, Union[Callable, object]] = {} - self.flush_option() - for option, value in kwargs.items(): - if option not in self.cached_options and self._check_options: - raise OptionNameNotDefined(f"option: {option} with value: {value} is not defined") - setattr(self, option, value) - run_load_file = True - if hasattr(self, 'init'): - run_load_file = self.init(**kwargs) # 默认 False/None - run_load_file = not run_load_file - if hasattr(self, 'load_file') and run_load_file: - try: - self.load_file() - except Exception: - traceback.print_exc() - self.flush_option() - - def __str__(self): - return f"<{self.__class__.__name__} {self.name}>" if self.name else f"<{self.__class__.__name__}>" - - def __repr__(self): - return self.__str__() - - if TYPE_CHECKING: - _options: Dict[str, Union[Callable, object]] = {} - - def init(self, **kwargs) -> bool: - """ 如果子类定义了这个函数,则会在 __init__ 之后调用这个函数 - 返回值为 True 则不会调用 load_file 函数 - """ - - def load_file(self) -> bool: - """如果子类定义了这个函数,则会在 __init__ 和 init 之后再调用这个函数 - - 请注意,这个函数请尽量使用 try 包裹住可能出现错误的部分 - 否则会在控制台输出你的报错""" - return True - - def option(self) -> Dict[str, Any]: - """ - 获取配置类的所有配置 - :return: 自己的所有配置 - """ - values = {} - for ann in self.__annotations__: # 获取类型注释 - values[ann] = getattr(self, ann, None) - if values[ann] is None: - values[ann] = self.__annotations__[ann] - - if not hasattr(self, '_options'): - self._options: Dict[str, Union[Callable, object]] = {} - for option, a_fun in self._options.items(): # 获取额外内容 - values[option] = a_fun - - for option, a_fun in values.items(): # 检查是否为 property - if a_fun is bool and getattr(self, option, None) is not None: - values[option] = False - if isinstance(a_fun, property): - try: - values[option] = getattr(self, option) - except AttributeError: - raise OptionNotFound(f'Option {option} is not found in {self.name}') from None - return values - - def str_option(self, shrink_to_long: Optional[int] = None) -> Dict[str, Union[str, Any]]: - """ - 获取配置类的所有配置 并将所有非 BuiltIn 类型的值转换为 str - :return: - """ - raw_option = self.option() - str_option = to_str_value_(raw_option) - if shrink_to_long is None: - return str_option - if not isinstance(shrink_to_long, int) or shrink_to_long <= 0: - return str_option - for option, value in str_option.items(): - if value is not None: - if len(str(value)) > shrink_to_long: - str_option[option] = str(value)[:shrink_to_long] + '...' - return str_option - - def format(self, text: str) -> str: - """ - 使用自己的选项给输入的字符串替换内容 - :param text: 想替换的内容 - :return: 替换之后的内容 - """ - cache_option = self.flush_option() - for option, value in cache_option.items(): - text = text.replace(f'{{{option}}}', str(value)) - return text - - def flush_option(self) -> Dict[str, Any]: - """ - 刷新缓存 options 的内容 - :return: 刷新过的 options - """ - self.cached_options = self.option() - return self.cached_options - - def option_with_len(self) -> Tuple[List[Tuple[str, Any, Type]], int, int, int]: - """ - 返回一个可以用于打印的 option 列表 - :return: - """ - options = self.flush_option() - max_len_key = 1 - max_len_value = 1 - max_len_value_t = 1 - option_list = [] - for key, value in options.items(): - value_t = type(value) if isinstance(value, type(value)) else type(value) # 判定这个类型 是不是 基本类型 - max_len_key = max(max_len_key, len(key)) - max_len_value = max(max_len_value, len(str(value))) - max_len_value_t = max(max_len_value_t, len(str(value_t))) - option_list.append([key, value, value_t]) - return [option_list, max_len_key, max_len_value, max_len_value_t] # noqa - - def as_markdown(self, longest: Optional[int] = None) -> str: - """ - 返回一个 markdown 格式的 option 字符串 - :param longest: 最长的输出长度 - :return: markdown 格式的 option 字符串 - """ - value = self.option_with_len() - cache = StringIO() - option_len = max(value[1], len('Option')) - value_len = max(value[2], len('Value')) - value_type_len = max(value[3], len('Value Type')) - - # | Option | Value | Value Type | - shortest = len('Option | Value | Value Type') - - if longest is not None: - console_width = max(longest, shortest) - else: - console_width = shutil.get_terminal_size(fallback=(100, 80)).columns - console_width = max(console_width, shortest) - - # 为每一栏 预分配 1/3 或者 需要的宽度 (如果不需要 1/3) - option_len = min(option_len, console_width // 3) - value_len = min(value_len, console_width // 3) - value_type_len = min(value_type_len, console_width // 3) - - # 先指定每一个列的输出最窄宽度, 然后去尝试增加宽度 - # 循环分配新空间之前 首先检查是否已经不需要多分配 (and 后面) - while option_len + value_len + value_type_len + 16 < console_width\ - and (option_len < value[1] - or value_len < value[2] - or value_type_len < value[3]): - # 每一个部分的逻辑都是 - # 如果现在的输出长度小于原始长度 - # 并且长度 + 1 之后的总长度依然在允许范围内 - # 那么就 + 1 - if option_len < value[1] and option_len + value_len + value_type_len + 16 < console_width: - option_len += 1 - if value_len < value[2] and option_len + value_len + value_type_len + 16 < console_width: - value_len += 1 - if value_type_len < value[3] and option_len + value_len + value_type_len + 16 < console_width: - value_type_len += 1 - # 实际上 对于列表(可变对象) for 出来的这个值是一个引用 - # 所以可以直接修改 string - for v in value[0]: - if len(str(v[0])) > option_len: - v[0] = f'{str(v[0])[:value_len - 3]}...' - if len(str(v[1])) > value_len: - v[1] = f'{str(v[1])[:value_len - 3]}...' - if len(str(v[2])) > value_type_len: - v[2] = f'{str(v[2])[:value_len - 3]}..' - - cache.write( - f"| Option{' ' * (option_len - 3)}| Value{' ' * (value_len - 2)}| Value Type{' ' * (value_type_len - 7)}|\n") - cache.write(f'|:{"-" * (option_len + 3)}|:{"-" * (value_len + 3)}|:{"-" * (value_type_len + 3)}|\n') - for option, value, value_t in value[0]: - cache.write(f"| `{option}`{' ' * (option_len - len(option))} " - f"| `{value}`{' ' * (value_len - len(str(value)))} " - f"| `{value_t}`{' ' * (value_type_len - len(str(value_t)))} |\n") - result = cache.getvalue() - cache.close() - return result - - @classmethod - def add_option(cls, name: str, value: Union[Callable, object]) -> Dict: - """ - 向配置类中添加一个额外的配置 - :param name: 配置的名字 - :param value: 用于获取配置的函数或者类 - :return: 配置类的所有配置 - """ - if not hasattr(cls, '_options'): - cls._options: Dict[str, Union[Callable, object]] = {} - cls._options[name] = value - return cls._options - - @staticmethod - def init_option(options_class: Type['Options'], init_value: Optional[dict] = None) -> 'Options': - return options_class(**init_value if init_value is not None else {}) diff --git a/libs/lib_not_dr/types/version.py b/libs/lib_not_dr/types/version.py deleted file mode 100644 index bbf3e42..0000000 --- a/libs/lib_not_dr/types/version.py +++ /dev/null @@ -1,220 +0,0 @@ -# 本文件以 GNU Lesser General Public License v3.0(GNU LGPL v3) 开源协议进行授权 (谢谢狐狸写出这么好的MCDR) -# 顺便说一句,我把所有的tab都改成了空格,因为我觉得空格比tab更好看(草,后半句是github copilot自动填充的) - -""" -This part of code come from MCDReforged(https://github.com/Fallen-Breath/MCDReforged) -Thanks a lot to Fallen_Breath and MCDR contributors -GNU Lesser General Public License v3.0 (GNU LGPL v3) -""" - -import re -from typing import List, Callable, Tuple, Optional, Union -""" -Plugin Version -""" - - -# beta.3 -> (beta, 3), random -> (random, None) -class ExtraElement: - DIVIDER = '.' - body: str - num: Optional[int] - - def __init__(self, segment_str: str): - segments = segment_str.rsplit(self.DIVIDER, 1) - try: - self.body, self.num = segments[0], int(segments[1]) - except (IndexError, ValueError): - self.body, self.num = segment_str, None - - def __str__(self): - if self.num is None: - return self.body - return '{}{}{}'.format(self.body, self.DIVIDER, self.num) - - def __lt__(self, other): - if not isinstance(other, type(self)): - raise TypeError() - if self.num is None or other.num is None: - return str(self) < str(other) - else: - return (self.body, self.num) < (other.body, other.num) - - -class Version: - """ - A version container that stores semver like version string - - Example: - - * ``"1.2.3"`` - * ``"1.0.*"`` - * ``"1.2.3-pre4+build.5"`` - """ - EXTRA_ID_PATTERN = re.compile(r'|[-+0-9A-Za-z]+(\.[-+0-9A-Za-z]+)*') - WILDCARDS = ('*', 'x', 'X') - WILDCARD = -1 - - component: List[int] - has_wildcard: bool - pre: Optional[ExtraElement] - build: Optional[ExtraElement] - - def __init__(self, version_str: str, *, allow_wildcard: bool = True): - """ - :param version_str: The version string to be parsed - :keyword allow_wildcard: If wildcard (``"*"``, ``"x"``, ``"X"``) is allowed. Default: ``True`` - """ - if not isinstance(version_str, str): - raise VersionParsingError('Invalid input version string') - - def separate_extra(text, char) -> Tuple[str, Optional[ExtraElement]]: - if char in text: - text, extra_str = text.split(char, 1) - if not self.EXTRA_ID_PATTERN.fullmatch(extra_str): - raise VersionParsingError('Invalid build string: ' + extra_str) - extra = ExtraElement(extra_str) - else: - extra = None - return text, extra - - self.component = [] - self.has_wildcard = False - version_str, self.build = separate_extra(version_str, '+') - version_str, self.pre = separate_extra(version_str, '-') - if len(version_str) == 0: - raise VersionParsingError('Version string is empty') - for comp in version_str.split('.'): - if comp in self.WILDCARDS: - self.component.append(self.WILDCARD) - self.has_wildcard = True - if not allow_wildcard: - raise VersionParsingError('Wildcard {} is not allowed'.format(comp)) - else: - try: - num = int(comp) - except ValueError: - num = None - if num is None: - raise VersionParsingError('Invalid version number component: {}'.format(comp)) - if num < 0: - raise VersionParsingError('Unsupported negatived number component: {}'.format(num)) - self.component.append(num) - if len(self.component) == 0: - raise VersionParsingError('Empty version string') - - def __str__(self): - version_str = '.'.join(map(lambda c: str(c) if c != self.WILDCARD else self.WILDCARDS[0], self.component)) - if self.pre is not None: - version_str += '-' + str(self.pre) - if self.build is not None: - version_str += '+' + str(self.build) - return version_str - - def __repr__(self): - return self.__str__() - - def __getitem__(self, index: int) -> int: - if index < len(self.component): - return self.component[index] - else: - return self.WILDCARD if self.component[len(self.component) - 1] == self.WILDCARD else 0 - - def __lt__(self, other): - if not isinstance(other, Version): - raise TypeError('Cannot compare between instances of {} and {}'.format(Version.__name__, type(other).__name__)) - for i in range(max(len(self.component), len(other.component))): - if self[i] == self.WILDCARD or other[i] == self.WILDCARD: - continue - if self[i] != other[i]: - return self[i] < other[i] - if self.pre is not None and other.pre is not None: - return self.pre < other.pre - elif self.pre is not None: - return not other.has_wildcard - elif other.pre is not None: - return False - else: - return False - - def __eq__(self, other): - return not self < other and not other < self - - def __le__(self, other): - return self == other or self < other - - def compare_to(self, other): - if self < other: - return -1 - elif self > other: - return 1 - else: - return 0 - - -DEFAULT_CRITERION_OPERATOR = '=' - - -class Criterion: - def __init__(self, opt: str, base_version: Version, criterion: Callable[[Version, Version], bool]): - self.opt = opt - self.base_version = base_version - self.criterion = criterion - - def test(self, target: Union[Version, str]): - return self.criterion(self.base_version, target) - - def __str__(self): - return '{}{}'.format(self.opt if self.opt != DEFAULT_CRITERION_OPERATOR else '', self.base_version) - - -class VersionRequirement: - """ - A version requirement tester - - It can test if a given :class:`Version` object matches its requirement - """ - CRITERIONS = { - '<=': lambda base, ver: ver <= base, - '>=': lambda base, ver: ver >= base, - '<': lambda base, ver: ver < base, - '>': lambda base, ver: ver > base, - '=': lambda base, ver: ver == base, - '^': lambda base, ver: ver >= base and ver[0] == base[0], - '~': lambda base, ver: ver >= base and ver[0] == base[0] and ver[1] == base[1], - } - - def __init__(self, requirements: str): - """ - :param requirements: The requirement string, which contains several version predicates connected by space character. - e.g. ``">=1.0.x"``, ``"^2.9"``, ``">=1.2.0 <1.4.3"`` - """ - if not isinstance(requirements, str): - raise VersionParsingError('Requirements should be a str, not {}'.format(type(requirements).__name__)) - self.criterions = [] # type: List[Criterion] - for requirement in requirements.split(' '): - if len(requirement) > 0: - for prefix, func in self.CRITERIONS.items(): - if requirement.startswith(prefix): - opt = prefix - base_version = requirement[len(prefix):] - break - else: - opt = DEFAULT_CRITERION_OPERATOR - base_version = requirement - self.criterions.append(Criterion(opt, Version(base_version), self.CRITERIONS[opt])) - - def accept(self, version: Union[Version, str]): - if isinstance(version, str): - version = Version(version) - for criterion in self.criterions: - if not criterion.test(version): - return False - return True - - def __str__(self): - return ' '.join(map(str, self.criterions)) - - -class VersionParsingError(ValueError): - pass