melobot.protocols.onebot.v11.io.duplex_http 源代码
import asyncio
import hmac
import json
import time
from asyncio import Future
import aiohttp
import aiohttp.web
from melobot.io import SourceLifeSpan
from melobot.log import LogLevel, log_exc, logger
from .base import BaseIOSource
from .packet import EchoPacket, InPacket, OutPacket
[文档]
class HttpIO(BaseIOSource):
[文档]
def __init__(
self,
onebot_url: str,
serve_host: str,
serve_port: int,
secret: str | None = None,
access_token: str | None = None,
cd_time: float = 0,
) -> None:
super().__init__(cd_time)
self.onebot_url = onebot_url
self.host: str = serve_host
self.port: int = serve_port
self.serve_site: aiohttp.web.TCPSite
self.client_session: aiohttp.ClientSession
self.secret = secret
self.access_token = access_token
self._tasks: list[asyncio.Task] = []
self._in_buf: asyncio.Queue[InPacket] = asyncio.Queue()
self._out_buf: asyncio.Queue[OutPacket] = asyncio.Queue()
self._pre_send_time = time.time_ns()
self._echo_table: dict[str, tuple[str, Future[EchoPacket]]] = {}
self._opened = asyncio.Event()
self._lock = asyncio.Lock()
async def _respond(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
if not self._opened.is_set():
self._opened.set()
data = await request.content.read()
if data == b"":
return aiohttp.web.Response(status=400)
if self.secret is not None:
sign = hmac.new(self.secret.encode(), data, "sha1").hexdigest()
recv_sign = request.headers["X-Signature"][len("sha1=") :]
if sign != recv_sign:
logger.warning("OneBot 实现程序鉴权不通过,本次上报数据将不会被处理")
logger.generic_obj("试图上报的数据", data, level=LogLevel.WARNING)
return aiohttp.web.Response(status=403)
try:
raw = json.loads(data.decode())
if (
self._hook_bus.get_evoke_time(SourceLifeSpan.STARTED) != -1
and raw.get("post_type") == "meta_event"
and raw.get("meta_event_type") == "lifecycle"
and raw.get("sub_type") == "connect"
):
await self._hook_bus.emit(SourceLifeSpan.RESTARTED, False)
logger.generic_obj("收到上报,未格式化的字典", str(raw), level=LogLevel.DEBUG)
await self._in_buf.put(InPacket(time=raw["time"], data=raw))
except Exception as e:
log_exc(e, msg="OneBot v11 HTTP IO 源输入异常", obj=raw)
return aiohttp.web.Response(status=204)
async def _output_loop(self) -> None:
while True:
try:
await self._opened.wait()
out_packet = await self._out_buf.get()
wait_time = self.cd_time - ((time.time_ns() - self._pre_send_time) / 1e9)
await asyncio.sleep(wait_time)
asyncio.create_task(self._handle_output(out_packet))
self._pre_send_time = time.time_ns()
except asyncio.CancelledError:
raise
except Exception as e:
log_exc(e, msg="OneBot v11 HTTP IO 源输出异常", obj=locals())
async def _handle_output(self, packet: OutPacket) -> None:
try:
headers: dict | None = None
if self.access_token is not None:
headers = {"Authorization": f"Bearer {self.access_token}"}
http_resp = await self.client_session.post(
f"{self.onebot_url}/{packet.action_type}",
json=packet.action_params,
headers=headers,
)
if packet.echo_id is None:
return
raw = await http_resp.json()
echo_id = raw.get("echo")
if echo_id in (None, ""):
return
action_type, fut = self._echo_table.pop(echo_id)
fut.set_result(
EchoPacket(
time=int(time.time()),
data=raw,
ok=raw["status"] == "ok",
status=raw["retcode"],
action_type=action_type,
)
)
except aiohttp.ContentTypeError:
logger.error("OneBot v11 HTTP IO 源无法解析上报数据。可能是 access_token 未配置或错误")
except Exception as e:
log_exc(e, msg="OneBot v11 HTTP IO 源输出异常", obj=packet.data)
async def open(self) -> None:
if self.opened():
return
async with self._lock:
if self.opened():
return
self.client_session = aiohttp.ClientSession()
app = aiohttp.web.Application()
app.add_routes([aiohttp.web.post("/", self._respond)])
runner = aiohttp.web.AppRunner(app)
await runner.setup()
self.serve_site = aiohttp.web.TCPSite(runner, self.host, self.port)
await self.serve_site.start()
self._tasks.append(asyncio.create_task(self._output_loop()))
logger.info("OneBot v11 HTTP IO 源就绪,等待实现端上线中")
await self._opened.wait()
logger.info("OneBot v11 HTTP IO 源双向通信已建立")
[文档]
def opened(self) -> bool:
return self._opened.is_set()
async def close(self) -> None:
if not self.opened():
return
async with self._lock:
if not self.opened():
return
await self.serve_site.stop()
await self.client_session.close()
for t in self._tasks:
t.cancel()
if len(self._tasks):
await asyncio.wait(self._tasks)
self._tasks.clear()
self._opened.clear()
logger.info("OneBot v11 HTTP IO 源已停止运行")
async def input(self) -> InPacket:
return await self._in_buf.get()
async def output(self, packet: OutPacket) -> EchoPacket:
await self._out_buf.put(packet)
if packet.echo_id is None:
return EchoPacket(noecho=True)
fut: Future[EchoPacket] = asyncio.get_running_loop().create_future()
self._echo_table[packet.echo_id] = (packet.action_type, fut)
return await fut