更新MinIO文件上传逻辑,修改上传文件名为带路径格式;添加上传文件时的注释说明
This commit is contained in:
		
							
								
								
									
										244
									
								
								md_processor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										244
									
								
								md_processor.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,244 @@ | ||||
| import re | ||||
| from typing import List, Tuple, Optional | ||||
| import os | ||||
|  | ||||
| class MarkdownProcessor: | ||||
|     """ | ||||
|     Markdown文档处理器:处理标题层级、目录标记和格式清洗 | ||||
|  | ||||
|     这个处理器用于处理中文文档(如招股说明书等)的标题格式化和目录标记, | ||||
|     可将非标准的标题格式统一转换为Markdown标准的标题格式。 | ||||
|     """ | ||||
|  | ||||
|     # 标题模式定义 | ||||
|     PATTERNS = { | ||||
|         "first_level": re.compile( | ||||
|             r"^(重大事项提示|重要声明|声明|声明和承诺|目录|声明及承诺|发行人声明|" | ||||
|             r"发行概况|本次发行概况|声明与承诺|释义|第[一二三四五六七八九十百]{1,3}[节章])" | ||||
|         ), | ||||
|         "second_level": re.compile(r"^[一二三四五六七八九十]+、"), | ||||
|         "third_level": re.compile(r"^([一二三四五六七八九十]+)"), | ||||
|     } | ||||
|  | ||||
|     def __init__(self, debug: bool = False): | ||||
|         """ | ||||
|         初始化Markdown处理器 | ||||
|  | ||||
|         Args: | ||||
|             debug: 是否开启调试模式,开启后会打印处理过程信息 | ||||
|         """ | ||||
|         self.debug = debug | ||||
|  | ||||
|     def clean_lines(self, lines: List[str]) -> List[str]: | ||||
|         """ | ||||
|         清理文本行:去除空行和规范化标题格式 | ||||
|  | ||||
|         Args: | ||||
|             lines: 原始文本行列表 | ||||
|  | ||||
|         Returns: | ||||
|             清理后的文本行列表 | ||||
|         """ | ||||
|         cleaned = [] | ||||
|         for line in lines: | ||||
|             stripped = line.strip() | ||||
|             if not stripped: | ||||
|                 continue | ||||
|  | ||||
|             if re.match(r"^目\s*录$", stripped): | ||||
|                 cleaned.append("目录") | ||||
|             elif stripped.startswith("#"): | ||||
|                 # 移除现有的#号和空格,后续会重新添加正确的标题级别 | ||||
|                 cleaned.append(stripped.replace("#", "").strip()) | ||||
|             else: | ||||
|                 cleaned.append(stripped) | ||||
|  | ||||
|         return cleaned | ||||
|  | ||||
|     def clean_header(self, header: str) -> str: | ||||
|         # 去除所有空格和句点 | ||||
|         header = re.sub(r"^[一二三四五六七八九十]+、", "", header) | ||||
|         header = re.sub(r"^([一二三四五六七八九十]+)", "", header) | ||||
|         header = header.replace(" ", "").replace(".", "").replace("·", "") | ||||
|         # 去除行末的数字(匹配末尾的数字) | ||||
|         header = re.sub(r"\d+$", "", header) | ||||
|         return header | ||||
|  | ||||
|     def mark_toc_lines(self, lines: List[str]) -> List[str]: | ||||
|         """ | ||||
|         标记目录区域中的行,添加TOC:前缀 | ||||
|  | ||||
|         Args: | ||||
|             lines: 清理后的文本行列表 | ||||
|  | ||||
|         Returns: | ||||
|             标记了目录的文本行列表 | ||||
|         """ | ||||
|         result = [] | ||||
|         toc_mode = False | ||||
|  | ||||
|         all_table_content_str = "" | ||||
|  | ||||
|         toc_min_end_len = 15 | ||||
|  | ||||
|         toc_counter = 0 | ||||
|  | ||||
|         for index, line in enumerate(lines): | ||||
|             if toc_mode: | ||||
|                 toc_counter += 1 | ||||
|             if re.match(r"目\s*录$", line.strip()): | ||||
|                 toc_mode = True | ||||
|                 result.append("目录") | ||||
|             elif ( | ||||
|                 toc_mode | ||||
|                 and re.match(r".*释义$", line.strip().replace(" ", "")) | ||||
|                 and toc_counter > toc_min_end_len  # 防止目录中的前几行就找到了结束标志 | ||||
|             ): | ||||
|                 # 目录区域结束标志 | ||||
|                 toc_mode = False | ||||
|                 result.append(line) | ||||
|             elif toc_mode: | ||||
|                 # 在目录区域内的行添加TOC:前缀 | ||||
|                 result.append(f"TOC:{line}") | ||||
|                 all_table_content_str = all_table_content_str + self.clean_header( | ||||
|                     header=line | ||||
|                 ) | ||||
|             else: | ||||
|                 result.append(line) | ||||
|  | ||||
|         return result, all_table_content_str | ||||
|  | ||||
|     def format_headings(self, lines: List[str], all_table_content_str) -> List[str]: | ||||
|         """ | ||||
|         格式化标题,根据不同的模式添加对应的Markdown标题标记 | ||||
|  | ||||
|         Args: | ||||
|             lines: 文本行列表 | ||||
|             all_table_content_str: 目录内容的字符串 | ||||
|  | ||||
|         Returns: | ||||
|             格式化标题后的文本行列表 | ||||
|         """ | ||||
|         formatted = [] | ||||
|  | ||||
|         for line in lines: | ||||
|             # 跳过已标记为目录的行 | ||||
|             if line.startswith("TOC:"): | ||||
|                 formatted.append(line) | ||||
|                 continue | ||||
|  | ||||
|             # 检查并格式化标题 | ||||
|             if self.PATTERNS["first_level"].match(line): | ||||
|                 new_line = f"\n# {line}\n" | ||||
|                 if self.debug: | ||||
|                     print(f"第一级标题: {line}") | ||||
|                     print("-" * 20) | ||||
|             elif self.PATTERNS["second_level"].match(line): | ||||
|                 # 移除中文序号 (如 "一、") | ||||
|                 cleaned_line = re.sub(r"^[一二三四五六七八九十]+、", "", line) | ||||
|                 # 检查清理后的内容是否在目录中 | ||||
|                 if self.clean_header(cleaned_line) in all_table_content_str: | ||||
|                     new_line = f"\n## {line}\n" | ||||
|                     if self.debug: | ||||
|                         print(f"第二级标题: {line}") | ||||
|                         print("-" * 20) | ||||
|                 else: | ||||
|                     new_line = line | ||||
|             elif self.PATTERNS["third_level"].match(line): | ||||
|                 # 移除中文序号 (如 "(一)") | ||||
|                 cleaned_line = re.sub(r"^([一二三四五六七八九十]+)", "", line) | ||||
|                 # 检查清理后的内容是否在目录中 | ||||
|                 if self.clean_header(cleaned_line) in all_table_content_str: | ||||
|                     new_line = f"\n### {line}\n" | ||||
|                     if self.debug: | ||||
|                         print(f"第三级标题: {line}") | ||||
|                         print("-" * 20) | ||||
|                 else: | ||||
|                     new_line = line | ||||
|             else: | ||||
|                 new_line = line | ||||
|  | ||||
|             formatted.append(new_line) | ||||
|  | ||||
|         return formatted | ||||
|  | ||||
|     def process_file(self, input_path: str, output_path: Optional[str] = None) -> str: | ||||
|         """ | ||||
|         处理单个Markdown文件 | ||||
|  | ||||
|         Args: | ||||
|             input_path: 输入文件路径 | ||||
|             output_path: 输出文件路径,如未指定则自动生成 | ||||
|  | ||||
|         Returns: | ||||
|             输出文件的路径 | ||||
|         """ | ||||
|         if not output_path: | ||||
|             # 自动生成输出文件名 | ||||
|             base_name, ext = os.path.splitext(input_path) | ||||
|             output_path = f"{base_name}_processed{ext}" | ||||
|  | ||||
|         # 读取文件 | ||||
|         with open(input_path, "r", encoding="utf-8") as f: | ||||
|             lines = f.readlines() | ||||
|  | ||||
|         # 处理流程:清理 -> 标记目录 -> 格式化标题 | ||||
|         cleaned_lines = self.clean_lines(lines) | ||||
|         toc_marked_lines, all_table_content_str = self.mark_toc_lines(cleaned_lines) | ||||
|         formatted_lines = self.format_headings(toc_marked_lines, all_table_content_str) | ||||
|  | ||||
|         print(f"目录文字串: {all_table_content_str}") | ||||
|  | ||||
|         # 写入结果 | ||||
|         with open(output_path, "w", encoding="utf-8") as f: | ||||
|             for line in formatted_lines: | ||||
|                 f.write(line + "\n") | ||||
|  | ||||
|         if self.debug: | ||||
|             print(f"处理完成! 结果已保存至: {output_path}") | ||||
|  | ||||
|         return output_path | ||||
|  | ||||
|  | ||||
| def main(): | ||||
|     """主函数,用于示例和命令行执行""" | ||||
|     import argparse | ||||
|  | ||||
|     parser = argparse.ArgumentParser(description="Markdown文档标题与目录处理工具") | ||||
|     parser.add_argument("input_file", help="输入Markdown文件路径") | ||||
|     parser.add_argument("-o", "--output", help="输出Markdown文件路径(可选)") | ||||
|     parser.add_argument("-d", "--debug", action="store_true", help="启用调试输出") | ||||
|  | ||||
|     args = parser.parse_args() | ||||
|     input_path = args.input_file | ||||
|     processor = MarkdownProcessor(debug=args.debug) | ||||
|  | ||||
|     # 判断是文件还是目录 | ||||
|     if os.path.isdir(input_path): | ||||
|         # 批量处理所有 .md 文件 | ||||
|         for root, _, files in os.walk(input_path): | ||||
|             for file in files: | ||||
|                 if file.endswith(".md"): | ||||
|                     file_path = os.path.join(root, file) | ||||
|                     output_dir = args.output if args.output else root | ||||
|                     os.makedirs(output_dir, exist_ok=True) | ||||
|                     output_filename = os.path.splitext(file)[0] + "_processed.md" | ||||
|                     output_path = os.path.join(output_dir, output_filename) | ||||
|                     print(f"处理文件: {file_path}") | ||||
|                     processor.process_file(file_path, output_path) | ||||
|     else: | ||||
|         # 单文件处理 | ||||
|         output_path = processor.process_file(input_path, args.output) | ||||
|         print(f"文件处理完成,输出路径: {output_path}") | ||||
|     return | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
|     # 测试代码 | ||||
|     # processor = MarkdownProcessor(debug=True) | ||||
|     # input_path = "安徽天源.md" | ||||
|     # output_path = "安徽天源科技股份有限公司_processed.md" | ||||
|  | ||||
|     # processor.process_file(input_path, output_path) | ||||
|     # 使用方法python md_processor.py input_folder/input_file -o output_folder | ||||
							
								
								
									
										16
									
								
								minio_api.py
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								minio_api.py
									
									
									
									
									
								
							| @@ -86,9 +86,17 @@ minio_client= get_minio_client() | ||||
