fix: WerFault when close

This commit is contained in:
shenjack 2023-05-02 15:31:28 +08:00
parent db7cf30568
commit 8d670ae8c3
14 changed files with 384 additions and 297 deletions

24
DR.py
View File

@ -7,6 +7,7 @@ import sys
import time import time
import cProfile import cProfile
import traceback import traceback
import threading
from io import StringIO from io import StringIO
@ -39,7 +40,7 @@ def print_path() -> None:
# 输出一遍大部分文件位置相关信息 以后可能会加到logs里 # 输出一遍大部分文件位置相关信息 以后可能会加到logs里
def main() -> None: def main() -> int:
print(hi) # hi print(hi) # hi
start_time_ns = time.time_ns() start_time_ns = time.time_ns()
start_time_perf_ns = time.perf_counter_ns() start_time_perf_ns = time.perf_counter_ns()
@ -57,7 +58,7 @@ def main() -> None:
print('pyglet_rs available:', get_version_str()) print('pyglet_rs available:', get_version_str())
print('trying to patch pyglet_rs') print('trying to patch pyglet_rs')
patch_vector() patch_vector()
except ImportError as e: except ImportError:
print('pyglet_rs import error') print('pyglet_rs import error')
traceback.print_exc() traceback.print_exc()
try: try:
@ -82,7 +83,8 @@ def main() -> None:
if DR_option.crash_report_test: if DR_option.crash_report_test:
raise TestError('debugging') # debug 嘛试试crash raise TestError('debugging') # debug 嘛试试crash
except Exception as exp: # 出毛病了 except Exception as exp: # 出毛病了
print(error_format['error.happen']) # # 解析错误信息
print(error_format['error.happen'])
error = traceback.format_exc() error = traceback.format_exc()
name = type(exp).__name__ name = type(exp).__name__
if name in error_format: if name in error_format:
@ -90,6 +92,7 @@ def main() -> None:
else: else:
print(error_format['error.unknown']) print(error_format['error.unknown'])
print(error) print(error)
# 输出 crash 信息
crash.create_crash_report(error) crash.create_crash_report(error)
cache_steam = StringIO() cache_steam = StringIO()
crash.write_info_to_cache(cache_steam) crash.write_info_to_cache(cache_steam)
@ -99,9 +102,20 @@ def main() -> None:
crash.record_thread = False crash.record_thread = False
print(crash.all_thread) print(crash.all_thread)
print(crash.all_process) print(crash.all_process)
# join all thread
for thread in threading.enumerate():
print(thread)
if thread.name == 'MainThread' or thread == threading.main_thread() or thread == threading.current_thread():
continue
if thread.daemon:
continue
thread.join()
# stop pyglet
import pyglet
pyglet.app.exit()
print("Difficult_Rocket 已关闭") print("Difficult_Rocket 已关闭")
sys.exit(0) return 0
if __name__ == '__main__': if __name__ == '__main__':
main() sys.exit(main())

View File

@ -65,7 +65,7 @@ class _DR_option(Options):
DR_rust_available: bool = False DR_rust_available: bool = False
use_cProfile: bool = False use_cProfile: bool = False
use_local_logging: bool = False use_local_logging: bool = False
use_DR_rust: bool = True # use_DR_rust: bool = True
# tests # tests
playing: bool = False playing: bool = False
@ -75,25 +75,25 @@ class _DR_option(Options):
# window option # window option
gui_scale: float = 1.0 # default 1.0 2.0 -> 2x 3 -> 3x gui_scale: float = 1.0 # default 1.0 2.0 -> 2x 3 -> 3x
def init(self, **kwargs): # def init(self, **kwargs):
try: # try:
from libs.Difficult_Rocket_rs import test_call, get_version_str # from libs.Difficult_Rocket_rs import test_call, get_version_str
test_call(self) # test_call(self)
print(f'DR_rust available: {get_version_str()}') # print(f'DR_rust available: {get_version_str()}')
except ImportError: # except ImportError:
if __name__ != '__main__': # if __name__ != '__main__':
traceback.print_exc() # traceback.print_exc()
self.DR_rust_available = False # self.DR_rust_available = False
self.use_DR_rust = self.use_DR_rust and self.DR_rust_available # self.use_DR_rust = self.use_DR_rust and self.DR_rust_available
self.flush_option() # self.flush_option()
#
def test_rust(self): # def test_rust(self):
if self.DR_rust_available: # if self.DR_rust_available:
from libs.Difficult_Rocket_rs import part_list_read_test # from libs.Difficult_Rocket_rs import part_list_read_test
part_list_read_test("./configs/PartList.xml") # part_list_read_test("./configs/PartList.xml")
#
def draw(self): # def draw(self):
self.DR_rust_available = True # self.DR_rust_available = True
@property @property
def std_font_size(self) -> int: def std_font_size(self) -> int:

