melobot.io.base 源代码

from __future__ import annotations

import time
from abc import abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from types import TracebackType

from typing_extensions import Any, Generic, LiteralString, Self, TypeVar

from ..mixin import HookMixin, LogMixin
from ..typ.cls import BetterABC, abstractattr
from ..utils.common import get_id


[文档] @dataclass class InPacket: """输入包基类(数据类) :ivar float time: 时间戳 :ivar str id: id 标识 :ivar typing.LiteralString | None protocol: 遵循的协议 :ivar Any data: 附加的数据 """ time: float = field(default_factory=lambda: time.time_ns() / 1e9) id: str = field(default_factory=get_id) protocol: LiteralString | None = None data: Any = None
[文档] @dataclass class OutPacket: """输出包基类(数据类) :ivar float time: 时间戳 :ivar str id: id 标识 :ivar typing.LiteralString | None protocol: 遵循的协议 :ivar Any data: 附加的数据 """ time: float = field(default_factory=lambda: time.time_ns() / 1e9) id: str = field(default_factory=get_id) protocol: LiteralString | None = None data: Any = None
[文档] @dataclass class EchoPacket: """回应包基类(数据类) :ivar float time: 时间戳 :ivar str id: id 标识 :ivar typing.LiteralString | None protocol: 遵循的协议 :ivar Any data: 附加的数据 :ivar bool ok: 回应是否成功 :ivar int status: 回应状态码 :ivar str prompt: 提示语 :ivar bool noecho: 是否并无回应产生 """ time: float = field(default_factory=lambda: time.time_ns() / 1e9) id: str = field(default_factory=get_id) protocol: LiteralString | None = None data: Any = None ok: bool = True status: int = 0 prompt: str = "" noecho: bool = False
InPacketT = TypeVar("InPacketT", bound=InPacket) OutPacketT = TypeVar("OutPacketT", bound=OutPacket) EchoPacketT = TypeVar("EchoPacketT", bound=EchoPacket)
[文档] class SourceLifeSpan(Enum): """源生命周期阶段的枚举""" STARTED = "sta" RESTARTED = "res" STOPPED = "sto"
[文档] class AbstractSource(HookMixin[SourceLifeSpan], LogMixin, BetterABC): """抽象源基类""" protocol: LiteralString = abstractattr()
[文档] def __init__(self) -> None: super().__init__( hook_type=SourceLifeSpan, hook_tag=f"{self.__class__.__module__}.{self.__class__.__name__}", )
[文档] @abstractmethod async def open(self) -> None: """源打开方法""" raise NotImplementedError
[文档] @abstractmethod def opened(self) -> bool: """源是否已打开""" raise NotImplementedError
[文档] @abstractmethod async def close(self) -> None: """源关闭方法""" raise NotImplementedError
async def __aenter__(self) -> Self: if self.opened(): return self await self.open() await self._hook_bus.emit(SourceLifeSpan.STARTED) return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: if not self.opened(): return None try: await self.close() finally: await self._hook_bus.emit(SourceLifeSpan.STOPPED, True) return None
[文档] class AbstractInSource(AbstractSource, Generic[InPacketT]): """抽象输入源基类""" @abstractmethod async def open(self) -> None: raise NotImplementedError @abstractmethod def opened(self) -> bool: raise NotImplementedError @abstractmethod async def close(self) -> None: raise NotImplementedError
[文档] @abstractmethod async def input(self) -> InPacketT: """源输入方法 :return: 返回 :class:`.InPacket` 对象 """ raise NotImplementedError
InSourceT = TypeVar("InSourceT", bound=AbstractInSource)
[文档] class AbstractOutSource(AbstractSource, Generic[OutPacketT, EchoPacketT]): """抽象输出源基类""" @abstractmethod async def open(self) -> None: raise NotImplementedError @abstractmethod def opened(self) -> bool: raise NotImplementedError @abstractmethod async def close(self) -> None: raise NotImplementedError
[文档] @abstractmethod async def output(self, packet: OutPacketT) -> EchoPacketT: """源输出方法 :return: 返回 :class:`.OutPacket` 对象 """ raise NotImplementedError
OutSourceT = TypeVar("OutSourceT", bound=AbstractOutSource) InOrOutSourceT = TypeVar("InOrOutSourceT", bound=AbstractInSource | AbstractOutSource)
[文档] class AbstractIOSource(AbstractInSource[InPacketT], AbstractOutSource[OutPacketT, EchoPacketT]): """抽象输入输出源基类""" @abstractmethod async def open(self) -> None: raise NotImplementedError @abstractmethod async def close(self) -> None: raise NotImplementedError @abstractmethod def opened(self) -> bool: raise NotImplementedError @abstractmethod async def input(self) -> InPacketT: raise NotImplementedError @abstractmethod async def output(self, packet: OutPacketT) -> EchoPacketT: raise NotImplementedError
IOSourceT = TypeVar("IOSourceT", bound=AbstractIOSource)