Difficult-Rocket/Difficult_Rocket/mod/loader/__init__.py

236 lines
8.2 KiB
Python

# -------------------------------
# Difficult Rocket
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
# All rights reserved
# -------------------------------
import sys
import time
import traceback
import importlib
from pathlib import Path
from typing import List, Dict, Optional, TypeVar
from Difficult_Rocket.mod.api import ModInfo
from Difficult_Rocket.utils.translate import tr
from Difficult_Rocket.api.types import Options
from lib_not_dr.loggers import config
Game = TypeVar("Game")
logger = config.get_logger_from_old("mod_manager", "client")
ONE_FILE_SUFFIX = (".py", ".pyc", ".pyd")
PACKAGE_SUFFIX = (".pyz", ".zip", ".dr_mod")
def _add_path_to_sys(paths: List[Path]):
for path in paths:
if str(path) not in sys.path:
sys.path.append(str(path))
class ModManager(Options):
name = "Mod Manager"
mods_path: List[Path] = [Path("./mods")]
find_mod_paths: Dict[str, Path] = {}
loaded_mod_modules: Dict[str, ModInfo] = {}
def get_mod_module(self, mod_name: str) -> Optional[ModInfo]:
"""
获取指定 mod 的模块
:param mod_name: mod 名
:return:
"""
for mod in self.loaded_mod_modules.values():
if mod.name == mod_name:
return mod
return None
def dispatch_event(self, event_name: str, *args, **kwargs):
"""
分发事件
:param event_name: 事件名
:param args: 事件参数
:param kwargs: 事件参数
:return:
"""
for mod in self.loaded_mod_modules.values():
if hasattr(mod, event_name):
try:
getattr(mod, event_name)(*args, **kwargs)
except Exception as e:
logger.error(
tr()
.mod.event.error()
.format(mod, event_name, e, traceback.format_exc())
)
def load_mod(self, mod_path: Path) -> Optional[type(ModInfo)]:
"""
加载指定路径下的 mod
:param mod_path: mod 的路径
:return:
"""
if not mod_path.exists():
logger.error(tr().mod.load.faild.not_exist().format(mod_path))
return None
_add_path_to_sys([mod_path.parent])
try:
if mod_path.name == "__pycache__":
# 忽略 __pycache__ 文件夹 (Python 编译文件)
return None
logger.info(tr().mod.load.loading().format(mod_path))
if (
mod_path.is_dir()
or mod_path.suffix in PACKAGE_SUFFIX
or mod_path.suffix in ONE_FILE_SUFFIX
):
# 文件夹 mod
loading_mod = importlib.import_module(mod_path.name)
if not hasattr(loading_mod, "mod_class") or not issubclass(
loading_mod.mod_class, ModInfo
):
logger.warn(tr().mod.load.faild.no_mod_class().format(mod_path))
return None
mod_class: type(ModInfo) = loading_mod.mod_class # 获取 mod 类
if mod_class.mod_id not in self.find_mod_paths:
self.find_mod_paths[mod_class.mod_id] = mod_path
return mod_class
except ImportError:
logger.warn(
tr().mod.load.faild.error().format(mod_path, traceback.format_exc())
)
return None
def find_mods_in_path(
self, extra_mods_path: Optional[List[Path]] = None
) -> List[Path]:
"""
查找所有 mod 路径
:return: 找到的 mod 的路径 (未校验)
"""
find_path = self.mods_path + (
extra_mods_path if extra_mods_path is not None else []
)
mods_path = []
start_time = time.time()
for path in find_path:
if not path.exists():
path.mkdir(parents=True)
continue
for mod in path.iterdir():
if mod.name == "__pycache__":
# 忽略 __pycache__ 文件夹 (Python 编译文件)
continue
if (
mod.is_dir()
or mod.suffix in PACKAGE_SUFFIX
or mod.suffix in ONE_FILE_SUFFIX
):
# 文件夹 mod
mods_path.append(mod)
logger.info(tr().mod.finded().format(len(mods_path), time.time() - start_time))
return mods_path
def load_mods(
self,
extra_path: Optional[List[Path]] = None,
extra_mod_path: Optional[List[Path]] = None,
) -> List[type(ModInfo)]:
"""
加载所有 mod (可提供额外的 mod 路径)
:param extra_path: 额外的 mod 路径
:param extra_mod_path: 额外的找到的 mod 路径
:return:
"""
find_path = self.mods_path + (extra_path if extra_path is not None else [])
_add_path_to_sys(find_path)
mods = []
start_time = time.time()
logger.info(tr().mod.load.start().format(find_path))
for path in find_path:
if not path.exists():
path.mkdir(parents=True)
continue
for mod in path.iterdir():
if (cache := self.load_mod(mod)) is not None:
mods.append(cache)
if extra_mod_path is not None:
for path in extra_mod_path:
if (cache := self.load_mod(path)) is not None:
mods.append(cache)
logger.info(tr().mod.load.use_time().format(time.time() - start_time))
return mods
def init_mods(self, mods: List[type(ModInfo)]):
"""
加载 mod
:param mods: 要加载的 mod 的 ModInfo 类
:return:
"""
start_time = time.time()
for mod_class in mods:
try:
init_mod = mod_class()
self.loaded_mod_modules[init_mod.mod_id] = init_mod
logger.info(tr().mod.init.success().format(init_mod, init_mod.version))
except Exception as e:
logger.error(
tr().mod.init.faild().format(mod_class, e, traceback.format_exc())
)
continue
logger.info(tr().mod.init.use_time().format(time.time() - start_time))
def unload_mod(self, mod_id: str, game: Game) -> Optional[ModInfo]:
"""
卸载 mod
:param mod_id: 要卸载的 mod id
:param game: 游戏实例
:return: 卸载的 mod 的 ModInfo 类
"""
if (
not (mod_class := self.loaded_mod_modules.get(mod_id))
and (mod_class := self.get_mod_module(mod_id)) is None
):
logger.warn(tr().mod.unload.faild.not_find().format(mod_id))
return None
try:
mod_class.on_unload(game=game)
self.loaded_mod_modules.pop(mod_class.mod_id)
logger.info(tr().mod.unload.success().format(mod_id))
return mod_class
except Exception as e:
logger.error(
tr().mod.unload.faild.error().format(mod_id, e, traceback.format_exc())
)
return None
def reload_mod(self, mod_id: str, game: Game):
"""
重载 mod
:param mod_id:
:param game:
:return:
"""
unload = self.unload_mod(mod_id, game)
if unload is None:
return
mod_class: Optional[ModInfo] = None
if unload.mod_id not in self.find_mod_paths:
logger.warn(tr().mod.reload.faild.not_find().format(unload.mod_id))
paths = self.find_mods_in_path()
for path in paths:
mod_class = self.load_mod(path)
if mod_class is not None and mod_class.mod_id == unload.mod_id:
self.init_mods([mod_class])
break
else:
mod_class = self.load_mod(self.find_mod_paths[unload.mod_id])
if mod_class is not None:
self.init_mods([mod_class])
if mod_id in self.loaded_mod_modules and mod_class is not None:
self.loaded_mod_modules[mod_id].on_load(game=game, old_self=mod_class)
logger.info(tr().mod.reload.success().format(mod_id))