View File

@ -11,6 +11,7 @@ github: @shenjackyuanjie
gitee: @shenjackyuanjie gitee: @shenjackyuanjie
""" """
from Difficult_Rocket.api import screen, mod, exception
# from Difficult_Rocket.api import screen, mod, exception
__all__ = ['screen', 'mod', 'exception'] __all__ = ['screen', 'mod', 'exception']

View File

@ -5,15 +5,20 @@
# ------------------------------- # -------------------------------
# system function # system function
from typing import Tuple, List, Optional from typing import Tuple, List, Optional, TypeVar, TYPE_CHECKING
# from libs # from libs
from libs.MCDR.version import Version from libs.MCDR.version import Version
# from DR # from DR
from Difficult_Rocket.main import Game if TYPE_CHECKING:
from Difficult_Rocket import DR_runtime, Options from Difficult_Rocket.main import Game
from Difficult_Rocket.client import ClientWindow from Difficult_Rocket.client import ClientWindow
else:
Game = TypeVar("Game")
ClientWindow = TypeVar("ClientWindow")
from Difficult_Rocket import DR_runtime
from ..types import Options
""" """

View File

@ -5,15 +5,14 @@
# ------------------------------- # -------------------------------
import math import math
from typing import Dict, Union, List, Optional from typing import Dict, Union, Optional
from dataclasses import dataclass from dataclasses import dataclass
# pyglet # pyglet
# import pyglet
from pyglet.image import load, AbstractImage from pyglet.image import load, AbstractImage
# Difficult Rocket # Difficult Rocket
from Difficult_Rocket.api.types import Options from Difficult_Rocket.utils.options import Options
@dataclass @dataclass
@ -30,7 +29,6 @@ class SR1PartData:
flip_y: bool flip_y: bool
explode: bool explode: bool
textures: Optional[str] = None textures: Optional[str] = None
connections: Optional[List[int]] = None
class SR1Textures(Options): class SR1Textures(Options):
@ -145,23 +143,3 @@ def xml_bool(bool_like: Union[str, int, bool, None]) -> bool:
if isinstance(bool_like, int): if isinstance(bool_like, int):
return bool_like != 0 return bool_like != 0
return False if bool_like == '0' else bool_like.lower() != 'false' return False if bool_like == '0' else bool_like.lower() != 'false'
#
#
# from xml.etree.ElementTree import Element, ElementTree
# from defusedxml.ElementTree import parse
#
# part_list = parse("../../../../textures/PartList.xml")
# part_list_root: Element = part_list.getroot()
# print(part_list_root.tag, part_list_root.attrib)
#
# part_types = part_list_root.find('PartTypes')
#
# for x in list(part_list_root):
# print(f'tag: {x.tag} attr: {x.attrib}')
#
# for part_type in list(part_list_root):
# part_type: Element
# print(f'\'{part_type.attrib.get("id")}\': \'{part_type.attrib.get("sprite")}\'')
#
#

View File

