melobot.log.base 源代码

from __future__ import annotations

import io
import logging
import logging.config
import logging.handlers
import os
import sys
import traceback
import types
from abc import abstractmethod
from contextlib import contextmanager
from enum import Enum
from inspect import currentframe
from logging import CRITICAL, DEBUG, ERROR, INFO, WARNING
from logging import Logger as _Logger
from logging import _srcfile as _LOGGING_SRC_FILE

import colorlog
import rich.console
import rich.pretty
from better_exceptions import ExceptionFormatter
from rich.highlighter import Highlighter, ReprHighlighter
from rich.style import Style
from rich.text import Text
from typing_extensions import Any, Callable, Generator, Literal, Optional

from ..typ._enum import VoidType
from ..typ.base import T
from ..typ.cls import BetterABC
from ..utils.common import singleton

_CONSOLE_IO = io.StringIO()
_CONSOLE = rich.console.Console(file=_CONSOLE_IO, record=True, color_system="256")
_REPR_HIGHLIGHTER = ReprHighlighter()
_HIGH_LIGHTWORDS = ["GET", "POST", "HEAD", "PUT", "DELETE", "OPTIONS", "TRACE", "PATCH"]


class _StyleHighlighter(Highlighter):
    def __init__(self, style: Style | None) -> None:
        super().__init__()
        self.style = style

    def highlight(self, text: Text) -> None:
        if self.style:
            text.stylize(self.style)


def _get_rich_object(
    obj: object,
    max_len: Optional[int] = 2000,
    style: Style | None = None,
    no_color: bool = False,
) -> tuple[str, str]:
    if no_color:
        hl = _StyleHighlighter(style=None)
    elif style:
        hl = _StyleHighlighter(style)
    else:
        hl = None

    _CONSOLE.print(
        rich.pretty.Pretty(
            obj,
            highlighter=hl,
            indent_guides=True,
            max_string=max_len,
            overflow="ignore",
            expand_all=True,
        ),
        crop=False,
    )
    colored_str = _CONSOLE_IO.getvalue().rstrip("\n")
    _CONSOLE_IO.seek(0)
    _CONSOLE_IO.truncate(0)
    return colored_str, _CONSOLE.export_text().rstrip("\n")


def _get_rich_repr(s: str, style: Style | None = None, no_color: bool = False) -> tuple[str, str]:
    if no_color:
        msg = Text(s)
    elif style:
        msg = Text(s, style=style)
    else:
        msg = _REPR_HIGHLIGHTER(Text(s))
        msg.highlight_words(_HIGH_LIGHTWORDS, "logging.keyword")

    _CONSOLE.print(msg)
    colored_str = _CONSOLE_IO.getvalue()[:-1]
    _CONSOLE_IO.seek(0)
    _CONSOLE_IO.truncate(0)
    return colored_str, _CONSOLE.export_text()[:-1]


_EXC_FORMATTER = ExceptionFormatter(colored=True)
_NO_COLOR_EXC_FORMATTER = ExceptionFormatter(colored=False)


class _NormalLvlFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        return logging.DEBUG <= record.levelno < logging.WARNING


