Merge branch 'main' of https://git.lqsjy.cn/glowz/ragflow_api_test
This commit is contained in:
		
							
								
								
									
										157
									
								
								chunk_operations.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										157
									
								
								chunk_operations.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,157 @@ | ||||
| from elasticsearch import Elasticsearch | ||||
|  | ||||
| # 初始化 Elasticsearch   用户名elastic,密码infini_rag_flow | ||||
| es = Elasticsearch( | ||||
|     [{'host': '192.168.107.165', 'port': 1200, 'scheme': 'http'}], | ||||
|     basic_auth=('elastic', 'infini_rag_flow') | ||||
| ) | ||||
|  | ||||
| def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id): | ||||
|     """ | ||||
|     在 Elasticsearch 中更新指定文档块的 img_id。 | ||||
|     如果img_id不存在,则增加一个新的 img_id。 | ||||
|  | ||||
|     :param tenant_id: 租户 ID | ||||
|     :param dataset_id: 数据集 ID | ||||
|     :param doc_id: 文档 ID | ||||
|     :param chunk_id: 文档块 ID | ||||
|     :param new_img_id: 新的 img_id | ||||
|     :return: 更新结果 | ||||
|  | ||||
|     """ | ||||
|     # 构建索引名称 | ||||
|     index_name = f"ragflow_{tenant_id}"  # 这里需要替换为实际的索引名称生成逻辑 | ||||
|  | ||||
|     # 构建查询条件 | ||||
|     query = { | ||||
|         "bool": { | ||||
|             "must": [ | ||||
|                 {"term": {"doc_id": doc_id}}, | ||||
|                 {"term": {"_id": chunk_id}} | ||||
|             ] | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     # 搜索目标文档 | ||||
|     result = es.search(index=index_name, body={"query": query}) | ||||
|  | ||||
|     # 检查是否找到目标文档 | ||||
|     if result['hits']['total']['value'] == 0: | ||||
|         return {"code": 102, "message": f"Can't find this chunk {chunk_id}"} | ||||
|      | ||||
|  | ||||
|     # 获取目标文档的 ID | ||||
|     hit = result['hits']['hits'][0] | ||||
|     doc_id_in_es = hit['_id'] | ||||
|     # print(doc_id_in_es) | ||||
|     #print(hit) | ||||
|     #print(len(hit['_source']['img_id'])) | ||||
|     # image_id = hit['_source'].get('img_id', None) | ||||
|  | ||||
|     # if (image_id): | ||||
|     #     mapping = es.indices.get_mapping(index=index_name) | ||||
|     #     print(mapping) | ||||
|  | ||||
|     # else: | ||||
|     #     # img_id 不存在,添加新的 img_id | ||||
|     #     # 获取索引的映射 | ||||
|     #     mapping = es.indices.get_mapping(index=index_name) | ||||
|     #     mapping[index_name]['mappings']['properties']['img_id'] = {'type': 'text'} | ||||
|     #     es.indices.put_mapping(index="my_index", body=mapping) | ||||
|     #     print(mapping) | ||||
|  | ||||
|      | ||||
|     #         # 构建更新请求 | ||||
|     update_body = { | ||||
|         "doc": { | ||||
|             "img_id": new_img_id | ||||
|         } | ||||
|         } | ||||
|  | ||||
|     # 更新文档 | ||||
|     update_result = es.update(index=index_name, id=doc_id_in_es, body=update_body) | ||||
|      | ||||
|  | ||||
|     if update_result['result'] == 'updated': | ||||
|         return {"code": 0, "message": ""} | ||||
|     else: | ||||
|         return {"code": 100, "message": "Failed to update img_id"} | ||||
|      | ||||
|  | ||||
| def list_chunk_information(tenant_id, dataset_id, doc_id=None, chunk_id=None, size=1000): | ||||
|     """ | ||||
|     列出指定条件下的 chunk 信息 | ||||
|  | ||||
|     :param tenant_id: 租户 ID | ||||
|     :param dataset_id: 数据集 ID | ||||
|     :param doc_id: 文档 ID(可选) | ||||
|     :param chunk_id: 文档块 ID(可选) | ||||
|     :param size: 返回的最大结果数 | ||||
|     :return: chunk 信息列表 | ||||
|     """ | ||||
|     # 构建索引名称 | ||||
|     index_name = f"ragflow_{tenant_id}"  # 需替换为实际索引名生成逻辑 | ||||
|      | ||||
|     # 构建基础查询 | ||||
|     query = {"bool": {"must": []}} | ||||
|      | ||||
|     # 添加可选过滤条件 | ||||
|     if doc_id: | ||||
|         query["bool"]["must"].append({"term": {"doc_id": doc_id}}) | ||||
|  | ||||
|      | ||||
|     # 执行搜索 | ||||
|     try: | ||||
|         result = es.search( | ||||
|             index=index_name, | ||||
|             body={"query": query, "size": size}, | ||||
|             fields= ["_id"], | ||||
|             _source=["doc_id", "kb_id", "img_id", "content", "docnm_kwd"]  # 指定返回的字段 | ||||
|         ) | ||||
|         for d in result['hits']['hits']: | ||||
|             print(d['_id']) | ||||
|              | ||||
|          | ||||
|         # 提取并格式化结果 | ||||
|         chunks = [] | ||||
|         for hit in result['hits']['hits']: | ||||
|             source = hit['_source'] | ||||
|             chunks.append({ | ||||
|                 'doc_id': source.get('doc_id'), | ||||
|                 'kb_id': source.get('kb_id'), | ||||
|                 'img_id': source.get('img_id'), | ||||
|                 'content': source.get('content'), | ||||
|                 'docnm_kwd': source.get('docnm_kwd') | ||||
|             }) | ||||
|              | ||||
|         return {"code": 0, "message": "", "data": chunks} | ||||
|      | ||||
|     except Exception as e: | ||||
|         return {"code": 500, "message": str(e), "data": []} | ||||
|  | ||||
| # 示例调用 - 列出特定文档的所有 chunks | ||||
| if __name__ == "__main__": | ||||
|     try: | ||||
|         print(es.info()) | ||||
|     except Exception as e: | ||||
|         print("连接失败:", e) | ||||
|  | ||||
|     tenant_id = "d669205e57a211f0b9e7324e7f243034" | ||||
|     dataset_id = "10345832587311f0919f3a2728512a4b"  #  dataset_id = kb_id | ||||
|     doc_id = "cbf576385bc911f08f23fedc3996e479" | ||||
|     doc_id = "4300e558609511f08b8bde4a87f78768"   | ||||
|     chunk_id = "f035247f7de579b0"  #   | ||||
|     chunk_id = "5b3445f7861ff772"  #  | ||||
|     new_img_id =    "10345832587311f0919f3a2728512a4b-f035247f7de579b0" #"new_img_id_12345" | ||||
|     #new_img_id ="c5142bce5ac611f0ae707a8b5ba029cb-thumbnail_fb3cbc165ac611f0b5897a8b5ba029cb.png" | ||||
|  | ||||
|  | ||||
|  | ||||
|     #chunk_list = list_chunk_information(tenant_id, dataset_id, doc_id=doc_id) | ||||
|     update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id,new_img_id) | ||||
|     # if chunk_list["code"] == 0: | ||||
|     #     print(f"找到 {len(chunk_list['data'])} 个 chunks") | ||||
|     #     for chunk in chunk_list['data']: | ||||
|     #         print(f"Chunk ID: {chunk['kb_id']}, Img ID: {chunk['img_id']}","docnm_kwd:", chunk['docnm_kwd']) | ||||
|     # else: | ||||
|     #     print(f"Error: {chunk_list['message']}") | ||||
		Reference in New Issue
	
	Block a user