gitea_push2qq/server.py

208 lines
7.1 KiB
Python

import tomllib
from typing import Dict, Any, List, Tuple
import casbin
from casbin_tortoise_adapter import TortoiseAdapter
from nacl.signing import SigningKey
from sanic import Sanic, Request
from sanic.log import logger, Colors
from sanic.response import text
from socketio import AsyncClient
from tortoise.contrib.sanic import register_tortoise
import cmds
from gitea_model import WebHookIssueComment, WebHookIssue, GiteaEvent
from sio_model import Ctx, SioConfig, Message
from unit import sio_log_format, int2str, cas_log_fmt
app = Sanic('GiteaPush', ctx=Ctx)
def get_config() -> SioConfig:
with open('config.toml', 'rb') as f:
config = tomllib.load(f)
return SioConfig(**config)
SIO_CONFIG = get_config()
register_tortoise(
app, db_url=SIO_CONFIG.db_url, modules={"models": ["models", "casbin_tortoise_adapter"]}, generate_schemas=True
)
@app.before_server_start
async def setup_before_start(_app):
_app.ctx.sio_config = SIO_CONFIG
# 使用casbin策略管理
adapter = TortoiseAdapter()
e = casbin.AsyncEnforcer('./casbin_data/model.conf', adapter)
_app.ctx.e = e
t1 = await _app.ctx.e.add_policy('admin', '*')
t2 = await _app.ctx.e.add_policy('default', 'ping')
if t1 is True and t2 is True:
logger.info(cas_log_fmt('Init casbin rule success!'))
admins = int2str(_app.ctx.sio_config.admin)
for qid in admins:
if await _app.ctx.e.add_role_for_user(qid, 'admin'):
logger.debug(cas_log_fmt(f'Added {Colors.PURPLE}{qid}{Colors.YELLOW} to admin group'))
users = await _app.ctx.e.get_users_for_role('admin')
rm_user = set(users) ^ set(admins)
for u in list(rm_user):
if await _app.ctx.e.delete_user(u):
logger.debug(f'Delete {Colors.PURPLE}{u}{Colors.YELLOW} for group admin')
await _app.ctx.e.save_policy()
# 初始化sio
_app.ctx.sio = AsyncClient()
start_sio_listener()
await _app.ctx.sio.connect(_app.ctx.sio_config.host)
@app.post('/receive')
async def receive(rqt: Request):
match (rqt.headers['X-Gitea-Event']):
case GiteaEvent.issues.value:
data = WebHookIssue(**rqt.json)
rsp_title = f"[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})"
rsp_sender = f"By {data.sender.username}"
rsp_ctx = f"{data.issue.body}"
rsp_link = f"View it: {data.issue.html_url}"
cancel_subscribe = f"unsubscribe: {data.repository.html_url}"
rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}"
case GiteaEvent.issue_comment.value:
data = WebHookIssueComment(**rqt.json)
rsp_title = f"Re:[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})"
rsp_sender = f"By {data.sender.username}"
rsp_ctx = f"{data.comment.body}"
rsp_link = f"View it: {data.comment.html_url}"
cancel_subscribe = f"unsubscribe: {data.repository.html_url}"
rsp = f"{rsp_title}\n\n{rsp_sender}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}"
case _:
rsp = "Unknown webhook type! Please contact the administrator."
message = Message(content=rsp, room_id=-777186831)
await app.ctx.sio.emit('sendMessage', message.to_json())
return text(rsp)
@app.get('/redirect')
async def redirect(rqt: Request):
print(rqt.args)
print(rqt.ctx.state)
return text('success')
"""
以下为QQ监听部分
"""
def start_sio_listener():
@app.ctx.sio.on('connect')
def connect():
logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}')
@app.ctx.sio.on('requireAuth')
async def require_auth(salt: str, versions: Dict[str, str]):
logger.info(
f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}")
# 准备数据
sign = SigningKey(bytes.fromhex(app.ctx.sio_config.key))
signature = sign.sign(bytes.fromhex(salt))
# 发送数据
logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}")
await app.ctx.sio.emit('auth', signature.signature)
logger.info(f"{Colors.BLUE}send auth emit{Colors.END}")
@app.ctx.sio.on('auth')
async def auth(data: Dict[str, Any]):
logger.info(f"auth: {data}")
@app.ctx.sio.on('authFailed')
async def auth_failed():
logger.warn(f"authFailed")
await app.ctx.sio.disconnect()
@app.ctx.sio.on('authSucceed')
def auth_succeed():
logger.info(f"authSucceed")
@app.ctx.sio.on('connect_error')
def connect_error(*args, **kwargs):
logger.warn(f"连接错误 {args}, {kwargs}")
@app.ctx.sio.on('updateRoom')
def update_room(data: Dict[str, Any]):
logger.debug(sio_log_format('update_room:', data))
@app.ctx.sio.on('deleteMessage')
def delete_message(message_id: str):
logger.debug(sio_log_format('delete_message:', message_id))
@app.ctx.sio.on('setMessages')
def set_messages(data: Dict[str, Any]):
logger.debug(f"{sio_log_format('set_messages:', data)}"
f"{sio_log_format('message_len:', len(data['messages']))}")
@app.ctx.sio.on('setAllRooms')
def set_all_rooms(rooms: List[Dict[str, Any]]):
logger.debug(f"{sio_log_format('set_all_rooms:', rooms)}"
f"{sio_log_format('len:', len(rooms))}")
@app.ctx.sio.on('setAllChatGroups')
def set_all_chat_groups(groups: List[Dict[str, Any]]):
logger.debug(f"{sio_log_format('set_all_chat_groups:', groups)}"
f"{sio_log_format('len:', len(groups))}")
@app.ctx.sio.on('notify')
def notify(data: List[Tuple[str, Any]]):
logger.debug(sio_log_format('notify:', data))
@app.ctx.sio.on('closeLoading')
def close_loading(_):
logger.debug(sio_log_format('close_loading', ''))
@app.ctx.sio.on('onlineData')
def online_data(data: Dict[str, Any]):
logger.debug(sio_log_format('online_data:', data))
@app.ctx.sio.on('*')
def catch_all(event, data):
logger.debug(sio_log_format('catch_all:', f'{event}|{data}'))
@app.ctx.sio.on('addMessage')
async def add_message(data: Dict[str, Any]):
sio_decorator = cmds.cmds(app, data)
await sio_decorator.route2cmd_and_run()
# @app.ctx.sio.on('addMessage')
# @SioDecorator.cmd('gitea', app)
# async def gitea(sqt: SioRequest):
# parser = sqt.parser
# parser.add_argument('-vd', help='绑定QQ与gitea')
# args = parser.parse_args(sqt.args)
# if args.vd:
# state = secrets.token_urlsafe(16)
# user = User(id=sqt.sender_id, name=sqt.sender_name, state=state)
# await user.save()
# url = (f'{app.ctx.sio_config.validate_host}/login/oauth/authorize?'
# f'client_id={app.ctx.sio_config.client_id}&'
# f'redirect_uri={app.ctx.sio_config.localhost}/redirect&'
# f'response_type=code&state={state}')
# return Message(content=f'click: \n{url}')
if __name__ == "__main__":
app.run(host='0.0.0.0', port=80, dev=True)