from elasticsearch import Elasticsearch from src.add_chunk_cli_pdf_img import update_positon_img_id_in_elasticsearch # 初始化 Elasticsearch 用户名elastic,密码infini_rag_flow es = Elasticsearch( [{'host': '127.0.0.1', 'port': 1200, 'scheme': 'http'}], basic_auth=('elastic', 'infini_rag_flow') ) def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id): """ 在 Elasticsearch 中更新指定文档块的 img_id。 如果img_id不存在,则增加一个新的 img_id。 :param tenant_id: 租户 ID :param doc_id: 文档 ID :param chunk_id: 文档块 ID :param new_img_id: 新的 img_id :return: 更新结果 """ # 构建索引名称 index_name = f"ragflow_{tenant_id}" # 这里需要替换为实际的索引名称生成逻辑 # 构建查询条件 query = { "bool": { "must": [ {"term": {"doc_id": doc_id}}, {"term": {"_id": chunk_id}} ] } } # 搜索目标文档 result = es.search(index=index_name, body={"query": query}) # 检查是否找到目标文档 if result['hits']['total']['value'] == 0: return {"code": 102, "message": f"Can't find this chunk {chunk_id}"} # 获取目标文档的 ID hit = result['hits']['hits'][0] doc_id_in_es = hit['_id'] update_body = { "doc": { "img_id": new_img_id } } # 更新文档 update_result = es.update(index=index_name, id=doc_id_in_es, body=update_body) if update_result['result'] == 'updated': return {"code": 0, "message": ""} else: return {"code": 100, "message": "Failed to update img_id"} def get_index_mapping(tenant_id): """ 获取指定索引的 mapping 信息 :param tenant_id: 租户 ID :return: mapping 信息 """ index_name = f"ragflow_{tenant_id}" try: mapping = es.indices.get_mapping(index=index_name) # 将 ObjectApiResponse 转换为普通字典 mapping_dict = dict(mapping) return {"code": 0, "message": "", "data": mapping_dict} except Exception as e: return {"code": 500, "message": str(e), "data": {}} # 在主函数中调用示例 if __name__ == "__main__": # ... 现有代码 ... # 获取 mapping 信息 tenant_id = "9c73df5a3ebc11f08410c237296aa408" mapping_result = get_index_mapping(tenant_id) if mapping_result["code"] == 0: print("索引 mapping 信息:") import json # 使用 default=str 处理不能直接序列化的对象 print(json.dumps(mapping_result["data"], indent=2, ensure_ascii=False, default=str)) else: print(f"获取 mapping 失败: {mapping_result['message']}") def list_chunk_information(tenant_id, dataset_id, doc_id=None, chunk_id=None, size=1000): """ 列出指定条件下的 chunk 信息 :param tenant_id: 租户 ID :param dataset_id: 数据集 ID :param doc_id: 文档 ID(可选) :param chunk_id: 文档块 ID(可选) :param size: 返回的最大结果数 :return: chunk 信息列表 """ # 构建索引名称 index_name = f"ragflow_{tenant_id}" # 需替换为实际索引名生成逻辑 # 构建基础查询 query = {"bool": {"must": []}} # 添加可选过滤条件 if doc_id: query["bool"]["must"].append({"term": {"doc_id": doc_id}}) # 执行搜索 try: result = es.search( index=index_name, body={"query": query, "size": size}, fields= ["_id"], _source=["doc_id", "kb_id", "img_id", "content", "docnm_kwd"] # 指定返回的字段 ) for d in result['hits']['hits']: print(d['_id']) # 提取并格式化结果 chunks = [] for hit in result['hits']['hits']: source = hit['_source'] chunks.append({ 'doc_id': source.get('doc_id'), 'kb_id': source.get('kb_id'), 'img_id': source.get('img_id'), 'content': source.get('content'), 'docnm_kwd': source.get('docnm_kwd') }) return {"code": 0, "message": "", "data": chunks} except Exception as e: return {"code": 500, "message": str(e), "data": []} # 示例调用 - 列出特定文档的所有 chunks if __name__ == "__main__": try: print(es.info()) except Exception as e: print("连接失败:", e) tenant_id = "d669205e57a211f0b9e7324e7f243034" tenant_id = "9c73df5a3ebc11f08410c237296aa408" dataset_id = "0e6127da574a11f0a59c7e7439a490f8" # dataset_id = kb_id doc_id = "cbf576385bc911f08f23fedc3996e479" doc_id = "323113d8670c11f0b4255ea1d23c381a" doc_id = "5cdab2fa67cb11f0a21592edb0e63cad" # chunk_id = "f035247f7de579b0" # chunk_id = "b2d53baddbfde97c" # chunk_id = "e46a067c1edf939a" new_img_id = "10345832587311f0919f3a2728512a4b-f035247f7de579b0" #"new_img_id_12345" #new_img_id = "0e6127da574a11f0a59c7e7439a490f8-b2d53baddbfde97c" #new_img_id ="c5142bce5ac611f0ae707a8b5ba029cb-thumbnail_fb3cbc165ac611f0b5897a8b5ba029cb.png" pos= [3, 317, 397, 123, 182] # 获取 mapping 信息 tenant_id = "9c73df5a3ebc11f08410c237296aa408" mapping_result = get_index_mapping(tenant_id) if mapping_result["code"] == 0: print("索引 mapping 信息:") import json print(json.dumps(mapping_result["data"], indent=2, ensure_ascii=False)) else: print(f"获取 mapping 失败: {mapping_result['message']}") #chunk_list = list_chunk_information(tenant_id, dataset_id, doc_id=doc_id) # update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id,new_img_id) update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, pos, new_img_id) # if chunk_list["code"] == 0: # print(f"找到 {len(chunk_list['data'])} 个 chunks") # for chunk in chunk_list['data']: # print(f"Chunk ID: {chunk['kb_id']}, Img ID: {chunk['img_id']}","docnm_kwd:", chunk['docnm_kwd']) # else: # print(f"Error: {chunk_list['message']}")