from __future__ import annotations
import asyncio
from asyncio import create_task
from contextlib import contextmanager
from time import time_ns
from typing_extensions import (
TYPE_CHECKING,
Any,
AsyncIterator,
Generator,
Generic,
Hashable,
Iterator,
Literal,
LiteralString,
Self,
Sequence,
TypeVar,
cast,
)
from ..ctx import ActionAutoExecCtx
from ..exceptions import ActionHandleError
from ..io.base import AbstractOutSource
from ..mixin import AttrReprMixin, FlagMixin
from ..typ.cls import BetterABC, abstractattr
from ..utils.common import get_id
from .content import Content
if TYPE_CHECKING:
from .base import AbstractEchoFactory, AbstractOutputFactory
[文档]
class Event(AttrReprMixin, FlagMixin):
"""事件基类
:ivar typing.LiteralString protocol: 遵循的协议,为空则协议无关
:ivar float time: 时间戳
:ivar str id: id 标识
:ivar typing.Hashable | None scope: 所在的域,可空
:ivar typing.Sequence[Content] contents: 附加的通用内容序列
"""
def __init__(
self,
protocol: LiteralString,
time: float = -1,
id: str = "",
scope: Hashable | None = None,
contents: Sequence[Content] | None = None,
) -> None:
super().__init__()
self.time = time_ns() / 1e9 if time == -1 else time
self.id = get_id() if id == "" else id
self.protocol = protocol
self.contents = contents if contents else ()
self.scope = scope
self.spread: bool = True
EventT = TypeVar("EventT", bound=Event)
[文档]
class TextEvent(Event, BetterABC):
"""文本事件类
:ivar str text: 文本内容
:ivar list[str] textlines: 文本分行内容
"""
text: str = abstractattr()
""":meta hide-value:"""
textlines: list[str] = abstractattr()
""":meta hide-value:"""
[文档]
class Action(AttrReprMixin, FlagMixin):
"""行为基类
:ivar float time: 时间戳
:ivar str id: id 标识
:ivar typing.LiteralString | None protocol: 遵循的协议,为空则协议无关
:ivar typing.Sequence[Content] contents: 附加的通用内容序列
:ivar typing.Hashable | None scope: 所在的域,可空
:ivar Event | None trigger: 触发该行为的事件,为空表明不由事件触发
"""
def __init__(
self,
time: float = -1,
id: str = "",
protocol: LiteralString | None = None,
scope: Hashable | None = None,
contents: Sequence[Content] | None = None,
trigger: Event | None = None,
) -> None:
super().__init__()
self.time = time_ns() / 1e9 if time == -1 else time
self.id = get_id() if id == "" else id
self.protocol = protocol
self.contents = contents if contents else ()
self.scope = scope
self.trigger = trigger
[文档]
class Echo(AttrReprMixin, FlagMixin):
"""回应基类
:ivar float time: 时间戳
:ivar str id: id 标识
:ivar typing.LiteralString | None protocol: 遵循的协议,为空则协议无关
:ivar typing.Hashable | None scope: 所在的域,可空
:ivar int status: 回应状态码
:ivar str prompt: 回应提示语
"""
def __init__(
self,
time: float = -1,
id: str = "",
protocol: LiteralString | None = None,
scope: Hashable | None = None,
status: int = 0,
prompt: str = "",
) -> None:
super().__init__()
self.time = time_ns() / 1e9 if time == -1 else time
self.id = get_id() if id == "" else id
self.protocol = protocol
self.scope = scope
self.status = status
self.prompt = prompt
ActionT = TypeVar("ActionT", bound=Action)
EchoT = TypeVar("EchoT", bound=Echo)
[文档]
class ActionHandleGroup(Generic[EchoT]):
"""行为操作句柄组"""
def __init__(self, *handles: ActionHandle[EchoT]) -> None:
self._handles = handles
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(all: {len(self._handles)}, "
f"done: {len(tuple(h for h in self._handles if h.status == 'DONE'))})"
)
def __getitem__(self, idx: int) -> ActionHandle[EchoT]:
return self._handles[idx]
[文档]
def __len__(self) -> int:
return len(self._handles)
[文档]
def __iter__(self) -> Iterator[ActionHandle[EchoT]]:
return iter(self._handles)
[文档]
async def __aiter__(self) -> AsyncIterator[tuple[EchoT | None, ActionHandle[EchoT]]]:
waits = (h._await_ret_self() for h in self._handles)
for fut in asyncio.as_completed(waits):
yield await fut
[文档]
async def unwrap_iter(self) -> AsyncIterator[EchoT]:
waits = (h._await_ret_self() for h in self._handles)
for fut in asyncio.as_completed(waits):
echo, h = await fut
if echo is None:
raise ActionHandleError(
"迭代获取行为操作的回应失败,迭代时出现为 None 的回应", handle=h
)
yield echo
[文档]
def __await__(self) -> Generator[Any, Any, list[EchoT | None]]:
return self._await_all().__await__()
[文档]
async def unwrap(self, idx: int) -> EchoT:
handle = self._handles[idx]
echo = await handle
if echo is None:
raise ActionHandleError(
f"对行为操作 {self.unwrap.__name__} 失败,因为操作的回应为 None", handle=handle
)
return echo
[文档]
async def unwrap_all(self) -> list[EchoT]:
echoes = await self._await_all()
for idx, e in enumerate(echoes):
if e is None:
raise ActionHandleError(
f"对行为操作组 {self.unwrap_all.__name__} 失败,因为操作组的回应中有 None。",
handle=self._handles[idx],
)
return cast(list[EchoT], echoes)
async def _await_all(self) -> list[EchoT | None]:
echoes = await asyncio.gather(*self._handles)
return echoes
[文档]
def execute(self) -> None:
for h in self._handles:
h.execute()
[文档]
class ActionHandle(Generic[EchoT]):
"""行为操作句柄
:ivar Action action: 操作包含的行为对象
:ivar typing.Literal["PENDING", "EXECUTING", "FINISHED"] status: 操作的状态。分别对应:未执行、执行中、执行完成
:ivar AbstractOutSource out_src: 执行操作的输出源对象
"""
def __init__(
self,
action: Action,
out_src: AbstractOutSource,
output_factory: "AbstractOutputFactory",
echo_factory: "AbstractEchoFactory",
) -> None:
self.action = action
self.status: Literal["PENDING", "EXECUTING", "DONE"] = "PENDING"
self.out_src = out_src
self._echo_fut: asyncio.Future[EchoT | None] = asyncio.Future()
self._output_factory = output_factory
self._echo_factory = echo_factory
if ActionAutoExecCtx().try_get(True):
self.execute()
def __repr__(self) -> str:
return f"{self.__class__.__name__}(status={self.status}, fut={self._echo_fut})"
[文档]
def __await__(self) -> Generator[Any, Any, EchoT | None]:
# 并发多次 await 是安全的,未触发执行任务时 await 也是安全的
return self._echo_fut.__await__()
async def _await_ret_self(self) -> tuple[EchoT | None, Self]:
echo = await self._echo_fut
return echo, self
[文档]
def execute(self) -> Self:
if self.status != "PENDING":
raise RuntimeError("行为操作正在执行或执行完毕,不应该再执行")
self.status = "EXECUTING"
create_task(self._execute())
return self
async def _execute(self) -> None:
try:
output_packet = await self._output_factory.create(self.action)
echo_packet = await self.out_src.output(output_packet)
echo = await self._echo_factory.create(echo_packet)
self._echo_fut.set_result(echo)
except Exception as e:
self._echo_fut.set_exception(e)
if isinstance(e, asyncio.CancelledError):
raise
finally:
self.status = "DONE"
[文档]
@contextmanager
def lazy_action() -> Generator[None, None, None]:
"""手动执行行为操作的上下文管理器
展开一个行为操作不自动执行的上下文,适用于需要手动干预行为操作执行时机的场景
"""
with ActionAutoExecCtx().unfold(False):
yield