from __future__ import annotations
import asyncio
from asyncio import create_task
from contextlib import contextmanager
from dataclasses import dataclass, field
from time import time_ns
from typing_extensions import (
TYPE_CHECKING,
Any,
Awaitable,
Coroutine,
Generator,
Generic,
Hashable,
Literal,
LiteralString,
Self,
Sequence,
TypeVar,
cast,
)
from ..ctx import ActionManualSignalCtx, Context, LoggerCtx
from ..exceptions import AdapterError
from ..io.base import AbstractOutSource
from ..log.base import LogLevel
from ..mixin import AttrReprMixin, FlagMixin
from ..typ.base import T
from ..typ.cls import BetterABC, abstractattr
from ..utils.base import to_coro
from ..utils.common import get_id
from .content import Content
if TYPE_CHECKING:
from .base import AbstractEchoFactory, AbstractOutputFactory
[文档]
class Event(AttrReprMixin, FlagMixin):
"""事件基类
:ivar typing.LiteralString protocol: 遵循的协议,为空则协议无关
:ivar float time: 时间戳
:ivar str id: id 标识
:ivar typing.Hashable | None scope: 所在的域,可空
:ivar typing.Sequence[Content] contents: 附加的通用内容序列
"""
def __init__(
self,
protocol: LiteralString,
time: float = -1,
id: str = "",
scope: Hashable | None = None,
contents: Sequence[Content] | None = None,
) -> None:
super().__init__()
self.time = time_ns() / 1e9 if time == -1 else time
self.id = get_id() if id == "" else id
self.protocol = protocol
self.contents = contents if contents else ()
self.scope = scope
self.spread: bool = True
EventT = TypeVar("EventT", bound=Event)
[文档]
class TextEvent(Event, BetterABC):
"""文本事件类
:ivar str text: 文本内容
:ivar list[str] textlines: 文本分行内容
"""
text: str = abstractattr()
""":meta hide-value:"""
textlines: list[str] = abstractattr()
""":meta hide-value:"""
[文档]
class Action(AttrReprMixin, FlagMixin):
"""行为基类
:ivar float time: 时间戳
:ivar str id: id 标识
:ivar typing.LiteralString | None protocol: 遵循的协议,为空则协议无关
:ivar typing.Sequence[Content] contents: 附加的通用内容序列
:ivar typing.Hashable | None scope: 所在的域,可空
:ivar Event | None trigger: 触发该行为的事件,为空表明不由事件触发
"""
def __init__(
self,
time: float = -1,
id: str = "",
protocol: LiteralString | None = None,
scope: Hashable | None = None,
contents: Sequence[Content] | None = None,
trigger: Event | None = None,
) -> None:
super().__init__()
self.time = time_ns() / 1e9 if time == -1 else time
self.id = get_id() if id == "" else id
self.protocol = protocol
self.contents = contents if contents else ()
self.scope = scope
self.trigger = trigger
[文档]
class Echo(AttrReprMixin, FlagMixin):
"""回应基类
:ivar float time: 时间戳
:ivar str id: id 标识
:ivar typing.LiteralString | None protocol: 遵循的协议,为空则协议无关
:ivar typing.Hashable | None scope: 所在的域,可空
:ivar bool ok: 回应是否成功
:ivar int status: 回应状态码
:ivar str prompt: 回应提示语
:ivar Any data: 回应数据
"""
def __init__(
self,
time: float = -1,
id: str = "",
protocol: LiteralString | None = None,
scope: Hashable | None = None,
ok: bool = True,
status: int = 0,
prompt: str = "",
data: Any = None,
) -> None:
super().__init__()
self.time = time_ns() / 1e9 if time == -1 else time
self.id = get_id() if id == "" else id
self.protocol = protocol
self.scope = scope
self.ok = ok
self.status = status
self.prompt = prompt
self.data = data
ActionT = TypeVar("ActionT", bound=Action)
EchoT = TypeVar("EchoT", bound=Echo)
ActionRetT = TypeVar("ActionRetT", bound=Echo | None)
[文档]
class ActionHandle(Generic[ActionRetT]):
"""行为操作句柄
:ivar Action action: 操作包含的行为对象
:ivar typing.Literal["PENDING", "EXECUTING", "FINISHED"] status: 操作的状态。分别对应:未执行、执行中、执行完成
:ivar AbstractOutSource out_src: 执行操作的输出源对象
"""
def __init__(
self,
action: Action,
out_src: AbstractOutSource,
output_factory: "AbstractOutputFactory",
echo_factory: "AbstractEchoFactory",
) -> None:
self.action = action
self.status: Literal["PENDING", "EXECUTING", "FINISHED"] = "PENDING"
self.out_src = out_src
self._echo: ActionRetT
self._output_factory = output_factory
self._echo_factory = echo_factory
self._done = asyncio.Event()
if not ActionManualSignalCtx().try_get():
self.execute()
async def _wait(self) -> ActionRetT:
await self._done.wait()
return self._echo
[文档]
def __await__(self) -> Generator[Any, Any, ActionRetT]:
"""本对象实现 __await__ 接口,因此可等待。返回本操作对应的回应数据"""
return self._wait().__await__()
async def _execute(self) -> None:
try:
output_packet = await self._output_factory.create(self.action)
echo_packet = await self.out_src.output(output_packet)
self._echo = cast(ActionRetT, await self._echo_factory.create(echo_packet))
self.status = "FINISHED"
self._done.set()
except Exception:
logger = LoggerCtx().get()
handle_name = f"{self.__class__.__module__}.{self.action.__class__.__qualname__}"
logger.exception(f"行为句柄 {handle_name} 执行时出现异常")
logger.generic_obj("异常点局部变量", self.__dict__, level=LogLevel.ERROR)
def execute(self) -> Self:
if self.status != "PENDING":
raise AdapterError("行为操作正在执行或执行完毕,不应该再执行")
self.status = "EXECUTING"
create_task(self._execute())
return self
@dataclass
class _ChainStep:
coros: Sequence[Coroutine]
ret_when: Literal["FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED"]
next: _ChainStep | None = None
@dataclass(kw_only=True)
class _ChainCtxStep(_ChainStep): # type: ignore[override]
ctx_var: Context
ctx_val: Any
coros: Sequence[Coroutine] = field(default_factory=tuple)
ret_when: Literal["ALL_COMPLETED"] = "ALL_COMPLETED"
[文档]
class ActionChain:
"""行为链"""
def __init__(self) -> None:
self._chain: list[_ChainStep] = []
def _add_step(self, step: _ChainStep) -> None:
if len(self._chain):
self._chain[-1].next = step
self._chain.append(step)
[文档]
def unfold(self, ctx: Context[T], val: T) -> Self:
"""指定链后续的步骤中在 `ctx` 类别的上下文中执行,上下文值为 `val`
:param ctx: 上下文类别
:param val: 上下文的值
:return: 自身,因此支持链式调用
"""
self._add_step(_ChainCtxStep(ctx_var=ctx, ctx_val=val))
return self
async def _exec_handle(self, handles: Awaitable[tuple[ActionHandle, ...]]) -> None:
_handles = await handles
for handle in _handles:
handle.execute()
if len(_handles):
await asyncio.wait(map(create_task, map(to_coro, _handles)))
[文档]
def add(
self,
*handles: Awaitable[tuple[ActionHandle, ...]],
ret_when: Literal["FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED"] = "ALL_COMPLETED",
) -> Self:
"""在链的步骤中添加一组行为
:param handles: 返回行为操作句柄的可等待对象
:param ret_when: 指定这一组行为的等待模式
:return: 自身,因此支持链式调用
"""
coros = tuple(self._exec_handle(hs) for hs in handles)
self._add_step(_ChainStep(coros, ret_when))
return self
[文档]
def sleep(self, interval: float) -> Self:
"""
在链的步骤中添加指定时长的等待
:param interval: 等待时长
:return: 自身,因此支持链式调用
"""
coros = (asyncio.sleep(interval),)
self._add_step(_ChainStep(coros, ret_when="ALL_COMPLETED"))
return self
async def _start(self, step: _ChainStep) -> None:
if isinstance(step, _ChainCtxStep):
with step.ctx_var.unfold(step.ctx_val):
if step.next:
await self._start(step.next)
return
if len(step.coros):
await asyncio.wait(map(create_task, step.coros), return_when=step.ret_when)
if step.next:
await self._start(step.next)
[文档]
async def run(self) -> None:
"""顺序执行链的所有步骤"""
await self._start(self._chain[0])
[文档]
@contextmanager
def open_chain() -> Generator[ActionChain, None, None]:
"""创建行为链的上下文管理器
:yield: 行为链对象
"""
with ActionManualSignalCtx().unfold(True):
yield ActionChain()