Files
ragflow_api_test/integrated_upload.py

297 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
整合上传功能
1. 上传PDF到MinIO并获取document
2. 处理TXT的chunk
3. 处理chunk中的图片链接上传图片到MinIO删除chunk中的图片链接
4. 上传chunk并获取chunk_id
5. 根据chunk_id更新Elasticsearch中的img_id
"""
import os
import re
from pathlib import Path
import sys
# 添加当前目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from ragflow_sdk import RAGFlow
from minio import Minio
from minio.error import S3Error
from elasticsearch import Elasticsearch
import chunk_operations
from markdown_image2minio import get_minio_client, upload_file_to_minio
# 配置
MINIO_BUCKET = "pdf-documents"
ELASTICSEARCH_HOST = "192.168.107.165"
ELASTICSEARCH_PORT = 1200
# RAGFlow配置
API_KEY = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj"
BASE_URL = "http://127.0.0.1:8099"
class IntegratedUploader:
def __init__(self, tenant_id, dataset_id):
self.tenant_id = tenant_id
self.dataset_id = dataset_id
self.ragflow = RAGFlow(api_key=API_KEY, base_url=BASE_URL)
self.minio_client = get_minio_client()
self.es = Elasticsearch(
[{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT, 'scheme': 'http'}],
basic_auth=('elastic', 'infini_rag_flow')
)
# 确保MinIO bucket存在
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""确保MinIO bucket存在"""
try:
if not self.minio_client.bucket_exists(MINIO_BUCKET):
self.minio_client.make_bucket(MINIO_BUCKET)
print(f"Bucket '{MINIO_BUCKET}' created")
except S3Error as e:
print(f"MinIO bucket error: {e}")
def upload_pdf_to_minio(self, pdf_path):
"""上传PDF到MinIO并返回URL"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
filename = os.path.basename(pdf_path)
object_name = f"{self.dataset_id}/{filename}"
try:
upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, pdf_path)
url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}"
print(f"PDF已上传到MinIO: {url}")
return url
except Exception as e:
print(f"上传PDF失败: {e}")
raise
def get_or_create_document(self, pdf_path, display_name=None):
"""获取或创建文档对象"""
if display_name is None:
display_name = os.path.basename(pdf_path)
try:
datasets = self.ragflow.list_datasets()
dataset = None
for ds in datasets:
if ds.id == self.dataset_id:
dataset = ds
break
if not dataset:
raise ValueError(f"数据集 {self.dataset_id} 不存在")
# 检查文档是否已存在
try:
documents = dataset.list_documents(name=display_name)
if documents:
print(f"文档已存在: {display_name}")
return documents[0]
except Exception:
pass
# 上传PDF文档
print(f"上传文档: {display_name}")
with open(pdf_path, "rb") as f:
blob = f.read()
dataset.upload_documents([{"display_name": display_name, "blob": blob}])
documents = dataset.list_documents(name=display_name)
return documents[0]
except Exception as e:
print(f"获取/创建文档失败: {e}")
raise
def extract_images_from_chunk(self, content):
"""从chunk内容中提取图片链接"""
img_pattern = r'!\[.*?\]\((.*?)\)'
return re.findall(img_pattern, content)
def remove_images_from_content(self, content):
"""从内容中移除图片链接"""
# 移除markdown图片语法 ![alt](url)
content = re.sub(r'!\[.*?\]\(.*?\)', '', content)
# 清理多余的空行
content = re.sub(r'\n\s*\n\s*\n', '\n\n', content)
return content.strip()
def upload_chunk_images(self, images, base_path, chunk_index):
"""上传chunk中的图片到MinIO"""
uploaded_images = []
img_ids = []
for idx, img_path in enumerate(images):
try:
# 处理相对路径
if not os.path.isabs(img_path):
img_abs_path = os.path.join(os.path.dirname(base_path), img_path)
else:
img_abs_path = img_path
if not os.path.exists(img_abs_path):
print(f"图片文件不存在: {img_abs_path}")
continue
# 生成图片ID和对象名称
img_id = f"{self.dataset_id}-{chunk_index:04d}-{idx:03d}"
filename = os.path.basename(img_abs_path)
ext = os.path.splitext(filename)[1] or ".jpg"
object_name = f"{self.dataset_id}/images/chunk_{chunk_index:04d}/img_{idx:03d}{ext}"
# 上传到MinIO
upload_file_to_minio(self.minio_client, MINIO_BUCKET, object_name, img_abs_path)
url = f"http://127.0.0.1:9000/{MINIO_BUCKET}/{object_name}"
uploaded_images.append({
'original_path': img_path,
'img_id': img_id,
'url': url,
'object_name': object_name
})
img_ids.append(img_id)
print(f"图片已上传: {img_path} -> {url}")
except Exception as e:
print(f"上传图片失败 {img_path}: {e}")
continue
return uploaded_images, img_ids
def process_txt_chunks(self, txt_path, document):
"""处理TXT文件中的chunks"""
if not os.path.exists(txt_path):
raise FileNotFoundError(f"TXT文件不存在: {txt_path}")
try:
with open(txt_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
print(f"读取TXT文件失败: {e}")
raise
# 按段落分割内容
chunks = content.split('\n\n')
processed_chunks = []
for chunk_index, chunk_content in enumerate(chunks):
if not chunk_content.strip():
continue
print(f"\n处理第 {chunk_index + 1} 个chunk...")
print(f"原始内容长度: {len(chunk_content)} 字符")
# 提取图片
images = self.extract_images_from_chunk(chunk_content)
print(f"找到 {len(images)} 张图片")
# 上传图片并获取img_ids
uploaded_images, img_ids = self.upload_chunk_images(
images, txt_path, chunk_index
)
# 移除图片链接后的内容
clean_content = self.remove_images_from_content(chunk_content)
print(f"清理后内容长度: {len(clean_content)} 字符")
if clean_content.strip():
try:
# 上传chunk
chunk = document.add_chunk(content=clean_content)
chunk_id = chunk.id
print(f"Chunk已添加ID: {chunk_id}")
# 如果有图片更新Elasticsearch中的img_id
if img_ids:
img_id_str = ",".join(img_ids)
result = chunk_operations.update_img_id_in_elasticsearch(
self.tenant_id, document.id, chunk_id, img_id_str
)
if result["code"] == 0:
print(f"Elasticsearch img_id已更新: {img_id_str}")
else:
print(f"更新Elasticsearch失败: {result['message']}")
processed_chunks.append({
'chunk_id': chunk_id,
'content': clean_content,
'images': uploaded_images,
'img_ids': img_ids
})
except Exception as e:
print(f"添加chunk失败: {e}")
continue
else:
print("跳过空chunk")
return processed_chunks
def upload_pdf_and_process_txt(self, pdf_path, txt_path):
"""完整的上传和处理流程"""
print("=== 开始整合上传流程 ===")
try:
# 1. 上传PDF到MinIO
pdf_url = self.upload_pdf_to_minio(pdf_path)
print(f"PDF URL: {pdf_url}")
# 2. 获取或创建文档
document = self.get_or_create_document(pdf_path)
print(f"文档ID: {document.id}")
# 3. 处理TXT chunks
processed_chunks = self.process_txt_chunks(txt_path, document)
print(f"处理完成,共 {len(processed_chunks)} 个chunks")
return {
'success': True,
'document_id': document.id,
'pdf_url': pdf_url,
'chunks': processed_chunks
}
except Exception as e:
print(f"整合上传失败: {e}")
return {
'success': False,
'error': str(e)
}
def main():
"""主函数 - 示例用法"""
# 配置参数
tenant_id = "d669205e57a211f0b9e7324e7f243034"
dataset_id = "10345832587311f0919f3a2728512a4b"
# 文件路径
pdf_path = r"G:\11\22\规范\example.pdf"
txt_path = r"G:\11\22\规范\example.txt"
# 创建上传器实例
uploader = IntegratedUploader(tenant_id, dataset_id)
# 执行上传和处理
result = uploader.upload_pdf_and_process_txt(pdf_path, txt_path)
if result['success']:
print("\n=== 上传成功 ===")
print(f"文档ID: {result['document_id']}")
print(f"PDF URL: {result['pdf_url']}")
print(f"处理chunks数: {len(result['chunks'])}")
for chunk in result['chunks']:
print(f" - Chunk ID: {chunk['chunk_id']}, 图片数: {len(chunk['images'])}")
else:
print(f"\n上传失败: {result['error']}")
if __name__ == "__main__":
main()