import tomllib from typing import Dict, Any, List, Tuple import aiohttp import casbin from casbin_tortoise_adapter import TortoiseAdapter from nacl.signing import SigningKey from sanic import Sanic, Request from sanic.log import logger, Colors from sanic.response import text from socketio import AsyncClient from tortoise.contrib.sanic import register_tortoise import cmds from gitea_model import WebHookIssueComment, WebHookIssue, GiteaEvent from models import User, GiteaUser from sio_model import Ctx, SioConfig, Message from unit import sio_log_format, int2str, cas_log_fmt app = Sanic('GiteaPush', ctx=Ctx) def get_config() -> SioConfig: with open('config.toml', 'rb') as f: config = tomllib.load(f) return SioConfig(**config) SIO_CONFIG = get_config() register_tortoise( app, db_url=SIO_CONFIG.db_url, modules={"models": ["models", "casbin_tortoise_adapter"]}, generate_schemas=True ) @app.before_server_start async def setup_before_start(_app): _app.ctx.sio_config = SIO_CONFIG # 使用casbin策略管理 adapter = TortoiseAdapter() e = casbin.AsyncEnforcer('./casbin_data/model.conf', adapter) _app.ctx.e = e t1 = await _app.ctx.e.add_policy('admin', '*') t2 = await _app.ctx.e.add_policy('default', 'ping') if t1 is True and t2 is True: logger.info(cas_log_fmt('Init casbin rule success!')) admins = int2str(_app.ctx.sio_config.admin) for qid in admins: if await _app.ctx.e.add_role_for_user(qid, 'admin'): logger.debug(cas_log_fmt(f'Added {Colors.PURPLE}{qid}{Colors.YELLOW} to admin group')) users = await _app.ctx.e.get_users_for_role('admin') rm_user = set(users) ^ set(admins) for u in list(rm_user): if await _app.ctx.e.delete_user(u): logger.debug(f'Delete {Colors.PURPLE}{u}{Colors.YELLOW} for group admin') await _app.ctx.e.save_policy() # 初始化sio _app.ctx.sio = AsyncClient() start_sio_listener() await _app.ctx.sio.connect(_app.ctx.sio_config.host) async def push_webhook2users(_app: Sanic, full_name: str, msg: str): async with aiohttp.ClientSession() as session: url = f'{app.ctx.sio_config.gitea_host}/api/v1/repos/{full_name}/subscribers?page=&limit=' async with session.get(url) as rps: if rps.status == 200: json = await rps.json() q_user = set() for g_user in json: g_user = await GiteaUser.filter(id=g_user['id']).get_or_none() if g_user: _q_user = await g_user.qid.get() q_user.add(_q_user.id) for i in q_user: message = Message(content=msg, room_id=i) await app.ctx.sio.emit('sendMessage', message.to_json()) else: logger.warn(rps) @app.post('/receive') async def receive(rqt: Request): match (rqt.headers['X-Gitea-Event']): case GiteaEvent.issues.value: data = WebHookIssue(**rqt.json) rsp_sender = f"{data.sender.username}" rsp_title = f"[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_ctx = f"{data.issue.body}" rsp_link = f"View it: {data.issue.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_sender}\n{rsp_title}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case GiteaEvent.issue_comment.value: data = WebHookIssueComment(**rqt.json) rsp_sender = f"{data.sender.username}" rsp_title = f"Re:[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})" rsp_ctx = f"{data.comment.body}" rsp_link = f"View it: {data.comment.html_url}" cancel_subscribe = f"unsubscribe: {data.repository.html_url}" rsp = f"{rsp_sender}\n{rsp_title}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}" case _: data = None rsp = "Unknown webhook type! Please contact the administrator." logger.warn(rsp) if data: rqt.app.add_task(push_webhook2users(app, data.repository.full_name, rsp)) return text(rsp) async def get_gitea_token(_app, form: dict, q_user: User): async with aiohttp.ClientSession() as session: url = f'{app.ctx.sio_config.gitea_host}/login/oauth/access_token' async with session.post(url, data=form) as rps: if rps.status == 200: json = await rps.json() act = json['access_token'] rft = json['refresh_token'] else: logger.warn(f'Gitea: {rps}') return f'绑定失败:{await rps.text()}' url = f'{app.ctx.sio_config.gitea_host}/api/v1/user' headers = {'Authorization': f'token {act}'} async with session.get(url, headers=headers) as rps: if rps.status == 200: json = await rps.json() gid = json['id'] login = json['login'] else: logger.warn(f'Gitea: {rps}') return f'绑定失败:{await rps.text()}' g_user = await GiteaUser.filter(id=gid).get_or_none() if g_user: g_user.id = gid g_user.name = login g_user.qid = q_user g_user.ac_tk = act g_user.rfs_tk = rft await g_user.save() else: await GiteaUser(id=gid, name=login, qid=q_user, ac_tk=act, rfs_tk=rft).save() return f'绑定成功:QQ:{q_user.name}, Gitea:{login}' @app.get('/redirect') async def redirect(rqt: Request): rqt_state = rqt.args.get('state') code = rqt.args.get('code') if rqt_state is None or code is None: logger.warn(f'Gitea返回消息格式错误:{rqt.args}') return text(f'返回信息错误:{rqt.args}') q_user = await User.filter(state=rqt_state).get_or_none() if q_user is None: logger.warn(f'该请求不是从QQ中请求的链接') return text('请先从QQ处申请绑定') # msg = Message(content='已在绑定中,可能因为意外而失败\n查询结果使用\n/gitea -ust', room_id=q_user.id) # await app.ctx.sio.emit('sendMessage', msg.to_json()) rqt.app.add_task(get_gitea_token(app, {'client_id': app.ctx.sio_config.client_id, 'client_secret': app.ctx.sio_config.client_secret, 'code': code, 'grant_type': 'authorization_code', 'redirect_uri': f'{app.ctx.sio_config.localhost}/redirect'}, q_user), name=str(q_user.id)) return text('success') """ 以下为QQ监听部分 """ def start_sio_listener(): @app.ctx.sio.on('connect') def connect(): logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}') @app.ctx.sio.on('requireAuth') async def require_auth(salt: str, versions: Dict[str, str]): logger.info( f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}") # 准备数据 sign = SigningKey(bytes.fromhex(app.ctx.sio_config.key)) signature = sign.sign(bytes.fromhex(salt)) # 发送数据 logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}") await app.ctx.sio.emit('auth', signature.signature) logger.info(f"{Colors.BLUE}send auth emit{Colors.END}") @app.ctx.sio.on('auth') async def auth(data: Dict[str, Any]): logger.info(f"auth: {data}") @app.ctx.sio.on('authFailed') async def auth_failed(): logger.warn(f"authFailed") await app.ctx.sio.disconnect() @app.ctx.sio.on('authSucceed') def auth_succeed(): logger.info(f"authSucceed") @app.ctx.sio.on('connect_error') def connect_error(*args, **kwargs): logger.warn(f"连接错误 {args}, {kwargs}") @app.ctx.sio.on('updateRoom') def update_room(data: Dict[str, Any]): logger.debug(sio_log_format('update_room:', data)) @app.ctx.sio.on('deleteMessage') def delete_message(message_id: str): logger.debug(sio_log_format('delete_message:', message_id)) @app.ctx.sio.on('setMessages') def set_messages(data: Dict[str, Any]): logger.debug(f"{sio_log_format('set_messages:', data)}" f"{sio_log_format('message_len:', len(data['messages']))}") @app.ctx.sio.on('setAllRooms') def set_all_rooms(rooms: List[Dict[str, Any]]): logger.debug(f"{sio_log_format('set_all_rooms:', rooms)}" f"{sio_log_format('len:', len(rooms))}") @app.ctx.sio.on('setAllChatGroups') def set_all_chat_groups(groups: List[Dict[str, Any]]): logger.debug(f"{sio_log_format('set_all_chat_groups:', groups)}" f"{sio_log_format('len:', len(groups))}") @app.ctx.sio.on('notify') def notify(data: List[Tuple[str, Any]]): logger.debug(sio_log_format('notify:', data)) @app.ctx.sio.on('closeLoading') def close_loading(_): logger.debug(sio_log_format('close_loading', '')) @app.ctx.sio.on('onlineData') def online_data(data: Dict[str, Any]): logger.debug(sio_log_format('online_data:', data)) @app.ctx.sio.on('*') def catch_all(event, data): logger.debug(sio_log_format('catch_all:', f'{event}|{data}')) @app.ctx.sio.on('addMessage') async def add_message(data: Dict[str, Any]): sio_decorator = cmds.cmds(app, data) try: await sio_decorator.route2cmd_and_run() except IndexError: # 处理非文本消息 pass if __name__ == "__main__": app.run(host='0.0.0.0', port=80, dev=True)