gitea_push2qq/sio_model.py

183 lines
5.4 KiB
Python

import shlex
from dataclasses import dataclass
from typing import Optional, List, Union, Literal, Dict, Any, Callable
from casbin import Enforcer, AsyncEnforcer
from lib_not_dr.types import Options
from pydantic import BaseModel, Field
from sanic import Sanic
from sanic.log import logger
from socketio import AsyncClient
from cmdparser import ArgumentParser, PrintMessage
from unit import sio_log_format
class AtElement(Options):
text: str
id: Union[int, Literal['all']] = 'all'
class ReplyMessage(Options):
id: str
username: str = ''
content: str = ''
files: list = []
def to_json(self) -> dict:
return {
'_id': self.id,
'username': self.username,
'content': self.content,
'files': self.files
}
class SendMessage(Options):
content: str
room_id: Optional[int] = None
room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 )
file: None = None # TODO: 上传文件
reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any TODO: 回复消息
b64_img: Optional[str] = None # TODO: 发送图片
at: Optional[List[AtElement]] = [] # TODO: @某人
sticker: Optional[None] = None # TODO: 发送表情
message_type: Optional[str] = None # TODO: 消息类型
def to_json(self) -> dict:
return {
'content': self.content,
'roomId': self.room_id,
'room': self.room,
'file': self.file,
'replyMessage': self.reply_to.to_json() if self.reply_to else None,
'b64img': self.b64_img,
'at': self.at,
'sticker': self.sticker,
'messageType': self.message_type
}
def to_content(self, content: str) -> "SendMessage":
self.content = content
return self
class NewMessage(Options):
sender_id: int
sender_name: str
room_id: int
content: str
msg_id: str
data: dict
def init(self, **kwargs) -> None:
data = kwargs.pop('data')
self.sender_name = data["message"]["username"]
self.sender_id = data["message"]["senderId"]
self.content = data["message"]["content"]
self.room_id = data["roomId"]
self.msg_id = data["message"]["_id"]
def is_self(self, self_id: int) -> bool:
return self.sender_id == self_id
class SioConfig(BaseModel):
host: str
key: str = Field(alias='private_key')
self_id: int
admin: List[int]
gitea_host: str
client_id: str
client_secret: str
localhost: str
db_url: str
class CmdExists(Exception):
...
class SioDecorator:
def __init__(self):
self.data = None
self.app = None
self._cmd = None
self._content = None
self.cmd_mapper = {}
def cmd(self, cmd_key: str):
def wrapper(func: Callable[[SioRequest], None]):
self._cmds_append(cmd_key, func)
return func
return wrapper
def _cmds_append(self, cmd, func):
if cmd in self.cmd_mapper.keys():
raise CmdExists(
f"Command already registered: /{cmd}"
)
else:
self.cmd_mapper[f'/{cmd}'] = func
async def run(self, app: Sanic, data: Dict[str, Any]):
logger.debug(sio_log_format('add_message:', data))
self._content = data['message']['content']
self._cmd = shlex.split(self._content)
self.app = app
self.data = data
func = self.cmd_mapper.get(self._cmd[0])
if func:
sender_id = self.data['message']['senderId']
sender_name = self.data['message']['username']
room_id = self.data['roomId']
message_id = self.data['message']['_id']
if sender_id != self.app.ctx.sio_config.self_id:
e: AsyncEnforcer = self.app.ctx.e
if (e.enforce(str(room_id), self._cmd[0][1:]) or
(await e.has_role_for_user(str(sender_id), 'admin'))):
parser = ArgumentParser(self._cmd[0])
sqt = SioRequest(app=self.app,
parser=parser,
args=self._cmd[1:],
key=self._cmd[0],
sender_id=sender_id,
sender_name=sender_name,
content=self._content,
room_id=room_id,
data=self.data,
msg_id=message_id)
try:
await func(sqt)
except PrintMessage as e:
reply = ReplyMessage(id=message_id)
msg = SendMessage(content=str(e), room_id=room_id, reply_to=reply)
await self.app.ctx.sio.emit('sendMessage', msg.to_json())
@dataclass
class Ctx:
sio: AsyncClient
sio_config: SioConfig
e: Enforcer
sio_decorator: SioDecorator
@dataclass
class SioRequest:
app: Sanic = None
parser: Optional[ArgumentParser] = None
args: Optional[list] = None
key: Optional[str] = None
data: Optional[Dict[str, Any]] = None
sender_id: Optional[int] = None
sender_name: str = ''
content: str = ''
room_id: Optional[int] = None
msg_id: str = ''