melobot.utils.check.base 源代码

from __future__ import annotations

from abc import abstractmethod
from functools import partial

from typing_extensions import Any, Callable, Coroutine, Generic

from ...adapter.model import EventT
from ...exceptions import UtilValidateError
from ...typ._enum import LogicMode
from ...typ.base import SyncOrAsyncCallable
from ...typ.cls import BetterABC
from ..base import to_async


[文档] class Checker(Generic[EventT], BetterABC): """检查器基类""" def __init__(self, fail_cb: SyncOrAsyncCallable[[], None] | None = None) -> None: super().__init__() self.fail_cb = to_async(fail_cb) if fail_cb is not None else None def __and__(self, other: Checker) -> WrappedChecker: if not isinstance(other, Checker): raise UtilValidateError(f"联合检查器定义时出现了非检查器对象,其值为:{other}") return WrappedChecker(LogicMode.AND, self, other) def __or__(self, other: Checker) -> WrappedChecker: if not isinstance(other, Checker): raise UtilValidateError(f"联合检查器定义时出现了非检查器对象,其值为:{other}") return WrappedChecker(LogicMode.OR, self, other) def __invert__(self) -> WrappedChecker: return WrappedChecker(LogicMode.NOT, self) def __xor__(self, other: Checker) -> WrappedChecker: if not isinstance(other, Checker): raise UtilValidateError(f"联合检查器定义时出现了非检查器对象,其值为:{other}") return WrappedChecker(LogicMode.XOR, self, other)
[文档] @abstractmethod async def check(self, event: EventT) -> bool: """检查器检查方法 任何检查器应该实现此抽象方法。 :param event: 给定的事件 :return: 检查是否通过 """ raise NotImplementedError
[文档] @staticmethod def new(func: Callable[[EventT], bool]) -> Checker[EventT]: """从可调用对象创建检查器 :param func: 可调用对象 :return: 检查器对象 """ return _CustomChecker[EventT](func)
class _CustomChecker(Checker[EventT]): def __init__(self, func: Callable[[EventT], bool]) -> None: super().__init__() self.func = func async def check(self, event: EventT) -> bool: return self.func(event)
[文档] class WrappedChecker(Checker[EventT]): """合并检查器 在两个 :class:`Checker` 对象间使用 | & ^ ~ 运算符即可返回合并检查器。 """ def __init__( self, mode: LogicMode, checker1: Checker, checker2: Checker | None = None, ) -> None: """初始化一个合并检查器 :param mode: 合并检查的逻辑模式 :param checker1: 检查器1 :param checker2: 检查器2 """ super().__init__() self.mode = mode self.c1, self.c2 = checker1, checker2
[文档] def set_fail_cb(self, fail_cb: SyncOrAsyncCallable[[], None] | None) -> None: self.fail_cb = to_async(fail_cb) if fail_cb is not None else None
async def check(self, event: EventT) -> bool: c2_check: Callable[[], Coroutine[Any, Any, bool]] | None = ( partial(self.c2.check, event) if self.c2 is not None else None ) status = await LogicMode.async_short_calc( self.mode, partial(self.c1.check, event), c2_check ) if not status and self.fail_cb is not None: await self.fail_cb() return status
[文档] def checker_join(*checkers: Checker | None | Callable[[Any], bool]) -> Checker: """合并检查器 相比于使用 | & ^ ~ 运算符,此函数可以接受一个检查器序列,并返回一个合并检查器。 检查器序列可以为检查器对象,检查函数或空值 :return: 合并后的检查器对象 """ checker: Checker | None = None for c in checkers: if c is None: continue if isinstance(c, Checker): checker = checker & c if checker else c else: checker = checker & Checker.new(c) if checker else Checker.new(c) if checker is None: raise ValueError("检查器序列不能全为空") return checker