import tomllib from typing import Dict from nacl.signing import SigningKey from sanic import Sanic, Request from sanic.log import logger, Colors from sanic.response import text from socketio import AsyncClient from gitea_model import WebHookIssueComment, WebHookIssue, GiteaEvent from model import Ctx, SioConfig, Message app = Sanic('GiteaPush', ctx=Ctx) def get_config() -> SioConfig: with open('config.toml', 'rb') as f: config = tomllib.load(f) return SioConfig(config['host'], config['private_key'], config['self_id']) @app.before_server_start async def setup_before_start(_app): _app.ctx.sio_config = get_config() _app.ctx.sio = AsyncClient() start_sio_listener() await _app.ctx.sio.connect(_app.ctx.sio_config.HOST) @app.post('/receive') async def receive(rqt: Request): match (rqt.headers['X-Gitea-Event']): case GiteaEvent.issues.value: data = WebHookIssue(**rqt.json) rsp_title = f"[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_sender = f"By {data.sender.username}" rsp_ctx = f"{data.issue.body}" rsp_link = f"View it: {data.issue.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case GiteaEvent.issue_comment.value: data = WebHookIssueComment(**rqt.json) rsp_title = f"Re:[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_sender = f"By {data.sender.username}" rsp_ctx = f"{data.comment.body}" rsp_link = f"View it: {data.comment.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case _: rsp = "Unknown webhook type! Please contact the administrator." message = Message(content=rsp, room_id=-777186831) await app.ctx.sio.emit('sendMessage', message.to_json()) return text(rsp) """ 以下为QQ监听部分 """ def start_sio_listener(): @app.ctx.sio.on('connect') def connect(): logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}') @app.ctx.sio.on('requireAuth') async def require_auth(salt: str, versions: Dict[str, str]): logger.info( f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}") # 准备数据 sign = SigningKey(bytes.fromhex(app.ctx.sio_config.KEY)) signature = sign.sign(bytes.fromhex(salt)) # 发送数据 logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}") await app.ctx.sio.emit('auth', signature.signature) logger.info(f"{Colors.BLUE}send auth emit{Colors.END}") if __name__ == "__main__": app.run(host='0.0.0.0', port=80, dev=True)