diff --git a/integrated_upload.py b/integrated_upload.py new file mode 100644 index 0000000..5f74f21 --- /dev/null +++ b/integrated_upload.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +整合上传功能 +1. 上传PDF到MinIO并获取document +2. 处理TXT的chunk +3. 处理chunk中的图片链接:上传图片到MinIO,删除chunk中的图片链接 +4. 上传chunk并获取chunk_id +5. 根据chunk_id更新Elasticsearch中的img_id +""" + +import os +import re +from pathlib import Path +import sys + +# 添加当前目录到Python路径 +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from ragflow_sdk import RAGFlow +from minio import Minio +from minio.error import S3Error +from elasticsearch import Elasticsearch +import chunk_operations +from markdown_image2minio import get_minio_client, upload_file_to_minio + +# 配置 +MINIO_BUCKET = "pdf-documents" +ELASTICSEARCH_HOST = "192.168.107.165" +ELASTICSEARCH_PORT = 1200 + +# RAGFlow配置 +API_KEY = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj" +BASE_URL = "http://127.0.0.1:8099" + +class IntegratedUploader: + def __init__(self, tenant_id, dataset_id): + self.tenant_id = tenant_id + self.dataset_id = dataset_id + self.ragflow = RAGFlow(api_key=API_KEY, base_url=BASE_URL) + self.minio_client = get_minio_client() + self.es = Elasticsearch( + [{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT, 'scheme': 'http'}], + basic_auth=('elastic', 'infini_rag_flow') + ) + + # 确保MinIO bucket存在 + self._ensure_bucket_exists() + + def _ensure_bucket_exists(self): + """确保MinIO bucket存在""" + try: + if not self.minio_client.bucket_exists(MINIO_BUCKET): + self.minio_client.make_bucket(MINIO_BUCKET) + print(f"Bucket '{MINIO_BUCKET}' created") + except S3Error as e: + print(f"MinIO bucket error: {e}") + + def upload_pdf_to_minio(self, pdf_path): + """上传PDF到MinIO并返回URL""" + if not os.path.exists(pdf_path): + raise FileNotFoundError(f"PDF文件不存在: {pdf_path}") + + filename = os.path.basename(pdf_path) + object_name = f"{self.dataset_id}/{filename}" + + try: + upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, pdf_path) + url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}" + print(f"PDF已上传到MinIO: {url}") + return url + except Exception as e: + print(f"上传PDF失败: {e}") + raise + + def get_or_create_document(self, pdf_path, display_name=None): + """获取或创建文档对象""" + if display_name is None: + display_name = os.path.basename(pdf_path) + + try: + datasets = self.ragflow.list_datasets() + dataset = None + for ds in datasets: + if ds.id == self.dataset_id: + dataset = ds + break + + if not dataset: + raise ValueError(f"数据集 {self.dataset_id} 不存在") + + # 检查文档是否已存在 + try: + documents = dataset.list_documents(name=display_name) + if documents: + print(f"文档已存在: {display_name}") + return documents[0] + except Exception: + pass + + # 上传PDF文档 + print(f"上传文档: {display_name}") + with open(pdf_path, "rb") as f: + blob = f.read() + + dataset.upload_documents([{"display_name": display_name, "blob": blob}]) + documents = dataset.list_documents(name=display_name) + return documents[0] + + except Exception as e: + print(f"获取/创建文档失败: {e}") + raise + + def extract_images_from_chunk(self, content): + """从chunk内容中提取图片链接""" + img_pattern = r'!\[.*?\]\((.*?)\)' + return re.findall(img_pattern, content) + + def remove_images_from_content(self, content): + """从内容中移除图片链接""" + # 移除markdown图片语法 ![alt](url) + content = re.sub(r'!\[.*?\]\(.*?\)', '', content) + # 清理多余的空行 + content = re.sub(r'\n\s*\n\s*\n', '\n\n', content) + return content.strip() + + def upload_chunk_images(self, images, base_path, chunk_index): + """上传chunk中的图片到MinIO""" + uploaded_images = [] + img_ids = [] + + for idx, img_path in enumerate(images): + try: + # 处理相对路径 + if not os.path.isabs(img_path): + img_abs_path = os.path.join(os.path.dirname(base_path), img_path) + else: + img_abs_path = img_path + + if not os.path.exists(img_abs_path): + print(f"图片文件不存在: {img_abs_path}") + continue + + # 生成图片ID和对象名称 + img_id = f"{self.dataset_id}-{chunk_index:04d}-{idx:03d}" + filename = os.path.basename(img_abs_path) + ext = os.path.splitext(filename)[1] or ".jpg" + object_name = f"{self.dataset_id}/images/chunk_{chunk_index:04d}/img_{idx:03d}{ext}" + + # 上传到MinIO + upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, img_abs_path) + url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}" + + uploaded_images.append({ + 'original_path': img_path, + 'img_id': img_id, + 'url': url, + 'object_name': object_name + }) + img_ids.append(img_id) + + print(f"图片已上传: {img_path} -> {url}") + + except Exception as e: + print(f"上传图片失败 {img_path}: {e}") + continue + + return uploaded_images, img_ids + + def process_txt_chunks(self, txt_path, document): + """处理TXT文件中的chunks""" + if not os.path.exists(txt_path): + raise FileNotFoundError(f"TXT文件不存在: {txt_path}") + + try: + with open(txt_path, 'r', encoding='utf-8') as f: + content = f.read() + except Exception as e: + print(f"读取TXT文件失败: {e}") + raise + + # 按段落分割内容 + chunks = content.split('\n\n') + processed_chunks = [] + + for chunk_index, chunk_content in enumerate(chunks): + if not chunk_content.strip(): + continue + + print(f"\n处理第 {chunk_index + 1} 个chunk...") + print(f"原始内容长度: {len(chunk_content)} 字符") + + # 提取图片 + images = self.extract_images_from_chunk(chunk_content) + print(f"找到 {len(images)} 张图片") + + # 上传图片并获取img_ids + uploaded_images, img_ids = self.upload_chunk_images( + images, txt_path, chunk_index + ) + + # 移除图片链接后的内容 + clean_content = self.remove_images_from_content(chunk_content) + print(f"清理后内容长度: {len(clean_content)} 字符") + + if clean_content.strip(): + try: + # 上传chunk + chunk = document.add_chunk(content=clean_content) + chunk_id = chunk.id + print(f"Chunk已添加,ID: {chunk_id}") + + # 如果有图片,更新Elasticsearch中的img_id + if img_ids: + img_id_str = ",".join(img_ids) + result = chunk_operations.update_img_id_in_elasticsearch( + self.tenant_id, document.id, chunk_id, img_id_str + ) + if result["code"] == 0: + print(f"Elasticsearch img_id已更新: {img_id_str}") + else: + print(f"更新Elasticsearch失败: {result['message']}") + + processed_chunks.append({ + 'chunk_id': chunk_id, + 'content': clean_content, + 'images': uploaded_images, + 'img_ids': img_ids + }) + + except Exception as e: + print(f"添加chunk失败: {e}") + continue + else: + print("跳过空chunk") + + return processed_chunks + + def upload_pdf_and_process_txt(self, pdf_path, txt_path): + """完整的上传和处理流程""" + print("=== 开始整合上传流程 ===") + + try: + # 1. 上传PDF到MinIO + pdf_url = self.upload_pdf_to_minio(pdf_path) + print(f"PDF URL: {pdf_url}") + + # 2. 获取或创建文档 + document = self.get_or_create_document(pdf_path) + print(f"文档ID: {document.id}") + + # 3. 处理TXT chunks + processed_chunks = self.process_txt_chunks(txt_path, document) + print(f"处理完成,共 {len(processed_chunks)} 个chunks") + + return { + 'success': True, + 'document_id': document.id, + 'pdf_url': pdf_url, + 'chunks': processed_chunks + } + + except Exception as e: + print(f"整合上传失败: {e}") + return { + 'success': False, + 'error': str(e) + } + +def main(): + """主函数 - 示例用法""" + # 配置参数 + tenant_id = "d669205e57a211f0b9e7324e7f243034" + dataset_id = "10345832587311f0919f3a2728512a4b" + + # 文件路径 + pdf_path = r"G:\11\22\规范\example.pdf" + txt_path = r"G:\11\22\规范\example.txt" + + # 创建上传器实例 + uploader = IntegratedUploader(tenant_id, dataset_id) + + # 执行上传和处理 + result = uploader.upload_pdf_and_process_txt(pdf_path, txt_path) + + if result['success']: + print("\n=== 上传成功 ===") + print(f"文档ID: {result['document_id']}") + print(f"PDF URL: {result['pdf_url']}") + print(f"处理chunks数: {len(result['chunks'])}") + for chunk in result['chunks']: + print(f" - Chunk ID: {chunk['chunk_id']}, 图片数: {len(chunk['images'])}") + else: + print(f"\n上传失败: {result['error']}") + +if __name__ == "__main__": + main() \ No newline at end of file