Files
ragflow_api_test/md_processor.py

244 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import List, Tuple, Optional
import os
class MarkdownProcessor:
"""
Markdown文档处理器处理标题层级、目录标记和格式清洗
这个处理器用于处理中文文档(如招股说明书等)的标题格式化和目录标记,
可将非标准的标题格式统一转换为Markdown标准的标题格式。
"""
# 标题模式定义
PATTERNS = {
"first_level": re.compile(
r"^(重大事项提示|重要声明|声明|声明和承诺|目录|声明及承诺|发行人声明|"
r"发行概况|本次发行概况|声明与承诺|释义|第[一二三四五六七八九十百]{1,3}[节章])"
),
"second_level": re.compile(r"^[一二三四五六七八九十]+、"),
"third_level": re.compile(r"^[一二三四五六七八九十]+"),
}
def __init__(self, debug: bool = False):
"""
初始化Markdown处理器
Args:
debug: 是否开启调试模式,开启后会打印处理过程信息
"""
self.debug = debug
def clean_lines(self, lines: List[str]) -> List[str]:
"""
清理文本行:去除空行和规范化标题格式
Args:
lines: 原始文本行列表
Returns:
清理后的文本行列表
"""
cleaned = []
for line in lines:
stripped = line.strip()
if not stripped:
continue
if re.match(r"^目\s*录$", stripped):
cleaned.append("目录")
elif stripped.startswith("#"):
# 移除现有的#号和空格,后续会重新添加正确的标题级别
cleaned.append(stripped.replace("#", "").strip())
else:
cleaned.append(stripped)
return cleaned
def clean_header(self, header: str) -> str:
# 去除所有空格和句点
header = re.sub(r"^[一二三四五六七八九十]+、", "", header)
header = re.sub(r"^[一二三四五六七八九十]+", "", header)
header = header.replace(" ", "").replace(".", "").replace("·", "")
# 去除行末的数字(匹配末尾的数字)
header = re.sub(r"\d+$", "", header)
return header
def mark_toc_lines(self, lines: List[str]) -> List[str]:
"""
标记目录区域中的行添加TOC:前缀
Args:
lines: 清理后的文本行列表
Returns:
标记了目录的文本行列表
"""
result = []
toc_mode = False
all_table_content_str = ""
toc_min_end_len = 15
toc_counter = 0
for index, line in enumerate(lines):
if toc_mode:
toc_counter += 1
if re.match(r"\s*录$", line.strip()):
toc_mode = True
result.append("目录")
elif (
toc_mode
and re.match(r".*释义$", line.strip().replace(" ", ""))
and toc_counter > toc_min_end_len # 防止目录中的前几行就找到了结束标志
):
# 目录区域结束标志
toc_mode = False
result.append(line)
elif toc_mode:
# 在目录区域内的行添加TOC:前缀
result.append(f"TOC:{line}")
all_table_content_str = all_table_content_str + self.clean_header(
header=line
)
else:
result.append(line)
return result, all_table_content_str
def format_headings(self, lines: List[str], all_table_content_str) -> List[str]:
"""
格式化标题根据不同的模式添加对应的Markdown标题标记
Args:
lines: 文本行列表
all_table_content_str: 目录内容的字符串
Returns:
格式化标题后的文本行列表
"""
formatted = []
for line in lines:
# 跳过已标记为目录的行
if line.startswith("TOC:"):
formatted.append(line)
continue
# 检查并格式化标题
if self.PATTERNS["first_level"].match(line):
new_line = f"\n# {line}\n"
if self.debug:
print(f"第一级标题: {line}")
print("-" * 20)
elif self.PATTERNS["second_level"].match(line):
# 移除中文序号 (如 "一、")
cleaned_line = re.sub(r"^[一二三四五六七八九十]+、", "", line)
# 检查清理后的内容是否在目录中
if self.clean_header(cleaned_line) in all_table_content_str:
new_line = f"\n## {line}\n"
if self.debug:
print(f"第二级标题: {line}")
print("-" * 20)
else:
new_line = line
elif self.PATTERNS["third_level"].match(line):
# 移除中文序号 (如 "(一)")
cleaned_line = re.sub(r"^[一二三四五六七八九十]+", "", line)
# 检查清理后的内容是否在目录中
if self.clean_header(cleaned_line) in all_table_content_str:
new_line = f"\n### {line}\n"
if self.debug:
print(f"第三级标题: {line}")
print("-" * 20)
else:
new_line = line
else:
new_line = line
formatted.append(new_line)
return formatted
def process_file(self, input_path: str, output_path: Optional[str] = None) -> str:
"""
处理单个Markdown文件
Args:
input_path: 输入文件路径
output_path: 输出文件路径,如未指定则自动生成
Returns:
输出文件的路径
"""
if not output_path:
# 自动生成输出文件名
base_name, ext = os.path.splitext(input_path)
output_path = f"{base_name}_processed{ext}"
# 读取文件
with open(input_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# 处理流程:清理 -> 标记目录 -> 格式化标题
cleaned_lines = self.clean_lines(lines)
toc_marked_lines, all_table_content_str = self.mark_toc_lines(cleaned_lines)
formatted_lines = self.format_headings(toc_marked_lines, all_table_content_str)
print(f"目录文字串: {all_table_content_str}")
# 写入结果
with open(output_path, "w", encoding="utf-8") as f:
for line in formatted_lines:
f.write(line + "\n")
if self.debug:
print(f"处理完成! 结果已保存至: {output_path}")
return output_path
def main():
"""主函数,用于示例和命令行执行"""
import argparse
parser = argparse.ArgumentParser(description="Markdown文档标题与目录处理工具")
parser.add_argument("input_file", help="输入Markdown文件路径")
parser.add_argument("-o", "--output", help="输出Markdown文件路径(可选)")
parser.add_argument("-d", "--debug", action="store_true", help="启用调试输出")
args = parser.parse_args()
input_path = args.input_file
processor = MarkdownProcessor(debug=args.debug)
# 判断是文件还是目录
if os.path.isdir(input_path):
# 批量处理所有 .md 文件
for root, _, files in os.walk(input_path):
for file in files:
if file.endswith(".md"):
file_path = os.path.join(root, file)
output_dir = args.output if args.output else root
os.makedirs(output_dir, exist_ok=True)
output_filename = os.path.splitext(file)[0] + "_processed.md"
output_path = os.path.join(output_dir, output_filename)
print(f"处理文件: {file_path}")
processor.process_file(file_path, output_path)
else:
# 单文件处理
output_path = processor.process_file(input_path, args.output)
print(f"文件处理完成,输出路径: {output_path}")
return
if __name__ == "__main__":
main()
# 测试代码
# processor = MarkdownProcessor(debug=True)
# input_path = "安徽天源.md"
# output_path = "安徽天源科技股份有限公司_processed.md"
# processor.process_file(input_path, output_path)
# 使用方法python md_processor.py input_folder/input_file -o output_folder