ica-plugins/plugins/status.py
2024-08-02 19:15:08 +08:00

68 lines
2.1 KiB
Python

from __future__ import annotations
import time
from typing import TYPE_CHECKING, TypeVar
if TYPE_CHECKING:
from ica_typing import IcaNewMessage, IcaClient
from ica_typing import TailchatReciveMessage, TailchatClient
else:
IcaNewMessage = TypeVar("NewMessage")
IcaClient = TypeVar("IcaClient")
TailchatReciveMessage = TypeVar("TailchatReciveMessage")
TailchatClient = TypeVar("TailchatClient")
class Counter:
def __init__(self, count_time: int = 60):
self.msg_pre_min_ica = 0.0
self.ica_msg: list[float] = []
self.msg_pre_min_tc = 0.0
self.tc_msg: list[float] = []
self.count_time = count_time
def ica_update(self):
now = time.time()
self.ica_msg.append(now)
self.ica_msg = [x for x in self.ica_msg if now - x <= self.count_time]
self.msg_pre_min_ica = len(self.ica_msg) / (self.count_time / 60)
def tc_update(self):
now = time.time()
self.tc_msg.append(now)
self.tc_msg = [x for x in self.tc_msg if now - x <= self.count_time]
self.msg_pre_min_tc = len(self.tc_msg) / (self.count_time / 60)
@property
def ica_frequence(self):
return self.msg_pre_min_ica
@property
def tc_frequence(self):
return self.msg_pre_min_tc
COUNTER = Counter(120)
def on_ica_message(msg: IcaNewMessage, client: IcaClient) -> None:
if msg.is_from_self:
return
COUNTER.ica_update()
if msg.is_reply:
return
if msg.content == "/bot-count":
reply = f"ica 每分钟消息数: {COUNTER.ica_frequence:.2f}\ntailchat 每分钟消息数: {COUNTER.tc_frequence:.2f}"
client.send_message(msg.reply_with(reply))
def on_tailchat_message(msg: TailchatReciveMessage, client: TailchatClient) -> None:
if msg.is_from_self:
return
COUNTER.tc_update()
if msg.is_reply:
return
if msg.content == "/bot-count":
reply = f"ica 每分钟消息数: {COUNTER.ica_frequence:.2f}\ntailchat 每分钟消息数: {COUNTER.tc_frequence:.2f}"
client.send_message(msg.reply_with(reply))