melobot.protocols.onebot.v11.io.base 源代码
from abc import abstractmethod
from typing_extensions import Any, Self
from melobot.io import AbstractIOSource
from ..const import PROTOCOL_IDENTIFIER
from .packet import EchoPacket, InPacket, OutPacket
[文档]
class BaseIOSource(AbstractIOSource[InPacket, OutPacket, EchoPacket]):
"""
:ivar float cd_time: 发送行为操作的冷却时间(防风控)
"""
def __init__(self, cd_time: float) -> None:
super().__init__()
self.protocol = PROTOCOL_IDENTIFIER
self.cd_time = cd_time if cd_time >= 0 else 0
self._hook_bus.set_tag(f"{self.protocol}/{self.__class__.__name__}")
@abstractmethod
async def open(self) -> None:
raise NotImplementedError
@abstractmethod
async def close(self) -> None:
raise NotImplementedError
@abstractmethod
def opened(self) -> bool:
raise NotImplementedError
@abstractmethod
async def input(self) -> InPacket:
raise NotImplementedError
@abstractmethod
async def output(self, packet: OutPacket) -> EchoPacket:
raise NotImplementedError
class InstCounter:
INSTANCE_COUNT = 0
def __new__(cls, *args: Any, **kwargs: Any) -> Self:
o = super().__new__(cls)
cls.INSTANCE_COUNT += 1
return o