gitea_push2qq/sio_model.py
2023-12-22 00:21:19 +08:00

163 lines
4.9 KiB
Python

import shlex
from dataclasses import dataclass
from functools import wraps
from typing import Optional, List, Union, Literal, Dict, Any
from casbin import Enforcer, AsyncEnforcer
from pydantic import BaseModel, Field, model_validator
from sanic import Sanic
from sanic.log import logger
from socketio import AsyncClient
from cmdparser import ArgumentParser, PrintMessage
from unit import sio_log_format
class AtElement(BaseModel):
text: str
id: Union[int, Literal['all']] = 'all'
class ReplyMessage(BaseModel):
id: str
username: str = ''
content: str = ''
files: list = []
def to_json(self) -> dict:
return {
'_id': self.id,
'username': self.username,
'content': self.content,
'files': self.files
}
class Message(BaseModel):
content: str
room_id: Optional[int] = None
room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 )
file: None = None # 上传文件
reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any 回复消息
b64_img: Optional[str] = None # 发送图片
at: Optional[List[AtElement]] = [] # @某人
sticker: Optional[None] = None # 发送表情
message_type: Optional[str] = None # 消息类型
@model_validator(mode='after')
def check_room_id(self):
if self.room_id is None and self.room is None:
raise ValueError('room id 和 room 二选一 ( 实际上直接填 room id 就行了 )')
return self
def to_json(self) -> dict:
return {
'content': self.content,
'roomId': self.room_id,
'room': self.room,
'file': self.file,
'replyMessage': self.reply_to.to_json() if self.reply_to else None,
'b64img': self.b64_img,
'at': self.at,
'sticker': self.sticker,
'messageType': self.message_type
}
class SioConfig(BaseModel):
host: str
key: str = Field(alias='private_key')
self_id: int
admin: List[int]
gitea_host: str
client_id: str
client_secret: str
localhost: str
db_url: str
class CmdExists(Exception):
...
class SioDecorator:
def __init__(self, app: Sanic, data: Dict[str, Any]):
logger.debug(sio_log_format('add_message:', data))
self._content = data['message']['content']
self._cmd = shlex.split(self._content)
self.app = app
self.data = data
self.cmds = {}
def cmd(self, cmd_key: str):
def decorator(func):
self._cmds_append(cmd_key, func)
@wraps(func)
async def wrapper(*args, **kwargs): # args 无参数名的 kw 有参数名的
...
return wrapper
return decorator
def _cmds_append(self, cmd, func):
if cmd in self.cmds.keys():
raise CmdExists(
f"Command already registered: /{cmd}"
)
else:
self.cmds[f'/{cmd}'] = func
async def route2cmd_and_run(self):
func = self.cmds.get(self._cmd[0])
if func:
sender_id = self.data['message']['senderId']
sender_name = self.data['message']['username']
room_id = self.data['roomId']
message_id = self.data['message']['_id']
if sender_id != self.app.ctx.sio_config.self_id:
e: AsyncEnforcer = self.app.ctx.e
if (e.enforce(str(room_id), self._cmd[0][1:]) or
(await e.has_role_for_user(str(sender_id), 'admin'))):
parser = ArgumentParser(self._cmd[0])
sqt = SioRequest(app=self.app,
parser=parser,
args=self._cmd[1:],
key=self._cmd[0],
sender_id=sender_id,
sender_name=sender_name,
content=self._content,
room_id=room_id,
data=self.data,
msg_id=message_id)
try:
await func(sqt)
except PrintMessage as e:
msg = Message(content=str(e), room_id=room_id)
await self.app.ctx.sio.emit('sendMessage', msg.to_json())
@dataclass
class Ctx:
sio: AsyncClient
sio_config: SioConfig
e: Enforcer
sio_decorator: SioDecorator
@dataclass
class SioRequest:
app: Sanic = None
parser: Optional[ArgumentParser] = None
args: Optional[list] = None
key: Optional[str] = None
data: Optional[Dict[str, Any]] = None
sender_id: Optional[int] = None
sender_name: str = ''
content: str = ''
room_id: Optional[int] = None
msg_id: str = ''