class _MeloLogFilter(logging.Filter):
    def __init__(
        self,
        name: str = "",
        yellow_warn: bool = True,
        red_error: bool = True,
        legacy: bool = False,
    ) -> None:
        super().__init__(name)
        self._obj: Any = VoidType.VOID
        self._yellow_style = Style(color="yellow")
        self._red_style = Style(color="red")
        self._enable_yellow_warn = yellow_warn
        self._enable_red_error = red_error
        self._legacy = legacy

    def set_obj(self, obj: Any) -> None:
        self._obj = obj

    def clear_obj(self) -> None:
        self._obj = VoidType.VOID

    @contextmanager
    def on_obj(self, obj: Any) -> Generator[None, None, None]:
        try:
            self.set_obj(obj)
            yield
        finally:
            self.clear_obj()

    def filter(self, record: logging.LogRecord) -> Literal[True]:
        msg = str(record.msg)
        if record.args:
            msg = msg % record.args
        record.msg_str = msg

        record.mod_name, record.func_name, record.func_lineno = _current_finfo()
        self._fill_msg_and_obj(msg, record)
        return True

    def _fill_msg_and_obj(self, msg: str, record: logging.LogRecord) -> None:
        yellow_style = self._yellow_style
        red_style = self._red_style
        yellow_warn = self._enable_yellow_warn
        red_error = self._enable_red_error

        if self._legacy:
            record.legacy_msg_str, record.colored_msg_str, record.msg_str = msg, "", msg

            if self._obj is VoidType.VOID:
                record.legacy_obj, record.obj = "", ""
            else:
                record.legacy_obj = record.obj = _get_rich_object(self._obj, no_color=True)[1]
            record.colored_obj = ""
            return

        record.legacy_msg_str = ""
        record.legacy_obj = ""

        if red_error and record.levelno >= ERROR:
            record.colored_msg_str, record.msg_str = _get_rich_repr(msg, red_style)
        elif yellow_warn and record.levelno >= WARNING:
            record.colored_msg_str, record.msg_str = _get_rich_repr(msg, yellow_style)
        else:
            record.colored_msg_str, record.msg_str = _get_rich_repr(msg)

        if self._obj is VoidType.VOID:
            record.colored_obj, record.obj = "", ""
        elif red_error and record.levelno >= ERROR:
            record.legacy_obj = record.obj = _get_rich_object(self._obj, no_color=True)[1]
            record.colored_obj = ""
        elif yellow_warn and record.levelno >= WARNING:
            record.legacy_obj = record.obj = _get_rich_object(self._obj, no_color=True)[1]
            record.colored_obj = ""
        else:
            record.colored_obj, record.obj = _get_rich_object(self._obj)