@ -4,198 +4,24 @@
# All rights reserved # All rights reserved
# ------------------------------- # -------------------------------
""" from Difficult_Rocket.utils.options import Options, FontData, Fonts, \
writen by shenjackyuanjie OptionsError, OptionNameNotDefined, OptionNotFound, \
mail: 3695888@qq.com get_type_hints_
github: @shenjackyuanjie
gitee: @shenjackyuanjie
"""
import traceback __all__ = [
from dataclasses import dataclass # main class
from typing import get_type_hints, Type, List, Union, Dict, Any, Callable, Tuple, Optional, TYPE_CHECKING
# from Difficult Rocket
__all__ = ['get_type_hints_',
'Options', 'Options',
'Fonts',
# data class
'FontData', 'FontData',
'Fonts',
# exception
'OptionsError', 'OptionsError',
'OptionNameNotDefined',
'OptionNotFound', 'OptionNotFound',
'OptionNameNotDefined']
# other
'get_type_hints_',
]
def get_type_hints_(cls: Type):
try:
return get_type_hints(cls)
except ValueError:
return get_type_hints(cls, globalns={})
class OptionsError(Exception):
""" option 的错误基类"""
class OptionNameNotDefined(OptionsError):
""" 向初始化的 option 里添加了一个不存在于选项里的选项 """
class OptionNotFound(OptionsError):
""" 某个选项没有找到 """
class Options:
"""
Difficult Rocket 的游戏配置的存储基类
"""
name = 'Option Base'
cached_options: Dict[str, Union[str, Any]] = {}
def __init__(self, **kwargs):
"""
创建一个新的 Options 的时候的配置
如果存在 init 方法 会在设置完 kwargs 之后运行子类的 init 方法
:param kwargs:
"""
if TYPE_CHECKING:
self.options: Dict[str, Union[Callable, object]] = {}
self.flush_option()
for option, value in kwargs.items():
if option not in self.cached_options:
raise OptionNameNotDefined(f"option: {option} with value: {value} is not defined")
setattr(self, option, value)
if hasattr(self, 'init'):
self.init(**kwargs)
if hasattr(self, 'load_file'):
try:
self.load_file()
except Exception:
traceback.print_exc()
self.flush_option()
if TYPE_CHECKING:
def init(self, **kwargs) -> None:
""" 如果子类定义了这个函数,则会在 __init__ 之后调用这个函数 """
def load_file(self) -> bool:
"""如果子类定义了这个函数,则会在 __init__ 和 init 之后再调用这个函数
请注意这个函数请尽量使用 try 包裹住可能出现错误的部分
否则会在控制台输出你的报错"""
return True
def option(self) -> Dict[str, Any]:
"""
获取配置类的所有配置
:return: 自己的所有配置
"""
values = {}
for ann in self.__annotations__: # 获取类型注释
values[ann] = getattr(self, ann, None)
if values[ann] is None:
values[ann] = self.__annotations__[ann]
if not hasattr(self, 'options'):
self.options: Dict[str, Union[Callable, object]] = {}
for option, a_fun in self.options.items(): # 获取额外内容
values[option] = a_fun
for option, a_fun in values.items(): # 检查是否为 property
if a_fun is bool and getattr(self, option, None) is not None:
values[option] = False
if isinstance(a_fun, property):
try:
values[option] = getattr(self, option)
except AttributeError:
raise OptionNotFound(f'Option {option} is not found in {self.name}') from None
return values
def format(self, text: str) -> str:
"""
使用自己的选项给输入的字符串替换内容
:param text: 想替换的内容
:return: 替换之后的内容
"""
cache_option = self.flush_option()
for option, value in cache_option.items():
text = text.replace(f'{{{option}}}', str(value))
return text
def flush_option(self) -> Dict[str, Any]:
"""
刷新缓存 options 的内容
:return: 刷新过的 options
"""
self.cached_options = self.option()
return self.cached_options
def option_with_len(self) -> List[Union[List[Tuple[str, Any, Any]], int, Any]]:
options = self.flush_option()
max_len_key = 1
max_len_value = 1
max_len_value_t = 1
option_list = []
for key, value in options.items():
value_t = value if isinstance(value, Type) else type(value)
max_len_key = max(max_len_key, len(key))
max_len_value = max(max_len_value, len(str(value)))
max_len_value_t = max(max_len_value_t, len(str(value_t)))
option_list.append((key, value, value_t))
return [option_list, max_len_key, max_len_value, max_len_value_t]
@classmethod
def add_option(cls, name: str, value: Union[Callable, object]) -> Dict:
if not hasattr(cls, 'options'):
cls.options: Dict[str, Union[Callable, object]] = {}
cls.options[name] = value
return cls.options
@staticmethod
def init_option(options_class: Type['Options'], init_value: Optional[dict] = None) -> 'Options':
return options_class(**init_value if init_value is not None else {})
class Fonts(Options):
# font's value
HOS: str = 'HarmonyOS Sans'
HOS_S: str = 'HarmonyOS Sans SC'
HOS_T: str = 'HarmonyOS Sans TC'
HOS_C: str = 'HarmonyOS Sans Condensed'
鸿蒙字体: str = HOS
鸿蒙简体: str = HOS_S
鸿蒙繁体: str = HOS_T
鸿蒙窄体: str = HOS_C
CC: str = 'Cascadia Code'
CM: str = 'Cascadia Mono'
CCPL: str = 'Cascadia Code PL'
CMPL: str = 'Cascadia Mono PL'
微软等宽: str = CC
微软等宽无线: str = CM
微软等宽带电线: str = CCPL
微软等宽带电线无线: str = CMPL
得意黑: str = '得意黑'
# SS = smiley-sans
SS: str = 得意黑
@dataclass
class FontData:
""" 用于保存字体的信息 """
font_name: str = Fonts.鸿蒙简体
font_size: int = 13
bold: bool = False
italic: bool = False
stretch: bool = False
def dict(self) -> Dict[str, Union[str, int, bool]]:
return dict(font_name=self.font_name,
font_size=self.font_size,
bold=self.bold,
italic=self.italic,
stretch=self.stretch)

