gitea_push2qq/server.py

274 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tomllib
from typing import Dict, Any, List, Tuple
import aiohttp
import casbin
from casbin_tortoise_adapter import TortoiseAdapter
from nacl.signing import SigningKey
from sanic import Sanic, Request
from sanic.log import logger, Colors
from sanic.response import text
from socketio import AsyncClient
from tortoise.contrib.sanic import register_tortoise
import cmds
from gitea_model import WebHookIssueComment, WebHookIssue, GiteaEvent
from models import User, GiteaUser
from sio_model import Ctx, SioConfig, Message
from unit import sio_log_format, int2str, cas_log_fmt
app = Sanic('GiteaPush', ctx=Ctx)
def get_config() -> SioConfig:
with open('config.toml', 'rb') as f:
config = tomllib.load(f)
return SioConfig(**config)
SIO_CONFIG = get_config()
register_tortoise(
app, db_url=SIO_CONFIG.db_url, modules={"models": ["models", "casbin_tortoise_adapter"]}, generate_schemas=True
)
@app.before_server_start
async def setup_before_start(_app):
_app.ctx.sio_config = SIO_CONFIG
# 使用casbin策略管理
adapter = TortoiseAdapter()
e = casbin.AsyncEnforcer('./casbin_data/model.conf', adapter)
_app.ctx.e = e
t1 = await _app.ctx.e.add_policy('admin', '*')
t2 = await _app.ctx.e.add_policy('default', 'ping')
if t1 is True and t2 is True:
logger.info(cas_log_fmt('Init casbin rule success!'))
admins = int2str(_app.ctx.sio_config.admin)
for qid in admins:
if await _app.ctx.e.add_role_for_user(qid, 'admin'):
logger.debug(cas_log_fmt(f'Added {Colors.PURPLE}{qid}{Colors.YELLOW} to admin group'))
users = await _app.ctx.e.get_users_for_role('admin')
rm_user = set(users) ^ set(admins)
for u in list(rm_user):
if await _app.ctx.e.delete_user(u):
logger.debug(f'Delete {Colors.PURPLE}{u}{Colors.YELLOW} for group admin')
await _app.ctx.e.save_policy()
# 初始化sio
_app.ctx.sio = AsyncClient()
start_sio_listener()
await _app.ctx.sio.connect(_app.ctx.sio_config.host)
async def push_webhook2users(_app: Sanic, full_name: str, msg: str):
async with aiohttp.ClientSession() as session:
url = f'{app.ctx.sio_config.gitea_host}/api/v1/repos/{full_name}/subscribers?page=&limit='
async with session.get(url) as rps:
if rps.status == 200:
json = await rps.json()
q_user = set()
for g_user in json:
g_user = await GiteaUser.filter(id=g_user['id']).get_or_none()
if g_user:
_q_user = await g_user.qid.get()
q_user.add(_q_user.id)
for i in q_user:
message = Message(content=msg, room_id=i)
await app.ctx.sio.emit('sendMessage', message.to_json())
else:
logger.warn(rps)
@app.post('/receive')
async def receive(rqt: Request):
match (rqt.headers['X-Gitea-Event']):
case GiteaEvent.issues.value:
data = WebHookIssue(**rqt.json)
rsp_sender = f"{data.sender.username}"
rsp_title = f"[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})"
rsp_ctx = f"{data.issue.body}"
rsp_link = f"View it: {data.issue.html_url}"
cancel_subscribe = f"unsubscribe: {data.repository.html_url}"
rsp = f"{rsp_sender}\n{rsp_title}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}"
case GiteaEvent.issue_comment.value:
data = WebHookIssueComment(**rqt.json)
rsp_sender = f"{data.sender.username}"
rsp_title = f"Re:[{data.repository.full_name}][{data.action}] {data.issue.title} (Issue #{data.issue.id})"
rsp_ctx = f"{data.comment.body}"
rsp_link = f"View it: {data.comment.html_url}"
cancel_subscribe = f"unsubscribe: {data.repository.html_url}"
rsp = f"{rsp_sender}\n{rsp_title}\n------\n{rsp_ctx}\n------\n{rsp_link}\n{cancel_subscribe}"
case _:
data = None
rsp = "Unknown webhook type! Please contact the administrator."
logger.warn(rsp)
if data:
rqt.app.add_task(push_webhook2users(app, data.repository.full_name, rsp))
return text(rsp)
async def get_gitea_token(_app, form: dict, q_user: User):
async with aiohttp.ClientSession() as session:
url = f'{app.ctx.sio_config.gitea_host}/login/oauth/access_token'
async with session.post(url, data=form) as rps:
if rps.status == 200:
json = await rps.json()
act = json['access_token']
rft = json['refresh_token']
else:
logger.warn(f'Gitea: {rps}')
return f'绑定失败:{await rps.text()}'
url = f'{app.ctx.sio_config.gitea_host}/api/v1/user'
headers = {'Authorization': f'token {act}'}
async with session.get(url, headers=headers) as rps:
if rps.status == 200:
json = await rps.json()
gid = json['id']
login = json['login']
else:
logger.warn(f'Gitea: {rps}')
return f'绑定失败:{await rps.text()}'
g_user = await GiteaUser.filter(id=gid).get_or_none()
if g_user:
g_user.id = gid
g_user.name = login
g_user.qid = q_user
g_user.ac_tk = act
g_user.rfs_tk = rft
await g_user.save()
else:
await GiteaUser(id=gid, name=login, qid=q_user, ac_tk=act, rfs_tk=rft).save()
return f'绑定成功QQ:{q_user.name}, Gitea:{login}'
@app.get('/redirect')
async def redirect(rqt: Request):
rqt_state = rqt.args.get('state')
code = rqt.args.get('code')
if rqt_state is None or code is None:
logger.warn(f'Gitea返回消息格式错误{rqt.args}')
return text(f'返回信息错误:{rqt.args}')
q_user = await User.filter(state=rqt_state).get_or_none()
if q_user is None:
logger.warn(f'该请求不是从QQ中请求的链接')
return text('请先从QQ处申请绑定')
msg = Message(content='已在绑定中,可能因为意外而失败\n查询结果使用\n/gitea -ust', room_id=q_user.id)
await app.ctx.sio.emit('sendMessage', msg.to_json())
rqt.app.add_task(get_gitea_token(app, {'client_id': app.ctx.sio_config.client_id,
'client_secret': app.ctx.sio_config.client_secret,
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': f'{app.ctx.sio_config.localhost}/redirect'},
q_user), name=str(q_user.id))
return text('success')
"""
以下为QQ监听部分
"""
def start_sio_listener():
@app.ctx.sio.on('connect')
def connect():
logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}')
@app.ctx.sio.on('requireAuth')
async def require_auth(salt: str, versions: Dict[str, str]):
logger.info(
f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}")
# 准备数据
sign = SigningKey(bytes.fromhex(app.ctx.sio_config.key))
signature = sign.sign(bytes.fromhex(salt))
# 发送数据
logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}")
await app.ctx.sio.emit('auth', signature.signature)
logger.info(f"{Colors.BLUE}send auth emit{Colors.END}")
@app.ctx.sio.on('auth')
async def auth(data: Dict[str, Any]):
logger.info(f"auth: {data}")
@app.ctx.sio.on('authFailed')
async def auth_failed():
logger.warn(f"authFailed")
await app.ctx.sio.disconnect()
@app.ctx.sio.on('authSucceed')
def auth_succeed():
logger.info(f"authSucceed")
@app.ctx.sio.on('connect_error')
def connect_error(*args, **kwargs):
logger.warn(f"连接错误 {args}, {kwargs}")
@app.ctx.sio.on('updateRoom')
def update_room(data: Dict[str, Any]):
logger.debug(sio_log_format('update_room:', data))
@app.ctx.sio.on('deleteMessage')
def delete_message(message_id: str):
logger.debug(sio_log_format('delete_message:', message_id))
@app.ctx.sio.on('setMessages')
def set_messages(data: Dict[str, Any]):
logger.debug(f"{sio_log_format('set_messages:', data)}"
f"{sio_log_format('message_len:', len(data['messages']))}")
@app.ctx.sio.on('setAllRooms')
def set_all_rooms(rooms: List[Dict[str, Any]]):
logger.debug(f"{sio_log_format('set_all_rooms:', rooms)}"
f"{sio_log_format('len:', len(rooms))}")
@app.ctx.sio.on('setAllChatGroups')
def set_all_chat_groups(groups: List[Dict[str, Any]]):
logger.debug(f"{sio_log_format('set_all_chat_groups:', groups)}"
f"{sio_log_format('len:', len(groups))}")
@app.ctx.sio.on('notify')
def notify(data: List[Tuple[str, Any]]):
logger.debug(sio_log_format('notify:', data))
@app.ctx.sio.on('closeLoading')
def close_loading(_):
logger.debug(sio_log_format('close_loading', ''))
@app.ctx.sio.on('onlineData')
def online_data(data: Dict[str, Any]):
logger.debug(sio_log_format('online_data:', data))
@app.ctx.sio.on('*')
def catch_all(event, data):
logger.debug(sio_log_format('catch_all:', f'{event}|{data}'))
@app.ctx.sio.on('addMessage')
async def add_message(data: Dict[str, Any]):
sio_decorator = cmds.cmds(app, data)
try:
await sio_decorator.route2cmd_and_run()
except IndexError:
# 处理非文本消息
pass
if __name__ == "__main__":
app.run(host='0.0.0.0', port=80, dev=True)