66 lines
1.8 KiB
Python
66 lines
1.8 KiB
Python
import tomllib
|
|
from typing import Dict
|
|
|
|
from nacl.signing import SigningKey
|
|
from sanic import Sanic, Request
|
|
from sanic.log import logger, Colors
|
|
from sanic.response import json
|
|
from socketio import AsyncClient
|
|
|
|
from gitea_model import WebHook
|
|
from model import Ctx, SioConfig, Message
|
|
|
|
app = Sanic('GiteaPush', ctx=Ctx)
|
|
|
|
|
|
def get_config() -> SioConfig:
|
|
with open('config.toml', 'rb') as f:
|
|
config = tomllib.load(f)
|
|
return SioConfig(config['host'], config['private_key'], config['self_id'])
|
|
|
|
|
|
@app.before_server_start
|
|
async def setup_before_start(_app):
|
|
_app.ctx.sio_config = get_config()
|
|
_app.ctx.sio = AsyncClient()
|
|
|
|
start_sio_listener()
|
|
|
|
await _app.ctx.sio.connect(_app.ctx.sio_config.HOST)
|
|
|
|
|
|
@app.post('/receive')
|
|
async def receive(rqt: Request):
|
|
data = WebHook(**rqt.json)
|
|
message = Message(content=f'webhook test: {data.issue.body}', room_id=-777186831)
|
|
await app.ctx.sio.emit('sendMessage', message.to_json())
|
|
return json({})
|
|
|
|
|
|
"""
|
|
以下为QQ监听部分
|
|
"""
|
|
|
|
|
|
def start_sio_listener():
|
|
@app.ctx.sio.on('connect')
|
|
def connect():
|
|
logger.info(f'{Colors.GREEN}icalingua 已连接{Colors.END}')
|
|
|
|
@app.ctx.sio.on('requireAuth')
|
|
async def require_auth(salt: str, versions: Dict[str, str]):
|
|
logger.info(
|
|
f"{Colors.BLUE}versions: {Colors.PURPLE}{versions} {Colors.BLUE}with type {type(salt)}|{salt=}{Colors.END}")
|
|
# 准备数据
|
|
sign = SigningKey(bytes.fromhex(app.ctx.sio_config.KEY))
|
|
signature = sign.sign(bytes.fromhex(salt))
|
|
|
|
# 发送数据
|
|
logger.info(f"{len(signature.signature)=} {type(signature.signature)=}{Colors.END}")
|
|
await app.ctx.sio.emit('auth', signature.signature)
|
|
logger.info(f"{Colors.BLUE}send auth emit{Colors.END}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host='0.0.0.0', port=80, dev=True)
|