import shlex from dataclasses import dataclass from typing import Optional, List, Union, Literal, Dict, Any, Callable from casbin import Enforcer, AsyncEnforcer from lib_not_dr.types import Options from pydantic import BaseModel, Field from sanic import Sanic from sanic.log import logger from socketio import AsyncClient from cmdparser import ArgumentParser, PrintMessage from unit import sio_log_format class AtElement(Options): text: str id: Union[int, Literal['all']] = 'all' class ReplyMessage(Options): id: str username: str = '' content: str = '' files: list = [] def to_json(self) -> dict: return { '_id': self.id, 'username': self.username, 'content': self.content, 'files': self.files } class SendMessage(Options): content: str room_id: Optional[int] = None room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 ) file: None = None # TODO: 上传文件 reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any TODO: 回复消息 b64_img: Optional[str] = None # TODO: 发送图片 at: Optional[List[AtElement]] = [] # TODO: @某人 sticker: Optional[None] = None # TODO: 发送表情 message_type: Optional[str] = None # TODO: 消息类型 def to_json(self) -> dict: return { 'content': self.content, 'roomId': self.room_id, 'room': self.room, 'file': self.file, 'replyMessage': self.reply_to.to_json() if self.reply_to else None, 'b64img': self.b64_img, 'at': self.at, 'sticker': self.sticker, 'messageType': self.message_type } def to_content(self, content: str) -> "SendMessage": self.content = content return self class NewMessage(Options): sender_id: int sender_name: str room_id: int content: str msg_id: str data: dict def init(self, **kwargs) -> None: data = kwargs.pop('data') self.sender_name = data["message"]["username"] self.sender_id = data["message"]["senderId"] self.content = data["message"]["content"] self.room_id = data["roomId"] self.msg_id = data["message"]["_id"] def is_self(self, self_id: int) -> bool: return self.sender_id == self_id class SioConfig(BaseModel): host: str key: str = Field(alias='private_key') self_id: int admin: List[int] gitea_host: str client_id: str client_secret: str localhost: str db_url: str class CmdExists(Exception): ... class SioDecorator: def __init__(self): self.data = None self.app = None self._cmd = None self._content = None self.cmd_mapper = {} def cmd(self, cmd_key: str): def wrapper(func: Callable[[SioRequest], None]): self._cmds_append(cmd_key, func) return func return wrapper def _cmds_append(self, cmd, func): if cmd in self.cmd_mapper.keys(): raise CmdExists( f"Command already registered: /{cmd}" ) else: self.cmd_mapper[f'/{cmd}'] = func async def run(self, app: Sanic, data: Dict[str, Any]): logger.debug(sio_log_format('add_message:', data)) self._content = data['message']['content'] self._cmd = shlex.split(self._content) self.app = app self.data = data func = self.cmd_mapper.get(self._cmd[0]) if func: sender_id = self.data['message']['senderId'] sender_name = self.data['message']['username'] room_id = self.data['roomId'] message_id = self.data['message']['_id'] if sender_id != self.app.ctx.sio_config.self_id: e: AsyncEnforcer = self.app.ctx.e if (e.enforce(str(room_id), self._cmd[0][1:]) or (await e.has_role_for_user(str(sender_id), 'admin'))): parser = ArgumentParser(self._cmd[0]) sqt = SioRequest(app=self.app, parser=parser, args=self._cmd[1:], key=self._cmd[0], sender_id=sender_id, sender_name=sender_name, content=self._content, room_id=room_id, data=self.data, msg_id=message_id) try: await func(sqt) except PrintMessage as e: reply = ReplyMessage(id=message_id) msg = SendMessage(content=str(e), room_id=room_id, reply_to=reply) await self.app.ctx.sio.emit('sendMessage', msg.to_json()) @dataclass class Ctx: sio: AsyncClient sio_config: SioConfig e: Enforcer sio_decorator: SioDecorator @dataclass class SioRequest: app: Sanic = None parser: Optional[ArgumentParser] = None args: Optional[list] = None key: Optional[str] = None data: Optional[Dict[str, Any]] = None sender_id: Optional[int] = None sender_name: str = '' content: str = '' room_id: Optional[int] = None msg_id: str = ''