新增 chunk_pos.py 文件,集成 Elasticsearch 功能,支持文档块位置和图像 ID 更新,优化索引映射获取逻辑

This commit is contained in:
2025-08-07 17:04:20 +08:00
parent e5ac523bd9
commit c1d66237e6

256
chunk_pos.py Normal file
View File

@@ -0,0 +1,256 @@
from elasticsearch import Elasticsearch
#from src.add_chunk_cli_pdf_img import update_positon_img_id_in_elasticsearch
# 初始化 Elasticsearch 用户名elastic密码infini_rag_flow
from dotenv import load_dotenv # 新增
import os
import json
# 加载 .env 文件中的环境变量
load_dotenv()
# 初始化 Elasticsearch
es = Elasticsearch(
[{
'host': os.getenv("ELASTIC_HOST"),
'port': int(os.getenv("ELASTIC_PORT")),
'scheme': 'http'
}],
basic_auth=(
os.getenv("ELASTIC_USERNAME"),
os.getenv("ELASTIC_PASSWORD")
)
)
def get_index_mapping(tenant_id):
"""
获取指定索引的 mapping 信息
:param tenant_id: 租户 ID
:return: mapping 信息
"""
index_name = f"ragflow_{tenant_id}"
try:
mapping = es.indices.get_mapping(index=index_name)
# 将 ObjectApiResponse 转换为普通字典
mapping_dict = dict(mapping)
return {"code": 0, "message": "", "data": mapping_dict}
except Exception as e:
return {"code": 500, "message": str(e), "data": {}}
def update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, positions):
"""
在 Elasticsearch 中更新指定文档块的position and img_id。
:param tenant_id: 租户 ID
:param doc_id: 文档 ID
:param chunk_id: 文档块 ID
:param new_img_id: 新的 img_id
:param position: 位置信息
:return: 更新结果
"""
if not positions:
return
position_int = []
for pos in positions:
if len(pos) != 5:
continue # Skip invalid positions
pn, left, right, top, bottom = pos
# 使用元组格式与原始RAGFlow保持一致
position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom)))
if position_int: # Only add if we have valid positions
# 仅添加精确位置信息,不修改排序字段
# 构建索引名称
index_name = f"ragflow_{tenant_id}"
# 构建查询条件
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
# 构建更新请求 - 只更新存在的字段
update_body = {"doc": {}}
update_body["doc"]["position_int"] = position_int
update_body["doc"]["page_num_int"] = [position_int[0][0]]
update_body["doc"]["top_int"] = [position_int[0][3]]
# 更新文档
update_result = es.update(
index=index_name,
id=doc_id_in_es,
body=update_body,
refresh=True # 确保更新立即可见
)
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position, new_img_id):
"""
在 Elasticsearch 中更新指定文档块的position and img_id。
:param tenant_id: 租户 ID
:param doc_id: 文档 ID
:param chunk_id: 文档块 ID
:param new_img_id: 新的 img_id
:param position: 位置信息
:return: 更新结果
"""
try:
# 构建索引名称
index_name = f"ragflow_{tenant_id}"
# 构建查询条件
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
# 构建更新请求 - 只更新存在的字段
update_body = {"doc": {}}
#只有当 new_img_id 存在时才更新 img_id
if new_img_id is not None:
update_body["doc"]["img_id"] = new_img_id
# 只有当 position 存在时才更新 positions
if position is not None:
update_body["doc"]["positions"] = position
# 如果没有需要更新的字段,直接返回成功
if not update_body["doc"]:
print("没有需要更新的字段")
return {"code": 0, "message": "No fields to update"}
# 更新文档
update_result = es.update(
index=index_name,
id=doc_id_in_es,
body=update_body,
refresh=True # 确保更新立即可见
)
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
# 验证更新
verify_doc = es.get(index=index_name, id=doc_id_in_es)
# 检查 img_id 是否已更新(如果提供了 new_img_id
img_id_updated = True
if new_img_id is not None:
img_id_updated = verify_doc['_source'].get('img_id') == new_img_id
if img_id_updated:
print(f"成功更新 img_id 为: {new_img_id}")
else:
print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}")
# 检查 position 是否已更新(如果提供了 position
position_updated = True
if position is not None:
position_updated = verify_doc['_source'].get('positions') == position
if position_updated:
print(f"成功更新 position 为: {position}")
else:
print(f"更新验证失败,当前 position: {verify_doc['_source'].get('positions')}")
# 统一返回结果
if img_id_updated and position_updated:
return {"code": 0, "message": ""}
else:
return {"code": 100, "message": "Failed to verify update"}
except Exception as e:
print(f"更新 Elasticsearch 时发生错误: {str(e)}")
return {"code": 101, "message": f"Error updating img_id: {str(e)}"}
# 示例调用 - 列出特定文档的所有 chunks
if __name__ == "__main__":
try:
print(es.info())
except Exception as e:
print("连接失败:", e)
# 单位电脑
tenant_id = "d669205e57a211f0b9e7324e7f243034"
new_img_id ="10345832587311f0919f3a2728512a4b-bd04866cd05337281"
doc_id="ea8d75966df811f0925ac6e8db75f472"
chunk_id="4a4927560a7e6d80"
# 添加以下代码来检查索引映射
# mapping_result = get_index_mapping(tenant_id)
# print("Positions field mapping:", mapping_result["data"][f"ragflow_{tenant_id}"]["mappings"]["properties"]["positions"])
# 左,右 -->
#上, 下| 上面最小,下面最大
pos = [[4, 0, 100, 200, 510]]
#pos_string = json.dumps(pos) # 转换为 JSON 字符串
update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, pos)
#update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, pos, "")