实现pdf-img-chunk完整功能,从环境变量加载配置,新增网络图片下载功能,并优化文本块处理逻辑

This commit is contained in:
2025-07-23 17:13:38 +08:00
parent a0872e5eac
commit d8f2a26ecc

View File

@@ -1,103 +1,118 @@
from ragflow_sdk import RAGFlow
import os
import re
## home
api_key = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj"
base_url = "http://127.0.0.1:8099"
## 公司内网
# base_url = "http://192.168.107.165:8099"
# api_key = "ragflow-I5ZDNjMWNhNTdlMjExZjBiOTEwMzI0ZT"
elastic_tenant_id = "9c73df5a3ebc11f08410c237296aa408"
rag_object = RAGFlow(api_key=api_key, base_url=base_url)
elastic_url = "127.0.0.1"
# 在文件顶部添加新依赖
import requests
#from urllib.parse import urlparse
import tempfile
from elasticsearch import Elasticsearch
from minio import Minio
from minio.error import S3Error
# 初始化 Elasticsearch 用户名elastic密码infini_rag_flow
from dotenv import load_dotenv # 新增
# 加载 .env 文件中的环境变量
load_dotenv()
# 从环境变量初始化配置
api_key = os.getenv("RAGFLOW_API_KEY")
base_url = os.getenv("RAGFLOW_BASE_URL")
elastic_tenant_id = os.getenv("ELASTIC_TENANT_ID")
# 初始化 RAGFlow
rag_object = RAGFlow(api_key=api_key, base_url=base_url)
# 初始化 Elasticsearch
es = Elasticsearch(
[{'host': elastic_url, 'port': 1200, 'scheme': 'http'}],
basic_auth=('elastic', 'infini_rag_flow')
[{
'host': os.getenv("ELASTIC_HOST"),
'port': int(os.getenv("ELASTIC_PORT")),
'scheme': 'http'
}],
basic_auth=(
os.getenv("ELASTIC_USERNAME"),
os.getenv("ELASTIC_PASSWORD")
)
)
# MinIO 配置
MINIO_CONFIG = {
"endpoint": f"{os.getenv('MINIO_HOST')}:{os.getenv('MINIO_PORT')}",
"access_key": os.getenv("MINIO_USER"),
"secret_key": os.getenv("MINIO_PASSWORD"),
"secure": False
}
def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id):
"""
在 Elasticsearch 中更新指定文档块的 img_id。
如果img_id不存在则增加一个新的 img_id。
:param tenant_id: 租户 ID
:param doc_id: 文档 ID
:param chunk_id: 文档块 ID
:param new_img_id: 新的 img_id
:return: 更新结果
"""
# 构建索引名称
index_name = f"ragflow_{tenant_id}" # 这里需要替换为实际的索引名称生成逻辑
try:
# 构建索引名称
index_name = f"ragflow_{tenant_id}"
# 构建查询条件
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
update_body = {
"doc": {
"img_id": new_img_id
}
# 构建查询条件
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 更新文档
update_result = es.update(index=index_name, id=doc_id_in_es, body=update_body)
print("更新结果:", update_result)
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
if update_result['result'] == 'updated':
return {"code": 0, "message": ""}
else:
return {"code": 100, "message": "Failed to update img_id"}
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
# 构建更新请求
update_body = {
"doc": {
"img_id": new_img_id
}
}
# 更新文档
update_result = es.update(
index=index_name,
id=doc_id_in_es,
body=update_body,
refresh=True # 确保更新立即可见
)
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
# 验证更新
verify_doc = es.get(index=index_name, id=doc_id_in_es)
if verify_doc['_source'].get('img_id') == new_img_id:
print(f"成功更新 img_id 为: {new_img_id}")
return {"code": 0, "message": ""}
else:
print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}")
return {"code": 100, "message": "Failed to verify img_id update"}
except Exception as e:
print(f"更新 Elasticsearch 时发生错误: {str(e)}")
return {"code": 101, "message": f"Error updating img_id: {str(e)}"}
from minio import Minio
from minio.error import S3Error
MINIO_HOST="127.0.0.1"
MINIO_CONFIG = {
"endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}",
"access_key": os.getenv("MINIO_USER", "rag_flow"),
"secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"),
"secure": False
}
def get_minio_client():
"""创建MinIO客户端"""
return Minio(
@@ -275,52 +290,76 @@ def remove_images_from_content( content):
def process_txt_chunks( dataset_id, document, txt_path):
"""处理文本分块并添加到文档
dataset_id = kb_id
"""
# 修改 process_txt_chunks 函数中的图片处理逻辑
def process_txt_chunks(dataset_id, document, txt_path):
try:
with open(txt_path, 'r', encoding='utf-8') as file:
file_content = file.read()
img_chunk_ids = []
for num, txt_chunk in enumerate(file_content.split('\n\n')):
if txt_chunk.strip():
print(f"处理文本块: {txt_chunk[:30]}...")
img_urls= extract_images_from_chunk(txt_chunk)
img_urls = extract_images_from_chunk(txt_chunk)
img_url = img_urls[0] if img_urls else None
if img_url:
print(f"检测到图片链接: {img_url}")
# 清楚图片链接
clean_chunk = remove_images_from_content(txt_chunk)
chunk = document.add_chunk(content=clean_chunk)
# 判断是相对路径还是绝对路径
if not os.path.isabs(img_url):
img_abs_path = os.path.join(os.path.dirname(txt_path), img_url)
# 判断是否为网络图片 (新增逻辑)
if img_url.startswith(('http://', 'https://')):
# 下载网络图片到临时文件
try:
response = requests.get(img_url)
response.raise_for_status()
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file:
tmp_file.write(response.content)
tmp_path = tmp_file.name
# 上传临时文件
if upload_file2minio(dataset_id, chunk.id, tmp_path):
img_chunk_ids.append(chunk.id)
# new_img_id = f"{dataset_id}-{chunk.id}"
# print(f"网络图片 {img_url} 已下载并上传,新的 img_id: {new_img_id}")
# update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id)
# 删除临时文件
os.unlink(tmp_path)
except Exception as e:
print(f"下载网络图片失败: {e}")
else:
img_abs_path = img_url
# 处理本地图片 (原逻辑)
if not os.path.isabs(img_url):
img_abs_path = os.path.join(os.path.dirname(txt_path), img_url)
else:
img_abs_path = img_url
print(f"图片绝对路径: {img_abs_path}")
if not os.path.exists(img_abs_path):
print(f"图片未找到: {img_abs_path},跳过。")
continue
else:
if(upload_file2minio(dataset_id, chunk.id, img_abs_path)):
new_img_id = f"{dataset_id}-{chunk.id}"
print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}")
update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id)
if os.path.exists(img_abs_path):
if upload_file2minio(dataset_id, chunk.id, img_abs_path):
img_chunk_ids.append(chunk.id)
# new_img_id = f"{dataset_id}-{chunk.id}"
# print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}")
# update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id)
else:
print(f"图片未找到: {img_abs_path},跳过。")
else:
print("未检测到图片链接,直接添加文本块。")
chunk = document.add_chunk(content=txt_chunk)
print(f"{num+1} Chunk添加成功! ID: {chunk.id}")
for img_chunk_id in img_chunk_ids:
update_img_id_in_elasticsearch(elastic_tenant_id, document.id, img_chunk_id, f"{dataset_id}-{img_chunk_id}")
except Exception as e:
print(f"处理文本文件时出错: {txt_path},错误: {e}")
def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset):
"""处理PDF-TXT文件对"""
for name, pdf_path in pdf_dict.items():
@@ -341,7 +380,7 @@ def main():
dataset.id = bucket_name
chunk_id = object_name
"""
file_path = "g:\\11\\22\\test\\"
file_path = "F:\\Synology_nas\\SynologyDrive\\大模型\\厦门市城市道路开口设置指引DB3502T 141-2024\\"
pdf_dict, txt_dict = pair_pdf_and_txt(file_path, file_path)
if not pdf_dict: