melobot.protocols.onebot.v11.adapter.base 源代码

import asyncio
from functools import wraps
from os import PathLike

from typing_extensions import Any, Callable, Iterable, Literal, Optional, Sequence, cast

from melobot.adapter import (
    AbstractEchoFactory,
    AbstractEventFactory,
    AbstractOutputFactory,
)
from melobot.adapter import Adapter as RootAdapter
from melobot.adapter import content as mc
from melobot.adapter.content import Content
from melobot.adapter.model import ActionHandle, EchoT
from melobot.ctx import Context
from melobot.exceptions import AdapterError
from melobot.handle import try_get_event
from melobot.typ import AsyncCallable
from melobot.utils import to_coro

from ..const import PROTOCOL_IDENTIFIER, P, T
from ..io.base import BaseIO
from ..io.packet import EchoPacket, InPacket, OutPacket
from . import action as ac
from . import echo as ec
from . import event as ev
from . import segment as se

_ValidateErrHandler = AsyncCallable[[dict[str, Any], Exception], None]


class ValidateErrHandleable:
    def __init__(self) -> None:
        self.err_handlers: list[_ValidateErrHandler] = []

    def add_validate_handler(self, callback: _ValidateErrHandler) -> None:
        self.err_handlers.append(callback)

    def validate_handle(
        self, data: dict[str, Any]
    ) -> Callable[[Callable[P, T]], AsyncCallable[P, T]]:

        def wrapper(func: Callable[P, T]) -> AsyncCallable[P, T]:

            @wraps(func)
            async def wrapped(*args: P.args, **kwargs: P.kwargs) -> T:
                try:
                    return func(*args, **kwargs)

                except Exception as e:
                    tasks = tuple(
                        asyncio.create_task(to_coro(cb(data, e)))
                        for cb in self.err_handlers
                    )
                    if len(tasks):
                        await asyncio.wait(tasks)

                return func(*args, **kwargs)

            return wrapped

        return wrapper


class EventFactory(AbstractEventFactory[InPacket, ev.Event], ValidateErrHandleable):
    async def create(self, packet: InPacket) -> ev.Event:
        data = packet.data
        return await self.validate_handle(data)(ev.Event.resolve)(data)


class OutputFactory(AbstractOutputFactory[OutPacket, ac.Action]):
    async def create(self, action: ac.Action) -> OutPacket:
        return OutPacket(
            data=action.flatten(),
            action_type=action.type,
            action_params=action.params,
            echo_id=action.id if action.need_echo else None,
        )


class EchoFactory(AbstractEchoFactory[EchoPacket, ec.Echo], ValidateErrHandleable):
    async def create(self, packet: EchoPacket) -> ec.Echo | None:
        if packet.noecho:
            return None

        data = packet.data
        data["action_type"] = packet.action_type
        return await self.validate_handle(data)(ec.Echo.resolve)(data)


