gitea_push2qq/model.py

70 lines
1.8 KiB
Python

from dataclasses import dataclass
from typing import Optional, List, Union, Literal
from lib_not_dr.types import Options
from pydantic import BaseModel, Field
from socketio import AsyncClient
class AtElement(Options):
text: str
id: Union[int, Literal['all']] = 'all'
class ReplyMessage(Options):
id: str
username: str = ''
content: str = ''
files: list = []
def to_json(self) -> dict:
return {
'_id': self.id,
'username': self.username,
'content': self.content,
'files': self.files
}
class Message(Options):
content: str
room_id: Optional[int] = None
room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 )
file: None = None # 上传文件
reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any 回复消息
b64_img: Optional[str] = None # 发送图片
at: Optional[List[AtElement]] = [] # @某人
sticker: Optional[None] = None # 发送表情
message_type: Optional[str] = None # 消息类型
def to_json(self) -> dict:
return {
'content': self.content,
'roomId': self.room_id,
'room': self.room,
'file': self.file,
'replyMessage': self.reply_to.to_json() if self.reply_to else None,
'b64img': self.b64_img,
'at': self.at,
'sticker': self.sticker,
'messageType': self.message_type
}
class SioConfig(BaseModel):
host: str
key: str = Field(alias='private_key')
self_id: int
admin: List[int]
validate_host: str
client_id: str
client_secret: str
localhost: str
db_url: str
@dataclass
class Ctx:
sio: AsyncClient
sio_config: SioConfig