Difficult-Rocket/Difficult_Rocket/mod/loader/__init__.py

235 lines
8.2 KiB
Python
Raw Normal View History

2022-02-16 13:36:26 +08:00
# -------------------------------
# Difficult Rocket
2023-01-20 14:08:12 +08:00
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
2022-02-16 13:36:26 +08:00
# All rights reserved
# -------------------------------
2023-04-23 21:56:28 +08:00
2023-06-22 13:34:14 +08:00
import sys
import time
2023-06-10 18:08:40 +08:00
import logging
2023-06-22 13:34:14 +08:00
import traceback
import importlib
2023-06-10 18:08:40 +08:00
from pathlib import Path
2023-06-26 10:14:28 +08:00
from typing import List, Dict, Optional, TypeVar
2023-06-10 18:08:40 +08:00
from Difficult_Rocket.mod.api import ModInfo
2023-06-22 13:34:14 +08:00
from Difficult_Rocket.utils.translate import tr
from Difficult_Rocket.api.types import Options
2023-06-22 13:34:14 +08:00
2023-12-03 16:54:07 +08:00
Game = TypeVar("Game")
2023-06-26 10:14:28 +08:00
2023-12-03 16:54:07 +08:00
logger = logging.getLogger("mod_manager")
ONE_FILE_SUFFIX = (".py", ".pyc", ".pyd")
PACKAGE_SUFFIX = (".pyz", ".zip", ".dr_mod")
2023-06-10 18:08:40 +08:00
2023-06-26 10:14:28 +08:00
def _add_path_to_sys(paths: List[Path]):
for path in paths:
if str(path) not in sys.path:
sys.path.append(str(path))
2023-06-10 18:08:40 +08:00
class ModManager(Options):
2023-12-03 16:54:07 +08:00
name = "Mod Manager"
2023-06-10 18:08:40 +08:00
2023-12-03 16:54:07 +08:00
mods_path: List[Path] = [Path("./mods")]
2023-06-26 10:14:28 +08:00
find_mod_paths: Dict[str, Path] = {}
2023-06-10 18:08:40 +08:00
loaded_mod_modules: Dict[str, ModInfo] = {}
2023-04-23 21:56:28 +08:00
2023-06-26 10:14:28 +08:00
def get_mod_module(self, mod_name: str) -> Optional[ModInfo]:
"""
获取指定 mod 的模块
:param mod_name: mod
:return:
"""
for mod in self.loaded_mod_modules.values():
if mod.name == mod_name:
return mod
return None
2023-06-22 13:34:14 +08:00
def dispatch_event(self, event_name: str, *args, **kwargs):
"""
分发事件
2023-06-26 10:14:28 +08:00
:param event_name: 事件名
:param args: 事件参数
:param kwargs: 事件参数
2023-06-22 13:34:14 +08:00
:return:
"""
for mod in self.loaded_mod_modules.values():
if hasattr(mod, event_name):
try:
getattr(mod, event_name)(*args, **kwargs)
except Exception as e:
2023-12-03 16:54:07 +08:00
logger.error(
tr()
.mod.event.error()
.format(mod, event_name, e, traceback.format_exc())
)
2023-06-22 13:34:14 +08:00
2023-06-26 10:14:28 +08:00
def load_mod(self, mod_path: Path) -> Optional[type(ModInfo)]:
"""
加载指定路径下的 mod
:param mod_path: mod 的路径
:return:
"""
if not mod_path.exists():
logger.error(tr().mod.load.faild.not_exist().format(mod_path))
return None
_add_path_to_sys([mod_path.parent])
try:
2023-12-03 16:54:07 +08:00
if mod_path.name == "__pycache__":
2023-06-26 10:14:28 +08:00
# 忽略 __pycache__ 文件夹 (Python 编译文件)
return None
logger.info(tr().mod.load.loading().format(mod_path))
2023-12-03 16:54:07 +08:00
if (
mod_path.is_dir()
or mod_path.suffix in PACKAGE_SUFFIX
or mod_path.suffix in ONE_FILE_SUFFIX
):
2023-06-26 10:14:28 +08:00
# 文件夹 mod
loading_mod = importlib.import_module(mod_path.name)
2023-12-03 16:54:07 +08:00
if not hasattr(loading_mod, "mod_class") or not issubclass(
loading_mod.mod_class, ModInfo
):
2023-06-26 10:14:28 +08:00
logger.warning(tr().mod.load.faild.no_mod_class().format(mod_path))
return None
mod_class: type(ModInfo) = loading_mod.mod_class # 获取 mod 类
if mod_class.mod_id not in self.find_mod_paths:
self.find_mod_paths[mod_class.mod_id] = mod_path
return mod_class
except ImportError:
2023-12-03 16:54:07 +08:00
logger.warning(
tr().mod.load.faild.error().format(mod_path, traceback.format_exc())
)
2023-06-26 10:14:28 +08:00
return None
2023-12-03 16:54:07 +08:00
def find_mods_in_path(
self, extra_mods_path: Optional[List[Path]] = None
) -> List[Path]:
2023-06-26 10:14:28 +08:00
"""
查找所有 mod 路径
:return: 找到的 mod 的路径 (未校验)
"""
2023-12-03 16:54:07 +08:00
find_path = self.mods_path + (
extra_mods_path if extra_mods_path is not None else []
)
2023-06-26 10:14:28 +08:00
mods_path = []
start_time = time.time()
for path in find_path:
if not path.exists():
path.mkdir(parents=True)
continue
for mod in path.iterdir():
2023-12-03 16:54:07 +08:00
if mod.name == "__pycache__":
2023-06-26 10:14:28 +08:00
# 忽略 __pycache__ 文件夹 (Python 编译文件)
continue
2023-12-03 16:54:07 +08:00
if (
mod.is_dir()
or mod.suffix in PACKAGE_SUFFIX
or mod.suffix in ONE_FILE_SUFFIX
):
2023-06-26 10:14:28 +08:00
# 文件夹 mod
mods_path.append(mod)
logger.info(tr().mod.finded().format(len(mods_path), time.time() - start_time))
return mods_path
2023-12-03 16:54:07 +08:00
def load_mods(
self,
extra_path: Optional[List[Path]] = None,
extra_mod_path: Optional[List[Path]] = None,
) -> List[type(ModInfo)]:
2023-06-10 18:08:40 +08:00
"""
2023-06-22 13:34:14 +08:00
加载所有 mod (可提供额外的 mod 路径)
:param extra_path: 额外的 mod 路径
2023-06-26 10:14:28 +08:00
:param extra_mod_path: 额外的找到的 mod 路径
2023-06-10 18:08:40 +08:00
:return:
"""
2023-06-22 13:34:14 +08:00
find_path = self.mods_path + (extra_path if extra_path is not None else [])
2023-06-26 10:14:28 +08:00
_add_path_to_sys(find_path)
2023-06-10 18:08:40 +08:00
mods = []
2023-06-22 13:34:14 +08:00
start_time = time.time()
logger.info(tr().mod.load.start().format(find_path))
for path in find_path:
2023-06-10 18:08:40 +08:00
if not path.exists():
path.mkdir(parents=True)
continue
for mod in path.iterdir():
2023-06-26 10:14:28 +08:00
if (cache := self.load_mod(mod)) is not None:
mods.append(cache)
if extra_mod_path is not None:
for path in extra_mod_path:
if (cache := self.load_mod(path)) is not None:
mods.append(cache)
2023-06-22 13:34:14 +08:00
logger.info(tr().mod.load.use_time().format(time.time() - start_time))
return mods
2023-04-23 21:56:28 +08:00
2023-06-22 13:34:14 +08:00
def init_mods(self, mods: List[type(ModInfo)]):
"""
加载 mod
2023-06-26 10:14:28 +08:00
:param mods: 要加载的 mod ModInfo
2023-06-22 13:34:14 +08:00
:return:
"""
start_time = time.time()
for mod_class in mods:
try:
init_mod = mod_class()
2023-06-26 10:14:28 +08:00
self.loaded_mod_modules[init_mod.mod_id] = init_mod
2023-06-22 13:34:14 +08:00
logger.info(tr().mod.init.success().format(init_mod, init_mod.version))
except Exception as e:
2023-12-03 16:54:07 +08:00
logger.error(
tr().mod.init.faild().format(mod_class, e, traceback.format_exc())
)
2023-06-22 13:34:14 +08:00
continue
logger.info(tr().mod.init.use_time().format(time.time() - start_time))
2023-06-26 10:14:28 +08:00
def unload_mod(self, mod_id: str, game: Game) -> Optional[ModInfo]:
"""
卸载 mod
:param mod_id: 要卸载的 mod id
:param game: 游戏实例
:return: 卸载的 mod ModInfo
"""
2023-12-03 16:54:07 +08:00
if (
not (mod_class := self.loaded_mod_modules.get(mod_id))
and (mod_class := self.get_mod_module(mod_id)) is None
):
2023-06-26 10:14:28 +08:00
logger.warning(tr().mod.unload.faild.not_find().format(mod_id))
return None
try:
mod_class.on_unload(game=game)
self.loaded_mod_modules.pop(mod_class.mod_id)
logger.info(tr().mod.unload.success().format(mod_id))
return mod_class
except Exception as e:
2023-12-03 16:54:07 +08:00
logger.error(
tr().mod.unload.faild.error().format(mod_id, e, traceback.format_exc())
)
2023-06-26 10:14:28 +08:00
return None
def reload_mod(self, mod_id: str, game: Game):
"""
重载 mod
:param mod_id:
:param game:
:return:
"""
unload = self.unload_mod(mod_id, game)
if unload is None:
return
mod_class: Optional[ModInfo] = None
if unload.mod_id not in self.find_mod_paths:
logger.warning(tr().mod.reload.faild.not_find().format(unload.mod_id))
paths = self.find_mods_in_path()
for path in paths:
mod_class = self.load_mod(path)
if mod_class is not None and mod_class.mod_id == unload.mod_id:
self.init_mods([mod_class])
break
else:
mod_class = self.load_mod(self.find_mod_paths[unload.mod_id])
if mod_class is not None:
self.init_mods([mod_class])
if mod_id in self.loaded_mod_modules and mod_class is not None:
self.loaded_mod_modules[mod_id].on_load(game=game, old_self=mod_class)
logger.info(tr().mod.reload.success().format(mod_id))