diff --git a/chunk_pos.py b/chunk_pos.py new file mode 100644 index 0000000..e4d433a --- /dev/null +++ b/chunk_pos.py @@ -0,0 +1,256 @@ +from elasticsearch import Elasticsearch +#from src.add_chunk_cli_pdf_img import update_positon_img_id_in_elasticsearch +# 初始化 Elasticsearch 用户名elastic,密码infini_rag_flow + +from dotenv import load_dotenv # 新增 +import os +import json +# 加载 .env 文件中的环境变量 +load_dotenv() + + + + +# 初始化 Elasticsearch +es = Elasticsearch( + [{ + 'host': os.getenv("ELASTIC_HOST"), + 'port': int(os.getenv("ELASTIC_PORT")), + 'scheme': 'http' + }], + basic_auth=( + os.getenv("ELASTIC_USERNAME"), + os.getenv("ELASTIC_PASSWORD") + ) +) + + +def get_index_mapping(tenant_id): + """ + 获取指定索引的 mapping 信息 + + :param tenant_id: 租户 ID + :return: mapping 信息 + """ + index_name = f"ragflow_{tenant_id}" + + try: + mapping = es.indices.get_mapping(index=index_name) + # 将 ObjectApiResponse 转换为普通字典 + mapping_dict = dict(mapping) + return {"code": 0, "message": "", "data": mapping_dict} + except Exception as e: + return {"code": 500, "message": str(e), "data": {}} + +def update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, positions): + """ + 在 Elasticsearch 中更新指定文档块的position and img_id。 + + :param tenant_id: 租户 ID + :param doc_id: 文档 ID + :param chunk_id: 文档块 ID + :param new_img_id: 新的 img_id + :param position: 位置信息 + :return: 更新结果 + """ + if not positions: + return + + position_int = [] + + for pos in positions: + if len(pos) != 5: + continue # Skip invalid positions + + pn, left, right, top, bottom = pos + # 使用元组格式,与原始RAGFlow保持一致 + position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom))) + + if position_int: # Only add if we have valid positions + # 仅添加精确位置信息,不修改排序字段 + + # 构建索引名称 + index_name = f"ragflow_{tenant_id}" + + # 构建查询条件 + query = { + "bool": { + "must": [ + {"term": {"doc_id": doc_id}}, + {"term": {"_id": chunk_id}} + ] + } + } + + # 搜索目标文档 + result = es.search(index=index_name, body={"query": query}) + + # 检查是否找到目标文档 + if result['hits']['total']['value'] == 0: + print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}") + return {"code": 102, "message": f"Can't find this chunk {chunk_id}"} + + # 获取目标文档的 ID + hit = result['hits']['hits'][0] + doc_id_in_es = hit['_id'] + + # 构建更新请求 - 只更新存在的字段 + update_body = {"doc": {}} + update_body["doc"]["position_int"] = position_int + update_body["doc"]["page_num_int"] = [position_int[0][0]] + update_body["doc"]["top_int"] = [position_int[0][3]] + + + + + + # 更新文档 + update_result = es.update( + index=index_name, + id=doc_id_in_es, + body=update_body, + refresh=True # 确保更新立即可见 + ) + + print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}") + + + + + + + +def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position, new_img_id): + """ + 在 Elasticsearch 中更新指定文档块的position and img_id。 + + :param tenant_id: 租户 ID + :param doc_id: 文档 ID + :param chunk_id: 文档块 ID + :param new_img_id: 新的 img_id + :param position: 位置信息 + :return: 更新结果 + """ + try: + + # 构建索引名称 + index_name = f"ragflow_{tenant_id}" + + # 构建查询条件 + query = { + "bool": { + "must": [ + {"term": {"doc_id": doc_id}}, + {"term": {"_id": chunk_id}} + ] + } + } + + # 搜索目标文档 + result = es.search(index=index_name, body={"query": query}) + + # 检查是否找到目标文档 + if result['hits']['total']['value'] == 0: + print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}") + return {"code": 102, "message": f"Can't find this chunk {chunk_id}"} + + # 获取目标文档的 ID + hit = result['hits']['hits'][0] + doc_id_in_es = hit['_id'] + + # 构建更新请求 - 只更新存在的字段 + update_body = {"doc": {}} + + #只有当 new_img_id 存在时才更新 img_id + if new_img_id is not None: + update_body["doc"]["img_id"] = new_img_id + + # 只有当 position 存在时才更新 positions + if position is not None: + + update_body["doc"]["positions"] = position + + + # 如果没有需要更新的字段,直接返回成功 + if not update_body["doc"]: + print("没有需要更新的字段") + return {"code": 0, "message": "No fields to update"} + + # 更新文档 + update_result = es.update( + index=index_name, + id=doc_id_in_es, + body=update_body, + refresh=True # 确保更新立即可见 + ) + + print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}") + + # 验证更新 + verify_doc = es.get(index=index_name, id=doc_id_in_es) + + # 检查 img_id 是否已更新(如果提供了 new_img_id) + img_id_updated = True + if new_img_id is not None: + img_id_updated = verify_doc['_source'].get('img_id') == new_img_id + if img_id_updated: + print(f"成功更新 img_id 为: {new_img_id}") + else: + print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}") + + # 检查 position 是否已更新(如果提供了 position) + position_updated = True + if position is not None: + position_updated = verify_doc['_source'].get('positions') == position + if position_updated: + print(f"成功更新 position 为: {position}") + else: + print(f"更新验证失败,当前 position: {verify_doc['_source'].get('positions')}") + + # 统一返回结果 + if img_id_updated and position_updated: + return {"code": 0, "message": ""} + else: + return {"code": 100, "message": "Failed to verify update"} + + + except Exception as e: + print(f"更新 Elasticsearch 时发生错误: {str(e)}") + return {"code": 101, "message": f"Error updating img_id: {str(e)}"} + + + + + + +# 示例调用 - 列出特定文档的所有 chunks +if __name__ == "__main__": + try: + print(es.info()) + except Exception as e: + print("连接失败:", e) + + +# 单位电脑 + tenant_id = "d669205e57a211f0b9e7324e7f243034" + new_img_id ="10345832587311f0919f3a2728512a4b-bd04866cd05337281" + doc_id="ea8d75966df811f0925ac6e8db75f472" + chunk_id="4a4927560a7e6d80" + # 添加以下代码来检查索引映射 + # mapping_result = get_index_mapping(tenant_id) + # print("Positions field mapping:", mapping_result["data"][f"ragflow_{tenant_id}"]["mappings"]["properties"]["positions"]) + + + + + # 左,右 --> + #上, 下| 上面最小,下面最大 + + + + pos = [[4, 0, 100, 200, 510]] + #pos_string = json.dumps(pos) # 转换为 JSON 字符串 + update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, pos) + + + #update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, pos, "")