View File

@ -6,13 +6,14 @@
# ------------------------------- # -------------------------------
import os import os
import sys
import time import time
import logging import logging
import inspect import inspect
import functools import functools
import traceback import traceback
from typing import Callable, Dict from typing import Callable, Dict, List, TYPE_CHECKING
from decimal import Decimal from decimal import Decimal
# third function # third function
@ -25,7 +26,8 @@ from pyglet.window import Window
from pyglet.window import key, mouse from pyglet.window import key, mouse
# Difficult_Rocket function # Difficult_Rocket function
# from Difficult_Rocket.main import Game if TYPE_CHECKING:
from Difficult_Rocket.main import Game
from Difficult_Rocket.utils import tools from Difficult_Rocket.utils import tools
from Difficult_Rocket.api.types import Options from Difficult_Rocket.api.types import Options
from Difficult_Rocket.command import line, tree from Difficult_Rocket.command import line, tree
@ -83,11 +85,6 @@ class Client:
file_drops=True) file_drops=True)
end_time = time.time_ns() end_time = time.time_ns()
self.use_time = end_time - start_time self.use_time = end_time - start_time
if DR_option.use_DR_rust:
from libs.Difficult_Rocket_rs import read_ship_test, part_list_read_test
# part_list_read_test()
# read_ship_test()
self.logger.info(tr().client.setup.use_time().format(Decimal(self.use_time) / 1000000000)) self.logger.info(tr().client.setup.use_time().format(Decimal(self.use_time) / 1000000000))
self.logger.debug(tr().client.setup.use_time_ns().format(self.use_time)) self.logger.debug(tr().client.setup.use_time_ns().format(self.use_time))
@ -188,7 +185,7 @@ class ClientWindow(Window):
self.set_handlers(self.input_box) self.set_handlers(self.input_box)
self.input_box.enabled = True self.input_box.enabled = True
# 设置刷新率 # 设置刷新率
pyglet.clock.schedule_interval(self.draw_update, float(self.SPF)) # pyglet.clock.schedule_interval(self.draw_update, float(self.SPF))
# 完成设置后的信息输出 # 完成设置后的信息输出
self.logger.info(tr().window.os.pid_is().format(os.getpid(), os.getppid())) self.logger.info(tr().window.os.pid_is().format(os.getpid(), os.getppid()))
end_time = time.time_ns() end_time = time.time_ns()
@ -218,22 +215,29 @@ class ClientWindow(Window):
self.set_icon(pyglet.image.load('./textures/icon.png')) self.set_icon(pyglet.image.load('./textures/icon.png'))
self.run_input = True self.run_input = True
self.read_input() self.read_input()
try:
pyglet.app.event_loop.run(1 / self.main_config['runtime']['fps']) pyglet.app.event_loop.run(1 / self.main_config['runtime']['fps'])
except KeyboardInterrupt:
print("==========client stop. KeyboardInterrupt info==========")
traceback.print_exc()
print("==========client stop. KeyboardInterrupt info end==========")
self.dispatch_event("on_close")
sys.exit(0)
@new_thread('window read_input', daemon=True) @new_thread('window read_input', daemon=True)
def read_input(self): def read_input(self):
self.logger.debug('read_input start') self.logger.debug('read_input start')
while self.run_input: while self.run_input:
try:
get = input(">") get = input(">")
except (EOFError, KeyboardInterrupt):
self.run_input = False
break
if get in ('', ' ', '\n', '\r'): if get in ('', ' ', '\n', '\r'):
continue continue
if get == 'stop': if get == 'stop':
self.run_input = False self.run_input = False
self.command_list.append(get) self.command_list.append(get)
# try:
# self.on_command(line.CommandText(get))
# except CommandError:
# self.logger.error(traceback.format_exc())
self.logger.debug('read_input end') self.logger.debug('read_input end')
@new_thread('window save_info') @new_thread('window save_info')
@ -268,9 +272,11 @@ class ClientWindow(Window):
if self.command_list: if self.command_list:
for command in self.command_list: for command in self.command_list:
self.on_command(line.CommandText(command)) self.on_command(line.CommandText(command))
self.command_list.pop(0)
# self.logger.debug('on_draw call dt: {}'.format(dt)) # self.logger.debug('on_draw call dt: {}'.format(dt))
pyglet.gl.glClearColor(0.1, 0, 0, 0.0) pyglet.gl.glClearColor(0.1, 0, 0, 0.0)
self.clear() self.clear()
self.draw_update(float(self.SPF))
self.draw_batch() self.draw_batch()
@_call_screen_after @_call_screen_after

