Compare commits
	
		
			2 Commits
		
	
	
		
			19133b203a
			...
			b4769d2ec1
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b4769d2ec1 | |||
| 4c1e031bb5 | 
							
								
								
									
										297
									
								
								integrated_upload.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										297
									
								
								integrated_upload.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,297 @@
 | 
				
			|||||||
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					# -*- coding: utf-8 -*-
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					整合上传功能
 | 
				
			||||||
 | 
					1. 上传PDF到MinIO并获取document
 | 
				
			||||||
 | 
					2. 处理TXT的chunk
 | 
				
			||||||
 | 
					3. 处理chunk中的图片链接:上传图片到MinIO,删除chunk中的图片链接
 | 
				
			||||||
 | 
					4. 上传chunk并获取chunk_id
 | 
				
			||||||
 | 
					5. 根据chunk_id更新Elasticsearch中的img_id
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import re
 | 
				
			||||||
 | 
					from pathlib import Path
 | 
				
			||||||
 | 
					import sys
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# 添加当前目录到Python路径
 | 
				
			||||||
 | 
					sys.path.append(os.path.dirname(os.path.abspath(__file__)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from ragflow_sdk import RAGFlow
 | 
				
			||||||
 | 
					from minio import Minio
 | 
				
			||||||
 | 
					from minio.error import S3Error
 | 
				
			||||||
 | 
					from elasticsearch import Elasticsearch
 | 
				
			||||||
 | 
					import chunk_operations
 | 
				
			||||||
 | 
					from markdown_image2minio import get_minio_client, upload_file_to_minio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# 配置
 | 
				
			||||||
 | 
					MINIO_BUCKET = "pdf-documents"
 | 
				
			||||||
 | 
					ELASTICSEARCH_HOST = "192.168.107.165"
 | 
				
			||||||
 | 
					ELASTICSEARCH_PORT = 1200
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# RAGFlow配置
 | 
				
			||||||
 | 
					API_KEY = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj"
 | 
				
			||||||
 | 
					BASE_URL = "http://127.0.0.1:8099"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class IntegratedUploader:
 | 
				
			||||||
 | 
					    def __init__(self, tenant_id, dataset_id):
 | 
				
			||||||
 | 
					        self.tenant_id = tenant_id
 | 
				
			||||||
 | 
					        self.dataset_id = dataset_id
 | 
				
			||||||
 | 
					        self.ragflow = RAGFlow(api_key=API_KEY, base_url=BASE_URL)
 | 
				
			||||||
 | 
					        self.minio_client = get_minio_client()
 | 
				
			||||||
 | 
					        self.es = Elasticsearch(
 | 
				
			||||||
 | 
					            [{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT, 'scheme': 'http'}],
 | 
				
			||||||
 | 
					            basic_auth=('elastic', 'infini_rag_flow')
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # 确保MinIO bucket存在
 | 
				
			||||||
 | 
					        self._ensure_bucket_exists()
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def _ensure_bucket_exists(self):
 | 
				
			||||||
 | 
					        """确保MinIO bucket存在"""
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            if not self.minio_client.bucket_exists(MINIO_BUCKET):
 | 
				
			||||||
 | 
					                self.minio_client.make_bucket(MINIO_BUCKET)
 | 
				
			||||||
 | 
					                print(f"Bucket '{MINIO_BUCKET}' created")
 | 
				
			||||||
 | 
					        except S3Error as e:
 | 
				
			||||||
 | 
					            print(f"MinIO bucket error: {e}")
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def upload_pdf_to_minio(self, pdf_path):
 | 
				
			||||||
 | 
					        """上传PDF到MinIO并返回URL"""
 | 
				
			||||||
 | 
					        if not os.path.exists(pdf_path):
 | 
				
			||||||
 | 
					            raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        filename = os.path.basename(pdf_path)
 | 
				
			||||||
 | 
					        object_name = f"{self.dataset_id}/{filename}"
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, pdf_path)
 | 
				
			||||||
 | 
					            url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}"
 | 
				
			||||||
 | 
					            print(f"PDF已上传到MinIO: {url}")
 | 
				
			||||||
 | 
					            return url
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            print(f"上传PDF失败: {e}")
 | 
				
			||||||
 | 
					            raise
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def get_or_create_document(self, pdf_path, display_name=None):
 | 
				
			||||||
 | 
					        """获取或创建文档对象"""
 | 
				
			||||||
 | 
					        if display_name is None:
 | 
				
			||||||
 | 
					            display_name = os.path.basename(pdf_path)
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            datasets = self.ragflow.list_datasets()
 | 
				
			||||||
 | 
					            dataset = None
 | 
				
			||||||
 | 
					            for ds in datasets:
 | 
				
			||||||
 | 
					                if ds.id == self.dataset_id:
 | 
				
			||||||
 | 
					                    dataset = ds
 | 
				
			||||||
 | 
					                    break
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            if not dataset:
 | 
				
			||||||
 | 
					                raise ValueError(f"数据集 {self.dataset_id} 不存在")
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 检查文档是否已存在
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                documents = dataset.list_documents(name=display_name)
 | 
				
			||||||
 | 
					                if documents:
 | 
				
			||||||
 | 
					                    print(f"文档已存在: {display_name}")
 | 
				
			||||||
 | 
					                    return documents[0]
 | 
				
			||||||
 | 
					            except Exception:
 | 
				
			||||||
 | 
					                pass
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 上传PDF文档
 | 
				
			||||||
 | 
					            print(f"上传文档: {display_name}")
 | 
				
			||||||
 | 
					            with open(pdf_path, "rb") as f:
 | 
				
			||||||
 | 
					                blob = f.read()
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            dataset.upload_documents([{"display_name": display_name, "blob": blob}])
 | 
				
			||||||
 | 
					            documents = dataset.list_documents(name=display_name)
 | 
				
			||||||
 | 
					            return documents[0]
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            print(f"获取/创建文档失败: {e}")
 | 
				
			||||||
 | 
					            raise
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def extract_images_from_chunk(self, content):
 | 
				
			||||||
 | 
					        """从chunk内容中提取图片链接"""
 | 
				
			||||||
 | 
					        img_pattern = r'!\[.*?\]\((.*?)\)'
 | 
				
			||||||
 | 
					        return re.findall(img_pattern, content)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def remove_images_from_content(self, content):
 | 
				
			||||||
 | 
					        """从内容中移除图片链接"""
 | 
				
			||||||
 | 
					        # 移除markdown图片语法 
 | 
				
			||||||
 | 
					        content = re.sub(r'!\[.*?\]\(.*?\)', '', content)
 | 
				
			||||||
 | 
					        # 清理多余的空行
 | 
				
			||||||
 | 
					        content = re.sub(r'\n\s*\n\s*\n', '\n\n', content)
 | 
				
			||||||
 | 
					        return content.strip()
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def upload_chunk_images(self, images, base_path, chunk_index):
 | 
				
			||||||
 | 
					        """上传chunk中的图片到MinIO"""
 | 
				
			||||||
 | 
					        uploaded_images = []
 | 
				
			||||||
 | 
					        img_ids = []
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        for idx, img_path in enumerate(images):
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                # 处理相对路径
 | 
				
			||||||
 | 
					                if not os.path.isabs(img_path):
 | 
				
			||||||
 | 
					                    img_abs_path = os.path.join(os.path.dirname(base_path), img_path)
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    img_abs_path = img_path
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					                if not os.path.exists(img_abs_path):
 | 
				
			||||||
 | 
					                    print(f"图片文件不存在: {img_abs_path}")
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					                # 生成图片ID和对象名称
 | 
				
			||||||
 | 
					                img_id = f"{self.dataset_id}-{chunk_index:04d}-{idx:03d}"
 | 
				
			||||||
 | 
					                filename = os.path.basename(img_abs_path)
 | 
				
			||||||
 | 
					                ext = os.path.splitext(filename)[1] or ".jpg"
 | 
				
			||||||
 | 
					                object_name = f"{self.dataset_id}/images/chunk_{chunk_index:04d}/img_{idx:03d}{ext}"
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					                # 上传到MinIO
 | 
				
			||||||
 | 
					                upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, img_abs_path)
 | 
				
			||||||
 | 
					                url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}"
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					                uploaded_images.append({
 | 
				
			||||||
 | 
					                    'original_path': img_path,
 | 
				
			||||||
 | 
					                    'img_id': img_id,
 | 
				
			||||||
 | 
					                    'url': url,
 | 
				
			||||||
 | 
					                    'object_name': object_name
 | 
				
			||||||
 | 
					                })
 | 
				
			||||||
 | 
					                img_ids.append(img_id)
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					                print(f"图片已上传: {img_path} -> {url}")
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					            except Exception as e:
 | 
				
			||||||
 | 
					                print(f"上传图片失败 {img_path}: {e}")
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        return uploaded_images, img_ids
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def process_txt_chunks(self, txt_path, document):
 | 
				
			||||||
 | 
					        """处理TXT文件中的chunks"""
 | 
				
			||||||
 | 
					        if not os.path.exists(txt_path):
 | 
				
			||||||
 | 
					            raise FileNotFoundError(f"TXT文件不存在: {txt_path}")
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            with open(txt_path, 'r', encoding='utf-8') as f:
 | 
				
			||||||
 | 
					                content = f.read()
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            print(f"读取TXT文件失败: {e}")
 | 
				
			||||||
 | 
					            raise
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # 按段落分割内容
 | 
				
			||||||
 | 
					        chunks = content.split('\n\n')
 | 
				
			||||||
 | 
					        processed_chunks = []
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        for chunk_index, chunk_content in enumerate(chunks):
 | 
				
			||||||
 | 
					            if not chunk_content.strip():
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            print(f"\n处理第 {chunk_index + 1} 个chunk...")
 | 
				
			||||||
 | 
					            print(f"原始内容长度: {len(chunk_content)} 字符")
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 提取图片
 | 
				
			||||||
 | 
					            images = self.extract_images_from_chunk(chunk_content)
 | 
				
			||||||
 | 
					            print(f"找到 {len(images)} 张图片")
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 上传图片并获取img_ids
 | 
				
			||||||
 | 
					            uploaded_images, img_ids = self.upload_chunk_images(
 | 
				
			||||||
 | 
					                images, txt_path, chunk_index
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 移除图片链接后的内容
 | 
				
			||||||
 | 
					            clean_content = self.remove_images_from_content(chunk_content)
 | 
				
			||||||
 | 
					            print(f"清理后内容长度: {len(clean_content)} 字符")
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            if clean_content.strip():
 | 
				
			||||||
 | 
					                try:
 | 
				
			||||||
 | 
					                    # 上传chunk
 | 
				
			||||||
 | 
					                    chunk = document.add_chunk(content=clean_content)
 | 
				
			||||||
 | 
					                    chunk_id = chunk.id
 | 
				
			||||||
 | 
					                    print(f"Chunk已添加,ID: {chunk_id}")
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    # 如果有图片,更新Elasticsearch中的img_id
 | 
				
			||||||
 | 
					                    if img_ids:
 | 
				
			||||||
 | 
					                        img_id_str = ",".join(img_ids)
 | 
				
			||||||
 | 
					                        result = chunk_operations.update_img_id_in_elasticsearch(
 | 
				
			||||||
 | 
					                            self.tenant_id, document.id, chunk_id, img_id_str
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
 | 
					                        if result["code"] == 0:
 | 
				
			||||||
 | 
					                            print(f"Elasticsearch img_id已更新: {img_id_str}")
 | 
				
			||||||
 | 
					                        else:
 | 
				
			||||||
 | 
					                            print(f"更新Elasticsearch失败: {result['message']}")
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    processed_chunks.append({
 | 
				
			||||||
 | 
					                        'chunk_id': chunk_id,
 | 
				
			||||||
 | 
					                        'content': clean_content,
 | 
				
			||||||
 | 
					                        'images': uploaded_images,
 | 
				
			||||||
 | 
					                        'img_ids': img_ids
 | 
				
			||||||
 | 
					                    })
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                except Exception as e:
 | 
				
			||||||
 | 
					                    print(f"添加chunk失败: {e}")
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                print("跳过空chunk")
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        return processed_chunks
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def upload_pdf_and_process_txt(self, pdf_path, txt_path):
 | 
				
			||||||
 | 
					        """完整的上传和处理流程"""
 | 
				
			||||||
 | 
					        print("=== 开始整合上传流程 ===")
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            # 1. 上传PDF到MinIO
 | 
				
			||||||
 | 
					            pdf_url = self.upload_pdf_to_minio(pdf_path)
 | 
				
			||||||
 | 
					            print(f"PDF URL: {pdf_url}")
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 2. 获取或创建文档
 | 
				
			||||||
 | 
					            document = self.get_or_create_document(pdf_path)
 | 
				
			||||||
 | 
					            print(f"文档ID: {document.id}")
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 3. 处理TXT chunks
 | 
				
			||||||
 | 
					            processed_chunks = self.process_txt_chunks(txt_path, document)
 | 
				
			||||||
 | 
					            print(f"处理完成,共 {len(processed_chunks)} 个chunks")
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            return {
 | 
				
			||||||
 | 
					                'success': True,
 | 
				
			||||||
 | 
					                'document_id': document.id,
 | 
				
			||||||
 | 
					                'pdf_url': pdf_url,
 | 
				
			||||||
 | 
					                'chunks': processed_chunks
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            print(f"整合上传失败: {e}")
 | 
				
			||||||
 | 
					            return {
 | 
				
			||||||
 | 
					                'success': False,
 | 
				
			||||||
 | 
					                'error': str(e)
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def main():
 | 
				
			||||||
 | 
					    """主函数 - 示例用法"""
 | 
				
			||||||
 | 
					    # 配置参数
 | 
				
			||||||
 | 
					    tenant_id = "d669205e57a211f0b9e7324e7f243034"
 | 
				
			||||||
 | 
					    dataset_id = "10345832587311f0919f3a2728512a4b"
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # 文件路径
 | 
				
			||||||
 | 
					    pdf_path = r"G:\11\22\规范\example.pdf"
 | 
				
			||||||
 | 
					    txt_path = r"G:\11\22\规范\example.txt"
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # 创建上传器实例
 | 
				
			||||||
 | 
					    uploader = IntegratedUploader(tenant_id, dataset_id)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # 执行上传和处理
 | 
				
			||||||
 | 
					    result = uploader.upload_pdf_and_process_txt(pdf_path, txt_path)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    if result['success']:
 | 
				
			||||||
 | 
					        print("\n=== 上传成功 ===")
 | 
				
			||||||
 | 
					        print(f"文档ID: {result['document_id']}")
 | 
				
			||||||
 | 
					        print(f"PDF URL: {result['pdf_url']}")
 | 
				
			||||||
 | 
					        print(f"处理chunks数: {len(result['chunks'])}")
 | 
				
			||||||
 | 
					        for chunk in result['chunks']:
 | 
				
			||||||
 | 
					            print(f"  - Chunk ID: {chunk['chunk_id']}, 图片数: {len(chunk['images'])}")
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        print(f"\n上传失败: {result['error']}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == "__main__":
 | 
				
			||||||
 | 
					    main()
 | 
				
			||||||
		Reference in New Issue
	
	Block a user