import tomllib from typing import Dict, Any, List, Tuple import casbin from casbin_tortoise_adapter import TortoiseAdapter from nacl.signing import SigningKey from sanic import Sanic, Request from sanic.log import logger, Colors from sanic.response import text from socketio import AsyncClient from tortoise.contrib.sanic import register_tortoise import cmds from gitea_model import WebHookIssueComment, WebHookIssue, GiteaEvent from sio_model import Ctx, SioConfig, Message from unit import sio_log_format, int2str, cas_log_fmt app = Sanic('GiteaPush', ctx=Ctx) def get_config() -> SioConfig: with open('config.toml', 'rb') as f: config = tomllib.load(f) return SioConfig(**config) SIO_CONFIG = get_config() register_tortoise( app, db_url=SIO_CONFIG.db_url, modules={"models": ["models", "casbin_tortoise_adapter"]}, generate_schemas=True ) @app.before_server_start async def setup_before_start(_app): _app.ctx.sio_config = SIO_CONFIG # 使用casbin策略管理 adapter = TortoiseAdapter() e = casbin.AsyncEnforcer('./casbin_data/model.conf', adapter) _app.ctx.e = e t1 = await _app.ctx.e.add_policy('admin', '*') t2 = await _app.ctx.e.add_policy('default', 'ping') if t1 is True and t2 is True: logger.info(cas_log_fmt('Init casbin rule success!')) admins = int2str(_app.ctx.sio_config.admin) for qid in admins: if await _app.ctx.e.add_role_for_user(qid, 'admin'): logger.debug(cas_log_fmt(f'Added {Colors.PURPLE}{qid}{Colors.YELLOW} to admin group')) users = await _app.ctx.e.get_users_for_role('admin') rm_user = set(users) ^ set(admins) for u in list(rm_user): if await _app.ctx.e.delete_user(u): logger.debug(f'Delete {Colors.PURPLE}{u}{Colors.YELLOW} for group admin') await _app.ctx.e.save_policy() # 初始化sio _app.ctx.sio = AsyncClient() start_sio_listener() await _app.ctx.sio.connect(_app.ctx.sio_config.host) @app.post('/receive') async def receive(rqt: Request): match (rqt.headers['X-Gitea-Event']): case GiteaEvent.issues.value: data = WebHookIssue(**rqt.json) rsp_title = f"[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_sender = f"By {data.sender.username}" rsp_ctx = f"{data.issue.body}" rsp_link = f"View it: {data.issue.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case GiteaEvent.issue_comment.value: data = WebHookIssueComment(**rqt.json) rsp_title = f"Re:[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_sender = f"By {data.sender.username}" rsp_ctx = f"{data.comment.body}" rsp_link = f"View it: {data.comment.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case _: rsp = "Unknown webhook type! Please contact the administrator." message = Message(content=rsp, room_id=-777186831) await app.ctx.sio.emit('sendMessage', message.to_json()) return text(rsp) @app.get('/redirect') async def redirect(rqt: Request): print(rqt.args) print(rqt.ctx.state) return text('success') """ 以下为QQ监听部分 """ def start_sio_listener(): @app.ctx.sio.on('connect') def connect(): logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}') @app.ctx.sio.on('requireAuth') async def require_auth(salt: str, versions: Dict[str, str]): logger.info( f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}") # 准备数据 sign = SigningKey(bytes.fromhex(app.ctx.sio_config.key)) signature = sign.sign(bytes.fromhex(salt)) # 发送数据 logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}") await app.ctx.sio.emit('auth', signature.signature) logger.info(f"{Colors.BLUE}send auth emit{Colors.END}") @app.ctx.sio.on('auth') async def auth(data: Dict[str, Any]): logger.info(f"auth: {data}") @app.ctx.sio.on('authFailed') async def auth_failed(): logger.warn(f"authFailed") await app.ctx.sio.disconnect() @app.ctx.sio.on('authSucceed') def auth_succeed(): logger.info(f"authSucceed") @app.ctx.sio.on('connect_error') def connect_error(*args, **kwargs): logger.warn(f"连接错误 {args}, {kwargs}") @app.ctx.sio.on('updateRoom') def update_room(data: Dict[str, Any]): logger.debug(sio_log_format('update_room:', data)) @app.ctx.sio.on('deleteMessage') def delete_message(message_id: str): logger.debug(sio_log_format('delete_message:', message_id)) @app.ctx.sio.on('setMessages') def set_messages(data: Dict[str, Any]): logger.debug(f"{sio_log_format('set_messages:', data)}" f"{sio_log_format('message_len:', len(data['messages']))}") @app.ctx.sio.on('setAllRooms') def set_all_rooms(rooms: List[Dict[str, Any]]): logger.debug(f"{sio_log_format('set_all_rooms:', rooms)}" f"{sio_log_format('len:', len(rooms))}") @app.ctx.sio.on('setAllChatGroups') def set_all_chat_groups(groups: List[Dict[str, Any]]): logger.debug(f"{sio_log_format('set_all_chat_groups:', groups)}" f"{sio_log_format('len:', len(groups))}") @app.ctx.sio.on('notify') def notify(data: List[Tuple[str, Any]]): logger.debug(sio_log_format('notify:', data)) @app.ctx.sio.on('closeLoading') def close_loading(_): logger.debug(sio_log_format('close_loading', '')) @app.ctx.sio.on('onlineData') def online_data(data: Dict[str, Any]): logger.debug(sio_log_format('online_data:', data)) @app.ctx.sio.on('*') def catch_all(event, data): logger.debug(sio_log_format('catch_all:', f'{event}|{data}')) @app.ctx.sio.on('addMessage') async def add_message(data: Dict[str, Any]): sio_decorator = cmds.cmds(app, data) await sio_decorator.route2cmd_and_run() if __name__ == "__main__": app.run(host='0.0.0.0', port=80, dev=True)