Files
ragflow_api_test/chunk_pos.py

257 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from elasticsearch import Elasticsearch
#from src.add_chunk_cli_pdf_img import update_positon_img_id_in_elasticsearch
# 初始化 Elasticsearch 用户名elastic密码infini_rag_flow
from dotenv import load_dotenv # 新增
import os
import json
# 加载 .env 文件中的环境变量
load_dotenv()
# 初始化 Elasticsearch
es = Elasticsearch(
[{
'host': os.getenv("ELASTIC_HOST"),
'port': int(os.getenv("ELASTIC_PORT")),
'scheme': 'http'
}],
basic_auth=(
os.getenv("ELASTIC_USERNAME"),
os.getenv("ELASTIC_PASSWORD")
)
)
def get_index_mapping(tenant_id):
"""
获取指定索引的 mapping 信息
:param tenant_id: 租户 ID
:return: mapping 信息
"""
index_name = f"ragflow_{tenant_id}"
try:
mapping = es.indices.get_mapping(index=index_name)
# 将 ObjectApiResponse 转换为普通字典
mapping_dict = dict(mapping)
return {"code": 0, "message": "", "data": mapping_dict}
except Exception as e:
return {"code": 500, "message": str(e), "data": {}}
def update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, positions):
"""
在 Elasticsearch 中更新指定文档块的position and img_id。
:param tenant_id: 租户 ID
:param doc_id: 文档 ID
:param chunk_id: 文档块 ID
:param new_img_id: 新的 img_id
:param position: 位置信息
:return: 更新结果
"""
if not positions:
return
position_int = []
for pos in positions:
if len(pos) != 5:
continue # Skip invalid positions
pn, left, right, top, bottom = pos
# 使用元组格式与原始RAGFlow保持一致
position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom)))
if position_int: # Only add if we have valid positions
# 仅添加精确位置信息,不修改排序字段
# 构建索引名称
index_name = f"ragflow_{tenant_id}"
# 构建查询条件
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
# 构建更新请求 - 只更新存在的字段
update_body = {"doc": {}}
update_body["doc"]["position_int"] = position_int
update_body["doc"]["page_num_int"] = [position_int[0][0]]
update_body["doc"]["top_int"] = [position_int[0][3]]
# 更新文档
update_result = es.update(
index=index_name,
id=doc_id_in_es,
body=update_body,
refresh=True # 确保更新立即可见
)
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position, new_img_id):
"""
在 Elasticsearch 中更新指定文档块的position and img_id。
:param tenant_id: 租户 ID
:param doc_id: 文档 ID
:param chunk_id: 文档块 ID
:param new_img_id: 新的 img_id
:param position: 位置信息
:return: 更新结果
"""
try:
# 构建索引名称
index_name = f"ragflow_{tenant_id}"
# 构建查询条件
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
# 构建更新请求 - 只更新存在的字段
update_body = {"doc": {}}
#只有当 new_img_id 存在时才更新 img_id
if new_img_id is not None:
update_body["doc"]["img_id"] = new_img_id
# 只有当 position 存在时才更新 positions
if position is not None:
update_body["doc"]["positions"] = position
# 如果没有需要更新的字段,直接返回成功
if not update_body["doc"]:
print("没有需要更新的字段")
return {"code": 0, "message": "No fields to update"}
# 更新文档
update_result = es.update(
index=index_name,
id=doc_id_in_es,
body=update_body,
refresh=True # 确保更新立即可见
)
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
# 验证更新
verify_doc = es.get(index=index_name, id=doc_id_in_es)
# 检查 img_id 是否已更新(如果提供了 new_img_id
img_id_updated = True
if new_img_id is not None:
img_id_updated = verify_doc['_source'].get('img_id') == new_img_id
if img_id_updated:
print(f"成功更新 img_id 为: {new_img_id}")
else:
print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}")
# 检查 position 是否已更新(如果提供了 position
position_updated = True
if position is not None:
position_updated = verify_doc['_source'].get('positions') == position
if position_updated:
print(f"成功更新 position 为: {position}")
else:
print(f"更新验证失败,当前 position: {verify_doc['_source'].get('positions')}")
# 统一返回结果
if img_id_updated and position_updated:
return {"code": 0, "message": ""}
else:
return {"code": 100, "message": "Failed to verify update"}
except Exception as e:
print(f"更新 Elasticsearch 时发生错误: {str(e)}")
return {"code": 101, "message": f"Error updating img_id: {str(e)}"}
# 示例调用 - 列出特定文档的所有 chunks
if __name__ == "__main__":
try:
print(es.info())
except Exception as e:
print("连接失败:", e)
# 单位电脑
tenant_id = "d669205e57a211f0b9e7324e7f243034"
new_img_id ="10345832587311f0919f3a2728512a4b-bd04866cd05337281"
doc_id="ea8d75966df811f0925ac6e8db75f472"
chunk_id="4a4927560a7e6d80"
# 添加以下代码来检查索引映射
# mapping_result = get_index_mapping(tenant_id)
# print("Positions field mapping:", mapping_result["data"][f"ragflow_{tenant_id}"]["mappings"]["properties"]["positions"])
# 左,右 -->
#上, 下| 上面最小,下面最大
pos = [[4, 0, 100, 200, 510]]
#pos_string = json.dumps(pos) # 转换为 JSON 字符串
update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, pos)
#update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, pos, "")