melobot.utils.parse.cmd 源代码
import re
from dataclasses import dataclass
from functools import lru_cache
from types import TracebackType
from typing_extensions import Any, Callable, Iterator, Optional
from melobot.exceptions import UtilError
from melobot.log import get_logger
from melobot.typ import SyncOrAsyncCallable, VoidType
from melobot.utils import to_async
from .base import AbstractParseArgs, Parser
class CmdParseError(UtilError): ...
class FormatError(CmdParseError): ...
class ArgValidateFailed(FormatError): ...
class ArgLackError(FormatError): ...
[文档]
@dataclass
class CmdArgs(AbstractParseArgs):
"""命令参数对象"""
name: str
tag: str | None
vals: list[Any]
[文档]
class CmdArgFormatInfo:
"""命令参数格式化信息对象
用于在命令参数格式化异常时传递信息。
"""
def __init__(
self,
src: str | VoidType,
src_desc: Optional[str],
src_expect: Optional[str],
idx: int,
exc: Exception,
exc_tb: Optional[TracebackType],
name: str,
) -> None:
#: 命令参数所属命令的命令名
self.name: str = name
#: 命令参数格式化前的原值,参数缺失时是 VoidType.VOID
self.src: str | VoidType = src
#: 命令参数值的功能描述
self.src_desc: str | None = src_desc
#: 命令参数值的值期待描述
self.src_expect: str | None = src_expect
#: 命令参数值的顺序(从 0 开始索引)
self.idx: int = idx
#: 命令参数格式化异常时的异常对象
self.exc: Exception = exc
#: 命令参数格式化异常时的调用栈信息
self.exc_tb: TracebackType | None = exc_tb
[文档]
class CmdArgFormatter:
"""命令参数格式化器
用于格式化命令解析器解析出的命令参数。搭配命令解析器 :class:`.CmdParser` 使用
"""
[文档]
def __init__(
self,
convert: Optional[Callable[[str], Any]] = None,
validate: Optional[Callable[[Any], bool]] = None,
src_desc: Optional[str] = None,
src_expect: Optional[str] = None,
default: Any = VoidType.VOID,
default_replace_flag: Optional[str] = None,
convert_fail: Optional[SyncOrAsyncCallable[[CmdArgFormatInfo], None]] = None,
validate_fail: Optional[SyncOrAsyncCallable[[CmdArgFormatInfo], None]] = None,
arg_lack: Optional[SyncOrAsyncCallable[[CmdArgFormatInfo], None]] = None,
) -> None:
"""初始化一个命令参数格式化器
:param convert: 类型转换方法,为空则不进行类型转换
:param validate: 值验证方法,为空则不对值进行验证
:param src_desc: 命令参数值的功能描述
:param src_expect: 命令参数值的值期待描述
:param default: 命令参数值的默认值(默认值 :class:`.Void` 表示无值,而不是 :obj:`None` 表达的空值)
:param default_replace_flag: 命令参数使用默认值的标志
:param convert_fail: 类型转换失败的回调,为空则使用默认回调
:param validate_fail: 值验证失败的回调,为空则使用默认回调
:param arg_lack: 参数缺失的回调,为空则执行默认规则
"""
self.convert = convert
self.validate = validate
self.src_desc = src_desc
self.src_expect = src_expect
self.default = default
self.default_replace_flag = default_replace_flag
if self.default is VoidType.VOID and self.default_replace_flag is not None:
raise CmdParseError("初始化参数格式化器时,使用“默认值替换标记”必须同时设置默认值")
self.convert_fail = to_async(convert_fail) if convert_fail is not None else None
self.validate_fail = to_async(validate_fail) if validate_fail is not None else None
self.arg_lack = to_async(arg_lack) if arg_lack is not None else None
def _get_val(self, args: CmdArgs, idx: int) -> Any:
if self.default is VoidType.VOID:
if len(args.vals) < idx + 1:
raise ArgLackError
return args.vals[idx]
if len(args.vals) < idx + 1:
args.vals.append(self.default)
return args.vals[idx]
[文档]
async def format(self, group_id: str, args: CmdArgs, idx: int) -> bool:
# 格式化参数为对应类型的变量
try:
src = self._get_val(args, idx)
if self.default_replace_flag is not None and src == self.default_replace_flag:
src = self.default
res = self.convert(src) if self.convert is not None else src
if self.validate is None or self.validate(res):
pass
else:
raise ArgValidateFailed
args.vals[idx] = res
return True
except ArgValidateFailed as e:
info = CmdArgFormatInfo(
src, self.src_desc, self.src_expect, idx, e, e.__traceback__, group_id
)
if self.validate_fail:
await self.validate_fail(info)
else:
await self._validate_fail_default(info)
return False
except ArgLackError as e:
info = CmdArgFormatInfo(
VoidType.VOID,
self.src_desc,
self.src_expect,
idx,
e,
e.__traceback__,
group_id,
)
if self.arg_lack:
await self.arg_lack(info)
else:
await self._arglack_default(info)
return False
except Exception as e:
info = CmdArgFormatInfo(
src, self.src_desc, self.src_expect, idx, e, e.__traceback__, group_id
)
if self.convert_fail:
await self.convert_fail(info)
else:
await self._convert_fail_default(info)
return False
async def _convert_fail_default(self, info: CmdArgFormatInfo) -> None:
e_class = f"{info.exc.__class__.__module__}.{info.exc.__class__.__qualname__}"
src = repr(info.src) if isinstance(info.src, str) else info.src
tip = f"第 {info.idx + 1} 个参数"
tip += (
f"({info.src_desc})无法处理,给定的值为:{src}。"
if info.src_desc
else f"给定的值 {src} 无法处理。"
)
tip += f"参数要求:{info.src_expect}。" if info.src_expect else ""
tip += f"\n详细错误描述:[{e_class}] {info.exc}"
tip = f"命令 {info.name} 参数格式化失败:\n{tip}"
get_logger().warning(tip)
async def _validate_fail_default(self, info: CmdArgFormatInfo) -> None:
src = repr(info.src) if isinstance(info.src, str) else info.src
tip = f"第 {info.idx + 1} 个参数"
tip += (
f"({info.src_desc})不符合要求,给定的值为:{src}。"
if info.src_desc
else f"给定的值 {src} 不符合要求。"
)
tip += f"参数要求:{info.src_expect}。" if info.src_expect else ""
tip = f"命令 {info.name} 参数格式化失败:\n{tip}"
get_logger().warning(tip)
async def _arglack_default(self, info: CmdArgFormatInfo) -> None:
tip = f"第 {info.idx + 1} 个参数"
tip += f"({info.src_desc})缺失。" if info.src_desc else "缺失。"
tip += f"参数要求:{info.src_expect}。" if info.src_expect else ""
tip = f"命令 {info.name} 参数格式化失败:\n{tip}"
get_logger().warning(tip)
@lru_cache(maxsize=128)
def _cmd_parse(
text: str,
start_regex: re.Pattern[str],
sep_regex: re.Pattern[str],
rm_empty: bool = True,
) -> dict[str, list[str]]:
def _split(string: str, regex: re.Pattern, pop_first: bool = True) -> Iterator[str]:
temp_string = regex.sub("\u0000", string)
temp_list = re.split("\u0000", temp_string)
if pop_first:
temp_list.pop(0)
return filter(lambda x: x != "", temp_list)
pure_string = text.strip() if rm_empty else text
cmd_seqs = (list(_split(s, sep_regex, False)) for s in _split(pure_string, start_regex))
seqs_filter = filter(lambda x: len(x) > 0, cmd_seqs)
cmd_dict: dict[str, list[str]] = {}
for seq in seqs_filter:
if len(seq) == 0:
continue
cmd_dict[seq[0]] = seq[1:]
return cmd_dict
[文档]
class CmdParser(Parser):
"""命令解析器
通过解析命令名和命令参数的形式,解析字符串。
"""
[文档]
def __init__(
self,
cmd_start: str | list[str],
cmd_sep: str | list[str],
targets: str | list[str],
fmtters: Optional[list[Optional[CmdArgFormatter]]] = None,
tag: str | None = None,
) -> None:
"""初始化一个命令解析器
.. admonition:: 注意
:class: caution
- 命令起始符和命令间隔符不允许包含:引号,各种括号,反斜杠,数字,英文,控制字符及各类空白字符。
- 命令起始符不能是命令间隔符的子序列,反之亦然。
:param cmd_start: 命令起始符(可以是字符串或字符串列表)
:param cmd_sep: 命令间隔符(可以是字符串或字符串列表)
:param targets: 匹配的命令名
:param formatters: 格式化器列表(列表可以包含空值,即此位置的参数无格式化)
:param tag: 标签,此标签将被填充给本解析器产生的 :class:`.CmdArgs` 对象的 `tag` 属性
"""
super().__init__()
self.targets = targets if isinstance(targets, list) else [targets]
assert len(self.targets) >= 1, "命令解析器至少需要一个目标命令名"
self.fmtters = fmtters
self.start_tokens = cmd_start if isinstance(cmd_start, list) else [cmd_start]
self.sep_tokens = cmd_sep if isinstance(cmd_sep, list) else [cmd_sep]
self.ban_regex = re.compile(r"[\'\"\\\(\)\[\]\{\}\r\n\ta-zA-Z0-9]")
self.cmd_start: list[str]
self.cmd_sep: list[str]
self.start_regex: re.Pattern[str]
self.sep_regex: re.Pattern[str]
self.arg_tag = tag if tag is not None else self.targets[0]
if self.ban_regex.findall(f"{''.join(cmd_start)}{''.join(cmd_sep)}"):
raise CmdParseError("存在命令解析器不支持的命令起始符,或命令间隔符")
_regex = re.compile(r"([\`\-\=\~\!\@\#\$\%\^\&\*\(\)\_\+\[\]\{\}\|\:\,\.\/\<\>\?])")
if not len(set(self.sep_tokens) & set(self.start_tokens)):
self.cmd_sep = [_regex.sub(r"\\\1", token) for token in self.sep_tokens]
self.cmd_start = [_regex.sub(r"\\\1", token) for token in self.start_tokens]
self.sep_regex = re.compile(rf"{'|'.join(self.cmd_sep)}")
self.start_regex = re.compile(rf"{'|'.join(self.cmd_start)}")
else:
raise CmdParseError("命令解析器起始符不能和间隔符重合")
async def parse(self, text: str) -> Optional[CmdArgs]:
cmd_dict = _cmd_parse(text, self.start_regex, self.sep_regex)
args_dict = {
cmd_name: CmdArgs(cmd_name, self.arg_tag, vals) for cmd_name, vals in cmd_dict.items()
}
for group_id in self.targets:
args = args_dict.get(group_id)
if args is not None:
break
else:
return None
if self.fmtters is None:
return args
for idx, fmt in enumerate(self.fmtters):
if fmt is None:
continue
status = await fmt.format(group_id, args, idx)
if not status:
return None
args.vals = args.vals[: len(self.fmtters)]
return args
[文档]
class CmdParserFactory:
"""命令解析器的工厂
预先存储命令起始符和命令间隔符,指定匹配的命令名后返回一个命令解析器。
"""
[文档]
def __init__(self, cmd_start: str | list[str], cmd_sep: str | list[str]) -> None:
"""初始化一个命令解析器的工厂
.. admonition:: 注意
:class: caution
- 命令起始符和命令间隔符不允许包含:引号,各种括号,反斜杠,数字,英文,控制字符及各类空白字符。
- 命令起始符不能是命令间隔符的子序列,反之亦然。
:param cmd_start: 命令起始符(可以是字符串或字符串列表)
:param cmd_sep: 命令间隔符(可以是字符串或字符串列表)
"""
self.cmd_start = cmd_start
self.cmd_sep = cmd_sep
[文档]
def get(
self,
targets: str | list[str],
formatters: Optional[list[Optional[CmdArgFormatter]]] = None,
tag: str | None = None,
) -> CmdParser:
"""生成匹配指定命令名的命令解析器
:param targets: 匹配的命令名
:param formatters: 格式化器列表(列表可以包含空值,即此位置的参数无格式化选项)
:param tag: 标签,此标签将被填充给解析器产生的 :class:`.CmdArgs` 对象的 `tag` 属性
"""
return CmdParser(self.cmd_start, self.cmd_sep, targets, formatters, tag)