import tomllib from typing import Dict from nacl.signing import SigningKey from sanic import Sanic, Request from sanic.log import logger, Colors from sanic.response import json from socketio import AsyncClient from model import Ctx, SioConfig, Message app = Sanic('GiteaPush', ctx=Ctx) def get_config() -> SioConfig: with open('config.toml', 'rb') as f: config = tomllib.load(f) return SioConfig(config['host'], config['private_key'], config['self_id']) @app.before_server_start async def setup_before_start(_app): _app.ctx.sio_config = get_config() _app.ctx.sio = AsyncClient() start_sio_listener() await _app.ctx.sio.connect(_app.ctx.sio_config.HOST) @app.post('/receive') async def receive(rqt: Request): data = rqt.json ctx = data['issue']['body'] message = Message(content=f'webhook test: {ctx}', room_id=-777186831) await app.ctx.sio.emit('sendMessage', message.to_json()) return json({}) """ 以下为QQ监听部分 """ def start_sio_listener(): @app.ctx.sio.on('connect') def connect(): logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}') @app.ctx.sio.on('requireAuth') async def require_auth(salt: str, versions: Dict[str, str]): logger.info( f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}") # 准备数据 sign = SigningKey(bytes.fromhex(app.ctx.sio_config.KEY)) signature = sign.sign(bytes.fromhex(salt)) # 发送数据 logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}") await app.ctx.sio.emit('auth', signature.signature) logger.info(f"{Colors.BLUE}send auth emit{Colors.END}") if __name__ == "__main__": app.run(host='0.0.0.0', port=80, dev=True)