# ------------------------------- # Difficult Rocket # Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com # All rights reserved # ------------------------------- import sys import time import logging import traceback import importlib from pathlib import Path from typing import List, Dict, Optional, TypeVar from Difficult_Rocket.mod.api import ModInfo from Difficult_Rocket.utils.translate import tr from Difficult_Rocket.api.types import Options, Version Game = TypeVar('Game') logger = logging.getLogger('mod_manager') ONE_FILE_SUFFIX = ('.py', '.pyc', '.pyd') PACKAGE_SUFFIX = ('.pyz', '.zip', '.dr_mod') def _add_path_to_sys(paths: List[Path]): for path in paths: if str(path) not in sys.path: sys.path.append(str(path)) class ModManager(Options): name = 'Mod Manager' mods_path: List[Path] = [Path('./mods')] find_mod_paths: Dict[str, Path] = {} loaded_mod_modules: Dict[str, ModInfo] = {} def get_mod_module(self, mod_name: str) -> Optional[ModInfo]: """ 获取指定 mod 的模块 :param mod_name: mod 名 :return: """ for mod in self.loaded_mod_modules.values(): if mod.name == mod_name: return mod return None def dispatch_event(self, event_name: str, *args, **kwargs): """ 分发事件 :param event_name: 事件名 :param args: 事件参数 :param kwargs: 事件参数 :return: """ for mod in self.loaded_mod_modules.values(): if hasattr(mod, event_name): try: getattr(mod, event_name)(*args, **kwargs) except Exception as e: logger.error(tr().mod.event.error().format(mod, event_name, e, traceback.format_exc())) def load_mod(self, mod_path: Path) -> Optional[type(ModInfo)]: """ 加载指定路径下的 mod :param mod_path: mod 的路径 :return: """ if not mod_path.exists(): logger.error(tr().mod.load.faild.not_exist().format(mod_path)) return None _add_path_to_sys([mod_path.parent]) try: if mod_path.name == '__pycache__': # 忽略 __pycache__ 文件夹 (Python 编译文件) return None logger.info(tr().mod.load.loading().format(mod_path)) if mod_path.is_dir() or mod_path.suffix in PACKAGE_SUFFIX or mod_path.suffix in ONE_FILE_SUFFIX: # 文件夹 mod loading_mod = importlib.import_module(mod_path.name) if not hasattr(loading_mod, 'mod_class') or not issubclass(loading_mod.mod_class, ModInfo): logger.warning(tr().mod.load.faild.no_mod_class().format(mod_path)) return None mod_class: type(ModInfo) = loading_mod.mod_class # 获取 mod 类 if mod_class.mod_id not in self.find_mod_paths: self.find_mod_paths[mod_class.mod_id] = mod_path return mod_class except ImportError: logger.warning(tr().mod.load.faild.error().format(mod_path, traceback.format_exc())) return None def find_mods_in_path(self, extra_mods_path: Optional[List[Path]] = None) -> List[Path]: """ 查找所有 mod 路径 :return: 找到的 mod 的路径 (未校验) """ find_path = self.mods_path + (extra_mods_path if extra_mods_path is not None else []) mods_path = [] start_time = time.time() for path in find_path: if not path.exists(): path.mkdir(parents=True) continue for mod in path.iterdir(): if mod.name == '__pycache__': # 忽略 __pycache__ 文件夹 (Python 编译文件) continue if mod.is_dir() or mod.suffix in PACKAGE_SUFFIX or mod.suffix in ONE_FILE_SUFFIX: # 文件夹 mod mods_path.append(mod) logger.info(tr().mod.finded().format(len(mods_path), time.time() - start_time)) return mods_path def load_mods(self, extra_path: Optional[List[Path]] = None, extra_mod_path: Optional[List[Path]] = None) -> List[type(ModInfo)]: """ 加载所有 mod (可提供额外的 mod 路径) :param extra_path: 额外的 mod 路径 :param extra_mod_path: 额外的找到的 mod 路径 :return: """ find_path = self.mods_path + (extra_path if extra_path is not None else []) _add_path_to_sys(find_path) mods = [] start_time = time.time() logger.info(tr().mod.load.start().format(find_path)) for path in find_path: if not path.exists(): path.mkdir(parents=True) continue for mod in path.iterdir(): if (cache := self.load_mod(mod)) is not None: mods.append(cache) if extra_mod_path is not None: for path in extra_mod_path: if (cache := self.load_mod(path)) is not None: mods.append(cache) logger.info(tr().mod.load.use_time().format(time.time() - start_time)) return mods def init_mods(self, mods: List[type(ModInfo)]): """ 加载 mod :param mods: 要加载的 mod 的 ModInfo 类 :return: """ start_time = time.time() for mod_class in mods: try: init_mod = mod_class() self.loaded_mod_modules[init_mod.mod_id] = init_mod logger.info(tr().mod.init.success().format(init_mod, init_mod.version)) except Exception as e: logger.error(tr().mod.init.faild().format(mod_class, e, traceback.format_exc())) continue logger.info(tr().mod.init.use_time().format(time.time() - start_time)) def unload_mod(self, mod_id: str, game: Game) -> Optional[ModInfo]: """ 卸载 mod :param mod_id: 要卸载的 mod id :param game: 游戏实例 :return: 卸载的 mod 的 ModInfo 类 """ if not (mod_class := self.loaded_mod_modules.get(mod_id)) and (mod_class := self.get_mod_module(mod_id)) is None: logger.warning(tr().mod.unload.faild.not_find().format(mod_id)) return None try: mod_class.on_unload(game=game) self.loaded_mod_modules.pop(mod_class.mod_id) logger.info(tr().mod.unload.success().format(mod_id)) return mod_class except Exception as e: logger.error(tr().mod.unload.faild.error().format(mod_id, e, traceback.format_exc())) return None def reload_mod(self, mod_id: str, game: Game): """ 重载 mod :param mod_id: :param game: :return: """ unload = self.unload_mod(mod_id, game) if unload is None: return mod_class: Optional[ModInfo] = None if unload.mod_id not in self.find_mod_paths: logger.warning(tr().mod.reload.faild.not_find().format(unload.mod_id)) paths = self.find_mods_in_path() for path in paths: mod_class = self.load_mod(path) if mod_class is not None and mod_class.mod_id == unload.mod_id: self.init_mods([mod_class]) break else: mod_class = self.load_mod(self.find_mod_paths[unload.mod_id]) if mod_class is not None: self.init_mods([mod_class]) if mod_id in self.loaded_mod_modules and mod_class is not None: self.loaded_mod_modules[mod_id].on_load(game=game, old_self=mod_class) logger.info(tr().mod.reload.success().format(mod_id))