299 lines
10 KiB
Python
299 lines
10 KiB
Python
|
|
import time
|
|
import random
|
|
import asyncio
|
|
import argparse
|
|
import traceback
|
|
|
|
from typing import Dict, List, Tuple, Any
|
|
|
|
# import qtoml
|
|
import socketio
|
|
from colorama import Fore
|
|
from nacl.signing import SigningKey
|
|
# from lib_not_dr.types import Options
|
|
from lib_not_dr.loggers import config
|
|
|
|
from data_struct import Message, ReplyMessage, get_config, BotConfig, BotStatus
|
|
|
|
_version_ = "0.2.2"
|
|
|
|
logger = config.get_logger('icalingua')
|
|
|
|
BOTCONFIG: BotConfig = get_config()
|
|
|
|
if __name__ == '__main__':
|
|
# --debug
|
|
# --config=config.toml
|
|
# -n --no-notice
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-d', '--debug', action='store_true')
|
|
parser.add_argument('-n', '--no-notice', action='store_true')
|
|
parser.add_argument('-c', '--config', type=str)
|
|
args = parser.parse_args()
|
|
if args.debug:
|
|
logger.global_level = 0
|
|
if args.config:
|
|
# global BOTCONFIG
|
|
BOTCONFIG: BotConfig = get_config(args.config)
|
|
if args.no_notice:
|
|
BOTCONFIG.notice_start = False
|
|
|
|
BotStatus = BotStatus()
|
|
|
|
sio: socketio.AsyncClient = socketio.AsyncClient()
|
|
|
|
|
|
@sio.on('connect') # type: ignore
|
|
def connect():
|
|
logger.info(f'{Fore.GREEN}icalingua 已连接')
|
|
|
|
|
|
@sio.on('requireAuth') # type: ignore
|
|
async def require_auth(salt: str, versions: Dict[str, str]):
|
|
logger.info(f"{Fore.BLUE}versions: {versions}\n{type(salt)}|{salt=}")
|
|
# 准备数据
|
|
sign = SigningKey(bytes.fromhex(BOTCONFIG.private_key))
|
|
signature = sign.sign(bytes.fromhex(salt))
|
|
|
|
# 发送数据
|
|
await sio.emit('auth', signature.signature)
|
|
logger.info(f"{Fore.BLUE}send auth emit")
|
|
|
|
# @sio.on('requireAuth')
|
|
# def require_auth(*data: Dict[str, Any]):
|
|
# logger.info(f"{Fore.BLUE}requireAuth: {data}")
|
|
|
|
|
|
@sio.on('auth') # type: ignore
|
|
def auth(data: Dict[str, Any]):
|
|
logger.info(f"auth: {data}")
|
|
|
|
|
|
@sio.on('authFailed') # type: ignore
|
|
async def auth_failed():
|
|
logger.info(f"{Fore.RED}authFailed")
|
|
await sio.disconnect()
|
|
|
|
|
|
@sio.on('authSucceed') # type: ignore
|
|
def auth_succeed():
|
|
logger.info(f"{Fore.GREEN}authSucceed")
|
|
|
|
|
|
@sio.on('connect_error') # type: ignore
|
|
def connect_error(*args, **kwargs):
|
|
logger.info(f"连接错误 {args}, {kwargs}")
|
|
|
|
|
|
@sio.on('updateRoom') # type: ignore
|
|
def update_room(data: Dict[str, Any]):
|
|
logger.info(f"{Fore.CYAN}update_room: {data}")
|
|
|
|
|
|
def safe_eval(code: str) -> str:
|
|
try:
|
|
# code = code.replace('help', '坏东西!\n')
|
|
# code = code.replace('bytes', '坏东西!\n')
|
|
# code = code.replace('encode', '坏东西!\n')
|
|
# code = code.replace('decode', '坏东西!\n')
|
|
# code = code.replace('compile', '屑的!\n')
|
|
# code = code.replace('globals', '拿不到!\n')
|
|
code = code.replace('os', '坏东西!\n')
|
|
code = code.replace('sys', '坏东西!\n')
|
|
# code = code.replace('input', '坏东西!\n')
|
|
# code = code.replace('__', '啊哈!\n')
|
|
# code = code.replace('import', '很坏!\n')
|
|
code = code.replace(' kill', '别跑!\n')
|
|
code = code.replace(' rm ', '别跑!\n')
|
|
code = code.replace('exit', '好坏!\n')
|
|
code = code.replace('eval', '啊哈!\n')
|
|
code = code.replace('exec', '抓住!\n')
|
|
start_time = time.time()
|
|
try:
|
|
import os
|
|
import math
|
|
import decimal
|
|
global_val = {'time': time,
|
|
'math': math,
|
|
'decimal': decimal,
|
|
'random': random,
|
|
'__import__': '<built-in function __import__>',
|
|
'globals': '也别惦记你那个 globals 了',
|
|
'compile': '想得美',
|
|
'help': '虽然但是 help 也不行',
|
|
'exit': '不许 exit',
|
|
'input': '你想干嘛',
|
|
'return': '别惦记你那个 return 了',
|
|
'getattr': '<built-in function getattr>',
|
|
'setattr': '<built-in function setattr>'}
|
|
os.system = '不许'
|
|
result = str(eval(code, global_val, {}))
|
|
limit = 500
|
|
if len(result) > limit:
|
|
result = result[:limit]
|
|
except:
|
|
result = traceback.format_exc()
|
|
end_time = time.time()
|
|
result = result.replace(BOTCONFIG.private_key, '***')
|
|
result = result.replace(BOTCONFIG.host, '***')
|
|
|
|
logger.info(f"{Fore.MAGENTA}safe_eval: {result}")
|
|
|
|
if result == '6' or result == 6:
|
|
result = '他确实等于 6'
|
|
|
|
result = f'{code}\neval result:\n{result}\n耗时: {end_time - start_time} s'
|
|
return result
|
|
except:
|
|
error = traceback.format_exc()
|
|
result = f'error:\n{error}'
|
|
return result
|
|
|
|
|
|
@sio.on('addMessage') # type: ignore
|
|
async def add_message(data: Dict[str, Any]):
|
|
logger.info(f"{Fore.MAGENTA}add_message: {data}")
|
|
|
|
is_self = data['message']['senderId'] == BOTCONFIG.self_id
|
|
sender_name = data['message']['username']
|
|
sender_id = data['message']['senderId']
|
|
content = data['message']['content']
|
|
room_id = data['roomId']
|
|
|
|
if not is_self:
|
|
if content == '/bot':
|
|
message = Message(content=f'icalingua bot test v{_version_}',
|
|
room_id=data['roomId'],
|
|
reply_to=ReplyMessage(id=data['message']['_id']))
|
|
await sio.emit('sendMessage', message.to_json())
|
|
elif content.startswith('=='):
|
|
|
|
evals: str = content[2:]
|
|
|
|
result = safe_eval(evals)
|
|
# whitelist = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ', '.', '+', '-', '*', '/', '(', ')', '<',
|
|
# '>', '=']
|
|
# evals = evals.replace('**', '')
|
|
# express = ''
|
|
# for text in evals:
|
|
# if text in whitelist:
|
|
# express += text
|
|
# if express == '':
|
|
# result = '你在干嘛'
|
|
# else:
|
|
# result = str(eval(express))
|
|
|
|
reply = ReplyMessage(id=data['message']['_id'])
|
|
message = Message(content=result,
|
|
reply_to=reply,
|
|
room_id=room_id)
|
|
|
|
await asyncio.sleep(random.random() * 2)
|
|
await sio.emit('sendMessage', message.to_json())
|
|
elif content == '!!jrrp':
|
|
randomer = random.Random(f'{sender_id}-{data["message"]["date"]}-jrrp-{_version_}')
|
|
result = randomer.randint(0, 50) + randomer.randint(0, 50)
|
|
logger.info(f'{sender_name} 今日人品值为 {result}')
|
|
reply = ReplyMessage(id=data['message']['_id'])
|
|
message = Message(content=f'{sender_name} 今日人品值为 {result}',
|
|
reply_to=reply,
|
|
room_id=room_id)
|
|
await asyncio.sleep(0.5)
|
|
await sio.emit('sendMessage', message.to_json())
|
|
# 如果只包括一个或多个 6
|
|
# elif data['message']['content'].replace(' ', '') in ('6', '666', '六', '3+3', '5+1', '4+2', '2+4', '1+5'):
|
|
# reply = ReplyMessage(id=data['message']['_id'])
|
|
# message = Message(content='你 6 nm 呢',
|
|
# reply_to=reply,
|
|
# room_id=room_id)
|
|
# await asyncio.sleep(0.5)
|
|
# await sio.emit('sendMessage', message.to_json())
|
|
|
|
|
|
@sio.on('deleteMessage') # type: ignore
|
|
def delete_message(message_id: str):
|
|
logger.info(f"{Fore.MAGENTA}delete_message: {message_id}")
|
|
|
|
|
|
@sio.on('setMessages') # type: ignore
|
|
def set_messages(data: Dict[str, Any]):
|
|
logger.info(f"{Fore.YELLOW}set_messages: {data}\nmessage_len: {len(data['messages'])}")
|
|
|
|
|
|
async def notice_startup(room_list: List[int]):
|
|
for notice_room in BOTCONFIG.notice_room:
|
|
if notice_room in room_list:
|
|
notice_message = Message(content=f'ica bot v{_version_}', room_id=notice_room)
|
|
await sio.emit('sendMessage', notice_message.to_json())
|
|
BotStatus.inited = True
|
|
logger.info("inited", tag='notice room')
|
|
else:
|
|
logger.warn(f"未找到通知房间: {notice_room}", tag='notice room')
|
|
await asyncio.sleep(random.randint(2, 5))
|
|
|
|
|
|
@sio.on('setAllRooms') # type: ignore
|
|
async def set_all_rooms(rooms: List[Dict[str, Any]]):
|
|
BotStatus.running = True
|
|
room_list: List[int] = [room.get('roomId') for room in rooms] # type: ignore
|
|
if not BotStatus.inited:
|
|
logger.info("initing...", tag='setAllRooms')
|
|
logger.debug(f"room_list: {room_list}", tag='setAllRooms')
|
|
if BOTCONFIG.notice_start:
|
|
await notice_startup(room_list)
|
|
if room_list != BotStatus.rooms:
|
|
logger.info(f"{Fore.YELLOW}set_all_rooms: {rooms}\nlen: {len(rooms)}\n")
|
|
BotStatus.rooms = room_list
|
|
logger.info(f"更新房间: {room_list}", tag='setAllRooms')
|
|
|
|
|
|
@sio.on('setAllChatGroups') # type: ignore
|
|
def set_all_chat_groups(groups: List[Dict[str, Any]]):
|
|
logger.info(f"{Fore.YELLOW}set_all_chat_groups: {groups}\nlen: {len(groups)}\n")
|
|
|
|
|
|
@sio.on('notify') # type: ignore
|
|
def notify(data: List[Tuple[str, Any]]):
|
|
logger.info(f"notify: {data}")
|
|
|
|
|
|
@sio.on('closeLoading') # type: ignore
|
|
def close_loading(_):
|
|
logger.info(f"{Fore.GREEN}close_loading")
|
|
|
|
|
|
@sio.on('onlineData') # type: ignore
|
|
def online_data(data: Dict[str, Any]):
|
|
logger.info(f"{Fore.GREEN}online_data: {data}")
|
|
|
|
|
|
@sio.on('*') # type: ignore
|
|
def catch_all(event, data):
|
|
logger.info(f"{Fore.RED}catch_all: {event}|{data}")
|
|
|
|
|
|
|
|
async def main():
|
|
"""
|
|
while True:
|
|
await self.eio.wait()
|
|
await self.sleep(1) # give the reconnect task time to start up
|
|
if not self._reconnect_task:
|
|
break
|
|
await self._reconnect_task
|
|
if self.eio.state != 'connected':
|
|
break
|
|
"""
|
|
await sio.connect(BOTCONFIG.host)
|
|
await sio.wait()
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logger.info("KeyboardInterrupt")
|
|
except Exception:
|
|
logger.error(traceback.format_exc())
|
|
|