icalingua-python-bot/connect.py

301 lines
9.8 KiB
Python
Raw Normal View History

2023-10-03 21:30:36 +08:00
import time
2023-10-02 18:54:24 +08:00
import random
import asyncio
import traceback
from typing import Dict, List, Tuple, Any, Optional, Union, Literal
2023-12-28 21:55:21 +08:00
import qtoml
2023-10-02 18:54:24 +08:00
import socketio
from colorama import Fore, Style
from nacl.signing import SigningKey
from lib_not_dr.types import Options
def get_config() -> Tuple[str, str, int]:
with open('config.toml', 'r', encoding='utf-8') as f:
2023-12-28 21:55:21 +08:00
config = qtoml.load(f)
2023-10-02 18:54:24 +08:00
return config['host'], config['private_key'], config['self_id']
HOST, KEY, SELF_ID = get_config()
class AtElement(Options):
text: str
id: Union[int, Literal['all']] = 'all'
class ReplyMessage(Options):
id: str
username: str = ''
content: str = ''
files: list = []
def to_json(self) -> dict:
return {
'_id': self.id,
'username': self.username,
'content': self.content,
'files': self.files
}
class Message(Options):
content: str
room_id: Optional[int] = None
room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 )
file: None = None # TODO: 上传文件
reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any TODO: 回复消息
b64_img: Optional[str] = None # TODO: 发送图片
at: Optional[List[AtElement]] = [] # TODO: @某人
sticker: Optional[None] = None # TODO: 发送表情
message_type: Optional[str] = None # TODO: 消息类型
def to_json(self) -> dict:
return {
'content': self.content,
'roomId': self.room_id,
'room': self.room,
'file': self.file,
'replyMessage': self.reply_to.to_json() if self.reply_to else None,
'b64img': self.b64_img,
'at': self.at,
'sticker': self.sticker,
'messageType': self.message_type
}
sio: socketio.AsyncClient = socketio.AsyncClient()
2023-10-02 18:54:24 +08:00
@sio.on('connect')
def connect():
print(f'{Fore.GREEN}icalingua 已连接{Style.RESET_ALL}')
@sio.on('requireAuth')
async def require_auth(salt: str, versions: Dict[str, str]):
print(f"{Fore.BLUE}versions: {versions}{Style.RESET_ALL}\n{type(salt)}|{salt=}")
2023-10-02 18:54:24 +08:00
# 准备数据
sign = SigningKey(bytes.fromhex(KEY))
signature = sign.sign(bytes.fromhex(salt))
# 发送数据
print(f"{len(signature.signature)=} {type(signature.signature)=}")
2023-10-02 18:54:24 +08:00
await sio.emit('auth', signature.signature)
print(f"{Fore.BLUE}send auth emit{Style.RESET_ALL}")
# @sio.on('requireAuth')
# def require_auth(*data: Dict[str, Any]):
# print(f"{Fore.BLUE}requireAuth: {data}{Style.RESET_ALL}")
2023-10-02 18:54:24 +08:00
@sio.on('auth')
def auth(data: Dict[str, Any]):
print(f"auth: {data}")
@sio.on('authFailed')
async def auth_failed():
print(f"{Fore.RED}authFailed{Style.RESET_ALL}")
await sio.disconnect()
@sio.on('authSucceed')
def auth_succeed():
print(f"{Fore.GREEN}authSucceed{Style.RESET_ALL}")
@sio.on('connect_error')
def connect_error(*args, **kwargs):
print(f"连接错误 {args}, {kwargs}")
@sio.on('updateRoom')
def update_room(data: Dict[str, Any]):
print(f"{Fore.CYAN}update_room: {data}{Style.RESET_ALL}")
2023-10-15 01:14:00 +08:00
def safe_eval(code: str) -> str:
2023-10-02 18:54:24 +08:00
try:
# code = code.replace('help', '坏东西!\n')
# code = code.replace('bytes', '坏东西!\n')
# code = code.replace('encode', '坏东西!\n')
# code = code.replace('decode', '坏东西!\n')
# code = code.replace('compile', '屑的!\n')
# code = code.replace('globals', '拿不到!\n')
code = code.replace('os', '坏东西!\n')
code = code.replace('sys', '坏东西!\n')
# code = code.replace('input', '坏东西!\n')
# code = code.replace('__', '啊哈!\n')
# code = code.replace('import', '很坏!\n')
code = code.replace(' kill', '别跑!\n')
code = code.replace(' rm ', '别跑!\n')
code = code.replace('exit', '好坏!\n')
code = code.replace('eval', '啊哈!\n')
code = code.replace('exec', '抓住!\n')
2023-10-03 21:30:36 +08:00
start_time = time.time()
2023-10-02 18:54:24 +08:00
try:
import os
import math
import decimal
2023-10-03 21:30:36 +08:00
global_val = {'time': time,
'math': math,
2023-10-02 18:54:24 +08:00
'decimal': decimal,
'random': random,
'__import__': '<built-in function __import__>',
'globals': '也别惦记你那个 globals 了',
'compile': '想得美',
'help': '虽然但是 help 也不行',
'exit': '不许 exit',
'input': '你想干嘛',
'return': '别惦记你那个 return 了',
'getattr': '<built-in function getattr>',
'setattr': '<built-in function setattr>'}
os.system = '不许'
result = str(eval(code, global_val, {}))
limit = 500
if len(result) > limit:
result = result[:limit]
except:
result = traceback.format_exc()
2023-10-03 21:30:36 +08:00
end_time = time.time()
2023-10-02 18:54:24 +08:00
result = result.replace(KEY, '***')
result = result.replace(HOST, '***')
print(f"{Fore.MAGENTA}safe_eval: {result}{Style.RESET_ALL}")
if result == '6' or result == 6:
result = '他确实等于 6'
2023-10-03 21:30:36 +08:00
result = f'{code}\neval result:\n{result}\n耗时: {end_time - start_time} s'
2023-10-02 18:54:24 +08:00
return result
except:
error = traceback.format_exc()
result = f'error:\n{error}'
return result
@sio.on('addMessage')
async def add_message(data: Dict[str, Any]):
print(f"{Fore.MAGENTA}add_message: {data}{Style.RESET_ALL}")
2023-10-15 12:33:01 +08:00
is_self = data['message']['senderId'] == SELF_ID
2023-10-02 18:54:24 +08:00
sender_name = data['message']['username']
sender_id = data['message']['senderId']
content = data['message']['content']
room_id = data['roomId']
if not is_self:
2023-12-24 17:27:15 +08:00
if content == '/bot':
2023-10-02 18:54:24 +08:00
message = Message(content='icalingua bot test',
room_id=data['roomId'])
await sio.emit('sendMessage', message.to_json())
2023-12-24 17:27:15 +08:00
elif content.startswith('=='):
2023-10-02 18:54:24 +08:00
2023-12-24 17:27:15 +08:00
evals: str = content[2:]
2023-10-15 01:14:00 +08:00
2023-10-15 12:33:01 +08:00
# quene = multiprocessing.Queue()
# def run(quene, evals):
# go = safe_eval(evals)
# quene.put(go)
# process = multiprocessing.Process(target=run, args=(quene, evals))
# process.start()
# process.join(1)
# if quene.empty():
# result = '超时'
# else:
# result = quene.get()
whitelist = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ', '.', '+', '-', '*', '/', '(', ')', '<',
'>', '=']
evals = evals.replace('**', '')
express = ''
for text in evals:
if text in whitelist:
express += text
if express == '':
result = '你在干嘛'
2023-10-15 01:14:00 +08:00
else:
2023-10-15 12:33:01 +08:00
result = str(eval(express))
2023-10-15 01:14:00 +08:00
2023-10-04 12:36:52 +08:00
reply = ReplyMessage(id=data['message']['_id'])
2023-10-02 18:54:24 +08:00
message = Message(content=result,
reply_to=reply,
room_id=room_id)
2023-10-15 01:25:50 +08:00
await asyncio.sleep(random.random() * 2)
2023-10-02 18:54:24 +08:00
await sio.emit('sendMessage', message.to_json())
2023-12-24 17:27:15 +08:00
elif content == '!!jrrp':
randomer = random.Random(f'{sender_id}-{data["message"]["date"]}-jrrp-v2')
2023-10-02 18:54:24 +08:00
result = randomer.randint(0, 50) + randomer.randint(0, 50)
print(f'{sender_name} 今日人品值为 {result}')
2023-10-04 12:36:52 +08:00
reply = ReplyMessage(id=data['message']['_id'])
2023-10-02 18:54:24 +08:00
message = Message(content=f'{sender_name} 今日人品值为 {result}',
reply_to=reply,
room_id=room_id)
await asyncio.sleep(0.5)
await sio.emit('sendMessage', message.to_json())
# 如果只包括一个或多个 6
# elif data['message']['content'].replace(' ', '') in ('6', '666', '六', '3+3', '5+1', '4+2', '2+4', '1+5'):
2023-10-04 12:36:52 +08:00
# reply = ReplyMessage(id=data['message']['_id'])
2023-10-02 18:54:24 +08:00
# message = Message(content='你 6 nm 呢',
# reply_to=reply,
# room_id=room_id)
# await asyncio.sleep(0.5)
# await sio.emit('sendMessage', message.to_json())
@sio.on('deleteMessage')
def delete_message(message_id: str):
print(f"{Fore.MAGENTA}delete_message: {message_id}{Style.RESET_ALL}")
@sio.on('setMessages')
def set_messages(data: Dict[str, Any]):
print(f"{Fore.YELLOW}set_messages: {data}\nmessage_len: {len(data['messages'])}{Style.RESET_ALL}")
@sio.on('setAllRooms')
def set_all_rooms(rooms: List[Dict[str, Any]]):
print(f"{Fore.YELLOW}set_all_rooms: {rooms}\nlen: {len(rooms)}\n{Style.RESET_ALL}")
@sio.on('setAllChatGroups')
def set_all_chat_groups(groups: List[Dict[str, Any]]):
print(f"{Fore.YELLOW}set_all_chat_groups: {groups}\nlen: {len(groups)}\n{Style.RESET_ALL}")
@sio.on('notify')
def notify(data: List[Tuple[str, Any]]):
print(f"notify: {data}")
@sio.on('closeLoading')
def close_loading(_):
print(f"{Fore.GREEN}close_loading{Style.RESET_ALL}")
@sio.on('onlineData')
def online_data(data: Dict[str, Any]):
print(f"{Fore.GREEN}online_data: {data}{Style.RESET_ALL}")
@sio.on('*')
def catch_all(event, data):
print(f"{Fore.RED}catch_all: {event}|{data}{Style.RESET_ALL}")
async def main():
2023-10-15 01:15:14 +08:00
await sio.connect(HOST)
await sio.wait()
2023-10-02 18:54:24 +08:00
2023-10-15 01:14:00 +08:00
# await sio.emit('requireAuth', ('', {'version': '', 'protocolVersion': ''}))
# await asyncio.sleep(2)
2023-10-02 18:54:24 +08:00
2023-10-15 01:14:00 +08:00
# await asyncio.gather(sio.wait(), sio.wait(), sio.wait())
2023-10-02 18:54:24 +08:00
if __name__ == '__main__':
asyncio.run(main())