melobot.plugin.ipc 源代码

from typing_extensions import Callable, Generic

from ..di import inject_deps
from ..exceptions import PluginIpcError
from ..mixin import AttrReprMixin, LocateMixin
from ..typ.base import AsyncCallable, T
from ..utils.common import RWContext


[文档] class AsyncShare(Generic[T], LocateMixin, AttrReprMixin): """异步共享对象"""
[文档] def __init__( self, name: str, reflector: AsyncCallable[[], T] | None = None, callabck: AsyncCallable[[T], None] | None = None, static: bool = False, ) -> None: """初始化异步共享对象 :param name: 异步共享对象的名称 :param reflector: 获取共享值的异步可调用方法 :param callabck: 修改共享值的异步可调用方法 :param static: 是否使用静态模式 """ super().__init__() self.name = name self.__safe_ctx = RWContext() self.__reflect: AsyncCallable[[], T] | None = ( inject_deps(reflector) if reflector is not None else None ) self.__callback: AsyncCallable[[T], None] | None = ( inject_deps(callabck, manual_arg=True) if callabck is not None else None ) self.static = static if self.name.startswith("_"): raise PluginIpcError(f"共享对象 {self} 的名称不能以 _ 开头") if self.static and self.__callback is not None: raise PluginIpcError(f"{self} 作为静态的共享对象,不能绑定用于更新值的回调方法")
[文档] def __call__(self, func: AsyncCallable[[], T]) -> AsyncCallable[[], T]: """绑定获取共享值的异步方法的装饰器,如果未在初始化时绑定 :param func: 被绑定的异步可调用方法 :return: `func` 原值 """ if self.__reflect is not None: raise PluginIpcError("共享对象已经有获取值的反射方法,不能再次绑定") self.__reflect = inject_deps(func) return func
[文档] def setter(self, func: AsyncCallable[[T], None]) -> AsyncCallable[[T], None]: """绑定修改共享值的异步方法的装饰器,如果未在初始化时绑定 :param func: 被绑定的异步可调用方法 :return: `func` 原值 """ if self.static: raise PluginIpcError(f"{self} 作为静态的共享对象,不能绑定用于更新值的回调方法") if self.__callback is not None: raise PluginIpcError("共享对象已经有更新值的回调方法,不能再次绑定") self.__callback = inject_deps(func, manual_arg=True) return func
[文档] async def get(self) -> T: """获取异步共享值 :return: 异步共享值 """ if self.__reflect is None: raise PluginIpcError("共享对象获取值的反射方法未绑定") async with self.__safe_ctx.read(): return await self.__reflect()
[文档] async def set(self, val: T) -> None: """设置异步共享值 :param val: 新的异步共享值 """ if self.__callback is None: raise PluginIpcError("共享对象更新值的回调方法未绑定") async with self.__safe_ctx.write(): return await self.__callback(val)
[文档] class SyncShare(Generic[T], LocateMixin, AttrReprMixin): """同步共享对象"""
[文档] def __init__( self, name: str, reflector: Callable[[], T] | None = None, callabck: Callable[[T], None] | None = None, static: bool = False, ) -> None: """初始化同步共享对象 :param name: 同步共享对象的名称 :param reflector: 获取共享值的可调用方法 :param callabck: 修改共享值的可调用方法 :param static: 是否使用静态模式 """ super().__init__() self.name = name self.__reflect = reflector self.__callback = callabck self.static = static if self.name.startswith("_"): raise PluginIpcError(f"共享对象 {self} 的名称不能以 _ 开头") if self.static and self.__callback is not None: raise PluginIpcError(f"{self} 作为静态的共享对象,不能绑定用于更新值的回调方法")
[文档] def __call__(self, func: Callable[[], T]) -> Callable[[], T]: """绑定获取共享值的方法的装饰器,如果未在初始化时绑定 :param func: 被绑定的可调用方法 :return: `func` 原值 """ if self.__reflect is not None: raise PluginIpcError("共享对象已经有获取值的反射方法,不能再次绑定") self.__reflect = func return func
[文档] def setter(self, func: Callable[[T], None]) -> Callable[[T], None]: """绑定修改共享值的方法的装饰器,如果未在初始化时绑定 :param func: 被绑定的可调用方法 :return: `func` 原值 """ if self.static: raise PluginIpcError(f"{self} 作为静态的共享对象,不能绑定用于更新值的回调方法") if self.__callback is not None: raise PluginIpcError("共享对象已经有更新值的回调方法,不能再次绑定") self.__callback = func return func
[文档] def get(self) -> T: """获取共享值 :return: 共享值 """ if self.__reflect is None: raise PluginIpcError("共享对象未绑定获取值的反射方法") return self.__reflect()
[文档] def set(self, val: T) -> None: """设置共享值 :param val: 新的共享值 """ if self.__callback is None: raise PluginIpcError("共享对象未绑定更新值的回调方法") self.__callback(val)
class IPCManager: def __init__(self) -> None: self._shares: dict[str, dict[str, AsyncShare | SyncShare]] = {} def add(self, plugin: str, obj: AsyncShare | SyncShare) -> None: objs = self._shares.setdefault(plugin, {}) if objs.get(obj.name) is not None: raise PluginIpcError(f"插件 {plugin} 中已存在名为 {obj.name} 的共享对象") objs[obj.name] = obj def add_func(self, plugin: str, func: Callable) -> None: self.add(plugin, SyncShare(func.__name__, lambda: func, None, True)) def get(self, plugin: str, id: str) -> AsyncShare | SyncShare: if (objs := self._shares.get(plugin)) is None: raise PluginIpcError(f"插件 {plugin} 未加载,或其不提供共享功能") if (obj := objs.get(id)) is None: raise PluginIpcError(f"无法获取不存在的共享对象:标识 {id} 不存在") return obj