melobot.protocols.onebot.v11.io.duplex_http 源代码

import asyncio
import hmac
import json
import time
from asyncio import Future

import aiohttp
import aiohttp.log
import aiohttp.web

from melobot.io import SourceLifeSpan
from melobot.log import LogLevel

from .base import BaseIOSource
from .packet import EchoPacket, InPacket, OutPacket


[文档] class HttpIO(BaseIOSource):
[文档] def __init__( self, onebot_url: str, serve_host: str, serve_port: int, secret: str | None = None, access_token: str | None = None, cd_time: float = 0, ) -> None: super().__init__(cd_time) self.onebot_url = onebot_url self.host: str = serve_host self.port: int = serve_port self.serve_site: aiohttp.web.TCPSite self.client_session: aiohttp.ClientSession self.secret = secret self.access_token = access_token self._tasks: list[asyncio.Task] = [] self._in_buf: asyncio.Queue[InPacket] = asyncio.Queue() self._out_buf: asyncio.Queue[OutPacket] = asyncio.Queue() self._pre_send_time = time.time_ns() self._echo_table: dict[str, tuple[str, Future[EchoPacket]]] = {} self._opened = asyncio.Event() self._lock = asyncio.Lock()
async def _respond(self, request: aiohttp.web.Request) -> aiohttp.web.Response: if not self._opened.is_set(): self._opened.set() data = await request.content.read() if data == b"": return aiohttp.web.Response(status=400) if self.secret is not None: sign = hmac.new(self.secret.encode(), data, "sha1").hexdigest() recv_sign = request.headers["X-Signature"][len("sha1=") :] if sign != recv_sign: self.logger.error("OneBot 实现程序鉴权不通过,本次上报数据将不会被处理") self.logger.generic_obj("试图上报的数据", data, level=LogLevel.ERROR) return aiohttp.web.Response(status=403) try: raw = json.loads(data.decode()) if ( self._hook_bus.get_evoke_time(SourceLifeSpan.STARTED) != -1 and raw.get("post_type") == "meta_event" and raw.get("meta_event_type") == "lifecycle" and raw.get("sub_type") == "connect" ): await self._hook_bus.emit(SourceLifeSpan.RESTARTED, False) self.logger.generic_obj("收到上报,未格式化的字典", str(raw), level=LogLevel.DEBUG) await self._in_buf.put(InPacket(time=raw["time"], data=raw)) except Exception: self.logger.exception("OneBot v11 HTTP IO 源输入异常") self.logger.generic_obj("异常点的上报数据", raw, level=LogLevel.ERROR) finally: return aiohttp.web.Response(status=204) async def _output_loop(self) -> None: while True: try: await self._opened.wait() out_packet = await self._out_buf.get() wait_time = self.cd_time - ((time.time_ns() - self._pre_send_time) / 1e9) await asyncio.sleep(wait_time) asyncio.create_task(self._handle_output(out_packet)) self._pre_send_time = time.time_ns() except asyncio.CancelledError: break except Exception: self.logger.exception("OneBot v11 HTTP IO 源输出异常") self.logger.generic_obj("异常点局部变量", locals(), level=LogLevel.ERROR) async def _handle_output(self, packet: OutPacket) -> None: try: headers: dict | None = None if self.access_token is not None: headers = {"Authorization": f"Bearer {self.access_token}"} http_resp = await self.client_session.post( f"{self.onebot_url}/{packet.action_type}", json=packet.action_params, headers=headers, ) if packet.echo_id is None: return raw = await http_resp.json() echo_id = raw.get("echo") if echo_id in (None, ""): return action_type, fut = self._echo_table.pop(echo_id) fut.set_result( EchoPacket( time=int(time.time()), data=raw, ok=raw["status"] == "ok", status=raw["retcode"], action_type=action_type, ) ) except aiohttp.ContentTypeError: self.logger.error( "OneBot v11 HTTP IO 源无法解析上报数据。可能是 access_token 未配置或错误" ) except Exception: self.logger.exception("OneBot v11 HTTP IO 源输出异常") self.logger.generic_obj("异常点的发送数据", packet.data, level=LogLevel.ERROR) async def open(self) -> None: if self.opened(): return async with self._lock: if self.opened(): return self.client_session = aiohttp.ClientSession() app = aiohttp.web.Application() app.add_routes([aiohttp.web.post("/", self._respond)]) runner = aiohttp.web.AppRunner(app) await runner.setup() self.serve_site = aiohttp.web.TCPSite(runner, self.host, self.port) await self.serve_site.start() self._tasks.append(asyncio.create_task(self._output_loop())) self.logger.info("OneBot v11 HTTP IO 源就绪,等待实现端上线中") await self._opened.wait() self.logger.info("OneBot v11 HTTP IO 源双向通信已建立")
[文档] def opened(self) -> bool: return self._opened.is_set()
async def close(self) -> None: if not self.opened(): return async with self._lock: if not self.opened(): return await self.serve_site.stop() await self.client_session.close() for t in self._tasks: t.cancel() if len(self._tasks): await asyncio.wait(self._tasks) self._tasks.clear() self._opened.clear() self.logger.info("OneBot v11 HTTP IO 源已停止运行") async def input(self) -> InPacket: return await self._in_buf.get() async def output(self, packet: OutPacket) -> EchoPacket: await self._out_buf.put(packet) if packet.echo_id is None: return EchoPacket(noecho=True) fut: Future[EchoPacket] = asyncio.get_running_loop().create_future() self._echo_table[packet.echo_id] = (packet.action_type, fut) return await fut