gitea_push2qq/sio_model.py

160 lines
4.7 KiB
Python
Raw Normal View History

2023-12-20 20:47:07 +08:00
import shlex
2023-12-18 19:37:29 +08:00
from dataclasses import dataclass
2023-12-20 20:47:07 +08:00
from functools import wraps
from typing import Optional, List, Union, Literal, Dict, Any
2023-12-18 19:37:29 +08:00
2023-12-21 01:23:29 +08:00
from casbin import Enforcer, AsyncEnforcer
2023-12-20 20:47:07 +08:00
from pydantic import BaseModel, Field, model_validator
from sanic import Sanic
from sanic.log import logger
2023-12-18 19:37:29 +08:00
from socketio import AsyncClient
2023-12-20 20:47:07 +08:00
from cmdparser import ArgumentParser, PrintMessage
from unit import sio_log_format
2023-12-18 19:37:29 +08:00
2023-12-20 20:47:07 +08:00
class AtElement(BaseModel):
2023-12-18 19:37:29 +08:00
text: str
id: Union[int, Literal['all']] = 'all'
2023-12-20 20:47:07 +08:00
class ReplyMessage(BaseModel):
2023-12-18 19:37:29 +08:00
id: str
username: str = ''
content: str = ''
files: list = []
def to_json(self) -> dict:
return {
'_id': self.id,
'username': self.username,
'content': self.content,
'files': self.files
}
2023-12-20 20:47:07 +08:00
class Message(BaseModel):
2023-12-18 19:37:29 +08:00
content: str
room_id: Optional[int] = None
room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 )
file: None = None # 上传文件
reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any 回复消息
b64_img: Optional[str] = None # 发送图片
at: Optional[List[AtElement]] = [] # @某人
sticker: Optional[None] = None # 发送表情
message_type: Optional[str] = None # 消息类型
@model_validator(mode='after')
def check_room_id(self):
if self.room_id is None and self.room is None:
raise ValueError('room id 和 room 二选一 ( 实际上直接填 room id 就行了 )')
return self
2023-12-18 19:37:29 +08:00
def to_json(self) -> dict:
return {
'content': self.content,
'roomId': self.room_id,
'room': self.room,
'file': self.file,
'replyMessage': self.reply_to.to_json() if self.reply_to else None,
'b64img': self.b64_img,
'at': self.at,
'sticker': self.sticker,
'messageType': self.message_type
}
2023-12-19 01:05:14 +08:00
class SioConfig(BaseModel):
2023-12-19 01:41:28 +08:00
host: str
key: str = Field(alias='private_key')
2023-12-19 01:39:43 +08:00
self_id: int
2023-12-19 01:05:14 +08:00
admin: List[int]
2023-12-20 23:29:54 +08:00
gitea_host: str
2023-12-19 01:05:14 +08:00
client_id: str
client_secret: str
2023-12-19 02:25:59 +08:00
localhost: str
2023-12-19 02:49:04 +08:00
db_url: str
2023-12-18 19:37:29 +08:00
2023-12-20 20:47:07 +08:00
class CmdExists(Exception):
...
class SioDecorator:
def __init__(self, app: Sanic, data: Dict[str, Any]):
logger.debug(sio_log_format('add_message:', data))
self._content = data['message']['content']
self._cmd = shlex.split(self._content)
self.app = app
self.data = data
self.cmds = {}
def cmd(self, cmd_key: str):
def decorator(func):
self._cmds_append(cmd_key, func)
@wraps(func)
async def wrapper(*args, **kwargs): # args 无参数名的 kw 有参数名的
...
return wrapper
return decorator
def _cmds_append(self, cmd, func):
if cmd in self.cmds.keys():
raise CmdExists(
f"Command already registered: /{cmd}"
)
else:
self.cmds[f'/{cmd}'] = func
async def route2cmd_and_run(self):
func = self.cmds.get(self._cmd[0])
if func:
sender_id = self.data['message']['senderId']
sender_name = self.data['message']['username']
room_id = self.data['roomId']
if sender_id != self.app.ctx.sio_config.self_id:
2023-12-21 01:23:29 +08:00
e: AsyncEnforcer = self.app.ctx.e
2023-12-21 01:40:01 +08:00
if (e.enforce(str(room_id), self._cmd[0][1:]) or
(await e.has_role_for_user(str(sender_id), 'admin'))):
2023-12-21 01:23:29 +08:00
parser = ArgumentParser(self._cmd[0])
sqt = SioRequest(app=self.app,
parser=parser,
args=self._cmd[1:],
key=self._cmd[0],
sender_id=sender_id,
sender_name=sender_name,
content=self._content,
room_id=room_id,
data=self.data)
try:
await func(sqt)
except PrintMessage as e:
msg = Message(content=str(e), room_id=room_id)
await self.app.ctx.sio.emit('sendMessage', msg.to_json())
2023-12-20 20:47:07 +08:00
2023-12-18 19:37:29 +08:00
@dataclass
class Ctx:
sio: AsyncClient
sio_config: SioConfig
2023-12-20 20:47:07 +08:00
e: Enforcer
sio_decorator: SioDecorator
@dataclass
class SioRequest:
app: Sanic = None
parser: Optional[ArgumentParser] = None
args: Optional[list] = None
key: Optional[str] = None
data: Optional[Dict[str, Any]] = None
sender_id: Optional[int] = None
sender_name: str = ''
content: str = ''
room_id: Optional[int] = None