import shlex from dataclasses import dataclass from functools import wraps from typing import Optional, List, Union, Literal, Dict, Any from casbin import Enforcer, AsyncEnforcer from pydantic import BaseModel, Field, model_validator from sanic import Sanic from sanic.log import logger from socketio import AsyncClient from cmdparser import ArgumentParser, PrintMessage from unit import sio_log_format class AtElement(BaseModel): text: str id: Union[int, Literal['all']] = 'all' class ReplyMessage(BaseModel): id: str username: str = '' content: str = '' files: list = [] def to_json(self) -> dict: return { '_id': self.id, 'username': self.username, 'content': self.content, 'files': self.files } class Message(BaseModel): content: str room_id: Optional[int] = None room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 ) file: None = None # 上传文件 reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any 回复消息 b64_img: Optional[str] = None # 发送图片 at: Optional[List[AtElement]] = [] # @某人 sticker: Optional[None] = None # 发送表情 message_type: Optional[str] = None # 消息类型 @model_validator(mode='after') def check_room_id(self): if self.room_id is None and self.room is None: raise ValueError('room id 和 room 二选一 ( 实际上直接填 room id 就行了 )') return self def to_json(self) -> dict: return { 'content': self.content, 'roomId': self.room_id, 'room': self.room, 'file': self.file, 'replyMessage': self.reply_to.to_json() if self.reply_to else None, 'b64img': self.b64_img, 'at': self.at, 'sticker': self.sticker, 'messageType': self.message_type } class SioConfig(BaseModel): host: str key: str = Field(alias='private_key') self_id: int admin: List[int] gitea_host: str client_id: str client_secret: str localhost: str db_url: str class CmdExists(Exception): ... class SioDecorator: def __init__(self, app: Sanic, data: Dict[str, Any]): logger.debug(sio_log_format('add_message:', data)) self._content = data['message']['content'] self._cmd = shlex.split(self._content) self.app = app self.data = data self.cmds = {} def cmd(self, cmd_key: str): def decorator(func): self._cmds_append(cmd_key, func) @wraps(func) async def wrapper(*args, **kwargs): # args 无参数名的 kw 有参数名的 ... return wrapper return decorator def _cmds_append(self, cmd, func): if cmd in self.cmds.keys(): raise CmdExists( f"Command already registered: /{cmd}" ) else: self.cmds[f'/{cmd}'] = func async def route2cmd_and_run(self): func = self.cmds.get(self._cmd[0]) if func: sender_id = self.data['message']['senderId'] sender_name = self.data['message']['username'] room_id = self.data['roomId'] message_id = self.data['message']['_id'] if sender_id != self.app.ctx.sio_config.self_id: e: AsyncEnforcer = self.app.ctx.e if (e.enforce(str(room_id), self._cmd[0][1:]) or (await e.has_role_for_user(str(sender_id), 'admin'))): parser = ArgumentParser(self._cmd[0]) sqt = SioRequest(app=self.app, parser=parser, args=self._cmd[1:], key=self._cmd[0], sender_id=sender_id, sender_name=sender_name, content=self._content, room_id=room_id, data=self.data, message_id=message_id) try: await func(sqt) except PrintMessage as e: msg = Message(content=str(e), room_id=room_id) await self.app.ctx.sio.emit('sendMessage', msg.to_json()) @dataclass class Ctx: sio: AsyncClient sio_config: SioConfig e: Enforcer sio_decorator: SioDecorator @dataclass class SioRequest: app: Sanic = None parser: Optional[ArgumentParser] = None args: Optional[list] = None key: Optional[str] = None data: Optional[Dict[str, Any]] = None sender_id: Optional[int] = None sender_name: str = '' content: str = '' room_id: Optional[int] = None message_id: str = ''