新增PDF和TXT文件处理功能,包括文件选择、对齐、上传和文本块处理
This commit is contained in:
		
							
								
								
									
										184
									
								
								src/add_chunk_cli_pdf_img.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										184
									
								
								src/add_chunk_cli_pdf_img.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,184 @@
 | 
				
			|||||||
 | 
					from ragflow_sdk import RAGFlow
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import re
 | 
				
			||||||
 | 
					## home
 | 
				
			||||||
 | 
					api_key = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj"
 | 
				
			||||||
 | 
					base_url = "http://127.0.0.1:8099"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 公司内网
 | 
				
			||||||
 | 
					base_url = "http://192.168.107.165:8099"
 | 
				
			||||||
 | 
					api_key = "ragflow-I5ZDNjMWNhNTdlMjExZjBiOTEwMzI0ZT"
 | 
				
			||||||
 | 
					rag_object = RAGFlow(api_key=api_key, base_url=base_url)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def choose_from_list(options, prompt):
 | 
				
			||||||
 | 
					    for idx, item in enumerate(options):
 | 
				
			||||||
 | 
					        print(f"{idx + 1}. {item}")
 | 
				
			||||||
 | 
					    while True:
 | 
				
			||||||
 | 
					        choice = input(prompt)
 | 
				
			||||||
 | 
					        if choice.isdigit() and 1 <= int(choice) <= len(options):
 | 
				
			||||||
 | 
					            return options[int(choice) - 1]
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            print("输入无效,请重新输入编号。")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def select_files(file_path, file_type="pdf"):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    选择file_path中的所有指定类型文件(默认pdf),
 | 
				
			||||||
 | 
					    返回文件路径列表
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    file_list = []
 | 
				
			||||||
 | 
					    for root, dirs, files in os.walk(file_path):
 | 
				
			||||||
 | 
					        for file in files:
 | 
				
			||||||
 | 
					            if file.lower().endswith(f".{file_type.lower()}"):
 | 
				
			||||||
 | 
					                file_list.append(os.path.join(root, file))
 | 
				
			||||||
 | 
					    return file_list
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def pair_pdf_and_txt(pdf_path, txt_path):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    将pdf和txt文件对齐,
 | 
				
			||||||
 | 
					    返回对齐pdf_dict和txt_dict,
 | 
				
			||||||
 | 
					    pdf_dict和txt_dict的key为文件名(不含后缀),value为文件路径
 | 
				
			||||||
 | 
					    txt_dict仅收入与pdf_dict中存在的文件,
 | 
				
			||||||
 | 
					    如果pdf_dict中有文件名没有对应的txt文件,则不收入txt_dict
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    pdf_files = select_files(pdf_path, "pdf")
 | 
				
			||||||
 | 
					    txt_files = select_files(txt_path, "txt")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # 构建文件名到路径的映射
 | 
				
			||||||
 | 
					    pdf_dict = {os.path.splitext(os.path.basename(f))[0]: f for f in pdf_files}
 | 
				
			||||||
 | 
					    txt_dict_all = {os.path.splitext(os.path.basename(f))[0]: f for f in txt_files}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # 只保留有对应txt的pdf
 | 
				
			||||||
 | 
					    pdf_dict_aligned = {}
 | 
				
			||||||
 | 
					    txt_dict_aligned = {}
 | 
				
			||||||
 | 
					    for name in pdf_dict:
 | 
				
			||||||
 | 
					        if name in txt_dict_all:
 | 
				
			||||||
 | 
					            pdf_dict_aligned[name] = pdf_dict[name]
 | 
				
			||||||
 | 
					            txt_dict_aligned[name] = txt_dict_all[name]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return pdf_dict_aligned, txt_dict_aligned
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def select_dataset(rag_object):
 | 
				
			||||||
 | 
					    """选择可用数据集"""
 | 
				
			||||||
 | 
					    datasets = rag_object.list_datasets()
 | 
				
			||||||
 | 
					    if not datasets:
 | 
				
			||||||
 | 
					        print("没有可用的数据集。")
 | 
				
			||||||
 | 
					        return None
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    dataset_names = [ds.name for ds in datasets]
 | 
				
			||||||
 | 
					    dataset_name = choose_from_list(dataset_names, "请选择数据集编号:")
 | 
				
			||||||
 | 
					    return [ds for ds in datasets if ds.name == dataset_name][0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def upload_or_get_document(dataset, pdf_path, display_name):
 | 
				
			||||||
 | 
					    """上传或获取已存在的文档"""
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        document = dataset.list_documents(name=display_name)[0]
 | 
				
			||||||
 | 
					        print(f"文档已存在: {display_name},跳过上传。")
 | 
				
			||||||
 | 
					        return document
 | 
				
			||||||
 | 
					    except Exception:
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            with open(pdf_path, "rb") as f:
 | 
				
			||||||
 | 
					                blob = f.read()
 | 
				
			||||||
 | 
					            dataset.upload_documents([{"display_name": display_name, "blob": blob}])
 | 
				
			||||||
 | 
					            return dataset.list_documents(name=display_name)[0]
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            print(f"上传PDF失败: {pdf_path},错误: {e}")
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def divid_txt_chunk_img(txt_chunk):
 | 
				
			||||||
 | 
					    """分离文本块中的图片链接和纯文本内容
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    输入格式示例: 
 | 
				
			||||||
 | 
					        "这是文本内容更多文本"
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    返回:
 | 
				
			||||||
 | 
					        clean_text: 移除所有图片链接后的纯文本内容
 | 
				
			||||||
 | 
					        image_paths: 提取到的图片路径列表
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    # 正则表达式匹配Markdown图片格式: 
 | 
				
			||||||
 | 
					    pattern = r'!\[.*?\]\((.*?)\)'
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # 提取所有图片路径
 | 
				
			||||||
 | 
					    image_paths = re.findall(pattern, txt_chunk)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # 移除所有图片标记
 | 
				
			||||||
 | 
					    clean_text = re.sub(pattern, '', txt_chunk)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # 移除多余空行并清理首尾空白
 | 
				
			||||||
 | 
					    clean_text = re.sub(r'\n\s*\n', '\n\n', clean_text).strip()
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    return clean_text, image_paths
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def upload_images_to_minio(image_paths, document):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    上传图片到MinIO,
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def process_txt_chunks(document, txt_path):
 | 
				
			||||||
 | 
					    """处理文本分块并添加到文档"""
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        with open(txt_path, 'r', encoding='utf-8') as file:
 | 
				
			||||||
 | 
					            file_content = file.read()
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        for num, txt_chunk in enumerate(file_content.split('\n\n')):
 | 
				
			||||||
 | 
					            if txt_chunk.strip():
 | 
				
			||||||
 | 
					                print(f"处理文本块: {txt_chunk[:30]}...")
 | 
				
			||||||
 | 
					                chunk = document.add_chunk(content=txt_chunk)
 | 
				
			||||||
 | 
					                print(f"第{num+1} Chunk添加成功! ID: {chunk.id}")
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					    except Exception as e:
 | 
				
			||||||
 | 
					        print(f"处理文本文件时出错: {txt_path},错误: {e}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset):
 | 
				
			||||||
 | 
					    """处理PDF-TXT文件对"""
 | 
				
			||||||
 | 
					    for name, pdf_path in pdf_dict.items():
 | 
				
			||||||
 | 
					        display_name = os.path.basename(pdf_path)
 | 
				
			||||||
 | 
					        document = upload_or_get_document(dataset, pdf_path, display_name)
 | 
				
			||||||
 | 
					        if not document:
 | 
				
			||||||
 | 
					            continue
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					        txt_path = txt_dict.get(name)
 | 
				
			||||||
 | 
					        if txt_path:
 | 
				
			||||||
 | 
					            process_txt_chunks(document, txt_path)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def main():
 | 
				
			||||||
 | 
					    file_path = "g:\\11\\22\\规范\\"
 | 
				
			||||||
 | 
					    pdf_dict, txt_dict = pair_pdf_and_txt(file_path, file_path)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    if not pdf_dict:
 | 
				
			||||||
 | 
					        print("未选择任何文件。")
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					    dataset = select_dataset(rag_object)
 | 
				
			||||||
 | 
					    if not dataset:
 | 
				
			||||||
 | 
					        print("未选择数据集。")
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					    process_pdf_txt_pairs(pdf_dict, txt_dict, dataset)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == "__main__":
 | 
				
			||||||
 | 
					    main()
 | 
				
			||||||
		Reference in New Issue
	
	Block a user