import tomllib from typing import Dict, Any, List, Tuple import casbin from nacl.signing import SigningKey from sanic import Sanic, Request from sanic.log import logger, Colors from sanic.response import text from tortoise.contrib.sanic import register_tortoise from command import command_convert from gitea_model import WebHookIssueComment, WebHookIssue, GiteaEvent from sio_model import Ctx, SioConfig, Message from unit import sio_log_format, int2str app = Sanic('GiteaPush', ctx=Ctx) def get_config() -> SioConfig: with open('config.toml', 'rb') as f: config = tomllib.load(f) return SioConfig(**config) SIO_CONFIG = get_config() register_tortoise( app, db_url=SIO_CONFIG.db_url, modules={"models": ["models"]}, generate_schemas=True ) @app.before_server_start async def setup_before_start(_app): _app.ctx.sio_config = SIO_CONFIG # 使用casbin策略管理 e = casbin.Enforcer('./casbin_data/model.conf', './casbin_data/casbin.csv') _app.ctx.e = e e.add_policy('admin', '*') e.add_policy('default', 'ping') admins = int2str(_app.ctx.sio_config.admin) for qid in admins: logger.info(e.add_role_for_user(qid, 'admin')) users = e.get_users_for_role('admin') rm_user = set(users) ^ set(admins) for u in list(rm_user): logger.info(e.delete_user(u)) e.save_policy() # 初始化sio # _app.ctx.sio = AsyncClient() # start_sio_listener() # # await _app.ctx.sio.connect(_app.ctx.sio_config.host) @app.post('/receive') async def receive(rqt: Request): match (rqt.headers['X-Gitea-Event']): case GiteaEvent.issues.value: data = WebHookIssue(**rqt.json) rsp_title = f"[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_sender = f"By {data.sender.username}" rsp_ctx = f"{data.issue.body}" rsp_link = f"View it: {data.issue.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case GiteaEvent.issue_comment.value: data = WebHookIssueComment(**rqt.json) rsp_title = f"Re:[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_sender = f"By {data.sender.username}" rsp_ctx = f"{data.comment.body}" rsp_link = f"View it: {data.comment.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case _: rsp = "Unknown webhook type! Please contact the administrator." message = Message(content=rsp, room_id=-777186831) await app.ctx.sio.emit('sendMessage', message.to_json()) return text(rsp) @app.get('/redirect') async def redirect(rqt: Request): print(rqt.args) print(rqt.ctx.state) return text('success') """ 以下为QQ监听部分 """ def start_sio_listener(): @app.ctx.sio.on('connect') def connect(): logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}') @app.ctx.sio.on('requireAuth') async def require_auth(salt: str, versions: Dict[str, str]): logger.info( f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}") # 准备数据 sign = SigningKey(bytes.fromhex(app.ctx.sio_config.key)) signature = sign.sign(bytes.fromhex(salt)) # 发送数据 logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}") await app.ctx.sio.emit('auth', signature.signature) logger.info(f"{Colors.BLUE}send auth emit{Colors.END}") @app.ctx.sio.on('auth') async def auth(data: Dict[str, Any]): logger.info(f"auth: {data}") @app.ctx.sio.on('authFailed') async def auth_failed(): logger.warn(f"authFailed") await app.ctx.sio.disconnect() @app.ctx.sio.on('authSucceed') def auth_succeed(): logger.info(f"authSucceed") @app.ctx.sio.on('connect_error') def connect_error(*args, **kwargs): logger.warn(f"连接错误 {args}, {kwargs}") @app.ctx.sio.on('updateRoom') def update_room(data: Dict[str, Any]): logger.debug(sio_log_format('update_room:', data)) @app.ctx.sio.on('deleteMessage') def delete_message(message_id: str): logger.debug(sio_log_format('delete_message:', message_id)) @app.ctx.sio.on('setMessages') def set_messages(data: Dict[str, Any]): logger.debug(f"{sio_log_format('set_messages:', data)}" f"{sio_log_format('message_len:', len(data['messages']))}") @app.ctx.sio.on('setAllRooms') def set_all_rooms(rooms: List[Dict[str, Any]]): logger.debug(f"{sio_log_format('set_all_rooms:', rooms)}" f"{sio_log_format('len:', len(rooms))}") @app.ctx.sio.on('setAllChatGroups') def set_all_chat_groups(groups: List[Dict[str, Any]]): logger.debug(f"{sio_log_format('set_all_chat_groups:', groups)}" f"{sio_log_format('len:', len(groups))}") @app.ctx.sio.on('notify') def notify(data: List[Tuple[str, Any]]): logger.debug(sio_log_format('notify:', data)) @app.ctx.sio.on('closeLoading') def close_loading(_): logger.debug(sio_log_format('close_loading', '')) @app.ctx.sio.on('onlineData') def online_data(data: Dict[str, Any]): logger.debug(sio_log_format('online_data:', data)) @app.ctx.sio.on('*') def catch_all(event, data): logger.debug(sio_log_format('catch_all:', f'{event}|{data}')) @app.ctx.sio.on('addMessage') async def add_message(data: Dict[str, Any]): logger.debug(sio_log_format('add_message:', data)) sender_id = data['message']['senderId'] if sender_id != app.ctx.sio_config.self_id: msg = await command_convert(data, app) if msg is not None: await app.ctx.sio.emit('sendMessage', msg.to_json()) if __name__ == "__main__": app.run(host='0.0.0.0', port=80, dev=True)