[文档] class EchoRequireCtx(Context[bool]): def __init__(self) -> None: super().__init__("ONEBOT_V11_ECHO_REQUIRE", LookupError)
[文档] class Adapter( RootAdapter[EventFactory, OutputFactory, EchoFactory, ac.Action, BaseIO, BaseIO] ): def __init__(self) -> None: super().__init__( PROTOCOL_IDENTIFIER, EventFactory(), OutputFactory(), EchoFactory() )
[文档] def when_validate_error(self, validate_type: Literal["event", "echo"]) -> Callable[ [AsyncCallable[[dict[str, Any], Exception], None]], AsyncCallable[[dict[str, Any], Exception], None], ]: def wrapper( func: AsyncCallable[[dict[str, Any], Exception], None] ) -> AsyncCallable[[dict[str, Any], Exception], None]: if validate_type == "event": self._event_factory.add_validate_handler(func) elif validate_type == "echo": self._echo_factory.add_validate_handler(func) else: raise AdapterError("无效的验证类型,合法值是 'event', 'echo' 之一") return func return wrapper
[文档] async def call_output(self, action: ac.Action) -> tuple[ActionHandle, ...]: """输出行为的底层方法 :param action: 行为对象 :return: :class:`.ActionHandle` 元组 """ if EchoRequireCtx().try_get(): action.need_echo = True return await super().call_output(action)
[文档] def with_echo( self, func: AsyncCallable[P, tuple[ActionHandle[EchoT | None], ...]] ) -> AsyncCallable[P, tuple[ActionHandle[EchoT], ...]]: async def wrapped_api( *args: P.args, **kwargs: P.kwargs ) -> tuple[ActionHandle[EchoT], ...]: with EchoRequireCtx().unfold(True): handles = await func(*args, **kwargs) return cast(tuple[ActionHandle[EchoT], ...], handles) return wrapped_api
async def __send_text__( self, text: str ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send(text) async def __send_media__( self, name: str, raw: bytes | None = None, url: str | None = None, mimetype: str | None = None, ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send( se.contents_to_segs( [mc.MediaContent(name=name, url=url, raw=raw, mimetype=mimetype)] )[0] ) async def __send_image__( self, name: str, raw: bytes | None = None, url: str | None = None, mimetype: str | None = None, ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send( se.contents_to_segs( [mc.ImageContent(name=name, url=url, raw=raw, mimetype=mimetype)] )[0] ) async def __send_audio__( self, name: str, raw: bytes | None = None, url: str | None = None, mimetype: str | None = None, ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send( se.contents_to_segs( [mc.AudioContent(name=name, url=url, raw=raw, mimetype=mimetype)] )[0] ) async def __send_voice__( self, name: str, raw: bytes | None = None, url: str | None = None, mimetype: str | None = None, ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send( se.contents_to_segs( [mc.VoiceContent(name=name, url=url, raw=raw, mimetype=mimetype)] )[0] ) async def __send_video__( self, name: str, raw: bytes | None = None, url: str | None = None, mimetype: str | None = None, ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send( se.contents_to_segs( [mc.VideoContent(name=name, url=url, raw=raw, mimetype=mimetype)] )[0] ) async def __send_file__( self, name: str, path: str | PathLike[str] ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send( se.contents_to_segs([mc.FileContent(name=name, flag=str(path))])[0] ) async def __send_refer__( self, event: ev.RootEvent, contents: Sequence[Content] | None = None ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: if not isinstance(event, ev.MessageEvent): raise AdapterError( f"提供的事件不是 {ev.MessageEvent.__qualname__} 类型,无法用于发送 refer 消息" ) segs = se.contents_to_segs(list(contents)) if contents else [] segs.insert(0, se.ReplySegment(str(event.message_id))) if isinstance(event, ev.GroupMessageEvent): return await self.send_custom(segs, group_id=event.group_id) return await self.send_custom(segs, user_id=event.user_id) async def __send_resource__( self, name: str, url: str ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.send(se.contents_to_segs([mc.ResourceContent(name, url)])[0])
[文档] async def send( self, msgs: str | se.Segment | Iterable[se.Segment] | dict | Iterable[dict] ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: event = try_get_event() if not isinstance(event, ev.MessageEvent): raise AdapterError( f"当前上下文中不存在事件,或事件不为 {ev.MessageEvent.__qualname__} 类型,无法发送消息" ) if isinstance(event, ev.GroupMessageEvent): return await self.send_custom(msgs, group_id=event.group_id) return await self.send_custom(msgs, user_id=event.user_id)
[文档] async def send_reply( self, msgs: str | se.Segment | Iterable[se.Segment] | dict | Iterable[dict] ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: event = try_get_event() if not isinstance(event, ev.MessageEvent): raise AdapterError( f"当前上下文中不存在事件,或事件不为 {ev.MessageEvent.__qualname__} 类型,无法发送消息" ) kwargs: dict[str, Any] = { "msgs": ac.msgs_to_dicts(se.ReplySegment(str(event.message_id))) + ac.msgs_to_dicts(msgs) } if isinstance(event, ev.GroupMessageEvent): kwargs["group_id"] = event.group_id else: kwargs["user_id"] = event.user_id return await self.call_output(ac.SendMsgAction(**kwargs))
[文档] async def send_custom( self, msgs: str | se.Segment | Iterable[se.Segment] | dict | Iterable[dict], user_id: Optional[int] = None, group_id: Optional[int] = None, ) -> tuple[ActionHandle[ec.SendMsgEcho | None], ...]: return await self.call_output(ac.SendMsgAction(msgs, user_id, group_id))
[文档] async def send_forward( self, msgs: Iterable[se.NodeSegment] ) -> tuple[ActionHandle[ec.SendForwardMsgEcho | None], ...]: event = try_get_event() if not isinstance(event, ev.MessageEvent): raise AdapterError( f"当前上下文中不存在事件,或事件不为 {ev.MessageEvent.__qualname__} 类型,无法发送消息" ) if isinstance(event, ev.GroupMessageEvent): return await self.send_forward_custom(msgs, group_id=event.group_id) return await self.send_forward_custom(msgs, user_id=event.user_id)
[文档] async def send_forward_custom( self, msgs: Iterable[se.NodeSegment] | Iterable[dict], user_id: Optional[int] = None, group_id: Optional[int] = None, ) -> tuple[ActionHandle[ec.SendForwardMsgEcho | None], ...]: return await self.call_output(ac.SendForwardMsgAction(msgs, user_id, group_id))
[文档] async def delete_msg( self, msg_id: int ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.DeleteMsgAction(msg_id))
[文档] async def get_msg( self, msg_id: int ) -> tuple[ActionHandle[ec.GetMsgEcho | None], ...]: return await self.call_output(ac.GetMsgAction(msg_id))
[文档] async def get_forward_msg( self, forward_id: str ) -> tuple[ActionHandle[ec.GetForwardMsgEcho | None], ...]: return await self.call_output(ac.GetForwardMsgAction(forward_id))
[文档] async def send_like( self, user_id: int, times: int = 1 ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SendLikeAction(user_id, times))
[文档] async def set_group_kick( self, group_id: int, user_id: int, later_reject: bool = False ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output( ac.SetGroupKickAction(group_id, user_id, later_reject) )
[文档] async def set_group_ban( self, group_id: int, user_id: int, duration: int = 30 * 60 ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetGroupBanAction(group_id, user_id, duration))
[文档] async def set_group_anonymous_ban( self, group_id: int, anonymous: ac.SetGroupAnonymousBanAction.AnonymousDict, anonymous_flag: str, duration: int = 30 * 60, ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output( ac.SetGroupAnonymousBanAction(group_id, anonymous, anonymous_flag, duration) )
[文档] async def set_group_whole_ban( self, group_id: int, enable: bool = True ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetGroupWholeBanAction(group_id, enable))
[文档] async def set_group_admin( self, group_id: int, enable: bool = True ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetGroupAdminAction(group_id, enable))
[文档] async def set_group_anonymous( self, group_id: int, enable: bool = True ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetGroupAnonymousAction(group_id, enable))
[文档] async def set_group_card( self, group_id: int, user_id: int, card: str = "" ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetGroupCardAction(group_id, user_id, card))
[文档] async def set_group_name( self, group_id: int, name: str ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetGroupNameAction(group_id, name))
[文档] async def set_group_leave( self, group_id: int, is_dismiss: bool = False ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetGroupLeaveAction(group_id, is_dismiss))
[文档] async def set_group_special_title( self, group_id: int, user_id: int, title: str = "", duration: int = -1 ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output( ac.SetGroupSpecialTitleAction(group_id, user_id, title, duration) )
[文档] async def set_friend_add_request( self, add_flag: str, approve: bool = True, remark: str = "" ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output( ac.SetFriendAddRequestAction(add_flag, approve, remark) )
[文档] async def set_group_add_request( self, add_flag: str, add_type: Literal["add", "invite"], approve: bool = True, reason: str = "", ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output( ac.SetGroupAddRequestAction(add_flag, add_type, approve, reason) )
[文档] async def get_login_info(self) -> tuple[ActionHandle[ec.GetLoginInfoEcho], ...]: return await self.call_output(ac.GetLoginInfoAction())
[文档] async def get_stranger_info( self, user_id: int, no_cache: bool = False ) -> tuple[ActionHandle[ec.GetStrangerInfoEcho], ...]: return await self.call_output(ac.GetStrangerInfoAction(user_id, no_cache))
[文档] async def get_friend_list(self) -> tuple[ActionHandle[ec.GetFriendListEcho], ...]: return await self.call_output(ac.GetFriendlistAction())
[文档] async def get_group_info( self, group_id: int, no_cache: bool = False ) -> tuple[ActionHandle[ec.GetGroupInfoEcho], ...]: return await self.call_output(ac.GetGroupInfoAction(group_id, no_cache))
[文档] async def get_group_list(self) -> tuple[ActionHandle[ec.GetGroupListEcho], ...]: return await self.call_output(ac.GetGrouplistAction())
[文档] async def get_group_member_info( self, group_id: int, user_id: int, no_cache: bool = False ) -> tuple[ActionHandle[ec.GetGroupMemberInfoEcho], ...]: return await self.call_output( ac.GetGroupMemberInfoAction(group_id, user_id, no_cache) )
[文档] async def get_group_member_list( self, group_id: int ) -> tuple[ActionHandle[ec.GetGroupMemberListEcho], ...]: return await self.call_output(ac.GetGroupMemberlistAction(group_id))
[文档] async def get_group_honor_info( self, group_id: int, type: Literal[ "talkative", "performer", "legend", "strong_newbie", "emotion", "all" ], ) -> tuple[ActionHandle[ec.GetGroupHonorInfoEcho], ...]: return await self.call_output(ac.GetGroupHonorInfoAction(group_id, type))
[文档] async def get_cookies( self, domain: str = "" ) -> tuple[ActionHandle[ec.GetCookiesEcho], ...]: return await self.call_output(ac.GetCookiesAction(domain))
[文档] async def get_csrf_token(self) -> tuple[ActionHandle[ec.GetCsrfTokenEcho], ...]: return await self.call_output(ac.GetCsrfTokenAction())
[文档] async def get_credentials( self, domain: str = "" ) -> tuple[ActionHandle[ec.GetCredentialsEcho], ...]: return await self.call_output(ac.GetCredentialsAction(domain))
[文档] async def get_record( self, file: str, out_format: str ) -> tuple[ActionHandle[ec.GetRecordEcho], ...]: return await self.call_output(ac.GetRecordAction(file, out_format))
[文档] async def get_image(self, file: str) -> tuple[ActionHandle[ec.GetImageEcho], ...]: return await self.call_output(ac.GetImageAction(file))
[文档] async def can_send_image(self) -> tuple[ActionHandle[ec.CanSendImageEcho], ...]: return await self.call_output(ac.CanSendImageAction())
[文档] async def can_send_record(self) -> tuple[ActionHandle[ec.CanSendRecordEcho], ...]: return await self.call_output(ac.CanSendRecordAction())
[文档] async def get_status(self) -> tuple[ActionHandle[ec.GetStatusEcho], ...]: return await self.call_output(ac.GetStatusAction())
[文档] async def get_version_info(self) -> tuple[ActionHandle[ec.GetVersionInfoEcho], ...]: return await self.call_output(ac.GetVersionInfoAction())
[文档] async def set_restart( self, delay: int = 0 ) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.SetRestartAction(delay))
[文档] async def clean_cache(self) -> tuple[ActionHandle[ec.EmptyEcho | None], ...]: return await self.call_output(ac.CleanCacheAction())