新增整合上传功能,支持PDF和TXT文件的上传及处理,包括图片链接的提取与上传
This commit is contained in:
		
							
								
								
									
										297
									
								
								integrated_upload.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										297
									
								
								integrated_upload.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,297 @@ | ||||
| #!/usr/bin/env python3 | ||||
| # -*- coding: utf-8 -*- | ||||
| """ | ||||
| 整合上传功能 | ||||
| 1. 上传PDF到MinIO并获取document | ||||
| 2. 处理TXT的chunk | ||||
| 3. 处理chunk中的图片链接:上传图片到MinIO,删除chunk中的图片链接 | ||||
| 4. 上传chunk并获取chunk_id | ||||
| 5. 根据chunk_id更新Elasticsearch中的img_id | ||||
| """ | ||||
|  | ||||
| import os | ||||
| import re | ||||
| from pathlib import Path | ||||
| import sys | ||||
|  | ||||
| # 添加当前目录到Python路径 | ||||
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | ||||
|  | ||||
| from ragflow_sdk import RAGFlow | ||||
| from minio import Minio | ||||
| from minio.error import S3Error | ||||
| from elasticsearch import Elasticsearch | ||||
| import chunk_operations | ||||
| from markdown_image2minio import get_minio_client, upload_file_to_minio | ||||
|  | ||||
| # 配置 | ||||
| MINIO_BUCKET = "pdf-documents" | ||||
| ELASTICSEARCH_HOST = "192.168.107.165" | ||||
| ELASTICSEARCH_PORT = 1200 | ||||
|  | ||||
| # RAGFlow配置 | ||||
| API_KEY = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj" | ||||
| BASE_URL = "http://127.0.0.1:8099" | ||||
|  | ||||
| class IntegratedUploader: | ||||
|     def __init__(self, tenant_id, dataset_id): | ||||
|         self.tenant_id = tenant_id | ||||
|         self.dataset_id = dataset_id | ||||
|         self.ragflow = RAGFlow(api_key=API_KEY, base_url=BASE_URL) | ||||
|         self.minio_client = get_minio_client() | ||||
|         self.es = Elasticsearch( | ||||
|             [{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT, 'scheme': 'http'}], | ||||
|             basic_auth=('elastic', 'infini_rag_flow') | ||||
|         ) | ||||
|          | ||||
|         # 确保MinIO bucket存在 | ||||
|         self._ensure_bucket_exists() | ||||
|      | ||||
|     def _ensure_bucket_exists(self): | ||||
|         """确保MinIO bucket存在""" | ||||
|         try: | ||||
|             if not self.minio_client.bucket_exists(MINIO_BUCKET): | ||||
|                 self.minio_client.make_bucket(MINIO_BUCKET) | ||||
|                 print(f"Bucket '{MINIO_BUCKET}' created") | ||||
|         except S3Error as e: | ||||
|             print(f"MinIO bucket error: {e}") | ||||
|      | ||||
|     def upload_pdf_to_minio(self, pdf_path): | ||||
|         """上传PDF到MinIO并返回URL""" | ||||
|         if not os.path.exists(pdf_path): | ||||
|             raise FileNotFoundError(f"PDF文件不存在: {pdf_path}") | ||||
|          | ||||
|         filename = os.path.basename(pdf_path) | ||||
|         object_name = f"{self.dataset_id}/{filename}" | ||||
|          | ||||
|         try: | ||||
|             upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, pdf_path) | ||||
|             url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}" | ||||
|             print(f"PDF已上传到MinIO: {url}") | ||||
|             return url | ||||
|         except Exception as e: | ||||
|             print(f"上传PDF失败: {e}") | ||||
|             raise | ||||
|      | ||||
|     def get_or_create_document(self, pdf_path, display_name=None): | ||||
|         """获取或创建文档对象""" | ||||
|         if display_name is None: | ||||
|             display_name = os.path.basename(pdf_path) | ||||
|          | ||||
|         try: | ||||
|             datasets = self.ragflow.list_datasets() | ||||
|             dataset = None | ||||
|             for ds in datasets: | ||||
|                 if ds.id == self.dataset_id: | ||||
|                     dataset = ds | ||||
|                     break | ||||
|              | ||||
|             if not dataset: | ||||
|                 raise ValueError(f"数据集 {self.dataset_id} 不存在") | ||||
|              | ||||
|             # 检查文档是否已存在 | ||||
|             try: | ||||
|                 documents = dataset.list_documents(name=display_name) | ||||
|                 if documents: | ||||
|                     print(f"文档已存在: {display_name}") | ||||
|                     return documents[0] | ||||
|             except Exception: | ||||
|                 pass | ||||
|              | ||||
|             # 上传PDF文档 | ||||
|             print(f"上传文档: {display_name}") | ||||
|             with open(pdf_path, "rb") as f: | ||||
|                 blob = f.read() | ||||
|              | ||||
|             dataset.upload_documents([{"display_name": display_name, "blob": blob}]) | ||||
|             documents = dataset.list_documents(name=display_name) | ||||
|             return documents[0] | ||||
|              | ||||
|         except Exception as e: | ||||
|             print(f"获取/创建文档失败: {e}") | ||||
|             raise | ||||
|      | ||||
|     def extract_images_from_chunk(self, content): | ||||
|         """从chunk内容中提取图片链接""" | ||||
|         img_pattern = r'!\[.*?\]\((.*?)\)' | ||||
|         return re.findall(img_pattern, content) | ||||
|      | ||||
|     def remove_images_from_content(self, content): | ||||
|         """从内容中移除图片链接""" | ||||
|         # 移除markdown图片语法  | ||||
|         content = re.sub(r'!\[.*?\]\(.*?\)', '', content) | ||||
|         # 清理多余的空行 | ||||
|         content = re.sub(r'\n\s*\n\s*\n', '\n\n', content) | ||||
|         return content.strip() | ||||
|      | ||||
|     def upload_chunk_images(self, images, base_path, chunk_index): | ||||
|         """上传chunk中的图片到MinIO""" | ||||
|         uploaded_images = [] | ||||
|         img_ids = [] | ||||
|          | ||||
|         for idx, img_path in enumerate(images): | ||||
|             try: | ||||
|                 # 处理相对路径 | ||||
|                 if not os.path.isabs(img_path): | ||||
|                     img_abs_path = os.path.join(os.path.dirname(base_path), img_path) | ||||
|                 else: | ||||
|                     img_abs_path = img_path | ||||
|                  | ||||
|                 if not os.path.exists(img_abs_path): | ||||
|                     print(f"图片文件不存在: {img_abs_path}") | ||||
|                     continue | ||||
|                  | ||||
|                 # 生成图片ID和对象名称 | ||||
|                 img_id = f"{self.dataset_id}-{chunk_index:04d}-{idx:03d}" | ||||
|                 filename = os.path.basename(img_abs_path) | ||||
|                 ext = os.path.splitext(filename)[1] or ".jpg" | ||||
|                 object_name = f"{self.dataset_id}/images/chunk_{chunk_index:04d}/img_{idx:03d}{ext}" | ||||
|                  | ||||
|                 # 上传到MinIO | ||||
|                 upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, img_abs_path) | ||||
|                 url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}" | ||||
|                  | ||||
|                 uploaded_images.append({ | ||||
|                     'original_path': img_path, | ||||
|                     'img_id': img_id, | ||||
|                     'url': url, | ||||
|                     'object_name': object_name | ||||
|                 }) | ||||
|                 img_ids.append(img_id) | ||||
|                  | ||||
|                 print(f"图片已上传: {img_path} -> {url}") | ||||
|                  | ||||
|             except Exception as e: | ||||
|                 print(f"上传图片失败 {img_path}: {e}") | ||||
|                 continue | ||||
|          | ||||
|         return uploaded_images, img_ids | ||||
|      | ||||
|     def process_txt_chunks(self, txt_path, document): | ||||
|         """处理TXT文件中的chunks""" | ||||
|         if not os.path.exists(txt_path): | ||||
|             raise FileNotFoundError(f"TXT文件不存在: {txt_path}") | ||||
|          | ||||
|         try: | ||||
|             with open(txt_path, 'r', encoding='utf-8') as f: | ||||
|                 content = f.read() | ||||
|         except Exception as e: | ||||
|             print(f"读取TXT文件失败: {e}") | ||||
|             raise | ||||
|          | ||||
|         # 按段落分割内容 | ||||
|         chunks = content.split('\n\n') | ||||
|         processed_chunks = [] | ||||
|          | ||||
|         for chunk_index, chunk_content in enumerate(chunks): | ||||
|             if not chunk_content.strip(): | ||||
|                 continue | ||||
|              | ||||
|             print(f"\n处理第 {chunk_index + 1} 个chunk...") | ||||
|             print(f"原始内容长度: {len(chunk_content)} 字符") | ||||
|              | ||||
|             # 提取图片 | ||||
|             images = self.extract_images_from_chunk(chunk_content) | ||||
|             print(f"找到 {len(images)} 张图片") | ||||
|              | ||||
|             # 上传图片并获取img_ids | ||||
|             uploaded_images, img_ids = self.upload_chunk_images( | ||||
|                 images, txt_path, chunk_index | ||||
|             ) | ||||
|              | ||||
|             # 移除图片链接后的内容 | ||||
|             clean_content = self.remove_images_from_content(chunk_content) | ||||
|             print(f"清理后内容长度: {len(clean_content)} 字符") | ||||
|              | ||||
|             if clean_content.strip(): | ||||
|                 try: | ||||
|                     # 上传chunk | ||||
|                     chunk = document.add_chunk(content=clean_content) | ||||
|                     chunk_id = chunk.id | ||||
|                     print(f"Chunk已添加,ID: {chunk_id}") | ||||
|                      | ||||
|                     # 如果有图片,更新Elasticsearch中的img_id | ||||
|                     if img_ids: | ||||
|                         img_id_str = ",".join(img_ids) | ||||
|                         result = chunk_operations.update_img_id_in_elasticsearch( | ||||
|                             self.tenant_id, document.id, chunk_id, img_id_str | ||||
|                         ) | ||||
|                         if result["code"] == 0: | ||||
|                             print(f"Elasticsearch img_id已更新: {img_id_str}") | ||||
|                         else: | ||||
|                             print(f"更新Elasticsearch失败: {result['message']}") | ||||
|                      | ||||
|                     processed_chunks.append({ | ||||
|                         'chunk_id': chunk_id, | ||||
|                         'content': clean_content, | ||||
|                         'images': uploaded_images, | ||||
|                         'img_ids': img_ids | ||||
|                     }) | ||||
|                      | ||||
|                 except Exception as e: | ||||
|                     print(f"添加chunk失败: {e}") | ||||
|                     continue | ||||
|             else: | ||||
|                 print("跳过空chunk") | ||||
|          | ||||
|         return processed_chunks | ||||
|      | ||||
|     def upload_pdf_and_process_txt(self, pdf_path, txt_path): | ||||
|         """完整的上传和处理流程""" | ||||
|         print("=== 开始整合上传流程 ===") | ||||
|          | ||||
|         try: | ||||
|             # 1. 上传PDF到MinIO | ||||
|             pdf_url = self.upload_pdf_to_minio(pdf_path) | ||||
|             print(f"PDF URL: {pdf_url}") | ||||
|              | ||||
|             # 2. 获取或创建文档 | ||||
|             document = self.get_or_create_document(pdf_path) | ||||
|             print(f"文档ID: {document.id}") | ||||
|              | ||||
|             # 3. 处理TXT chunks | ||||
|             processed_chunks = self.process_txt_chunks(txt_path, document) | ||||
|             print(f"处理完成,共 {len(processed_chunks)} 个chunks") | ||||
|              | ||||
|             return { | ||||
|                 'success': True, | ||||
|                 'document_id': document.id, | ||||
|                 'pdf_url': pdf_url, | ||||
|                 'chunks': processed_chunks | ||||
|             } | ||||
|              | ||||
|         except Exception as e: | ||||
|             print(f"整合上传失败: {e}") | ||||
|             return { | ||||
|                 'success': False, | ||||
|                 'error': str(e) | ||||
|             } | ||||
|  | ||||
| def main(): | ||||
|     """主函数 - 示例用法""" | ||||
|     # 配置参数 | ||||
|     tenant_id = "d669205e57a211f0b9e7324e7f243034" | ||||
|     dataset_id = "10345832587311f0919f3a2728512a4b" | ||||
|      | ||||
|     # 文件路径 | ||||
|     pdf_path = r"G:\11\22\规范\example.pdf" | ||||
|     txt_path = r"G:\11\22\规范\example.txt" | ||||
|      | ||||
|     # 创建上传器实例 | ||||
|     uploader = IntegratedUploader(tenant_id, dataset_id) | ||||
|      | ||||
|     # 执行上传和处理 | ||||
|     result = uploader.upload_pdf_and_process_txt(pdf_path, txt_path) | ||||
|      | ||||
|     if result['success']: | ||||
|         print("\n=== 上传成功 ===") | ||||
|         print(f"文档ID: {result['document_id']}") | ||||
|         print(f"PDF URL: {result['pdf_url']}") | ||||
|         print(f"处理chunks数: {len(result['chunks'])}") | ||||
|         for chunk in result['chunks']: | ||||
|             print(f"  - Chunk ID: {chunk['chunk_id']}, 图片数: {len(chunk['images'])}") | ||||
|     else: | ||||
|         print(f"\n上传失败: {result['error']}") | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
		Reference in New Issue
	
	Block a user