import asyncio
import inspect
import time
from functools import wraps
from typing_extensions import Any, AsyncContextManager, Callable, ContextManager, cast
from ..exceptions import UtilValidateError
from ..typ.base import AsyncCallable, P, SyncOrAsyncCallable, T, U, V
from .base import to_async
[文档]
def if_not(
condition: SyncOrAsyncCallable[[], U] | U,
reject: SyncOrAsyncCallable[[], None],
give_up: bool = False,
accept: SyncOrAsyncCallable[[U], None] | None = None,
) -> Callable[[SyncOrAsyncCallable[P, T]], AsyncCallable[P, T | None]]:
"""条件判断装饰器
:param condition: 用于判断的条件(如果是可调用对象,则先求值再转为 bool 值)
:param reject: 当条件为 `False` 时,执行的回调
:param give_up: 在条件为 `False` 时,是否放弃执行被装饰函数
:param accept: 当条件为 `True` 时,执行的回调
"""
_condition = to_async(condition) if callable(condition) else condition
_reject = to_async(reject)
_accept = to_async(accept) if accept is not None else accept
def if_not_wrapper(func: SyncOrAsyncCallable[P, T]) -> AsyncCallable[P, T | None]:
_func = to_async(func)
@wraps(func)
async def if_not_wrapped(*args: P.args, **kwargs: P.kwargs) -> T | None:
if not callable(_condition):
cond = _condition
else:
obj = _condition()
cond = await obj if inspect.isawaitable(obj) else obj
if not cond:
await _reject()
if cond or not give_up:
if _accept is not None:
await _accept(cond)
return await _func(*args, **kwargs)
return None
return if_not_wrapped
return if_not_wrapper
[文档]
def unfold_ctx(
getter: SyncOrAsyncCallable[[], ContextManager | AsyncContextManager],
) -> Callable[[SyncOrAsyncCallable[P, T]], AsyncCallable[P, T]]:
"""上下文装饰器
展开一个上下文,供被装饰函数使用。
但注意此装饰器不支持获取上下文管理器 `yield` 的值
:param getter: 上下文管理器或上下文管理器获取方法
"""
_getter = to_async(getter)
def unfold_ctx_wrapper(func: SyncOrAsyncCallable[P, T]) -> AsyncCallable[P, T]:
_func = to_async(func)
@wraps(func)
async def unfold_ctx_wrapped(*args: P.args, **kwargs: P.kwargs) -> T:
try:
manager = await _getter()
except Exception as e:
raise UtilValidateError(
f"{unfold_ctx.__name__} 的 getter 参数为:{getter},调用它获取上下文管理器失败:{e}"
) from e
if isinstance(manager, ContextManager):
with manager:
return await _func(*args, **kwargs)
elif isinstance(manager, AsyncContextManager):
async with manager:
return await _func(*args, **kwargs)
else:
raise UtilValidateError(
f"{unfold_ctx.__name__} 的 getter 参数为:{getter},调用它返回了无效的上下文管理器"
)
return unfold_ctx_wrapped
return unfold_ctx_wrapper
[文档]
def lock(
callback: SyncOrAsyncCallable[[], U] | None = None,
) -> Callable[[SyncOrAsyncCallable[P, T]], AsyncCallable[P, T | U]]:
"""锁装饰器
本方法作为异步函数的装饰器使用,可以为被装饰函数加锁。
在获取锁冲突时,调用 `callback` 获得一个回调并执行。回调执行完毕后直接返回。
`callback` 参数为空,只应用 :class:`asyncio.Lock` 的锁功能。
被装饰函数的返回值:被装饰函数被执行 -> 被装饰函数返回值;执行任何回调 -> 那个回调的返回值
:param callback: 获取锁冲突时的回调
"""
alock = asyncio.Lock()
_callback = to_async(callback) if callback is not None else None
def lock_wrapper(func: SyncOrAsyncCallable[P, T]) -> AsyncCallable[P, T | U]:
_func = to_async(func)
@wraps(func)
async def lock_wrapped(*args: P.args, **kwargs: P.kwargs) -> T | U:
if _callback is not None and alock.locked():
return await _callback()
async with alock:
return await _func(*args, **kwargs)
return lock_wrapped
return lock_wrapper
[文档]
def cooldown(
busy_callback: SyncOrAsyncCallable[[], U] | None = None,
cd_callback: SyncOrAsyncCallable[[float], V] | None = None,
interval: float = 5,
) -> Callable[[SyncOrAsyncCallable[P, T]], AsyncCallable[P, T | U | V]]:
"""冷却装饰器
本方法作为异步函数的装饰器使用,可以为被装饰函数添加 cd 时间。
如果被装饰函数已有一个在运行,此时调用 `busy_callback` 生成回调并执行。回调执行完毕后直接返回。
`busy_callback` 参数为空,则等待已运行的运行完成。随后执行下面的“冷却”处理逻辑。
当被装饰函数没有在运行的,但冷却时间未结束:
- `cd_callback` 不为空:使用 `cd_callback` 生成回调并执行。
- `cd_callback` 为空,被装饰函数持续等待,直至冷却结束再执行。
被装饰函数的返回值:被装饰函数被执行 -> 被装饰函数返回值;执行任何回调 -> 那个回调的返回值
:param busy_callback: 已运行时的回调
:param cd_callback: 冷却时间未结束的回调
:param interval: 冷却时间
"""
alock = asyncio.Lock()
pre_finish_t = time.perf_counter() - interval - 1
_busy_callback = to_async(busy_callback) if busy_callback is not None else None
_cd_callback = to_async(cd_callback) if cd_callback is not None else None
def cooldown_wrapper(func: SyncOrAsyncCallable[P, T]) -> AsyncCallable[P, T | U | V]:
_func = to_async(func)
@wraps(func)
async def cooldown_wrapped(*args: P.args, **kwargs: P.kwargs) -> T | U | V:
nonlocal pre_finish_t
if _busy_callback is not None and alock.locked():
return await _busy_callback()
async with alock:
duration = time.perf_counter() - pre_finish_t
if duration > interval:
ret = await _func(*args, **kwargs)
pre_finish_t = time.perf_counter()
return ret
remain_t = interval - duration
if _cd_callback is not None:
return await _cd_callback(remain_t)
await asyncio.sleep(remain_t)
ret = await _func(*args, **kwargs)
pre_finish_t = time.perf_counter()
return ret
return cooldown_wrapped
return cooldown_wrapper
[文档]
def semaphore(
callback: SyncOrAsyncCallable[[], U] | None = None, value: int = -1
) -> Callable[[SyncOrAsyncCallable[P, T]], AsyncCallable[P, T | U]]:
"""信号量装饰器
本方法作为异步函数的装饰器使用,可以为被装饰函数添加信号量控制。
在信号量无法立刻获取时,将调用 `callback` 获得回调并执行。回调执行完毕后直接返回。
`callback` 参数为空,只应用 :class:`asyncio.Semaphore` 的信号量功能。
被装饰函数的返回值:被装饰函数被执行 -> 被装饰函数返回值;执行任何回调 -> 那个回调的返回值
:param callback: 信号量无法立即获取的回调
:param value: 信号量阈值
"""
a_semaphore = asyncio.Semaphore(value)
_callback = to_async(callback) if callback is not None else None
def semaphore_wrapper(func: SyncOrAsyncCallable[P, T]) -> AsyncCallable[P, T | U]:
_func = to_async(func)
@wraps(func)
async def semaphore_wrapped(*args: P.args, **kwargs: P.kwargs) -> T | U:
if _callback is not None and a_semaphore.locked():
return await _callback()
async with a_semaphore:
return await _func(*args, **kwargs)
return semaphore_wrapped
return semaphore_wrapper
[文档]
def timelimit(
callback: SyncOrAsyncCallable[[], U] | None = None, timeout: float = 5
) -> Callable[[SyncOrAsyncCallable[P, T]], AsyncCallable[P, T | U]]:
"""时间限制装饰器
本方法作为异步函数的装饰器使用,可以为被装饰函数添加超时控制。
超时之后,调用 `callback` 获得回调并执行,同时取消原任务。
`callback` 参数为空,如果超时,则抛出 :class:`asyncio.TimeoutError` 异常。
被装饰函数的返回值:被装饰函数被执行 -> 被装饰函数返回值;执行任何回调 -> 那个回调的返回值
:param callback: 超时时的回调
:param timeout: 超时时间
"""
_callback = to_async(callback) if callback is not None else None
def timelimit_wrapper(func: SyncOrAsyncCallable[P, T]) -> AsyncCallable[P, T | U]:
_func = to_async(func)
@wraps(func)
async def timelimit_wrapped(*args: P.args, **kwargs: P.kwargs) -> T | U:
try:
return await asyncio.wait_for(_func(*args, **kwargs), timeout)
except asyncio.TimeoutError:
if _callback is None:
raise TimeoutError("timelimit 所装饰的任务已超时") from None
return await _callback()
return timelimit_wrapped
return timelimit_wrapper
[文档]
def speedlimit(
callback: SyncOrAsyncCallable[[], U] | None = None,
limit: int = 60,
duration: int = 60,
) -> Callable[[SyncOrAsyncCallable[P, T]], AsyncCallable[P, T | U]]:
"""流量/速率限制装饰器(使用固定窗口算法)
本方法作为异步函数的装饰器使用,可以为被装饰函数添加流量控制:`duration` 秒内只允许 `limit` 次调用。
超出调用速率限制后,调用 `callback` 获得回调并执行,同时取消原任务。
`callback` 参数为空,等待直至满足速率控制要求再调用。
被装饰函数的返回值:被装饰函数被执行 -> 被装饰函数返回值;执行任何回调 -> 那个回调的返回值。
:param callback: 超出速率限制时的回调
:param limit: `duration` 秒内允许调用多少次
:param duration: 时长区间
"""
called_num = 0
min_start = time.perf_counter()
if limit <= 0:
raise UtilValidateError("speedlimit 装饰器的 limit 参数必须 > 0")
if duration <= 0:
raise UtilValidateError("speedlimit 装饰器的 duration 参数必须 > 0")
_callback = to_async(callback) if callback is not None else None
def speedlimit_wrapper(func: SyncOrAsyncCallable[P, T]) -> AsyncCallable[P, T | U]:
_func = to_async(func)
@wraps(func)
async def speedlimit_wrapped(*args: P.args, **kwargs: P.kwargs) -> T | U:
fut = _speedlimit_wrapped(_func, *args, **kwargs)
fut = cast(asyncio.Future[T | U | Exception], fut)
fut_ret = await fut
if isinstance(fut_ret, Exception):
raise fut_ret
return fut_ret
return speedlimit_wrapped
def _speedlimit_wrapped(
func: AsyncCallable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> asyncio.Future:
# 分离出来定义,方便 result_set 调用形成递归。主要逻辑通过 Future 实现,有利于避免竞争问题。
nonlocal called_num, min_start
passed_time = time.perf_counter() - min_start
res_fut: Any = asyncio.get_running_loop().create_future()
if passed_time <= duration:
if called_num < limit:
called_num += 1
asyncio.create_task(_speedlimit_set_result(func, res_fut, -1, *args, **kwargs))
elif _callback is not None:
asyncio.create_task(_speedlimit_set_result(_callback, res_fut, -1))
else:
asyncio.create_task(
_speedlimit_set_result(func, res_fut, duration - passed_time, *args, **kwargs)
)
else:
called_num, min_start = 0, time.perf_counter()
called_num += 1
asyncio.create_task(_speedlimit_set_result(func, res_fut, -1, *args, **kwargs))
return cast(asyncio.Future, res_fut)
async def _speedlimit_set_result(
func: AsyncCallable[P, T],
fut: asyncio.Future,
delay: float,
*args: P.args,
**kwargs: P.kwargs,
) -> None:
"""
只有依然在当前 duration 区间内,但超出调用次数限制的,需要等待。
随后就是递归调用。delay > 0 为需要递归的分支。
"""
nonlocal called_num
try:
if delay > 0:
await asyncio.sleep(delay)
res = await _speedlimit_wrapped(func, *args, **kwargs)
fut.set_result(res)
return
res = await func(*args, **kwargs)
fut.set_result(res)
except Exception as e:
fut.set_result(e)
return speedlimit_wrapper