|  | ||||
| # 要上传的存储桶信息 | ||||
| bucket_name = "my-bucket"  # 替换为你的存储桶名称 | ||||
| object_name = "1.jpg"  # 文件在MinIO中存储的名称 | ||||
| object_name = "image/1.jpg"  # 文件在MinIO中存储的名称 | ||||
| file_path = "G:\\11\\ragflow_api_test\\2.jpg"  # 本地文件路径 | ||||
|  | ||||
|  | ||||
| # 通过fput_object上传时: | ||||
|  | ||||
| # 如果object_name为image\image.jpg,则上传后的名字就是image\image.jpg; | ||||
|  | ||||
| # 如果object_name为image/image.jpg,则上传后image为文件夹,文件名为image.jpg; | ||||
|  | ||||
|  | ||||
| try: | ||||
|     # 检查存储桶是否存在,如果不存在则创建(可选) | ||||
|     if not minio_client.bucket_exists(bucket_name): | ||||
| @@ -101,9 +109,13 @@ try: | ||||
|         object_name=object_name, | ||||
|         file_path=file_path | ||||
|     ) | ||||
|  | ||||
|     # 获取文件的预签名URL(可选) | ||||
|     res = minio_client.get_presigned_url("GET", bucket_name, object_name, expires=timedelta(days=7)) | ||||
|     #res=minio_client.share_file(bucket_name, object_name, 7) | ||||
|  | ||||
|     #res = "http://127.0.0.1:9000" + "/"+bucket_name+"/"  + object_name | ||||
|  | ||||
|      | ||||
|     print(res) | ||||
|     print(f"文件 '{file_path}' 成功上传到存储桶 '{bucket_name}' 为 '{object_name}'") | ||||
|      | ||||
|   | ||||
		Reference in New Issue
	
	Block a user