melobot.protocols.console.io.src 源代码
from __future__ import annotations
import asyncio
import inspect
from prompt_toolkit import PromptSession, print_formatted_text
from prompt_toolkit.patch_stdout import patch_stdout
from typing_extensions import Any, ClassVar, cast
from melobot.io import AbstractIOSource
from melobot.log import LogLevel, logger
from ..const import PROTOCOL_IDENTIFIER
from .model import EchoPacket, InPacket, NormalOutputData, OutPacket, RawOutputData, StdinInputData
[文档]
class ConsoleIO(AbstractIOSource[InPacket, OutPacket, EchoPacket]):
__instance__: ClassVar[ConsoleIO | None] = None
[文档]
def __init__(
self,
record_in: bool = True,
record_out: bool = False,
**prompt_args: Any,
) -> None:
super().__init__()
self.protocol = PROTOCOL_IDENTIFIER
if self.__instance__ is not None:
raise ValueError(f"已经存在一个控制台源的实例: {self.__instance__}")
self.prompt_args = prompt_args
self.prompt_args.setdefault("message", "> ")
self.prompt_args.setdefault("mouse_support", False)
self.prompt_args.setdefault("show_frame", True)
self.prompt_args.setdefault("set_exception_handler", False)
self.prompt_args.setdefault("handle_sigint", False)
self.record_in = record_in
self.record_out = record_out
self.prompt_session: PromptSession[str]
self._prompt_args: dict[str, Any] = {}
self._last_finished: asyncio.Future[None] | None = None
self._lock = asyncio.Lock()
self._opened = asyncio.Event()
async def open(self) -> None:
if self._opened.is_set():
return
async with self._lock:
if self._opened.is_set():
return
self.prompt_session = PromptSession[str]()
self._opened.set()
logger.info("控制台源已开始运行")
[文档]
def opened(self) -> bool:
return self._opened.is_set()
async def close(self) -> None:
if not self._opened.is_set():
return
async with self._lock:
if not self._opened.is_set():
return
self._opened.clear()
if self.prompt_session.app.is_running:
self.prompt_session.app.exit()
logger.info("控制台源已停止运行")
[文档]
def refresh_prompt_args(self) -> None:
self._prompt_args = {}
async def input(self) -> InPacket:
await self._opened.wait()
if self._last_finished is not None:
await self._last_finished
kwargs = self.prompt_args | self._prompt_args
prompt = cast(str, kwargs.pop("message"))
with patch_stdout():
in_str = await self.prompt_session.prompt_async(prompt, **kwargs) # type: ignore[arg-type]
if self.record_in:
logger.generic_lazy(
"%s",
lambda: f"控制台输入: {in_str}",
level=LogLevel.DEBUG,
)
finish_fut = asyncio.get_running_loop().create_future()
self._last_finished = finish_fut
return InPacket(data=StdinInputData(content=in_str), finished=finish_fut)
async def output(self, packet: OutPacket) -> EchoPacket:
await self._opened.wait()
data = packet.data
if isinstance(data, RawOutputData):
if self.record_out:
logger.generic_lazy(
"%s",
lambda: f"控制台输出执行器: {data.executor}",
level=LogLevel.DEBUG,
)
ret = data.executor()
if inspect.isawaitable(ret):
await ret
elif isinstance(data, NormalOutputData):
out_str = data.content
stream = data.stream
if self.record_out:
logger.generic_lazy(
"%s",
lambda: f"控制台输出: {out_str},流:{stream}",
level=LogLevel.DEBUG,
)
print_formatted_text(out_str, file=stream)
else:
raise ValueError(f"暂不支持的输出包:{data}")
_data = cast(NormalOutputData | RawOutputData, data)
if _data.next_prompt_args is not None:
self._prompt_args = _data.next_prompt_args
return EchoPacket(noecho=True)