from dataclasses import dataclass from typing import Optional, List, Union, Literal from lib_not_dr.types import Options from pydantic import BaseModel, Field from socketio import AsyncClient class AtElement(Options): text: str id: Union[int, Literal['all']] = 'all' class ReplyMessage(Options): id: str username: str = '' content: str = '' files: list = [] def to_json(self) -> dict: return { '_id': self.id, 'username': self.username, 'content': self.content, 'files': self.files } class Message(Options): content: str room_id: Optional[int] = None room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 ) file: None = None # 上传文件 reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any 回复消息 b64_img: Optional[str] = None # 发送图片 at: Optional[List[AtElement]] = [] # @某人 sticker: Optional[None] = None # 发送表情 message_type: Optional[str] = None # 消息类型 def to_json(self) -> dict: return { 'content': self.content, 'roomId': self.room_id, 'room': self.room, 'file': self.file, 'replyMessage': self.reply_to.to_json() if self.reply_to else None, 'b64img': self.b64_img, 'at': self.at, 'sticker': self.sticker, 'messageType': self.message_type } class SioConfig(BaseModel): host: str key: str = Field(alias='private_key') self_id: int admin: List[int] validate_host: str client_id: str client_secret: str localhost: str db_url: str @dataclass class Ctx: sio: AsyncClient sio_config: SioConfig