From 1c23d272bb937a7ed13b7bb69c44ab340e9ffb05 Mon Sep 17 00:00:00 2001 From: glowzz <24627181@qq.com> Date: Fri, 8 Aug 2025 10:38:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20Elasticsearch=20=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=80=BB=E8=BE=91=EF=BC=8C=E6=94=AF=E6=8C=81=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E4=BD=8D=E7=BD=AE=E6=9B=B4=E6=96=B0=EF=BC=8C=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E5=8C=B9=E9=85=8D=E7=BB=93=E6=9E=9C=E5=A4=84=E7=90=86?= =?UTF-8?q?=EF=BC=8C=E6=96=B0=E5=A2=9E=E4=BD=8D=E7=BD=AE=E6=95=B4=E6=95=B0?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E8=BF=94=E5=9B=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/add_chunk_cli_pdf_img.py | 150 ++++++++++++++++++------------- src/find_text_in_pdf_enhanced.py | 10 ++- 2 files changed, 92 insertions(+), 68 deletions(-) diff --git a/src/add_chunk_cli_pdf_img.py b/src/add_chunk_cli_pdf_img.py index 7067dcb..18adde6 100644 --- a/src/add_chunk_cli_pdf_img.py +++ b/src/add_chunk_cli_pdf_img.py @@ -9,6 +9,8 @@ import tempfile from elasticsearch import Elasticsearch from minio import Minio from minio.error import S3Error +from find_text_in_pdf_enhanced import find_text_in_pdf +import time # from get_pos_pdf import smart_fuzzy_find_text_batch, find_text_positions_batch @@ -47,7 +49,7 @@ MINIO_CONFIG = { "secure": False } -def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position, new_img_id): +def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, positions, new_img_id): """ 在 Elasticsearch 中更新指定文档块的position and img_id。 @@ -88,29 +90,32 @@ def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position # 构建更新请求 - 只更新存在的字段 update_body = {"doc": {}} - # 只有当 new_img_id 存在时才更新 img_id + #只有当 new_img_id 存在时才更新 img_id if new_img_id is not None: update_body["doc"]["img_id"] = new_img_id # 只有当 position 存在时才更新 positions - if position is not None: - # 如果传入的是嵌套字典格式的 position - if isinstance(position, list) and all(isinstance(p, dict) for p in position): - # 将字典格式转换为整数列表格式 - formatted_positions = [] - for pos in position: - pos_list = [ - pos.get('page', 0), # 页码 - int(round(float(pos.get('x0', 0)))), # x0 - int(round(float(pos.get('x1', 0)))), # x1 - int(round(float(pos.get('y0', 0)))), # y0 - int(round(float(pos.get('y1', 0)))) # y1 - ] - formatted_positions.append(pos_list) - update_body["doc"]["positions"] = formatted_positions - # 如果已经是整数列表格式 - elif isinstance(position, list): - update_body["doc"]["positions"] = position + if positions : + + position_int = [] + + for pos in positions: + if len(pos) != 5: + continue # Skip invalid positions + + pn, left, right, top, bottom = pos + # 使用元组格式,与原始RAGFlow保持一致 + position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom))) + if position_int: + update_body["doc"]["position_int"] = position_int + update_body["doc"]["page_num_int"] = [position_int[0][0]] + update_body["doc"]["top_int"] = [position_int[0][3]] + + + + + + # 如果没有需要更新的字段,直接返回成功 if not update_body["doc"]: @@ -127,32 +132,32 @@ def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}") - # 验证更新 - verify_doc = es.get(index=index_name, id=doc_id_in_es) + # # 验证更新 + # verify_doc = es.get(index=index_name, id=doc_id_in_es) - # 检查 img_id 是否已更新(如果提供了 new_img_id) - img_id_updated = True - if new_img_id is not None: - img_id_updated = verify_doc['_source'].get('img_id') == new_img_id - if img_id_updated: - print(f"成功更新 img_id 为: {new_img_id}") - else: - print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}") + # # 检查 img_id 是否已更新(如果提供了 new_img_id) + # img_id_updated = True + # if new_img_id is not None: + # img_id_updated = verify_doc['_source'].get('img_id') == new_img_id + # if img_id_updated: + # print(f"成功更新 img_id 为: {new_img_id}") + # else: + # print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}") - # 检查 position 是否已更新(如果提供了 position) - position_updated = True - if position is not None: - position_updated = verify_doc['_source'].get('positions') == position - if position_updated: - print(f"成功更新 position 为: {position}") - else: - print(f"更新验证失败,当前 position: {verify_doc['_source'].get('positions')}") + # # 检查 position 是否已更新(如果提供了 position) + # position_updated = True + # if position is not None: + # position_updated = verify_doc['_source'].get('positions') == position + # if position_updated: + # print(f"成功更新 position 为: {position}") + # else: + # print(f"更新验证失败,当前 position: {verify_doc['_source'].get('positions')}") - # 统一返回结果 - if img_id_updated and position_updated: - return {"code": 0, "message": ""} - else: - return {"code": 100, "message": "Failed to verify update"} + # # 统一返回结果 + # if img_id_updated and position_updated: + # return {"code": 0, "message": ""} + # else: + # return {"code": 100, "message": "Failed to verify update"} except Exception as e: @@ -160,6 +165,7 @@ def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position return {"code": 101, "message": f"Error updating img_id: {str(e)}"} + def get_minio_client(): """创建MinIO客户端""" return Minio( @@ -444,43 +450,57 @@ def get_positions_from_chunk(pdf_path, chunks_info): try: # 提取所有chunk的文本内容用于批量查找 chunk_texts = [chunk_info['text'] for chunk_info in chunks_info] + print(f"批量查找文本块: {chunk_texts}") # 使用智能模糊查找获取位置信息 - batch_positions = smart_fuzzy_find_text_batch(pdf_path, chunk_texts, similarity_threshold=0.7) + matches = find_text_in_pdf( + pdf_path, + chunk_texts, + threshold=60 + ) + print(f"匹配结果: {matches}") # 将位置信息与chunks_info关联,并确保数据类型正确 for i, chunk_info in enumerate(chunks_info): - positions = batch_positions[i] if i < len(batch_positions) else [] - - # 处理位置信息 - processed_positions = [] - for pos in positions: - if isinstance(pos, dict): - # 创建新的位置字典,确保所有坐标都是整数 - processed_pos = { - 'x0': int(round(float(pos['x0']))) if pos.get('x0') is not None else 0, - 'y0': int(round(float(pos['y0']))) if pos.get('y0') is not None else 0, - 'x1': int(round(float(pos['x1']))) if pos.get('x1') is not None else 0, - 'y1': int(round(float(pos['y1']))) if pos.get('y1') is not None else 0, - 'page': int(pos['page']) if pos.get('page') is not None else 0 - } - processed_positions.append(processed_pos) - - # 更新chunk_info中的positions - chunk_info['positions'] = processed_positions + # 确保 chunk_info 包含 'positions' 键 + if 'positions' not in chunk_info: + chunk_info['positions'] = [] + + print(f"处理第 {i+1} 个chunk: {chunk_info['text']}") + print(f"更新前位置: {chunk_info['positions']}") + if isinstance(matches, list) and i < len(matches): + chunk_info['positions']=[mat['position_int'] for mat in matches[i] if 'position_int' in mat] + + # # 如果matches是列表且索引有效 + # if isinstance(matches[i], dict) and 'position_int' in matches[i]: + # chunk_info['positions'] = matches[i]['position_int'] + # print(f"更新后位置: {chunk_info['positions']}") + # else: + # chunk_info['positions'] = [] + # print(f"未找到有效位置信息,设置为空列表") + else: + chunk_info['positions'] = [] + print(f"匹配结果无效或索引越界,设置为空列表") + + # 验证更新结果 + print("最终chunks_info状态:") + for i, chunk_info in enumerate(chunks_info): + print(f" Chunk {i+1}: ID={chunk_info['id']}, Positions={chunk_info['positions']}") + return chunks_info except Exception as e: print(f"获取PDF文本位置信息时出错: {str(e)}") # 出错时为每个chunk添加空的位置信息 for chunk_info in chunks_info: - chunk_info['positions'] = [] + # 确保 chunk_info 包含 'positions' 键 + if 'positions' not in chunk_info: + chunk_info['positions'] = [] return chunks_info - def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset): """处理PDF-TXT文件对""" for name, pdf_path in pdf_dict.items(): @@ -493,6 +513,8 @@ def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset): txt_path = txt_dict.get(name) if txt_path: chunks_info=process_txt_chunks(dataset.id,document, txt_path) + + time.sleep(1) if chunks_info: chunks_info=get_positions_from_chunk(pdf_path, chunks_info) for chunk_info in chunks_info: diff --git a/src/find_text_in_pdf_enhanced.py b/src/find_text_in_pdf_enhanced.py index 1e6a188..5db1e0d 100644 --- a/src/find_text_in_pdf_enhanced.py +++ b/src/find_text_in_pdf_enhanced.py @@ -161,9 +161,10 @@ def find_text_in_pdf(pdf_path, if matched_lines: _, merged_bbox = _merge_lines(matched_lines) results.append({ - "page": p + 1, + "page": p, "bbox": merged_bbox, - "matched_text": matched_text + "matched_text": matched_text, + "position_int":[p, merged_bbox[0], merged_bbox[2], merged_bbox[1], merged_bbox[3]] }) if results: batch_results[idx].extend(results) @@ -206,6 +207,7 @@ def highlight_matches(pdf_path, matches, output_path="highlighted.pdf"): if __name__ == "__main__": pdf_path = 'e:\\2\\2024深化智慧城市发展推进城市全域数字化转型的指导意见.pdf' pdf_path = 'G:\\SynologyDrive\\大模型\\RAG\\20250805党建\\中国共产党领导干部廉洁从业若干准则.pdf' + pdf_path ="F:\\Synology_nas\\SynologyDrive\\大模型\\RAG\\20250805党建\\中国共产党领导干部廉洁从业若干准则.pdf" query = [ '''一、总体要求 以习近平新时代中国特色社会主义思想为指导,完整、准确、全面贯彻新发展理念,统筹发展和安全,充分发挥数据的基础资源和创新引擎作用,整体性重塑智慧城市技术架构、系统性变革城市管理流程、一体化推动产城深度融合,全面提升城市全域数字化转型的整体性、系统性、协同性,不断满足人民日益增长的美好生活需要,为全面建设社会主义现代化国家提供强大动力。到2027年,全国城市全域数字化转型取得明显成效,形成一批横向打通、纵向贯通、各具特色的宜居、韧性、智慧城市,有力支撑数字中国建设。到2030年,全国城市全域数字化转型全面突破,人民群众的获得感、幸福感、安全感全面提升,涌现一批数字文明时代具有全球竞争力的中国式现代化城市。''', @@ -271,7 +273,7 @@ if __name__ == "__main__": # 1. 找跨行正则匹配 matches = find_text_in_pdf( pdf_path, - query, # 你的正则 + query, threshold=60 ) @@ -284,7 +286,7 @@ if __name__ == "__main__": #highlight_matches(pdf_path, query_matches, "example_highlighted.pdf") for m in query_matches: - print(f"第 {m['page']} 页 匹配: {m['matched_text'][:50]}... 位置: {m['bbox']}") + print(f"第 {m['page']} 页 匹配: {m['matched_text'][:50]}... 位置: {m['bbox']}, 位置_int: {m['position_int']}") print("------------------")