melobot.log.reflect 源代码

from __future__ import annotations

import logging
import sys
from os import PathLike
from pathlib import Path
from time import perf_counter
from types import ModuleType

from typing_extensions import Any, cast

from .._imp import ALL_EXTS, PKG_INIT_FILENAMES, ModuleLoader, SpecFinder
from ..ctx import BotCtx
from ..utils.common import find_caller_stack
from .base import GenericLogger, Logger, is_logging_frame

_SPEC_FINDER = SpecFinder()
_BOT_CTX = BotCtx()


class LogNode:
    def __init__(
        self,
        part: str,
        parent: LogNode | None = None,
    ) -> None:
        self.part = part
        self.logger: GenericLogger | None = None
        self.parent = parent
        self.children: dict[str, LogNode] = {}

    def resolve_logger(self) -> GenericLogger | None:
        if self.logger is not None:
            return self.logger
        if self.parent is not None:
            return self.parent.resolve_logger()
        return None

    def add_child(self, part: str) -> LogNode:
        if part not in self.children:
            self.children[part] = LogNode(part, self)
        return self.children[part]


class LogReflector:
    global_logger: GenericLogger | None = Logger("[global]")
    root_node = LogNode("")
    last_updated = perf_counter()

    @classmethod
    def set_global_logger(cls, logger: GenericLogger | None) -> None:
        cls.global_logger = logger

    @classmethod
    def set_module_logger(
        cls, module: str | PathLike[str] | ModuleType, logger: GenericLogger | None
    ) -> None:
        path = cls._get_mod_path(module)
        cur_node = cls.root_node
        for part in path.parts:
            cur_node = cur_node.add_child(part)
        cur_node.logger = logger
        cls.last_updated = perf_counter()

    @classmethod
    def _get_mod_path(cls, mod: str | PathLike[str] | ModuleType) -> Path:
        if isinstance(mod, str) and not mod.endswith(ALL_EXTS) and not Path(mod).exists():
            cached_mod = sys.modules.get(mod)
            if cached_mod is not None:
                return cls._get_module_path(cached_mod)

            spec = _SPEC_FINDER.find_spec(mod, paths=None)
            if spec is not None:
                loader = cast(ModuleLoader, spec.loader)
                path = loader.mb_fp
                if path.parts[-1] in PKG_INIT_FILENAMES:
                    path = path.parent
                return path

        if isinstance(mod, ModuleType):
            path = cls._get_module_path(mod)
        else:
            path = Path(mod)
            if not path.is_absolute():
                try:
                    path = path.resolve(strict=True)
                except FileNotFoundError as e:
                    raise FileNotFoundError(f"尝试解析模块路径时,发现路径 {path} 不存在") from e
        return path

    @staticmethod
    def _get_module_path(mod: ModuleType) -> Path:
        if mod.__file__ is None:
            # 此时对应没有 __init__.* 的包
            path = Path(mod.__path__[0])
        else:
            path = (
                Path(mod.__file__).parent
                if mod.__file__.endswith(PKG_INIT_FILENAMES)
                else Path(mod.__file__)
            )
        return path


[文档] def set_global_logger(logger: GenericLogger | None) -> None: """设置顶级域对应的日志器 :param logger: 日志器,可以为空 """ LogReflector.set_global_logger(logger)
[文档] def set_module_logger( module: str | PathLike[str] | ModuleType, logger: GenericLogger | None ) -> None: """设置模块域对应的日志器 :param module: 模块名、模块相对或绝对路径、模块对象 :param logger: 日志器,可以为空 """ LogReflector.set_module_logger(module, logger)
def reflect_logger(stack_depth: int = 3) -> GenericLogger: _, caller_path_str, *_ = find_caller_stack( stacklevel=stack_depth, inner_frame_filter=is_logging_frame ) try: caller_path = Path(caller_path_str).resolve(strict=True) except FileNotFoundError as e: raise FileNotFoundError("自动决定日志器时,尝试解析调用模块的路径失败") from e cur_node = LogReflector.root_node for part in caller_path.parts: cur_node = cur_node.add_child(part) return GenericLogProxy(cur_node) class GenericLogProxy(GenericLogger): def __init__(self, node: LogNode) -> None: self.__log_node__ = node self.__cache_ts__: float = 0 self.__cache_logger__: GenericLogger | None = None @property def __logger__(self) -> GenericLogger | None: return self.__get_logger__() def __get_logger__(self) -> GenericLogger | None: if LogReflector.last_updated > self.__cache_ts__: logger = self.__log_node__.resolve_logger() self.__cache_ts__ = perf_counter() self.__cache_logger__ = logger else: logger = self.__cache_logger__ if logger is None: if bot := _BOT_CTX.try_get(): logger = bot.logger if logger is None: logger = LogReflector.global_logger return logger def __log_meth__(self, meth_name: str, *_: Any, **kwargs: Any) -> None: logger = self.__logger__ if logger is not None: if isinstance(logger, logging.Logger): kwargs["stacklevel"] = 3 getattr(logger, meth_name)(*_, **kwargs) def debug(self, *_: Any, **__: Any) -> None: self.__log_meth__("debug", *_, **__) def info(self, *_: Any, **__: Any) -> None: self.__log_meth__("info", *_, **__) def warning(self, *_: Any, **__: Any) -> None: self.__log_meth__("warning", *_, **__) def error(self, *_: Any, **__: Any) -> None: self.__log_meth__("error", *_, **__) def critical(self, *_: Any, **__: Any) -> None: self.__log_meth__("critical", *_, **__) def exception(self, *_: Any, **__: Any) -> None: self.__log_meth__("exception", *_, **__) def generic_lazy(self, *_: Any, **__: Any) -> None: self.__log_meth__("generic_lazy", *_, **__) def generic_obj(self, *_: Any, **__: Any) -> None: self.__log_meth__("generic_obj", *_, **__) def __getattr__(name: str) -> Any: if name != "logger": raise AttributeError return reflect_logger() logger: GenericLogger