新增批量更新Elasticsearch文档的功能,优化process_pdf_txt_pairs函数以提高处理效率

This commit is contained in:
2025-08-08 17:20:45 +08:00
parent 1c23d272bb
commit 51f24ced05

View File

@@ -12,7 +12,7 @@ from minio.error import S3Error
from find_text_in_pdf_enhanced import find_text_in_pdf
import time
# from get_pos_pdf import smart_fuzzy_find_text_batch, find_text_positions_batch
from dotenv import load_dotenv # 新增
@@ -49,6 +49,148 @@ MINIO_CONFIG = {
"secure": False
}
from elasticsearch.helpers import bulk
def bulk_update_elasticsearch(tenant_id, updates):
"""
批量更新Elasticsearch中的文档
:param tenant_id: 租户ID
:param updates: 更新信息列表每个元素包含doc_id, chunk_id, positions, new_img_id
:return: 更新结果
"""
try:
index_name = f"ragflow_{tenant_id}"
# 构建批量操作列表
actions = []
for update_info in updates:
doc_id = update_info['doc_id']
chunk_id = update_info['chunk_id']
positions = update_info.get('positions', [])
new_img_id = update_info.get('new_img_id')
# 构建查询条件来找到文档
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
continue
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
# 构建更新请求 - 只更新存在的字段
doc_update = {}
# 只有当 new_img_id 存在时才更新 img_id
if new_img_id is not None:
doc_update["img_id"] = new_img_id
# 只有当 positions 存在时才更新 positions
if positions:
position_int = []
for pos in positions:
if len(pos) != 5:
continue # Skip invalid positions
pn, left, right, top, bottom = pos
# 使用元组格式与原始RAGFlow保持一致
position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom)))
if position_int:
doc_update["position_int"] = position_int
doc_update["page_num_int"] = [position_int[0][0]]
doc_update["top_int"] = [position_int[0][3]]
# 如果没有需要更新的字段,跳过
if not doc_update:
print(f"没有需要更新的字段 for chunk {chunk_id}")
continue
# 添加到批量操作列表
action = {
"_op_type": "update",
"_index": index_name,
"_id": doc_id_in_es,
"doc": doc_update
}
actions.append(action)
# 执行批量更新
if actions:
results = bulk(es, actions, refresh=True)
print(f"批量更新完成,成功处理 {results[0]} 个操作")
return {"code": 0, "message": f"Successfully updated {results[0]} documents"}
else:
print("没有需要执行的更新操作")
return {"code": 0, "message": "No updates to perform"}
except Exception as e:
print(f"批量更新 Elasticsearch 时发生错误: {str(e)}")
return {"code": 101, "message": f"Error in bulk update: {str(e)}"}
# 修改 process_pdf_txt_pairs 函数以使用批量更新
def process_pdf_txt_pairs_bulk(pdf_dict, txt_dict, dataset):
"""处理PDF-TXT文件对使用批量更新提高效率"""
# 收集所有需要更新的信息
all_updates = []
for name, pdf_path in pdf_dict.items():
display_name = os.path.basename(pdf_path)
document = upload_or_get_document(dataset, pdf_path, display_name)
print(f"选择的文档: {document.name}ID: {document.id}")
if not document:
continue
txt_path = txt_dict.get(name)
if txt_path:
chunks_info = process_txt_chunks(dataset.id, document, txt_path)
time.sleep(1) # 等待chunk处理完成
if chunks_info:
chunks_info = get_positions_from_chunk(pdf_path, chunks_info)
# 收集更新信息而不是立即更新
for chunk_info in chunks_info:
print(f"Chunk ID: {chunk_info['id']}, Text: {chunk_info['text'][:30]}..., Has Image: {chunk_info['has_image']}, Positions: {chunk_info['positions']}")
update_info = {
'doc_id': document.id,
'chunk_id': chunk_info['id'],
'positions': chunk_info['positions']
}
if chunk_info['has_image']:
# 如果有图片准备更新img_id
update_info['new_img_id'] = f"{dataset.id}-{chunk_info['id']}"
# 如果没有图片new_img_id为None不会更新img_id字段
all_updates.append(update_info)
# 执行批量更新
if all_updates:
result = bulk_update_elasticsearch(elastic_tenant_id, all_updates)
print(f"批量更新结果: {result}")
def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, positions, new_img_id):
"""
在 Elasticsearch 中更新指定文档块的position and img_id。
@@ -109,12 +251,7 @@ def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position
if position_int:
update_body["doc"]["position_int"] = position_int
update_body["doc"]["page_num_int"] = [position_int[0][0]]
update_body["doc"]["top_int"] = [position_int[0][3]]
update_body["doc"]["top_int"] = [position_int[0][3]]
# 如果没有需要更新的字段,直接返回成功
@@ -132,32 +269,7 @@ def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
# # 验证更新
# verify_doc = es.get(index=index_name, id=doc_id_in_es)
# # 检查 img_id 是否已更新(如果提供了 new_img_id
# img_id_updated = True
# if new_img_id is not None:
# img_id_updated = verify_doc['_source'].get('img_id') == new_img_id
# if img_id_updated:
# print(f"成功更新 img_id 为: {new_img_id}")
# else:
# print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}")
# # 检查 position 是否已更新(如果提供了 position
# position_updated = True
# if position is not None:
# position_updated = verify_doc['_source'].get('positions') == position
# if position_updated:
# print(f"成功更新 position 为: {position}")
# else:
# print(f"更新验证失败,当前 position: {verify_doc['_source'].get('positions')}")
# # 统一返回结果
# if img_id_updated and position_updated:
# return {"code": 0, "message": ""}
# else:
# return {"code": 100, "message": "Failed to verify update"}
except Exception as e:
@@ -529,7 +641,6 @@ def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset):
def main():
"""主函数处理PDF和TXT文件对
dataset.id = bucket_name
@@ -550,8 +661,8 @@ def main():
print("未选择数据集。")
return
process_pdf_txt_pairs(pdf_dict, txt_dict, dataset)
# 使用批量处理函数替代原来的处理函数
process_pdf_txt_pairs_bulk(pdf_dict, txt_dict, dataset)