[文档] class LogLevel(int, Enum): """日志等级枚举""" CRITICAL = CRITICAL ERROR = ERROR WARNING = WARNING INFO = INFO DEBUG = DEBUG
_FILE = os.path.normcase(_get_rich_object.__code__.co_filename) def _is_internal_frame(frame: types.FrameType) -> bool: filename = os.path.normcase(frame.f_code.co_filename) return filename in (_FILE, _LOGGING_SRC_FILE) or ( "importlib" in filename and "_bootstrap" in filename ) def _current_finfo() -> tuple[str, str, int]: frame = currentframe() while frame: if not _is_internal_frame(frame): return ( frame.f_globals["__name__"], frame.f_code.co_name, frame.f_lineno, ) frame = frame.f_back return "<unknown module>", "<unknown file>", -1
[文档] class GenericLogger(BetterABC): """通用日志器抽象类 任何日志器实现本类接口,或通过 :func:`.logger_patch` 修补后, 即可兼容 melobot 内部所有日志操作(也就可以用于 bot 初始化 :meth:`.Bot.__init__`) """
[文档] @abstractmethod def debug(self, msg: object) -> None: """`debug` 级别日志""" raise NotImplementedError
[文档] @abstractmethod def info(self, msg: object) -> None: """`info` 级别日志""" raise NotImplementedError
[文档] @abstractmethod def warning(self, msg: object) -> None: """`warning` 级别日志""" raise NotImplementedError
[文档] @abstractmethod def error(self, msg: object) -> None: """`error` 级别日志""" raise NotImplementedError
[文档] @abstractmethod def critical(self, msg: object) -> None: """`critical` 级别日志""" raise NotImplementedError
[文档] @abstractmethod def exception(self, msg: object) -> None: """记录异常信息的日志""" raise NotImplementedError
[文档] @abstractmethod def generic_lazy( self, msg: str, *arg_getters: Callable[[], str], level: LogLevel, with_exc: bool = False, ) -> None: """通用懒惰日志方法 :param msg: 日志消息,可使用 %s 指定稍后填充的参数 :param arg_getters: 填充消息 %s 位置的填充函数 :param level: 日志等级 :param with_exc: 是否记录异常栈信息 """ raise NotImplementedError
[文档] @abstractmethod def generic_obj( self, msg: str, obj: T, *arg_getters: Callable[[], str], level: LogLevel = LogLevel.INFO, ) -> None: """通用记录对象日志方法 :param msg: 附加的日志消息,可使用 %s 指定稍后填充的参数 :param obj: 需要被日志记录的对象 :param arg_getters: 填充消息 %s 位置的填充函数 :param level: 日志等级 """ raise NotImplementedError
@singleton class NullLogger(_Logger, GenericLogger): def __init__(self) -> None: super().__init__("__MELO_EMPTYLOGGER__", CRITICAL) self.addHandler(logging.NullHandler()) def generic_lazy( self, msg: str, *arg_getters: Callable[[], str], level: LogLevel, with_exc: bool = False, ) -> None: return def generic_obj( self, msg: str, obj: T, *arg_getters: Callable[[], str], level: LogLevel = LogLevel.INFO, ) -> None: return
[文档] class Logger(_Logger, GenericLogger): """melobot 内置日志器 推荐使用的日志器。实现了 :class:`GenericLogger` 接口,因此可以用于 melobot 内部日志记录。 `debug`, `info`, `warning`, `error`, `critical`, `exception` 等接口与 :class:`logging.Logger` 用法完全一致 """ __instances__: dict[str, "Logger"] = {} def __new__(cls, name: str = "melobot", /, *args: Any, **kwargs: Any) -> Logger: if name in Logger.__instances__: return Logger.__instances__[name] obj = super().__new__(cls) Logger.__instances__[name] = obj return obj
[文档] def __init__( self, name: str = "melobot", /, level: LogLevel = LogLevel.INFO, file_level: LogLevel = LogLevel.DEBUG, to_console: bool = True, to_dir: str | None = None, add_tag: bool = False, legacy: bool = False, yellow_warn: bool = True, red_error: bool = True, two_stream: bool = False, ) -> None: """初始化日志器 :param name: 日志器的名称(唯一) :param level: 日志等级 :param file_level: 日志文件的日志等级 :param to_console: 是否输出到控制台 :param to_dir: 保存日志文件的目录,为空则不保存文件 :param add_tag: 记录日志时是否标识日志器名称 :param legacy: 记录日志时是否使用传统样式(不对日志内容进行自动高亮,而是使用日志等级的五色) :param yellow_warn: 记录 `LogLevel.WARN` 级别时,是否将日志内容着色为黄色。 `legacy` 选项为 `True` 时此参数无效 :param red_error: 记录 `LogLevel.ERROR` 及以上级别时,是否将日志内容着色为红色。 `legacy` 选项为 `True` 时此参数无效 :param two_stream: 当使用记录到文件功能时,是否分离“常规日志”和“问题日志”(warning, error, critical)到不同的文件 """ if hasattr(self, "_built") and self._built: return super().__init__(name, LogLevel.DEBUG) self._handler_arr: list[logging.Handler] = [] self._no_tag = not add_tag self._filter = _MeloLogFilter(name, yellow_warn, red_error, legacy) if to_console: con_handler = self._add_console_handler() con_handler.setLevel(level) if to_dir is None: pass elif not two_stream: self._add_file_handler(to_dir, name, file_level) else: normal_handler = self._add_file_handler(to_dir, f"{name}.out", file_level) normal_handler.addFilter(_NormalLvlFilter(name)) self._add_file_handler(to_dir, f"{name}.err", max(file_level, LogLevel.WARNING)) self._built: bool = True
def _add_console_handler(self) -> logging.Handler: fmt = self._console_fmt(self.name, self._no_tag) handler = self._console_handler(fmt) handler.addFilter(self._filter) self.addHandler(handler) self._handler_arr.append(handler) return handler def _add_file_handler( self, log_dir: str, name: str, level: LogLevel = LogLevel.DEBUG ) -> logging.Handler: fmt = self._file_fmt(self.name, self._no_tag) handler = self._file_handler(fmt, log_dir, name, level) handler.addFilter(self._filter) self.addHandler(handler) self._handler_arr.append(handler) return handler @staticmethod def _make_fmt_nocache(fmt: logging.Formatter) -> None: _origin_format = fmt.format def nocache_format(record: logging.LogRecord) -> str: record.exc_text = None return _origin_format(record) fmt.format = nocache_format # type: ignore[method-assign] @staticmethod def _console_fmt(name: str, no_tag: bool = False) -> logging.Formatter: fmt_arr = [ "%(cyan)s%(asctime)s.%(msecs)03d%(reset)s", "%(log_color)s%(levelname)-7s%(reset)s", "%(blue)s%(mod_name)s%(reset)s:%(blue)s%(func_name)s%(reset)s" + ":%(cyan)s%(func_lineno)d%(reset)s", ] if not no_tag: fmt_arr.insert(0, f"%(purple)s{name}%(reset)s") fmt_s = " | ".join(fmt_arr) msg_str_f = "%(log_color)s%(legacy_msg_str)s%(reset)s%(colored_msg_str)s" obj_str_f = "%(log_color)s%(legacy_obj)s%(reset)s%(colored_obj)s" fmt_s += f" - {msg_str_f}{obj_str_f}" fmt = colorlog.ColoredFormatter(fmt_s, datefmt="%Y-%m-%d %H:%M:%S", reset=True) fmt.default_msec_format = "%s.%03d" fmt.formatException = lambda exc_info: "".join( # type: ignore[assignment] _EXC_FORMATTER.format_exception(*exc_info) ) Logger._make_fmt_nocache(fmt) # 未知的 mypy bug 推断 fmt 为 Any 类型... return fmt # type: ignore[no-any-return] @staticmethod def _console_handler(fmt: logging.Formatter) -> logging.Handler: handler = logging.StreamHandler() handler.setFormatter(fmt) return handler @staticmethod def _file_fmt(name: str, no_tag: bool = False) -> logging.Formatter: fmt_arr = [ "%(asctime)s.%(msecs)03d", "%(levelname)-7s", "%(mod_name)s:%(func_name)s:%(func_lineno)d", ] if not no_tag: fmt_arr.insert(0, name) fmt_s = " | ".join(fmt_arr) fmt_s += " - %(msg_str)s%(obj)s" fmt = logging.Formatter( fmt=fmt_s, datefmt="%Y-%m-%d %H:%M:%S", ) fmt.default_msec_format = "%s.%03d" fmt.formatException = lambda exc_info: "".join( # type: ignore _NO_COLOR_EXC_FORMATTER.format_exception(*exc_info) ) Logger._make_fmt_nocache(fmt) return fmt @staticmethod def _file_handler( fmt: logging.Formatter, log_dir: str, name: str, level: LogLevel ) -> logging.Handler: if not os.path.exists(log_dir): os.mkdir(log_dir) handler = logging.handlers.RotatingFileHandler( filename=os.path.join(log_dir, f"{name}.log"), maxBytes=1024 * 1024, backupCount=10, encoding="UTF-8", ) handler.setLevel(level) handler.setFormatter(fmt) return handler
[文档] def set_level(self, level: LogLevel) -> None: # type: ignore[override] """设置日志等级 日志等级自动应用于包含的所有 handler(但输出日志到文件的 handler 除外) :param level: 日志等级 """ super().setLevel(level) for handler in self._handler_arr: if not isinstance(handler, logging.handlers.RotatingFileHandler): handler.setLevel(level)
def findCaller( self, stack_info: bool = False, stacklevel: int = 1 ) -> tuple[str, int, str, str | None]: f = currentframe() if f is None: return "<unknown file>", 0, "<unknown function>", "<unknown stackinfo>" while stacklevel > 0: next_f = f.f_back if next_f is None: break f = next_f if not _is_internal_frame(f): stacklevel -= 1 co = f.f_code sinfo = None if stack_info: with io.StringIO() as sio: sio.write("Stack (most recent call last):\n") traceback.print_stack(f, file=sio) sinfo = sio.getvalue() if sinfo[-1] == "\n": sinfo = sinfo[:-1] assert isinstance(f.f_lineno, int) return co.co_filename, f.f_lineno, co.co_name, sinfo
[文档] def generic_lazy( self, msg: str, *arg_getters: Callable[[], str], level: LogLevel, with_exc: bool = False, ) -> None: """懒惰日志方法 :param msg: 日志消息,可使用 %s 指定稍后填充的参数 :param arg_getters: 填充消息 %s 位置的填充函数 :param level: 日志等级 :param with_exc: 是否记录异常栈信息 """ if not self.isEnabledFor(level): return exc = sys.exc_info() if with_exc else None self._log(level, msg, tuple(g() for g in arg_getters), exc_info=exc)
[文档] def generic_obj( self, msg: str, obj: T, *arg_getters: Callable[[], str], level: LogLevel = LogLevel.INFO, ) -> None: """记录对象的日志方法 :param msg: 附加的日志消息,可使用 %s 指定稍后填充的参数 :param obj: 需要被日志记录的对象 :param arg_getters: 填充消息 %s 位置的填充函数 :param level: 日志等级 """ if isinstance(self, Logger): with self._filter.on_obj(obj): self.generic_lazy(msg + "\n", *arg_getters, level=level) else: _getters = arg_getters + (lambda: _get_rich_object(obj)[1],) self.generic_lazy(msg + "\n%s", *_getters, level=level)