View File

@ -74,6 +74,10 @@ class Game:
def load_mods(self) -> None: def load_mods(self) -> None:
mods = [] mods = []
mod_path = Path(DR_runtime.mod_path)
if not mod_path.exists():
self.logger.info(tr().main.mod.find.faild.no_mod_folder())
return
paths = Path(DR_runtime.mod_path).iterdir() paths = Path(DR_runtime.mod_path).iterdir()
sys.path.append(DR_runtime.mod_path) sys.path.append(DR_runtime.mod_path)
for mod_path in paths: for mod_path in paths:

View File

@ -4,17 +4,3 @@
# All rights reserved # All rights reserved
# ------------------------------- # -------------------------------
"""
writen by shenjackyuanjie
mail: 3695888@qq.com
github: @shenjackyuanjie
gitee: @shenjackyuanjie
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .new_thread import new_thread
__all__ = ['new_thread']

View File

@ -7,8 +7,7 @@
import functools import functools
import inspect import inspect
import threading import threading
from Difficult_Rocket import crash, DR_option from typing import Optional, Callable, Union, List
from typing import Optional, Callable
""" """
This part of code come from MCDReforged(https://github.com/Fallen-Breath/MCDReforged) This part of code come from MCDReforged(https://github.com/Fallen-Breath/MCDReforged)
@ -17,6 +16,15 @@ GNU Lesser General Public License v3.0GNU LGPL v3)
(have some changes) (have some changes)
""" """
__all__ = [
'new_thread',
'FunctionThread'
]
record_thread = False
record_destination: List[Callable[['FunctionThread'], None]] = []
def copy_signature(target: Callable, origin: Callable) -> Callable: def copy_signature(target: Callable, origin: Callable) -> Callable:
""" """
@ -28,10 +36,13 @@ def copy_signature(target: Callable, origin: Callable) -> Callable:
class FunctionThread(threading.Thread): class FunctionThread(threading.Thread):
"""
A Thread subclass which is used in decorator :func:`new_thread` to wrap a synchronized function call
"""
__NONE = object() __NONE = object()
def __init__(self, target, name, args, kwargs): def __init__(self, target, name, args, kwargs, daemon):
super().__init__(target=target, args=args, kwargs=kwargs, name=name) super().__init__(target=target, args=args, kwargs=kwargs, name=name, daemon=daemon)
self.__return_value = self.__NONE self.__return_value = self.__NONE
self.__error = None self.__error = None
@ -40,12 +51,33 @@ class FunctionThread(threading.Thread):
self.__return_value = target(*args_, **kwargs_) self.__return_value = target(*args_, **kwargs_)
except Exception as e: except Exception as e:
self.__error = e self.__error = e
print(e)
raise e from None raise e from None
self._target = wrapped_target self._target = wrapped_target
def get_return_value(self, block: bool = False, timeout: Optional[float] = None): def get_return_value(self, block: bool = False, timeout: Optional[float] = None):
"""
Get the return value of the original function
If an exception has occurred during the original function call, the exception will be risen again here
Examples::
>>> import time
>>> @new_thread
... def do_something(text: str):
... time.sleep(1)
... return text
>>> do_something('task').get_return_value(block=True)
'task'
:param block: If it should join the thread before getting the return value to make sure the function invocation finishes
:param timeout: The maximum timeout for the thread join
:raise RuntimeError: If the thread is still alive when getting return value. Might be caused by ``block=False``
while the thread is still running, or thread join operation times out
:return: The return value of the original function
"""
if block: if block:
self.join(timeout) self.join(timeout)
if self.__return_value is self.__NONE: if self.__return_value is self.__NONE:
@ -54,30 +86,57 @@ class FunctionThread(threading.Thread):
raise self.__error raise self.__error
return self.__return_value return self.__return_value
def join(self, timeout: Optional[float] = None) -> None:
super().join(timeout)
def start(self) -> None: def new_thread(arg: Optional[Union[str, Callable]] = None,
super().start()
def new_thread(thread_name: Optional[str or Callable] = None,
daemon: bool = False, daemon: bool = False,
log_thread: bool = True): log_thread: bool = True):
""" """
Use a new thread to execute the decorated function This is a one line solution to make your function executes in parallels.
The function return value will be set to the thread instance that executes this function When decorated with this decorator, functions will be executed in a new daemon thread
The name of the thread can be specified in parameter
This decorator only changes the return value of the function to the created ``Thread`` object.
Beside the return value, it reserves all signatures of the decorated function,
so you can safely use the decorated function as if there's no decorating at all
It's also a simple compatible upgrade method for old MCDR 0.x plugins
The return value of the decorated function is changed to the ``Thread`` object that executes this function
The decorated function has 1 extra field:
* ``original`` field: The original undecorated function
Examples::
>>> import time
>>> @new_thread('My Plugin Thread')
... def do_something(text: str):
... time.sleep(1)
... print(threading.current_thread().name)
>>> callable(do_something.original)
True
>>> t = do_something('foo')
>>> isinstance(t, FunctionThread)
True
>>> t.join()
My Plugin Thread
:param arg: A :class:`str`, the name of the thread. It's recommend to specify the thread name, so when you
log something by ``server.logger``, a meaningful thread name will be displayed
instead of a plain and meaningless ``Thread-3``
:param daemon: If the thread should be a daemon thread
:param log_thread: If the thread should be logged to callback defined in record_destination
""" """
def wrapper(func): def wrapper(func):
@functools.wraps(func) # to preserve the origin function information @functools.wraps(func) # to preserve the origin function information
def wrap(*args, **kwargs): def wrap(*args, **kwargs):
thread = FunctionThread(target=func, args=args, kwargs=kwargs, name=thread_name) thread = FunctionThread(target=func, args=args, kwargs=kwargs, name=thread_name, daemon=daemon)
thread.daemon = daemon if record_thread:
for destination in record_destination:
destination(thread)
thread.start() thread.start()
if log_thread and DR_option.record_threads:
crash.all_thread.append(thread)
return thread return thread
# bring the signature of the func to the wrap function # bring the signature of the func to the wrap function
@ -86,10 +145,11 @@ def new_thread(thread_name: Optional[str or Callable] = None,
wrap.original = func # access this field to get the original function wrap.original = func # access this field to get the original function
return wrap return wrap
# Directly use @on_new_thread without ending brackets case # Directly use @new_thread without ending brackets case, e.g. @new_thread
if isinstance(thread_name, Callable): if isinstance(arg, Callable):
this_is_a_function = thread_name
thread_name = None thread_name = None
return wrapper(this_is_a_function) return wrapper(arg)
# Use @on_new_thread with ending brackets case # Use @new_thread with ending brackets case, e.g. @new_thread('A'), @new_thread()
else:
thread_name = arg
return wrapper return wrapper

View File

@ -0,0 +1,194 @@
# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import traceback
from dataclasses import dataclass
from typing import get_type_hints, Type, List, Union, Dict, Any, Callable, Tuple, Optional, TYPE_CHECKING
__all__ = ['get_type_hints_',
'Options',
'Fonts',
'FontData',
'OptionsError',
'OptionNotFound',
'OptionNameNotDefined']
def get_type_hints_(cls: Type):
try:
return get_type_hints(cls)
except ValueError:
return get_type_hints(cls, globalns={})
class OptionsError(Exception):
""" option 的错误基类"""
class OptionNameNotDefined(OptionsError):
""" 向初始化的 option 里添加了一个不存在于选项里的选项 """
class OptionNotFound(OptionsError):
""" 某个选项没有找到 """
class Options:
"""
Difficult Rocket 的游戏配置的存储基类
"""
name = 'Option Base'
cached_options: Dict[str, Union[str, Any]] = {}
def __init__(self, **kwargs):
"""
创建一个新的 Options 的时候的配置
如果存在 init 方法 会在设置完 kwargs 之后运行子类的 init 方法
:param kwargs:
"""
if TYPE_CHECKING:
self.options: Dict[str, Union[Callable, object]] = {}
self.flush_option()
for option, value in kwargs.items():
if option not in self.cached_options:
raise OptionNameNotDefined(f"option: {option} with value: {value} is not defined")
setattr(self, option, value)
if hasattr(self, 'init'):
self.init(**kwargs)
if hasattr(self, 'load_file'):
try:
self.load_file()
except Exception:
traceback.print_exc()
self.flush_option()
if TYPE_CHECKING:
options: Dict[str, Union[Callable, object]] = {}
def init(self, **kwargs) -> None:
""" 如果子类定义了这个函数,则会在 __init__ 之后调用这个函数 """
def load_file(self) -> bool:
"""如果子类定义了这个函数,则会在 __init__ 和 init 之后再调用这个函数
请注意这个函数请尽量使用 try 包裹住可能出现错误的部分
否则会在控制台输出你的报错"""
return True
def option(self) -> Dict[str, Any]:
"""
获取配置类的所有配置
:return: 自己的所有配置
"""
values = {}
for ann in self.__annotations__: # 获取类型注释
values[ann] = getattr(self, ann, None)
if values[ann] is None:
values[ann] = self.__annotations__[ann]
if not hasattr(self, 'options'):
self.options: Dict[str, Union[Callable, object]] = {}
for option, a_fun in self.options.items(): # 获取额外内容
values[option] = a_fun
for option, a_fun in values.items(): # 检查是否为 property
if a_fun is bool and getattr(self, option, None) is not None:
values[option] = False
if isinstance(a_fun, property):
try:
values[option] = getattr(self, option)
except AttributeError:
raise OptionNotFound(f'Option {option} is not found in {self.name}') from None
return values
def format(self, text: str) -> str:
"""
使用自己的选项给输入的字符串替换内容
:param text: 想替换的内容
:return: 替换之后的内容
"""
cache_option = self.flush_option()
for option, value in cache_option.items():
text = text.replace(f'{{{option}}}', str(value))
return text
def flush_option(self) -> Dict[str, Any]:
"""
刷新缓存 options 的内容
:return: 刷新过的 options
"""
self.cached_options = self.option()
return self.cached_options
def option_with_len(self) -> List[Union[List[Tuple[str, Any, Any]], int, Any]]:
options = self.flush_option()
max_len_key = 1
max_len_value = 1
max_len_value_t = 1
option_list = []
for key, value in options.items():
value_t = value if isinstance(value, Type) else type(value)
max_len_key = max(max_len_key, len(key))
max_len_value = max(max_len_value, len(str(value)))
max_len_value_t = max(max_len_value_t, len(str(value_t)))
option_list.append((key, value, value_t))
return [option_list, max_len_key, max_len_value, max_len_value_t]
@classmethod
def add_option(cls, name: str, value: Union[Callable, object]) -> Dict:
if not hasattr(cls, 'options'):
cls.options: Dict[str, Union[Callable, object]] = {}
cls.options[name] = value
return cls.options
@staticmethod
def init_option(options_class: Type['Options'], init_value: Optional[dict] = None) -> 'Options':
return options_class(**init_value if init_value is not None else {})
class Fonts(Options):
# font's value
HOS: str = 'HarmonyOS Sans'
HOS_S: str = 'HarmonyOS Sans SC'
HOS_T: str = 'HarmonyOS Sans TC'
HOS_C: str = 'HarmonyOS Sans Condensed'
鸿蒙字体: str = HOS
鸿蒙简体: str = HOS_S
鸿蒙繁体: str = HOS_T
鸿蒙窄体: str = HOS_C
CC: str = 'Cascadia Code'
CM: str = 'Cascadia Mono'
CCPL: str = 'Cascadia Code PL'
CMPL: str = 'Cascadia Mono PL'
微软等宽: str = CC
微软等宽无线: str = CM
微软等宽带电线: str = CCPL
微软等宽带电线无线: str = CMPL
得意黑: str = '得意黑'
# SS = smiley-sans
SS: str = 得意黑
@dataclass
class FontData:
""" 用于保存字体的信息 """
font_name: str = Fonts.鸿蒙简体
font_size: int = 13
bold: bool = False
italic: bool = False
stretch: bool = False
def dict(self) -> Dict[str, Union[str, int, bool]]:
return dict(font_name=self.font_name,
font_size=self.font_size,
bold=self.bold,
italic=self.italic,
stretch=self.stretch)

View File

@ -20,8 +20,10 @@ logger.logfile_name = "Log file name : "
logger.logfile_level = "Log file record level : " logger.logfile_level = "Log file record level : "
logger.logfile_fmt = "Log file record format : " logger.logfile_fmt = "Log file record format : "
logger.logfile_datefmt = "Log file date format : " logger.logfile_datefmt = "Log file date format : "
game_start.at = "Game MainThread start at: {}"
mod.find.start = "Checking Mod: {}" mod.find.start = "Checking Mod: {}"
mod.find.faild.no_spec = "importlib can't find spec" mod.find.faild.no_spec = "importlib can't find spec"
mod.find.faild.no_mod_folder = "Can't find mod folder"
mod.find.done = "All Mod checked" mod.find.done = "All Mod checked"
mod.load.start = "Loading Mod: {}" mod.load.start = "Loading Mod: {}"
mod.load.info = "mod id: {} version: {}" mod.load.info = "mod id: {} version: {}"

View File

@ -23,6 +23,7 @@ logger.logfile_datefmt = "日志文件日期格式:"
game_start.at = "游戏主线程开始于:" game_start.at = "游戏主线程开始于:"
mod.find.start = "正在校验 Mod: {}" mod.find.start = "正在校验 Mod: {}"
mod.find.faild.no_spec = "importlib 无法找到 spec" mod.find.faild.no_spec = "importlib 无法找到 spec"
mod.find.faild.no_mod_folder = "没有找到 Mod 文件夹"
mod.find.done = "所有 Mod 校验完成" mod.find.done = "所有 Mod 校验完成"
mod.load.start = "正在加载 Mod: {}" mod.load.start = "正在加载 Mod: {}"
mod.load.info = "mod id: {} 版本号: {}" mod.load.info = "mod id: {} 版本号: {}"

View File

@ -26,6 +26,8 @@
> 啊哈! mod 加载来啦! > 啊哈! mod 加载来啦!
> 啊啊啊啊啊 大重构 api
### Remove ### Remove
- `game.config` - `game.config`
@ -53,6 +55,14 @@
- 现在游戏崩溃时会自动在 stdio 中输出崩溃日志 内容跟 crash report 中的基本相同 - 现在游戏崩溃时会自动在 stdio 中输出崩溃日志 内容跟 crash report 中的基本相同
- Now when the game crashes, it will automatically output the crash log in stdio - Now when the game crashes, it will automatically output the crash log in stdio
- The content of the crash log is basically the same as the crash report - The content of the crash log is basically the same as the crash report
- `utils.new_thread`
- 跟随 MCDR 的更新
- 将记录线程的方式改成 函数回调
- Follow the update of MCDR
- Change the way to record threads to function callbacks
- `Difficult_Rocket.api`
- 大重构,移除定义,改为引用
- Big refactoring, remove definition, change to reference
### Docs ### Docs