Difficult-Rocket/Difficult_Rocket/utils/new_thread.py

156 lines
5.5 KiB
Python
Raw Normal View History

2021-12-26 23:06:03 +08:00
# -------------------------------
# Difficult Rocket
2023-01-20 14:08:12 +08:00
# Copyright © 2020-2023 by shenjackyuanjie 3695888@qq.com
2021-12-26 23:06:03 +08:00
# All rights reserved
# -------------------------------
2021-07-14 22:53:15 +08:00
import functools
import inspect
import threading
2023-05-02 15:31:28 +08:00
from typing import Optional, Callable, Union, List
2021-10-31 23:26:32 +08:00
2021-07-14 22:53:15 +08:00
"""
2021-09-24 14:50:42 +08:00
This part of code come from MCDReforged(https://github.com/Fallen-Breath/MCDReforged)
2021-07-14 22:53:15 +08:00
Very thanks to Fallen_Breath and other coder who helped MCDR worked better
2021-09-24 14:50:42 +08:00
GNU Lesser General Public License v3.0GNU LGPL v3)
2021-09-24 14:53:12 +08:00
(have some changes)
2021-07-14 22:53:15 +08:00
"""
2023-05-02 15:31:28 +08:00
__all__ = [
'new_thread',
'FunctionThread'
]
record_thread = False
record_destination: List[Callable[['FunctionThread'], None]] = []
def copy_signature(target: Callable, origin: Callable) -> Callable:
"""
Copy the function signature of origin into target
"""
# https://stackoverflow.com/questions/39926567/python-create-decorator-preserving-function-arguments
target.__signature__ = inspect.signature(origin)
return target
class FunctionThread(threading.Thread):
2023-05-02 15:31:28 +08:00
"""
A Thread subclass which is used in decorator :func:`new_thread` to wrap a synchronized function call
"""
__NONE = object()
2023-05-02 15:31:28 +08:00
def __init__(self, target, name, args, kwargs, daemon):
super().__init__(target=target, args=args, kwargs=kwargs, name=name, daemon=daemon)
self.__return_value = self.__NONE
self.__error = None
def wrapped_target(*args_, **kwargs_):
try:
self.__return_value = target(*args_, **kwargs_)
except Exception as e:
self.__error = e
raise e from None
self._target = wrapped_target
def get_return_value(self, block: bool = False, timeout: Optional[float] = None):
2023-05-02 15:31:28 +08:00
"""
Get the return value of the original function
If an exception has occurred during the original function call, the exception will be risen again here
Examples::
>>> import time
>>> @new_thread
... def do_something(text: str):
... time.sleep(1)
... return text
>>> do_something('task').get_return_value(block=True)
'task'
:param block: If it should join the thread before getting the return value to make sure the function invocation finishes
:param timeout: The maximum timeout for the thread join
:raise RuntimeError: If the thread is still alive when getting return value. Might be caused by ``block=False``
while the thread is still running, or thread join operation times out
:return: The return value of the original function
"""
if block:
self.join(timeout)
if self.__return_value is self.__NONE:
if self.is_alive():
raise RuntimeError('The thread is still running')
raise self.__error
return self.__return_value
2023-05-02 15:31:28 +08:00
def new_thread(arg: Optional[Union[str, Callable]] = None,
2021-10-23 17:01:59 +08:00
daemon: bool = False,
log_thread: bool = True):
2021-07-14 22:53:15 +08:00
"""
2023-05-02 15:31:28 +08:00
This is a one line solution to make your function executes in parallels.
When decorated with this decorator, functions will be executed in a new daemon thread
This decorator only changes the return value of the function to the created ``Thread`` object.
Beside the return value, it reserves all signatures of the decorated function,
so you can safely use the decorated function as if there's no decorating at all
It's also a simple compatible upgrade method for old MCDR 0.x plugins
The return value of the decorated function is changed to the ``Thread`` object that executes this function
The decorated function has 1 extra field:
* ``original`` field: The original undecorated function
Examples::
>>> import time
>>> @new_thread('My Plugin Thread')
... def do_something(text: str):
... time.sleep(1)
... print(threading.current_thread().name)
>>> callable(do_something.original)
True
>>> t = do_something('foo')
>>> isinstance(t, FunctionThread)
True
>>> t.join()
My Plugin Thread
:param arg: A :class:`str`, the name of the thread. It's recommend to specify the thread name, so when you
log something by ``server.logger``, a meaningful thread name will be displayed
instead of a plain and meaningless ``Thread-3``
:param daemon: If the thread should be a daemon thread
:param log_thread: If the thread should be logged to callback defined in record_destination
2021-07-14 22:53:15 +08:00
"""
2021-07-14 22:53:15 +08:00
def wrapper(func):
@functools.wraps(func) # to preserve the origin function information
def wrap(*args, **kwargs):
2023-05-02 15:31:28 +08:00
thread = FunctionThread(target=func, args=args, kwargs=kwargs, name=thread_name, daemon=daemon)
if record_thread:
for destination in record_destination:
destination(thread)
thread.start()
return thread
2021-07-14 22:53:15 +08:00
# bring the signature of the func to the wrap function
# so inspect.getfullargspec(func) works correctly
copy_signature(wrap, func)
wrap.original = func # access this field to get the original function
2021-07-14 22:53:15 +08:00
return wrap
2023-05-02 15:31:28 +08:00
# Directly use @new_thread without ending brackets case, e.g. @new_thread
if isinstance(arg, Callable):
2021-07-14 22:53:15 +08:00
thread_name = None
2023-05-02 15:31:28 +08:00
return wrapper(arg)
# Use @new_thread with ending brackets case, e.g. @new_thread('A'), @new_thread()
else:
thread_name = arg
return wrapper