Compare commits
11 Commits
466fae53c9
...
main
Author | SHA1 | Date | |
---|---|---|---|
51f24ced05 | |||
1c23d272bb | |||
c1d66237e6 | |||
e5ac523bd9 | |||
ec30b8d78a | |||
c8f96ee41e | |||
020de8da5d | |||
657e3cb9e5 | |||
c47ddad5f1 | |||
73557a272d | |||
44ef61daab |
@@ -1,5 +1,5 @@
|
||||
from elasticsearch import Elasticsearch
|
||||
|
||||
from src.add_chunk_cli_pdf_img import update_positon_img_id_in_elasticsearch
|
||||
# 初始化 Elasticsearch 用户名elastic,密码infini_rag_flow
|
||||
es = Elasticsearch(
|
||||
[{'host': '127.0.0.1', 'port': 1200, 'scheme': 'http'}],
|
||||
@@ -58,6 +58,39 @@ def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id):
|
||||
else:
|
||||
return {"code": 100, "message": "Failed to update img_id"}
|
||||
|
||||
def get_index_mapping(tenant_id):
|
||||
"""
|
||||
获取指定索引的 mapping 信息
|
||||
|
||||
:param tenant_id: 租户 ID
|
||||
:return: mapping 信息
|
||||
"""
|
||||
index_name = f"ragflow_{tenant_id}"
|
||||
|
||||
try:
|
||||
mapping = es.indices.get_mapping(index=index_name)
|
||||
# 将 ObjectApiResponse 转换为普通字典
|
||||
mapping_dict = dict(mapping)
|
||||
return {"code": 0, "message": "", "data": mapping_dict}
|
||||
except Exception as e:
|
||||
return {"code": 500, "message": str(e), "data": {}}
|
||||
|
||||
# 在主函数中调用示例
|
||||
if __name__ == "__main__":
|
||||
# ... 现有代码 ...
|
||||
|
||||
# 获取 mapping 信息
|
||||
tenant_id = "9c73df5a3ebc11f08410c237296aa408"
|
||||
mapping_result = get_index_mapping(tenant_id)
|
||||
if mapping_result["code"] == 0:
|
||||
print("索引 mapping 信息:")
|
||||
import json
|
||||
# 使用 default=str 处理不能直接序列化的对象
|
||||
print(json.dumps(mapping_result["data"], indent=2, ensure_ascii=False, default=str))
|
||||
else:
|
||||
print(f"获取 mapping 失败: {mapping_result['message']}")
|
||||
|
||||
|
||||
|
||||
def list_chunk_information(tenant_id, dataset_id, doc_id=None, chunk_id=None, size=1000):
|
||||
"""
|
||||
@@ -121,17 +154,33 @@ if __name__ == "__main__":
|
||||
tenant_id = "9c73df5a3ebc11f08410c237296aa408"
|
||||
dataset_id = "0e6127da574a11f0a59c7e7439a490f8" # dataset_id = kb_id
|
||||
doc_id = "cbf576385bc911f08f23fedc3996e479"
|
||||
doc_id = "323113d8670c11f0b4255ea1d23c381a"
|
||||
doc_id = "323113d8670c11f0b4255ea1d23c381a"
|
||||
doc_id = "5cdab2fa67cb11f0a21592edb0e63cad" #
|
||||
chunk_id = "f035247f7de579b0" #
|
||||
chunk_id = "b2d53baddbfde97c" #
|
||||
chunk_id = "e46a067c1edf939a"
|
||||
new_img_id = "10345832587311f0919f3a2728512a4b-f035247f7de579b0" #"new_img_id_12345"
|
||||
new_img_id = "0e6127da574a11f0a59c7e7439a490f8-b2d53baddbfde97c"
|
||||
#new_img_id = "0e6127da574a11f0a59c7e7439a490f8-b2d53baddbfde97c"
|
||||
#new_img_id ="c5142bce5ac611f0ae707a8b5ba029cb-thumbnail_fb3cbc165ac611f0b5897a8b5ba029cb.png"
|
||||
pos= [3, 317, 397, 123, 182]
|
||||
|
||||
# 获取 mapping 信息
|
||||
tenant_id = "9c73df5a3ebc11f08410c237296aa408"
|
||||
mapping_result = get_index_mapping(tenant_id)
|
||||
if mapping_result["code"] == 0:
|
||||
print("索引 mapping 信息:")
|
||||
import json
|
||||
print(json.dumps(mapping_result["data"], indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print(f"获取 mapping 失败: {mapping_result['message']}")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#chunk_list = list_chunk_information(tenant_id, dataset_id, doc_id=doc_id)
|
||||
update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id,new_img_id)
|
||||
# update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id,new_img_id)
|
||||
update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, pos, new_img_id)
|
||||
# if chunk_list["code"] == 0:
|
||||
# print(f"找到 {len(chunk_list['data'])} 个 chunks")
|
||||
# for chunk in chunk_list['data']:
|
||||
|
256
chunk_pos.py
Normal file
256
chunk_pos.py
Normal file
@@ -0,0 +1,256 @@
|
||||
from elasticsearch import Elasticsearch
|
||||
#from src.add_chunk_cli_pdf_img import update_positon_img_id_in_elasticsearch
|
||||
# 初始化 Elasticsearch 用户名elastic,密码infini_rag_flow
|
||||
|
||||
from dotenv import load_dotenv # 新增
|
||||
import os
|
||||
import json
|
||||
# 加载 .env 文件中的环境变量
|
||||
load_dotenv()
|
||||
|
||||
|
||||
|
||||
|
||||
# 初始化 Elasticsearch
|
||||
es = Elasticsearch(
|
||||
[{
|
||||
'host': os.getenv("ELASTIC_HOST"),
|
||||
'port': int(os.getenv("ELASTIC_PORT")),
|
||||
'scheme': 'http'
|
||||
}],
|
||||
basic_auth=(
|
||||
os.getenv("ELASTIC_USERNAME"),
|
||||
os.getenv("ELASTIC_PASSWORD")
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_index_mapping(tenant_id):
|
||||
"""
|
||||
获取指定索引的 mapping 信息
|
||||
|
||||
:param tenant_id: 租户 ID
|
||||
:return: mapping 信息
|
||||
"""
|
||||
index_name = f"ragflow_{tenant_id}"
|
||||
|
||||
try:
|
||||
mapping = es.indices.get_mapping(index=index_name)
|
||||
# 将 ObjectApiResponse 转换为普通字典
|
||||
mapping_dict = dict(mapping)
|
||||
return {"code": 0, "message": "", "data": mapping_dict}
|
||||
except Exception as e:
|
||||
return {"code": 500, "message": str(e), "data": {}}
|
||||
|
||||
def update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, positions):
|
||||
"""
|
||||
在 Elasticsearch 中更新指定文档块的position and img_id。
|
||||
|
||||
:param tenant_id: 租户 ID
|
||||
:param doc_id: 文档 ID
|
||||
:param chunk_id: 文档块 ID
|
||||
:param new_img_id: 新的 img_id
|
||||
:param position: 位置信息
|
||||
:return: 更新结果
|
||||
"""
|
||||
if not positions:
|
||||
return
|
||||
|
||||
position_int = []
|
||||
|
||||
for pos in positions:
|
||||
if len(pos) != 5:
|
||||
continue # Skip invalid positions
|
||||
|
||||
pn, left, right, top, bottom = pos
|
||||
# 使用元组格式,与原始RAGFlow保持一致
|
||||
position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom)))
|
||||
|
||||
if position_int: # Only add if we have valid positions
|
||||
# 仅添加精确位置信息,不修改排序字段
|
||||
|
||||
# 构建索引名称
|
||||
index_name = f"ragflow_{tenant_id}"
|
||||
|
||||
# 构建查询条件
|
||||
query = {
|
||||
"bool": {
|
||||
"must": [
|
||||
{"term": {"doc_id": doc_id}},
|
||||
{"term": {"_id": chunk_id}}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
# 搜索目标文档
|
||||
result = es.search(index=index_name, body={"query": query})
|
||||
|
||||
# 检查是否找到目标文档
|
||||
if result['hits']['total']['value'] == 0:
|
||||
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
|
||||
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
|
||||
|
||||
# 获取目标文档的 ID
|
||||
hit = result['hits']['hits'][0]
|
||||
doc_id_in_es = hit['_id']
|
||||
|
||||
# 构建更新请求 - 只更新存在的字段
|
||||
update_body = {"doc": {}}
|
||||
update_body["doc"]["position_int"] = position_int
|
||||
update_body["doc"]["page_num_int"] = [position_int[0][0]]
|
||||
update_body["doc"]["top_int"] = [position_int[0][3]]
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# 更新文档
|
||||
update_result = es.update(
|
||||
index=index_name,
|
||||
id=doc_id_in_es,
|
||||
body=update_body,
|
||||
refresh=True # 确保更新立即可见
|
||||
)
|
||||
|
||||
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, position, new_img_id):
|
||||
"""
|
||||
在 Elasticsearch 中更新指定文档块的position and img_id。
|
||||
|
||||
:param tenant_id: 租户 ID
|
||||
:param doc_id: 文档 ID
|
||||
:param chunk_id: 文档块 ID
|
||||
:param new_img_id: 新的 img_id
|
||||
:param position: 位置信息
|
||||
:return: 更新结果
|
||||
"""
|
||||
try:
|
||||
|
||||
# 构建索引名称
|
||||
index_name = f"ragflow_{tenant_id}"
|
||||
|
||||
# 构建查询条件
|
||||
query = {
|
||||
"bool": {
|
||||
"must": [
|
||||
{"term": {"doc_id": doc_id}},
|
||||
{"term": {"_id": chunk_id}}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
# 搜索目标文档
|
||||
result = es.search(index=index_name, body={"query": query})
|
||||
|
||||
# 检查是否找到目标文档
|
||||
if result['hits']['total']['value'] == 0:
|
||||
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
|
||||
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
|
||||
|
||||
# 获取目标文档的 ID
|
||||
hit = result['hits']['hits'][0]
|
||||
doc_id_in_es = hit['_id']
|
||||
|
||||
# 构建更新请求 - 只更新存在的字段
|
||||
update_body = {"doc": {}}
|
||||
|
||||
#只有当 new_img_id 存在时才更新 img_id
|
||||
if new_img_id is not None:
|
||||
update_body["doc"]["img_id"] = new_img_id
|
||||
|
||||
# 只有当 position 存在时才更新 positions
|
||||
if position is not None:
|
||||
|
||||
update_body["doc"]["positions"] = position
|
||||
|
||||
|
||||
# 如果没有需要更新的字段,直接返回成功
|
||||
if not update_body["doc"]:
|
||||
print("没有需要更新的字段")
|
||||
return {"code": 0, "message": "No fields to update"}
|
||||
|
||||
# 更新文档
|
||||
update_result = es.update(
|
||||
index=index_name,
|
||||
id=doc_id_in_es,
|
||||
body=update_body,
|
||||
refresh=True # 确保更新立即可见
|
||||
)
|
||||
|
||||
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
|
||||
|
||||
# 验证更新
|
||||
verify_doc = es.get(index=index_name, id=doc_id_in_es)
|
||||
|
||||
# 检查 img_id 是否已更新(如果提供了 new_img_id)
|
||||
img_id_updated = True
|
||||
if new_img_id is not None:
|
||||
img_id_updated = verify_doc['_source'].get('img_id') == new_img_id
|
||||
if img_id_updated:
|
||||
print(f"成功更新 img_id 为: {new_img_id}")
|
||||
else:
|
||||
print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}")
|
||||
|
||||
# 检查 position 是否已更新(如果提供了 position)
|
||||
position_updated = True
|
||||
if position is not None:
|
||||
position_updated = verify_doc['_source'].get('positions') == position
|
||||
if position_updated:
|
||||
print(f"成功更新 position 为: {position}")
|
||||
else:
|
||||
print(f"更新验证失败,当前 position: {verify_doc['_source'].get('positions')}")
|
||||
|
||||
# 统一返回结果
|
||||
if img_id_updated and position_updated:
|
||||
return {"code": 0, "message": ""}
|
||||
else:
|
||||
return {"code": 100, "message": "Failed to verify update"}
|
||||
|
||||
|
||||
except Exception as e:
|
||||
print(f"更新 Elasticsearch 时发生错误: {str(e)}")
|
||||
return {"code": 101, "message": f"Error updating img_id: {str(e)}"}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# 示例调用 - 列出特定文档的所有 chunks
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
print(es.info())
|
||||
except Exception as e:
|
||||
print("连接失败:", e)
|
||||
|
||||
|
||||
# 单位电脑
|
||||
tenant_id = "d669205e57a211f0b9e7324e7f243034"
|
||||
new_img_id ="10345832587311f0919f3a2728512a4b-bd04866cd05337281"
|
||||
doc_id="ea8d75966df811f0925ac6e8db75f472"
|
||||
chunk_id="4a4927560a7e6d80"
|
||||
# 添加以下代码来检查索引映射
|
||||
# mapping_result = get_index_mapping(tenant_id)
|
||||
# print("Positions field mapping:", mapping_result["data"][f"ragflow_{tenant_id}"]["mappings"]["properties"]["positions"])
|
||||
|
||||
|
||||
|
||||
|
||||
# 左,右 -->
|
||||
#上, 下| 上面最小,下面最大
|
||||
|
||||
|
||||
|
||||
pos = [[4, 0, 100, 200, 510]]
|
||||
#pos_string = json.dumps(pos) # 转换为 JSON 字符串
|
||||
update_positon_in_elasticsearch(tenant_id, doc_id, chunk_id, pos)
|
||||
|
||||
|
||||
#update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, pos, "")
|
@@ -9,6 +9,10 @@ import tempfile
|
||||
from elasticsearch import Elasticsearch
|
||||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
from find_text_in_pdf_enhanced import find_text_in_pdf
|
||||
import time
|
||||
|
||||
|
||||
|
||||
|
||||
from dotenv import load_dotenv # 新增
|
||||
@@ -45,17 +49,161 @@ MINIO_CONFIG = {
|
||||
"secure": False
|
||||
}
|
||||
|
||||
def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id):
|
||||
from elasticsearch.helpers import bulk
|
||||
|
||||
def bulk_update_elasticsearch(tenant_id, updates):
|
||||
"""
|
||||
在 Elasticsearch 中更新指定文档块的 img_id。
|
||||
批量更新Elasticsearch中的文档
|
||||
|
||||
:param tenant_id: 租户ID
|
||||
:param updates: 更新信息列表,每个元素包含doc_id, chunk_id, positions, new_img_id
|
||||
:return: 更新结果
|
||||
"""
|
||||
try:
|
||||
index_name = f"ragflow_{tenant_id}"
|
||||
|
||||
# 构建批量操作列表
|
||||
actions = []
|
||||
|
||||
for update_info in updates:
|
||||
doc_id = update_info['doc_id']
|
||||
chunk_id = update_info['chunk_id']
|
||||
positions = update_info.get('positions', [])
|
||||
new_img_id = update_info.get('new_img_id')
|
||||
|
||||
# 构建查询条件来找到文档
|
||||
query = {
|
||||
"bool": {
|
||||
"must": [
|
||||
{"term": {"doc_id": doc_id}},
|
||||
{"term": {"_id": chunk_id}}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
# 搜索目标文档
|
||||
result = es.search(index=index_name, body={"query": query})
|
||||
|
||||
# 检查是否找到目标文档
|
||||
if result['hits']['total']['value'] == 0:
|
||||
print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}")
|
||||
continue
|
||||
|
||||
# 获取目标文档的 ID
|
||||
hit = result['hits']['hits'][0]
|
||||
doc_id_in_es = hit['_id']
|
||||
|
||||
# 构建更新请求 - 只更新存在的字段
|
||||
doc_update = {}
|
||||
|
||||
# 只有当 new_img_id 存在时才更新 img_id
|
||||
if new_img_id is not None:
|
||||
doc_update["img_id"] = new_img_id
|
||||
|
||||
# 只有当 positions 存在时才更新 positions
|
||||
if positions:
|
||||
position_int = []
|
||||
|
||||
for pos in positions:
|
||||
if len(pos) != 5:
|
||||
continue # Skip invalid positions
|
||||
|
||||
pn, left, right, top, bottom = pos
|
||||
# 使用元组格式,与原始RAGFlow保持一致
|
||||
position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom)))
|
||||
|
||||
if position_int:
|
||||
doc_update["position_int"] = position_int
|
||||
doc_update["page_num_int"] = [position_int[0][0]]
|
||||
doc_update["top_int"] = [position_int[0][3]]
|
||||
|
||||
# 如果没有需要更新的字段,跳过
|
||||
if not doc_update:
|
||||
print(f"没有需要更新的字段 for chunk {chunk_id}")
|
||||
continue
|
||||
|
||||
# 添加到批量操作列表
|
||||
action = {
|
||||
"_op_type": "update",
|
||||
"_index": index_name,
|
||||
"_id": doc_id_in_es,
|
||||
"doc": doc_update
|
||||
}
|
||||
actions.append(action)
|
||||
|
||||
# 执行批量更新
|
||||
if actions:
|
||||
results = bulk(es, actions, refresh=True)
|
||||
print(f"批量更新完成,成功处理 {results[0]} 个操作")
|
||||
return {"code": 0, "message": f"Successfully updated {results[0]} documents"}
|
||||
else:
|
||||
print("没有需要执行的更新操作")
|
||||
return {"code": 0, "message": "No updates to perform"}
|
||||
|
||||
except Exception as e:
|
||||
print(f"批量更新 Elasticsearch 时发生错误: {str(e)}")
|
||||
return {"code": 101, "message": f"Error in bulk update: {str(e)}"}
|
||||
|
||||
# 修改 process_pdf_txt_pairs 函数以使用批量更新
|
||||
def process_pdf_txt_pairs_bulk(pdf_dict, txt_dict, dataset):
|
||||
"""处理PDF-TXT文件对,使用批量更新提高效率"""
|
||||
# 收集所有需要更新的信息
|
||||
all_updates = []
|
||||
|
||||
for name, pdf_path in pdf_dict.items():
|
||||
display_name = os.path.basename(pdf_path)
|
||||
document = upload_or_get_document(dataset, pdf_path, display_name)
|
||||
print(f"选择的文档: {document.name},ID: {document.id}")
|
||||
if not document:
|
||||
continue
|
||||
|
||||
txt_path = txt_dict.get(name)
|
||||
if txt_path:
|
||||
chunks_info = process_txt_chunks(dataset.id, document, txt_path)
|
||||
|
||||
time.sleep(1) # 等待chunk处理完成
|
||||
if chunks_info:
|
||||
chunks_info = get_positions_from_chunk(pdf_path, chunks_info)
|
||||
|
||||
# 收集更新信息而不是立即更新
|
||||
for chunk_info in chunks_info:
|
||||
print(f"Chunk ID: {chunk_info['id']}, Text: {chunk_info['text'][:30]}..., Has Image: {chunk_info['has_image']}, Positions: {chunk_info['positions']}")
|
||||
|
||||
update_info = {
|
||||
'doc_id': document.id,
|
||||
'chunk_id': chunk_info['id'],
|
||||
'positions': chunk_info['positions']
|
||||
}
|
||||
|
||||
if chunk_info['has_image']:
|
||||
# 如果有图片,准备更新img_id
|
||||
update_info['new_img_id'] = f"{dataset.id}-{chunk_info['id']}"
|
||||
# 如果没有图片,new_img_id为None,不会更新img_id字段
|
||||
|
||||
all_updates.append(update_info)
|
||||
|
||||
# 执行批量更新
|
||||
if all_updates:
|
||||
result = bulk_update_elasticsearch(elastic_tenant_id, all_updates)
|
||||
print(f"批量更新结果: {result}")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def update_positon_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, positions, new_img_id):
|
||||
"""
|
||||
在 Elasticsearch 中更新指定文档块的position and img_id。
|
||||
|
||||
:param tenant_id: 租户 ID
|
||||
:param doc_id: 文档 ID
|
||||
:param chunk_id: 文档块 ID
|
||||
:param new_img_id: 新的 img_id
|
||||
:param position: 位置信息
|
||||
:return: 更新结果
|
||||
"""
|
||||
try:
|
||||
|
||||
# 构建索引名称
|
||||
index_name = f"ragflow_{tenant_id}"
|
||||
|
||||
@@ -81,12 +229,35 @@ def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id):
|
||||
hit = result['hits']['hits'][0]
|
||||
doc_id_in_es = hit['_id']
|
||||
|
||||
# 构建更新请求
|
||||
update_body = {
|
||||
"doc": {
|
||||
"img_id": new_img_id
|
||||
}
|
||||
}
|
||||
# 构建更新请求 - 只更新存在的字段
|
||||
update_body = {"doc": {}}
|
||||
|
||||
#只有当 new_img_id 存在时才更新 img_id
|
||||
if new_img_id is not None:
|
||||
update_body["doc"]["img_id"] = new_img_id
|
||||
|
||||
# 只有当 position 存在时才更新 positions
|
||||
if positions :
|
||||
|
||||
position_int = []
|
||||
|
||||
for pos in positions:
|
||||
if len(pos) != 5:
|
||||
continue # Skip invalid positions
|
||||
|
||||
pn, left, right, top, bottom = pos
|
||||
# 使用元组格式,与原始RAGFlow保持一致
|
||||
position_int.append((int(pn + 1), int(left), int(right), int(top), int(bottom)))
|
||||
if position_int:
|
||||
update_body["doc"]["position_int"] = position_int
|
||||
update_body["doc"]["page_num_int"] = [position_int[0][0]]
|
||||
update_body["doc"]["top_int"] = [position_int[0][3]]
|
||||
|
||||
|
||||
# 如果没有需要更新的字段,直接返回成功
|
||||
if not update_body["doc"]:
|
||||
print("没有需要更新的字段")
|
||||
return {"code": 0, "message": "No fields to update"}
|
||||
|
||||
# 更新文档
|
||||
update_result = es.update(
|
||||
@@ -98,14 +269,8 @@ def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id):
|
||||
|
||||
print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}")
|
||||
|
||||
# 验证更新
|
||||
verify_doc = es.get(index=index_name, id=doc_id_in_es)
|
||||
if verify_doc['_source'].get('img_id') == new_img_id:
|
||||
print(f"成功更新 img_id 为: {new_img_id}")
|
||||
return {"code": 0, "message": ""}
|
||||
else:
|
||||
print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}")
|
||||
return {"code": 100, "message": "Failed to verify img_id update"}
|
||||
|
||||
|
||||
|
||||
except Exception as e:
|
||||
print(f"更新 Elasticsearch 时发生错误: {str(e)}")
|
||||
@@ -295,7 +460,10 @@ def process_txt_chunks(dataset_id, document, txt_path):
|
||||
try:
|
||||
with open(txt_path, 'r', encoding='utf-8') as file:
|
||||
file_content = file.read()
|
||||
img_chunk_ids = []
|
||||
|
||||
# 使用字典列表替代三个独立的列表
|
||||
chunks_info = []
|
||||
|
||||
for num, txt_chunk in enumerate(file_content.split('\n\n')):
|
||||
if txt_chunk.strip():
|
||||
print(f"处理文本块: {txt_chunk[:30]}...")
|
||||
@@ -307,6 +475,16 @@ def process_txt_chunks(dataset_id, document, txt_path):
|
||||
clean_chunk = remove_images_from_content(txt_chunk)
|
||||
chunk = document.add_chunk(content=clean_chunk)
|
||||
|
||||
# 初始化chunk信息
|
||||
chunk_info = {
|
||||
'id': chunk.id,
|
||||
'text': chunk.content,
|
||||
'has_image': False, # 默认为False
|
||||
'img_url': img_url
|
||||
}
|
||||
|
||||
upload_success = False
|
||||
|
||||
# 判断是否为网络图片 (新增逻辑)
|
||||
if img_url.startswith(('http://', 'https://')):
|
||||
# 下载网络图片到临时文件
|
||||
@@ -321,10 +499,10 @@ def process_txt_chunks(dataset_id, document, txt_path):
|
||||
|
||||
# 上传临时文件
|
||||
if upload_file2minio(dataset_id, chunk.id, tmp_path):
|
||||
img_chunk_ids.append(chunk.id)
|
||||
# new_img_id = f"{dataset_id}-{chunk.id}"
|
||||
# print(f"网络图片 {img_url} 已下载并上传,新的 img_id: {new_img_id}")
|
||||
# update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id)
|
||||
upload_success = True
|
||||
new_img_id = f"{dataset_id}-{chunk.id}"
|
||||
print(f"网络图片 {img_url} 已下载并上传,新的 img_id: {new_img_id}")
|
||||
# update_positon_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, [], new_img_id)
|
||||
|
||||
# 删除临时文件
|
||||
os.unlink(tmp_path)
|
||||
@@ -340,23 +518,98 @@ def process_txt_chunks(dataset_id, document, txt_path):
|
||||
print(f"图片绝对路径: {img_abs_path}")
|
||||
if os.path.exists(img_abs_path):
|
||||
if upload_file2minio(dataset_id, chunk.id, img_abs_path):
|
||||
img_chunk_ids.append(chunk.id)
|
||||
# new_img_id = f"{dataset_id}-{chunk.id}"
|
||||
# print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}")
|
||||
# update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id)
|
||||
upload_success = True
|
||||
new_img_id = f"{dataset_id}-{chunk.id}"
|
||||
print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}")
|
||||
#update_positon_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, [], new_img_id)
|
||||
else:
|
||||
print(f"图片未找到: {img_abs_path},跳过。")
|
||||
|
||||
# 只有上传成功后才设置has_image为True
|
||||
if upload_success:
|
||||
chunk_info['has_image'] = True
|
||||
|
||||
chunks_info.append(chunk_info)
|
||||
else:
|
||||
print("未检测到图片链接,直接添加文本块。")
|
||||
chunk = document.add_chunk(content=txt_chunk)
|
||||
# 为无图片的chunk添加信息
|
||||
chunk_info = {
|
||||
'id': chunk.id,
|
||||
'text': chunk.content,
|
||||
'has_image': False,
|
||||
'img_url': None
|
||||
}
|
||||
chunks_info.append(chunk_info)
|
||||
|
||||
print(f"第{num+1} Chunk添加成功! ID: {chunk.id}")
|
||||
for img_chunk_id in img_chunk_ids:
|
||||
update_img_id_in_elasticsearch(elastic_tenant_id, document.id, img_chunk_id, f"{dataset_id}-{img_chunk_id}")
|
||||
|
||||
return chunks_info # 返回chunks_info
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理文本文件时出错: {txt_path},错误: {e}")
|
||||
return [] # 出错时返回空列表
|
||||
|
||||
|
||||
def get_positions_from_chunk(pdf_path, chunks_info):
|
||||
"""
|
||||
从PDF中获取文本块的位置信息
|
||||
|
||||
:param pdf_path: PDF文件路径
|
||||
:param chunks_info: 文本块信息列表,每个元素包含'id'和'text'键
|
||||
:return: 包含位置信息的列表
|
||||
"""
|
||||
try:
|
||||
# 提取所有chunk的文本内容用于批量查找
|
||||
chunk_texts = [chunk_info['text'] for chunk_info in chunks_info]
|
||||
print(f"批量查找文本块: {chunk_texts}")
|
||||
|
||||
# 使用智能模糊查找获取位置信息
|
||||
matches = find_text_in_pdf(
|
||||
pdf_path,
|
||||
chunk_texts,
|
||||
threshold=60
|
||||
)
|
||||
print(f"匹配结果: {matches}")
|
||||
|
||||
# 将位置信息与chunks_info关联,并确保数据类型正确
|
||||
for i, chunk_info in enumerate(chunks_info):
|
||||
# 确保 chunk_info 包含 'positions' 键
|
||||
if 'positions' not in chunk_info:
|
||||
chunk_info['positions'] = []
|
||||
|
||||
print(f"处理第 {i+1} 个chunk: {chunk_info['text']}")
|
||||
print(f"更新前位置: {chunk_info['positions']}")
|
||||
|
||||
if isinstance(matches, list) and i < len(matches):
|
||||
chunk_info['positions']=[mat['position_int'] for mat in matches[i] if 'position_int' in mat]
|
||||
|
||||
# # 如果matches是列表且索引有效
|
||||
# if isinstance(matches[i], dict) and 'position_int' in matches[i]:
|
||||
# chunk_info['positions'] = matches[i]['position_int']
|
||||
# print(f"更新后位置: {chunk_info['positions']}")
|
||||
# else:
|
||||
# chunk_info['positions'] = []
|
||||
# print(f"未找到有效位置信息,设置为空列表")
|
||||
else:
|
||||
chunk_info['positions'] = []
|
||||
print(f"匹配结果无效或索引越界,设置为空列表")
|
||||
|
||||
# 验证更新结果
|
||||
print("最终chunks_info状态:")
|
||||
for i, chunk_info in enumerate(chunks_info):
|
||||
print(f" Chunk {i+1}: ID={chunk_info['id']}, Positions={chunk_info['positions']}")
|
||||
|
||||
return chunks_info
|
||||
|
||||
except Exception as e:
|
||||
print(f"获取PDF文本位置信息时出错: {str(e)}")
|
||||
# 出错时为每个chunk添加空的位置信息
|
||||
for chunk_info in chunks_info:
|
||||
# 确保 chunk_info 包含 'positions' 键
|
||||
if 'positions' not in chunk_info:
|
||||
chunk_info['positions'] = []
|
||||
return chunks_info
|
||||
|
||||
|
||||
|
||||
@@ -371,10 +624,23 @@ def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset):
|
||||
|
||||
txt_path = txt_dict.get(name)
|
||||
if txt_path:
|
||||
process_txt_chunks(dataset.id,document, txt_path)
|
||||
chunks_info=process_txt_chunks(dataset.id,document, txt_path)
|
||||
|
||||
time.sleep(1)
|
||||
if chunks_info:
|
||||
chunks_info=get_positions_from_chunk(pdf_path, chunks_info)
|
||||
for chunk_info in chunks_info:
|
||||
print(f"Chunk ID: {chunk_info['id']}, Text: {chunk_info['text'][:30]}..., Has Image: {chunk_info['has_image']}, Positions: {chunk_info['positions']}")
|
||||
if chunk_info['has_image']:
|
||||
# 如果有图片,更新Elasticsearch中的img_id
|
||||
update_positon_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk_info['id'], chunk_info['positions'], f"{dataset.id}-{chunk_info['id']}")
|
||||
else:
|
||||
# 如果没有图片,仍然更新位置信息
|
||||
update_positon_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk_info['id'], chunk_info['positions'], None)
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
"""主函数,处理PDF和TXT文件对
|
||||
|
||||
dataset.id = bucket_name
|
||||
@@ -395,8 +661,8 @@ def main():
|
||||
print("未选择数据集。")
|
||||
return
|
||||
|
||||
process_pdf_txt_pairs(pdf_dict, txt_dict, dataset)
|
||||
|
||||
# 使用批量处理函数替代原来的处理函数
|
||||
process_pdf_txt_pairs_bulk(pdf_dict, txt_dict, dataset)
|
||||
|
||||
|
||||
|
||||
|
298
src/find_text_in_pdf_enhanced.py
Normal file
298
src/find_text_in_pdf_enhanced.py
Normal file
@@ -0,0 +1,298 @@
|
||||
import fitz # pymupdf
|
||||
import regex # 支持多行正则
|
||||
from rapidfuzz import fuzz
|
||||
import re
|
||||
def normalize_text(text):
|
||||
"""标准化文本,移除多余空白字符"""
|
||||
# 将换行符、制表符等替换为空格,然后合并多个空格为一个
|
||||
import re
|
||||
normalized = re.sub(r'\s+', ' ', text.strip())
|
||||
return normalized
|
||||
|
||||
|
||||
def clean_text_for_fuzzy_match(text):
|
||||
"""清理文本用于模糊匹配,移除特殊字符,只保留字母数字和空格"""
|
||||
# 移除标点符号和特殊字符,只保留字母、数字、中文字符和空格
|
||||
cleaned = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)
|
||||
# 标准化空白字符
|
||||
cleaned = re.sub(r'\s+', ' ', cleaned.strip())
|
||||
return cleaned
|
||||
def _merge_lines(lines):
|
||||
"""
|
||||
把多行文本合并成一段,同时记录每行 bbox 的并集。
|
||||
lines: list of (text, bbox)
|
||||
return: (merged_text, merged_bbox)
|
||||
"""
|
||||
if not lines:
|
||||
return "", None
|
||||
texts, bboxes = zip(*lines)
|
||||
merged_text = "\n".join(texts)
|
||||
|
||||
# 合并 bbox:取所有 bbox 的最小 x0,y0 和最大 x1,y1
|
||||
x0 = min(b[0] for b in bboxes)
|
||||
y0 = min(b[1] for b in bboxes)
|
||||
x1 = max(b[2] for b in bboxes)
|
||||
y1 = max(b[3] for b in bboxes)
|
||||
# 修改:将坐标转换为整数
|
||||
return merged_text, (int(x0), int(y0), int(x1), int(y1))
|
||||
|
||||
def _collect_lines(page):
|
||||
"""
|
||||
把一页的所有行按阅读顺序收集起来。
|
||||
return: list of (text, bbox)
|
||||
"""
|
||||
lines = []
|
||||
blocks = page.get_text("dict")["blocks"]
|
||||
for blk in blocks:
|
||||
if "lines" not in blk:
|
||||
continue
|
||||
for line in blk["lines"]:
|
||||
line_text = "".join(span["text"] for span in line["spans"])
|
||||
# 行级 bbox
|
||||
x0 = min(span["bbox"][0] for span in line["spans"])
|
||||
y0 = min(span["bbox"][1] for span in line["spans"])
|
||||
x1 = max(span["bbox"][2] for span in line["spans"])
|
||||
y1 = max(span["bbox"][3] for span in line["spans"])
|
||||
# 修改:将坐标转换为整数
|
||||
lines.append((line_text, (int(x0), int(y0), int(x1), int(y1))))
|
||||
return lines
|
||||
|
||||
def find_text_in_pdf(pdf_path,
|
||||
query, # 修改为支持list类型
|
||||
use_regex=False,
|
||||
threshold=80, # rapidfuzz 默认 0~100
|
||||
page_range=None,
|
||||
preprocess=True): # 添加预处理选项
|
||||
"""
|
||||
高级查找函数
|
||||
query: 正则表达式字符串 或 普通字符串,或它们的列表
|
||||
preprocess: 是否对文本进行预处理以提高模糊匹配准确性
|
||||
返回: list[dict] 每个 dict 含 page, bbox, matched_text
|
||||
"""
|
||||
# 处理单个查询字符串的情况
|
||||
if isinstance(query, str):
|
||||
queries = [query]
|
||||
else:
|
||||
queries = query # 假设已经是列表
|
||||
# 初始化结果列表
|
||||
batch_results = [[] for _ in queries]
|
||||
|
||||
doc = fitz.open(pdf_path)
|
||||
pages = range(len(doc)) if page_range is None else range(page_range[0]-1, page_range[1])
|
||||
|
||||
for p in pages:
|
||||
page = doc.load_page(p)
|
||||
lines = _collect_lines(page) # [(text, bbox), ...]
|
||||
if not lines:
|
||||
continue
|
||||
|
||||
full_text, _ = _merge_lines(lines) # 整页纯文本
|
||||
|
||||
# 如果启用预处理,则对整页文本进行预处理
|
||||
processed_full_text = full_text
|
||||
if preprocess and not use_regex:
|
||||
processed_full_text = clean_text_for_fuzzy_match(full_text)
|
||||
|
||||
# 一次性计算所有查询的匹配结果
|
||||
for idx ,q in enumerate(queries):
|
||||
positions = [] # 记录匹配区间在 full_text 中的起止字符索引
|
||||
results = []
|
||||
if use_regex:
|
||||
# regex 支持 (?s) 使 . 匹配换行
|
||||
pattern = regex.compile(q)
|
||||
for match in pattern.finditer(full_text):
|
||||
positions.append((match.start(), match.end(), match.group()))
|
||||
else:
|
||||
# 模糊匹配:滑动窗口(整页 vs 查询)
|
||||
# 修改:支持多个匹配结果并计算相似度分数
|
||||
potential_matches = []
|
||||
query_text = q
|
||||
# 如果启用预处理,则对查询文本也进行预处理
|
||||
if preprocess:
|
||||
query_text = clean_text_for_fuzzy_match(q)
|
||||
score = fuzz.partial_ratio(processed_full_text, query_text, score_cutoff=threshold)
|
||||
if score >= threshold:
|
||||
# 这里简单返回整页;如需精确定位,可再做二次对齐
|
||||
positions.append((0, len(full_text), full_text))
|
||||
|
||||
# query_len = len(query_text)
|
||||
# text_len = len(processed_full_text)
|
||||
|
||||
# # 优化:只在合理范围内进行滑动窗口匹配
|
||||
# # 添加早期终止机制,一旦找到足够高的匹配就停止搜索
|
||||
# best_score = 0
|
||||
# for i in range(text_len - query_len + 1):
|
||||
# window_text = processed_full_text[i:i + query_len]
|
||||
# # 优化:只处理非空文本
|
||||
# if window_text.strip():
|
||||
# # 优化:使用更快速的相似度计算方法
|
||||
# score = fuzz.partial_ratio(query_text, window_text)
|
||||
# if score >= threshold:
|
||||
# # 优化:记录当前最佳分数
|
||||
# if score > best_score:
|
||||
# best_score = score
|
||||
# potential_matches.append((i, i + query_len, window_text, score))
|
||||
# # 优化:如果找到非常高分的匹配,可以提前终止
|
||||
# if score >= 95: # 如果匹配度已经很高,可以提前结束
|
||||
# break
|
||||
|
||||
# 如果找到了潜在匹配,按分数排序并只取最高分的匹配
|
||||
# if potential_matches:
|
||||
# # 按分数降序排序
|
||||
# potential_matches.sort(key=lambda x: x[3], reverse=True)
|
||||
# # 只取分数最高的匹配
|
||||
# best_match = potential_matches[0]
|
||||
# positions.append((best_match[0], best_match[1], best_match[2]))
|
||||
|
||||
# 将字符区间映射回行
|
||||
for start, end, matched_text in positions:
|
||||
# 计算每一行在 full_text 中的起止字符偏移
|
||||
offset = 0
|
||||
matched_lines = []
|
||||
for text, bbox in lines:
|
||||
line_start = offset
|
||||
line_end = offset + len(text)
|
||||
# 检查该行是否与匹配区间有重叠 - 更严格的条件
|
||||
if line_start < end and line_end > start:
|
||||
matched_lines.append((text, bbox))
|
||||
# 修正:正确计算偏移量,包括换行符
|
||||
offset += len(text) + 1 # 加上换行符的长度
|
||||
# 修正:只有当确实匹配到文本时才添加结果
|
||||
if matched_lines:
|
||||
_, merged_bbox = _merge_lines(matched_lines)
|
||||
results.append({
|
||||
"page": p,
|
||||
"bbox": merged_bbox,
|
||||
"matched_text": matched_text,
|
||||
"position_int":[p, merged_bbox[0], merged_bbox[2], merged_bbox[1], merged_bbox[3]]
|
||||
})
|
||||
if results:
|
||||
batch_results[idx].extend(results)
|
||||
doc.close()
|
||||
return batch_results
|
||||
|
||||
def highlight_matches(pdf_path, matches, output_path="highlighted.pdf"):
|
||||
"""
|
||||
把 matches 里的 bbox 用黄色高亮写入新 PDF
|
||||
matches: find_text_in_pdf(...) 的返回值
|
||||
"""
|
||||
doc = fitz.open(pdf_path)
|
||||
for m in matches:
|
||||
page = doc.load_page(m["page"] - 1) # 0-based
|
||||
# 修改:确保坐标为整数(虽然已经是整数了,但为了保险起见)
|
||||
bbox = m["bbox"]
|
||||
rect = fitz.Rect(int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]))
|
||||
page.add_highlight_annot(rect) # 黄色高亮
|
||||
doc.save(output_path)
|
||||
doc.close()
|
||||
print(f"已保存高亮 PDF:{output_path}")
|
||||
|
||||
|
||||
|
||||
# ----------------- DEMO -----------------
|
||||
# if __name__ == "__main__":
|
||||
# pdf_path = "example.pdf"
|
||||
# # 例1:正则跨行匹配
|
||||
# query_regex = r"条款\s*\d+\.?\s*[\s\S]*?责任限制"
|
||||
# res = find_text_in_pdf(pdf_path, query_regex, use_regex=True)
|
||||
# for r in res:
|
||||
# print(r)
|
||||
|
||||
# # 例2:模糊匹配一句话
|
||||
# res2 = find_text_in_pdf(pdf_path, "这是一段可能不完全一样的文本", threshold=75)
|
||||
# for r in res2:
|
||||
# print(r)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pdf_path = 'e:\\2\\2024深化智慧城市发展推进城市全域数字化转型的指导意见.pdf'
|
||||
pdf_path = 'G:\\SynologyDrive\\大模型\\RAG\\20250805党建\\中国共产党领导干部廉洁从业若干准则.pdf'
|
||||
pdf_path ="F:\\Synology_nas\\SynologyDrive\\大模型\\RAG\\20250805党建\\中国共产党领导干部廉洁从业若干准则.pdf"
|
||||
query = [
|
||||
'''一、总体要求
|
||||
以习近平新时代中国特色社会主义思想为指导,完整、准确、全面贯彻新发展理念,统筹发展和安全,充分发挥数据的基础资源和创新引擎作用,整体性重塑智慧城市技术架构、系统性变革城市管理流程、一体化推动产城深度融合,全面提升城市全域数字化转型的整体性、系统性、协同性,不断满足人民日益增长的美好生活需要,为全面建设社会主义现代化国家提供强大动力。到2027年,全国城市全域数字化转型取得明显成效,形成一批横向打通、纵向贯通、各具特色的宜居、韧性、智慧城市,有力支撑数字中国建设。到2030年,全国城市全域数字化转型全面突破,人民群众的获得感、幸福感、安全感全面提升,涌现一批数字文明时代具有全球竞争力的中国式现代化城市。''',
|
||||
'''二、全领域推进城市数字化转型
|
||||
(一)建立城市数字化共性基础。构建统一规划、统一架构、统一标准、统一运维的城市运行和治理智能中枢,打造线上线下联动、服务管理协同的城市共性支撑平台,构建开放兼容、共性赋能、安全可靠的综合性基础环境,推进算法、模型等数字资源一体集成部署,探索建立共性组件、模块等共享协作机制。鼓励发展基于人工智能等技术的智能分析、智能调度、智能监管、辅助决策,全面支撑赋能城市数字化转型场景建设与发展。鼓励有条件的地方推进城市信息模型、时空大数据、国土空间基础信息、实景三维中国等基础平台功能整合、协同发展、应用赋能,为城市数字化转型提供统一的时空框架,因地制宜有序探索推进数字孪生城市建设,推动虚实共生、仿真推演、迭代优化的数字孪生场景落地。
|
||||
(二)培育壮大城市数字经济。深入推进数字技术与一二三产业深度融合,鼓励平台企业构建多层次产业互联网服务平台。因地制宜发展智慧农业,加快工业互联网规模化应用,推动金融、物流等生产性服务业和商贸、文旅、康养等生活性服务业数字化转型,提升“上云用数赋智”水平。深化数字化转型促进中心建设,促进城市数字化转型和大中小企业融合创新协同发展。因地制宜发展新兴数字产业,加强大数据、人工智能、区块链、先进计算、未来网络、卫星遥感、三维建模等关键数字技术在城市场景中集成应用,加快技术创新成果转化,打造具有国际竞争力的数字产业集群。培育壮大数据产业,发展一批数据商和第三方专业服务机构,提高数据要素应用支撑与服务能力。''',
|
||||
"""(三)促进新型产城融合发展。创新生产空间和生活空间融合的数字化场景,加强城市空间开发利用大数据分析,推进数字化赋能郊区新城,实现城市多中心、网络化、组团式发展。推动城市“数字更新”,加快街区、商圈等城市微单元基础设施智能化升级,探索利用数字技术创新应用场景,激发产城融合服务能级与数字活力。深化城市场景开放促进以城带产,提升产业聚合力。加速创新资源共享助力以产促城,发展虚拟园区和跨区域协同创新平台,增强城市数字经济就业吸附力。"""
|
||||
]
|
||||
|
||||
query = ["""执政党的党风关系党的生死存亡。坚决惩治和有效预防腐败,是党必须始终抓好
|
||||
的重大政治任务。党员领导干部廉洁从政是坚持以邓小平理论和“三个代表"重要思想为
|
||||
指导,深入贯彻落实科学发展观,全面贯彻党的路线方针政策的重要保障;是新时期
|
||||
从严治党,不断加强党的执政能力建设和先进性建设的重要内容;是推进改革开放和
|
||||
社会主义现代化建设的基本要求;是正确行使权力、履行职责的重要基础。
|
||||
党员领导干部必须具有共产主义远大理想和中国特色社会主义坚定信念,践行社
|
||||
会主义核心价值体系;必须坚持全心全意为人民服务的宗旨,立党为公、执政为民;
|
||||
必须在党员和人民群众中发挥表率作用,自重、自省、自警、自励;必须模范遵守党
|
||||
纪国法,清正廉洁,忠于职守,正确行使权力,始终保持职务行为的廉洁性;必须弘
|
||||
扬党的优良作风,求真务实,艰苦奋斗,密切联系群众。
|
||||
促进党员领导干部廉洁从政,必须坚持标本兼治、综合治理、惩防并举、注重预
|
||||
防的方针,按照建立健全惩治和预防腐败体系的要求,加强教育,健全制度,强化监
|
||||
督,深化改革,严肃纪律,坚持自律和他律相结合。
|
||||
""",
|
||||
"""第三条 禁止违反公共财物管理和使用的规定,假公济私、化公为私。不准有下
|
||||
列行为:
|
||||
(一)用公款报销或者支付应由个人负担的费用;
|
||||
(二)违反规定借用公款、公物或者将公款、公物借给他人;
|
||||
(三)私存私放公款;
|
||||
(四)用公款旅游或者变相用公款旅游;
|
||||
(五)用公款参与高消费娱乐、健身活动和获取各种形式的俱乐部会员资格;
|
||||
(六)违反规定用公款购买商业保险,缴纳住房公积金,滥发津贴、补贴、奖金
|
||||
等;
|
||||
(七)非法占有公共财物,或者以象征性地支付钱款等方式非法占有公共财物;
|
||||
(八)挪用或者拆借社会保障基金、住房公积金等公共资金或者其他财政资金。
|
||||
""",
|
||||
"""
|
||||
第六条禁止讲排场、比阔气、挥霍公款、铺张浪费。不准有下列行为:
|
||||
(一)在公务活动中提供或者接受超过规定标准的接待,或者超过规定标准报销
|
||||
招待费、差旅费等相关费用;
|
||||
(二)违反规定决定或者批准兴建、装修办公楼、培训中心等楼堂馆所,超标准
|
||||
配备、使用办公用房和办公用品;
|
||||
(三)擅自用公款包租、占用客房供个人使用;
|
||||
(四)违反规定配备、购买、更换、装饰或者使用小汽车;
|
||||
(五)违反规定决定或者批准用公款或者通过摊派方式举办各类庆典活动。
|
||||
第七条禁止违反规定干预和插手市场经济活动,谋取私利。不准有下列行为:
|
||||
(一)干预和插手建设工程项目承发包、土地使用权出让、政府采购、房地产开
|
||||
发与经营、矿产资源开发利用、中介机构服务等市场经济活动;
|
||||
""",
|
||||
"""第七条禁止违反规定干预和插手市场经济活动,谋取私利。不准有下列行为:
|
||||
(一)干预和插手建设工程项目承发包、土地使用权出让、政府采购、房地产开
|
||||
发与经营、矿产资源开发利用、中介机构服务等市场经济活动;
|
||||
(二)干预和插手国有企业重组改制、兼并、破产、产权交易、清产核资、资产
|
||||
评估、资产转让、重大项目投资以及其他重大经营活动等事项;
|
||||
(三)干预和插手批办各类行政许可和资金借贷等事项;
|
||||
四)干预和插手经济纠纷;
|
||||
(五)干预和插手农村集体资金、资产和资源的使用、分配、承包、租赁等事
|
||||
项。
|
||||
"""
|
||||
]
|
||||
|
||||
|
||||
|
||||
# 1. 找跨行正则匹配
|
||||
matches = find_text_in_pdf(
|
||||
pdf_path,
|
||||
query,
|
||||
threshold=60
|
||||
|
||||
)
|
||||
# 修改:正确处理二维列表结果
|
||||
# print(matches)
|
||||
# print("------------------")
|
||||
for idx,query_matches in enumerate(matches):
|
||||
print(f"第 {idx} 个查询结果:")
|
||||
print(query_matches)
|
||||
|
||||
#highlight_matches(pdf_path, query_matches, "example_highlighted.pdf")
|
||||
for m in query_matches:
|
||||
print(f"第 {m['page']} 页 匹配: {m['matched_text'][:50]}... 位置: {m['bbox']}, 位置_int: {m['position_int']}")
|
||||
print("------------------")
|
||||
|
||||
|
||||
|
||||
|
||||
# 2. 高亮并保存
|
||||
# 修改:展平二维列表用于高亮
|
||||
# flattened_matches = [match for query_matches in matches for match in query_matches]
|
||||
# highlight_matches(pdf_path, flattened_matches, "example_highlighted.pdf")
|
695
src/get_pos_pdf.py
Normal file
695
src/get_pos_pdf.py
Normal file
@@ -0,0 +1,695 @@
|
||||
import requests
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
from difflib import SequenceMatcher
|
||||
from pdfminer.pdfdocument import PDFDocument
|
||||
from pdfminer.pdfpage import PDFPage
|
||||
from pdfminer.pdfparser import PDFParser
|
||||
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
|
||||
from pdfminer.converter import PDFPageAggregator
|
||||
from pdfminer.layout import LAParams, LTText, LTChar, LTAnno
|
||||
|
||||
def parse_char_layout(layout):
|
||||
"""解析页面内容,一个字母一个字母的解析"""
|
||||
# bbox:
|
||||
# x0:从页面左侧到框左边缘的距离。
|
||||
# y0:从页面底部到框的下边缘的距离。
|
||||
# x1:从页面左侧到方框右边缘的距离。
|
||||
# y1:从页面底部到框的上边缘的距离
|
||||
char_list = []
|
||||
for textbox in layout:
|
||||
if isinstance(textbox, LTText):
|
||||
for line in textbox:
|
||||
for char in line:
|
||||
# If the char is a line-break or an empty space, the word is complete
|
||||
if isinstance(char, LTAnno):
|
||||
char_info = {
|
||||
'x': char.bbox[0] if hasattr(char, 'bbox') else 0,
|
||||
'y': char.bbox[3] if hasattr(char, 'bbox') else 0,
|
||||
'char': char.get_text()
|
||||
}
|
||||
char_list.append(char_info)
|
||||
elif isinstance(char, LTChar):
|
||||
char_info = {
|
||||
'x': char.bbox[0],
|
||||
'y': char.bbox[3],
|
||||
'char': char.get_text()
|
||||
}
|
||||
char_list.append(char_info)
|
||||
return char_list
|
||||
|
||||
def normalize_text(text):
|
||||
"""标准化文本,移除多余空白字符"""
|
||||
# 将换行符、制表符等替换为空格,然后合并多个空格为一个
|
||||
import re
|
||||
normalized = re.sub(r'\s+', ' ', text.strip())
|
||||
return normalized
|
||||
|
||||
|
||||
def clean_text_for_fuzzy_match(text):
|
||||
"""清理文本用于模糊匹配,移除特殊字符,只保留字母数字和空格"""
|
||||
# 移除标点符号和特殊字符,只保留字母、数字、中文字符和空格
|
||||
cleaned = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)
|
||||
# 标准化空白字符
|
||||
cleaned = re.sub(r'\s+', ' ', cleaned.strip())
|
||||
return cleaned
|
||||
def find_fuzzy_text_positions_batch(pdf_path, target_texts, similarity_threshold=0.8):
|
||||
"""
|
||||
在PDF中批量模糊查找指定文本并返回坐标
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
similarity_threshold (float): 相似度阈值 (0-1),默认0.8
|
||||
|
||||
Returns:
|
||||
list: 每个元素是一个列表,包含匹配文本坐标信息
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果列表
|
||||
batch_results = [[] for _ in target_texts]
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
# 处理每一页
|
||||
pages_chars = []
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
pages_chars.append((page_num, char_list))
|
||||
|
||||
# 预处理所有页面的文本
|
||||
pages_cleaned_text = []
|
||||
for page_num, char_list in pages_chars:
|
||||
page_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
cleaned_page_text = clean_text_for_fuzzy_match(page_text)
|
||||
pages_cleaned_text.append((page_num, cleaned_page_text, char_list))
|
||||
|
||||
# 为每个目标文本进行查找
|
||||
for idx, target_text in enumerate(target_texts):
|
||||
# 清理目标文本
|
||||
cleaned_target = clean_text_for_fuzzy_match(target_text)
|
||||
target_len = len(cleaned_target)
|
||||
|
||||
if target_len == 0:
|
||||
continue
|
||||
|
||||
found_positions = []
|
||||
|
||||
# 在每一页中查找
|
||||
for page_num, cleaned_page_text, char_list in pages_cleaned_text:
|
||||
# 滑动窗口查找相似文本
|
||||
matches = []
|
||||
for i in range(len(cleaned_page_text) - target_len + 1):
|
||||
window_text = cleaned_page_text[i:i + target_len]
|
||||
similarity = SequenceMatcher(None, cleaned_target, window_text).ratio()
|
||||
|
||||
if similarity >= similarity_threshold:
|
||||
# 找到匹配项,记录位置和相似度
|
||||
if i < len(char_list):
|
||||
matches.append({
|
||||
'start_idx': i,
|
||||
'end_idx': min(i + target_len - 1, len(char_list) - 1),
|
||||
'similarity': similarity
|
||||
})
|
||||
|
||||
# 合并相邻的匹配块
|
||||
if matches:
|
||||
# 按起始位置排序
|
||||
matches.sort(key=lambda x: x['start_idx'])
|
||||
|
||||
# 合并相邻或重叠的匹配块
|
||||
merged_matches = []
|
||||
current_match = matches[0].copy() # 创建副本
|
||||
|
||||
for i in range(1, len(matches)):
|
||||
next_match = matches[i]
|
||||
# 如果下一个匹配块与当前块相邻或重叠,则合并
|
||||
# 判断条件:下一个块的起始位置 <= 当前块的结束位置 + 一些缓冲距离
|
||||
if next_match['start_idx'] <= current_match['end_idx'] + min(target_len, 10):
|
||||
# 合并索引范围
|
||||
current_match['start_idx'] = min(current_match['start_idx'], next_match['start_idx'])
|
||||
current_match['end_idx'] = max(current_match['end_idx'], next_match['end_idx'])
|
||||
# 计算加权平均相似度
|
||||
total_length = (current_match['end_idx'] - current_match['start_idx'] + 1) + \
|
||||
(next_match['end_idx'] - next_match['start_idx'] + 1)
|
||||
current_match['similarity'] = (
|
||||
current_match['similarity'] * (current_match['end_idx'] - current_match['start_idx'] + 1) +
|
||||
next_match['similarity'] * (next_match['end_idx'] - next_match['start_idx'] + 1)
|
||||
) / total_length
|
||||
else:
|
||||
# 不相邻,保存当前块,开始新的块
|
||||
merged_matches.append(current_match)
|
||||
current_match = next_match.copy() # 创建副本
|
||||
|
||||
# 添加最后一个块
|
||||
merged_matches.append(current_match)
|
||||
|
||||
# 为每个合并后的块生成坐标信息
|
||||
for match in merged_matches:
|
||||
start_idx = match['start_idx']
|
||||
end_idx = match['end_idx']
|
||||
|
||||
if start_idx < len(char_list) and end_idx < len(char_list):
|
||||
# 获取匹配区域的所有字符
|
||||
matched_chars = char_list[start_idx:end_idx+1]
|
||||
|
||||
# 过滤掉坐标为0的字符(通常是特殊字符)
|
||||
valid_chars = [char for char in matched_chars
|
||||
if char['x'] > 0 and char['y'] > 0]
|
||||
|
||||
# 如果没有有效字符,则使用所有字符
|
||||
chars_to_use = valid_chars if valid_chars else matched_chars
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
if chars_to_use:
|
||||
# 计算边界值
|
||||
left = min([char['x'] for char in chars_to_use])
|
||||
right = max([char['x'] for char in chars_to_use])
|
||||
bottom = min([char['y'] for char in chars_to_use])
|
||||
top = max([char['y'] for char in chars_to_use])
|
||||
|
||||
# 获取匹配的文本内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in chars_to_use])
|
||||
|
||||
# 只有当边界框有效时才添加结果
|
||||
if left >= 0 and right > left and top > bottom:
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
matched_text, # 添加匹配的内容
|
||||
match['similarity'] # 添加相似度信息
|
||||
]
|
||||
found_positions.append(position)
|
||||
|
||||
batch_results[idx] = found_positions
|
||||
|
||||
return batch_results
|
||||
"""
|
||||
在PDF中批量模糊查找指定文本并返回坐标
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
similarity_threshold (float): 相似度阈值 (0-1),默认0.8
|
||||
|
||||
Returns:
|
||||
dict: 以target_text为键,包含匹配文本坐标信息列表为值的字典
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果字典
|
||||
batch_results = {text: [] for text in target_texts}
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
# 处理每一页
|
||||
pages_chars = []
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
pages_chars.append((page_num, char_list))
|
||||
|
||||
# 预处理所有页面的文本
|
||||
pages_cleaned_text = []
|
||||
for page_num, char_list in pages_chars:
|
||||
page_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
cleaned_page_text = clean_text_for_fuzzy_match(page_text)
|
||||
pages_cleaned_text.append((page_num, cleaned_page_text, char_list))
|
||||
|
||||
# 为每个目标文本进行查找
|
||||
for target_text in target_texts:
|
||||
# 清理目标文本
|
||||
cleaned_target = clean_text_for_fuzzy_match(target_text)
|
||||
target_len = len(cleaned_target)
|
||||
|
||||
if target_len == 0:
|
||||
continue
|
||||
|
||||
found_positions = []
|
||||
|
||||
# 在每一页中查找
|
||||
for page_num, cleaned_page_text, char_list in pages_cleaned_text:
|
||||
# 滑动窗口查找相似文本
|
||||
matches = []
|
||||
for i in range(len(cleaned_page_text) - target_len + 1):
|
||||
window_text = cleaned_page_text[i:i + target_len]
|
||||
similarity = SequenceMatcher(None, cleaned_target, window_text).ratio()
|
||||
|
||||
if similarity >= similarity_threshold:
|
||||
# 找到匹配项,记录位置和相似度
|
||||
if i < len(char_list):
|
||||
matches.append({
|
||||
'start_idx': i,
|
||||
'end_idx': min(i + target_len - 1, len(char_list) - 1),
|
||||
'similarity': similarity
|
||||
})
|
||||
|
||||
# 合并相邻的匹配块
|
||||
if matches:
|
||||
# 按起始位置排序
|
||||
matches.sort(key=lambda x: x['start_idx'])
|
||||
|
||||
# 合并相邻或重叠的匹配块
|
||||
merged_matches = []
|
||||
current_match = matches[0].copy() # 创建副本
|
||||
|
||||
for i in range(1, len(matches)):
|
||||
next_match = matches[i]
|
||||
# 如果下一个匹配块与当前块相邻或重叠,则合并
|
||||
# 判断条件:下一个块的起始位置 <= 当前块的结束位置 + 一些缓冲距离
|
||||
if next_match['start_idx'] <= current_match['end_idx'] + min(target_len, 10):
|
||||
# 合并索引范围
|
||||
current_match['start_idx'] = min(current_match['start_idx'], next_match['start_idx'])
|
||||
current_match['end_idx'] = max(current_match['end_idx'], next_match['end_idx'])
|
||||
# 计算加权平均相似度
|
||||
total_length = (current_match['end_idx'] - current_match['start_idx'] + 1) + \
|
||||
(next_match['end_idx'] - next_match['start_idx'] + 1)
|
||||
current_match['similarity'] = (
|
||||
current_match['similarity'] * (current_match['end_idx'] - current_match['start_idx'] + 1) +
|
||||
next_match['similarity'] * (next_match['end_idx'] - next_match['start_idx'] + 1)
|
||||
) / total_length
|
||||
else:
|
||||
# 不相邻,保存当前块,开始新的块
|
||||
merged_matches.append(current_match)
|
||||
current_match = next_match.copy() # 创建副本
|
||||
|
||||
# 添加最后一个块
|
||||
merged_matches.append(current_match)
|
||||
|
||||
# 为每个合并后的块生成坐标信息
|
||||
for match in merged_matches:
|
||||
start_idx = match['start_idx']
|
||||
end_idx = match['end_idx']
|
||||
|
||||
if start_idx < len(char_list) and end_idx < len(char_list):
|
||||
# 获取匹配区域的所有字符
|
||||
matched_chars = char_list[start_idx:end_idx+1]
|
||||
|
||||
# 过滤掉坐标为0的字符(通常是特殊字符)
|
||||
valid_chars = [char for char in matched_chars
|
||||
if char['x'] > 0 and char['y'] > 0]
|
||||
|
||||
# 如果没有有效字符,则使用所有字符
|
||||
chars_to_use = valid_chars if valid_chars else matched_chars
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
if chars_to_use:
|
||||
# 计算边界值
|
||||
left = min([char['x'] for char in chars_to_use])
|
||||
right = max([char['x'] for char in chars_to_use])
|
||||
bottom = min([char['y'] for char in chars_to_use])
|
||||
top = max([char['y'] for char in chars_to_use])
|
||||
|
||||
# 获取匹配的文本内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in chars_to_use])
|
||||
|
||||
# 只有当边界框有效时才添加结果
|
||||
if left >= 0 and right > left and top > bottom:
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
matched_text, # 添加匹配的内容
|
||||
match['similarity'] # 添加相似度信息
|
||||
]
|
||||
found_positions.append(position)
|
||||
|
||||
batch_results[target_text] = found_positions
|
||||
|
||||
return batch_results
|
||||
def find_text_positions_batch(pdf_path, target_texts):
|
||||
"""
|
||||
在PDF中批量查找指定文本并返回坐标
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
|
||||
Returns:
|
||||
list: 每个元素是一个列表,包含匹配文本坐标信息
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果列表
|
||||
batch_results = [[] for _ in target_texts]
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
all_chars = [] # 存储所有页面的字符
|
||||
page_start_indices = [] # 存储每页开始的索引
|
||||
|
||||
# 处理每一页并收集所有字符
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
page_start_indices.append(len(all_chars))
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
all_chars.extend(char_list)
|
||||
|
||||
# 将所有字符组合成文本并标准化
|
||||
full_text = ''.join([char_info['char'] for char_info in all_chars])
|
||||
normalized_full_text = normalize_text(full_text)
|
||||
|
||||
# 为每个目标文本查找位置
|
||||
for idx, target_text in enumerate(target_texts):
|
||||
# 标准化目标文本
|
||||
normalized_target = normalize_text(target_text)
|
||||
|
||||
found_positions = []
|
||||
start = 0
|
||||
while True:
|
||||
pos = normalized_full_text.find(normalized_target, start)
|
||||
if pos == -1:
|
||||
break
|
||||
|
||||
# 找到匹配项,获取对应的坐标信息
|
||||
if pos < len(all_chars):
|
||||
start_char = all_chars[pos]
|
||||
end_pos = pos + len(normalized_target) - 1
|
||||
if end_pos < len(all_chars):
|
||||
end_char = all_chars[end_pos]
|
||||
# 确定在哪一页
|
||||
page_num = 1
|
||||
for i, page_start in enumerate(page_start_indices):
|
||||
if pos >= page_start:
|
||||
page_num = i + 1
|
||||
|
||||
# 获取匹配的文本内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in all_chars[pos:pos+len(normalized_target)]])
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
left = min(start_char['x'], end_char['x'])
|
||||
right = max(start_char['x'], end_char['x'])
|
||||
bottom = min(start_char['y'], end_char['y'])
|
||||
top = max(start_char['y'], end_char['y'])
|
||||
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
]
|
||||
found_positions.append(position)
|
||||
|
||||
start = pos + 1
|
||||
|
||||
batch_results[idx] = found_positions
|
||||
|
||||
return batch_results
|
||||
|
||||
def find_text_in_pdf_per_page_batch(pdf_path, target_texts):
|
||||
"""
|
||||
在PDF中逐页批量查找指定文本并返回坐标
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
|
||||
Returns:
|
||||
list: 每个元素是一个列表,包含匹配文本坐标信息
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果列表
|
||||
batch_results = [[] for _ in target_texts]
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
# 处理每一页
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
|
||||
# 将页面字符组合成文本并标准化
|
||||
page_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
normalized_page_text = normalize_text(page_text)
|
||||
|
||||
# 预处理所有目标文本
|
||||
normalized_targets = [normalize_text(text) for text in target_texts]
|
||||
|
||||
# 为每个目标文本在当前页查找
|
||||
for idx, normalized_target in enumerate(normalized_targets):
|
||||
# 在页面文本中查找目标文本
|
||||
pos = normalized_page_text.find(normalized_target)
|
||||
if pos != -1:
|
||||
# 找到匹配项,获取对应的坐标信息
|
||||
if pos < len(char_list):
|
||||
start_char = char_list[pos]
|
||||
end_pos = pos + len(normalized_target) - 1
|
||||
if end_pos < len(char_list):
|
||||
end_char = char_list[end_pos]
|
||||
|
||||
# 获取匹配的文本内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in char_list[pos:pos+len(normalized_target)]])
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
left = min(start_char['x'], end_char['x'])
|
||||
right = max(start_char['x'], end_char['x'])
|
||||
bottom = min(start_char['y'], end_char['y'])
|
||||
top = max(start_char['y'], end_char['y'])
|
||||
|
||||
position = [
|
||||
int(page_num),
|
||||
int(left), # left
|
||||
int(right), # right
|
||||
int(top), # top
|
||||
int(bottom), # bottom
|
||||
]
|
||||
batch_results[idx].append(position)
|
||||
|
||||
return batch_results
|
||||
|
||||
def find_partial_text_positions_batch(pdf_path, target_texts, min_match_ratio=0.7):
|
||||
"""
|
||||
批量查找部分匹配的文本(适用于较长的文本)
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
min_match_ratio (float): 最小匹配比例 (0-1)
|
||||
|
||||
Returns:
|
||||
list: 每个元素是一个列表,包含匹配文本坐标信息
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果列表
|
||||
batch_results = [[] for _ in target_texts]
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
# 处理每一页
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
|
||||
# 将页面字符组合成文本并标准化
|
||||
page_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
normalized_page_text = normalize_text(page_text)
|
||||
|
||||
# 预处理所有目标文本
|
||||
normalized_targets = []
|
||||
keywords_list = []
|
||||
for target_text in target_texts:
|
||||
normalized_target = normalize_text(target_text)
|
||||
# 提取关键词(移除常见停用词后的词)
|
||||
keywords = [word for word in normalized_target.split() if len(word) > 2]
|
||||
|
||||
if not keywords:
|
||||
keywords = normalized_target.split() # 如果没有长词,则使用所有词
|
||||
|
||||
normalized_targets.append(normalized_target)
|
||||
keywords_list.append(keywords if keywords else [])
|
||||
|
||||
# 为每个目标文本计算匹配
|
||||
for idx, (normalized_target, keywords) in enumerate(zip(normalized_targets, keywords_list)):
|
||||
if not keywords:
|
||||
continue
|
||||
|
||||
# 计算匹配的关键词数量
|
||||
matched_keywords = 0
|
||||
for keyword in keywords:
|
||||
if keyword in normalized_page_text:
|
||||
matched_keywords += 1
|
||||
|
||||
# 如果匹配的关键词比例超过阈值,则认为找到匹配
|
||||
if len(keywords) > 0 and (matched_keywords / len(keywords)) >= min_match_ratio:
|
||||
# 简单起见,返回页面第一个字符和最后一个字符的坐标
|
||||
if char_list:
|
||||
start_char = char_list[0]
|
||||
end_char = char_list[-1]
|
||||
match_ratio = matched_keywords / len(keywords)
|
||||
|
||||
# 获取页面文本作为匹配内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
left = min(start_char['x'], end_char['x'])
|
||||
right = max(start_char['x'], end_char['x'])
|
||||
bottom = min(start_char['y'], end_char['y'])
|
||||
top = max(start_char['y'], end_char['y'])
|
||||
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
]
|
||||
batch_results[idx].append(position)
|
||||
|
||||
return batch_results
|
||||
|
||||
def smart_fuzzy_find_text_batch(pdf_path, target_texts, similarity_threshold=0.8):
|
||||
"""
|
||||
智能批量模糊文本查找,结合多种方法
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
similarity_threshold (float): 相似度阈值
|
||||
|
||||
Returns:
|
||||
list: 每个元素是一个列表,包含匹配文本坐标信息
|
||||
"""
|
||||
# 初始化结果列表
|
||||
batch_results = [[] for _ in target_texts]
|
||||
|
||||
# 方法1: 精确匹配
|
||||
exact_results = find_text_in_pdf_per_page_batch(pdf_path, target_texts)
|
||||
|
||||
# 对于已经找到精确匹配的文本,直接使用结果
|
||||
remaining_indices = []
|
||||
for idx, results in enumerate(exact_results):
|
||||
if results:
|
||||
batch_results[idx] = results
|
||||
else:
|
||||
remaining_indices.append(idx)
|
||||
|
||||
if not remaining_indices:
|
||||
return batch_results
|
||||
|
||||
# 构建剩余文本列表
|
||||
remaining_texts = [target_texts[idx] for idx in remaining_indices]
|
||||
|
||||
# 方法2: 模糊匹配(仅对未找到精确匹配的文本)
|
||||
fuzzy_results = find_fuzzy_text_positions_batch(pdf_path, remaining_texts, similarity_threshold)
|
||||
|
||||
# 更新结果
|
||||
for i, idx in enumerate(remaining_indices):
|
||||
if fuzzy_results[i]:
|
||||
batch_results[idx] = fuzzy_results[i]
|
||||
remaining_indices = [ri for ri in remaining_indices if ri != idx] # 从剩余索引中移除
|
||||
|
||||
if not remaining_indices:
|
||||
return batch_results
|
||||
|
||||
# 构建剩余文本列表
|
||||
remaining_texts = [target_texts[idx] for idx in remaining_indices]
|
||||
|
||||
# 方法3: 部分匹配(关键词匹配,仅对仍未找到匹配的文本)
|
||||
partial_results = find_partial_text_positions_batch(pdf_path, remaining_texts, 0.5)
|
||||
|
||||
# 更新最终结果
|
||||
for i, idx in enumerate(remaining_indices):
|
||||
if partial_results[i]:
|
||||
batch_results[idx] = partial_results[i]
|
||||
|
||||
return batch_results
|
||||
if __name__ == '__main__':
|
||||
# 使用本地PDF文件
|
||||
pdf_file_path = 'F:\\2\\2024深化智慧城市发展推进城市全域数字化转型的指导意见.pdf' # 修改为你的PDF文件路径
|
||||
target_texts = [
|
||||
'''一、总体要求
|
||||
以习近平新时代中国特色社会主义思想为指导,完整、准确、全面贯彻新发展理念,统筹发展和安全,充分发挥数据的基础资源和创新引擎作用,整体性重塑智慧城市技术架构、系统性变革城市管理流程、一体化推动产城深度融合,全面提升城市全域数字化转型的整体性、系统性、协同性,不断满足人民日益增长的美好生活需要,为全面建设社会主义现代化国家提供强大动力。到2027年,全国城市全域数字化转型取得明显成效,形成一批横向打通、纵向贯通、各具特色的宜居、韧性、智慧城市,有力支撑数字中国建设。到2030年,全国城市全域数字化转型全面突破,人民群众的获得感、幸福感、安全感全面提升,涌现一批数字文明时代具有全球竞争力的中国式现代化城市。''',
|
||||
'''二、全领域推进城市数字化转型
|
||||
(一)建立城市数字化共性基础。构建统一规划、统一架构、统一标准、统一运维的城市运行和治理智能中枢,打造线上线下联动、服务管理协同的城市共性支撑平台,构建开放兼容、共性赋能、安全可靠的综合性基础环境,推进算法、模型等数字资源一体集成部署,探索建立共性组件、模块等共享协作机制。鼓励发展基于人工智能等技术的智能分析、智能调度、智能监管、辅助决策,全面支撑赋能城市数字化转型场景建设与发展。鼓励有条件的地方推进城市信息模型、时空大数据、国土空间基础信息、实景三维中国等基础平台功能整合、协同发展、应用赋能,为城市数字化转型提供统一的时空框架,因地制宜有序探索推进数字孪生城市建设,推动虚实共生、仿真推演、迭代优化的数字孪生场景落地。
|
||||
(二)培育壮大城市数字经济。深入推进数字技术与一二三产业深度融合,鼓励平台企业构建多层次产业互联网服务平台。因地制宜发展智慧农业,加快工业互联网规模化应用,推动金融、物流等生产性服务业和商贸、文旅、康养等生活性服务业数字化转型,提升“上云用数赋智”水平。深化数字化转型促进中心建设,促进城市数字化转型和大中小企业融合创新协同发展。因地制宜发展新兴数字产业,加强大数据、人工智能、区块链、先进计算、未来网络、卫星遥感、三维建模等关键数字技术在城市场景中集成应用,加快技术创新成果转化,打造具有国际竞争力的数字产业集群。培育壮大数据产业,发展一批数据商和第三方专业服务机构,提高数据要素应用支撑与服务能力。''',
|
||||
"""(三)促进新型产城融合发展。创新生产空间和生活空间融合的数字化场景,加强城市空间开发利用大数据分析,推进数字化赋能郊区新城,实现城市多中心、网络化、组团式发展。推动城市“数字更新”,加快街区、商圈等城市微单元基础设施智能化升级,探索利用数字技术创新应用场景,激发产城融合服务能级与数字活力。深化城市场景开放促进以城带产,提升产业聚合力。加速创新资源共享助力以产促城,发展虚拟园区和跨区域协同创新平台,增强城市数字经济就业吸附力。"""
|
||||
]
|
||||
|
||||
try:
|
||||
print("批量智能模糊查找:")
|
||||
batch_positions = smart_fuzzy_find_text_batch(pdf_file_path, target_texts, similarity_threshold=0.7)
|
||||
|
||||
# 现在 batch_positions 是一个列表,需要使用 enumerate 来同时获取索引和位置信息
|
||||
for idx, positions in enumerate(batch_positions):
|
||||
target_text = target_texts[idx]
|
||||
print(f"\n查找文本: {target_text[:50]}{'...' if len(target_text) > 50 else ''}")
|
||||
if positions:
|
||||
print(f"找到文本在以下位置:")
|
||||
for pos in positions:
|
||||
if len(pos) >= 6: # 包含匹配内容和相似度信息
|
||||
print(f"页面: {pos[0]}, 边界框: Left({pos[1]:.2f}), Right({pos[2]:.2f}), Top({pos[3]:.2f}), Bottom({pos[4]:.2f})")
|
||||
if len(pos) >= 7: # 包含相似度信息
|
||||
print(f"相似度: {pos[6]:.2f}")
|
||||
if len(pos) >= 6: # 包含匹配内容
|
||||
print(f"匹配内容: {pos[5][:50]}{'...' if len(pos[5]) > 50 else ''}")
|
||||
print("-" * 50)
|
||||
else:
|
||||
print(f"页面: {pos[0]}, 边界框: Left({pos[1]:.2f}), Right({pos[2]:.2f}), Top({pos[3]:.2f}), Bottom({pos[4]:.2f})")
|
||||
else:
|
||||
print("未找到文本")
|
||||
|
||||
except FileNotFoundError as e:
|
||||
print(e)
|
||||
except Exception as e:
|
||||
print(f"处理PDF时出错: {e}")
|
53
src/get_pos_pdf_.py
Normal file
53
src/get_pos_pdf_.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import fitz # PyMuPDF
|
||||
import difflib
|
||||
|
||||
def find_text_in_pdf_detailed(pdf_path, query_text, threshold=0.8):
|
||||
"""
|
||||
在PDF中详细查找文本,按块和行查找。
|
||||
"""
|
||||
results = []
|
||||
doc = fitz.open(pdf_path)
|
||||
|
||||
# 清理查询文本
|
||||
cleaned_query = ' '.join(query_text.split())
|
||||
print(f"查找文本: {cleaned_query[:100]}...")
|
||||
|
||||
for page_num in range(len(doc)):
|
||||
page = doc.load_page(page_num)
|
||||
blocks = page.get_text("dict")["blocks"]
|
||||
|
||||
for block in blocks:
|
||||
if "lines" not in block:
|
||||
continue
|
||||
|
||||
# 组合整个块的文本
|
||||
block_text = ""
|
||||
for line in block["lines"]:
|
||||
for span in line["spans"]:
|
||||
block_text += span["text"]
|
||||
|
||||
if block_text.strip():
|
||||
similarity = difflib.SequenceMatcher(None, cleaned_query.strip(), block_text.strip()).ratio()
|
||||
if similarity >= threshold:
|
||||
# 使用块的边界框
|
||||
bbox = block["bbox"] if "bbox" in block else None
|
||||
if bbox:
|
||||
results.append((page_num + 1, bbox))
|
||||
print(f"第 {page_num + 1} 页块匹配,相似度: {similarity:.2f}")
|
||||
elif similarity >= 0.1: # 调试输出
|
||||
print(f"第 {page_num + 1} 页块相似度: {similarity:.2f}")
|
||||
|
||||
doc.close()
|
||||
return results
|
||||
|
||||
# 示例用法
|
||||
if __name__ == "__main__":
|
||||
pdf_path = 'F:\\2\\2024深化智慧城市发展推进城市全域数字化转型的指导意见.pdf'
|
||||
query = '''一、总体要求
|
||||
以习近平新时代中国特色社会主义思想为指导,完'''
|
||||
|
||||
print("开始详细查找...")
|
||||
matches = find_text_in_pdf_detailed(pdf_path, query, threshold=0.3)
|
||||
print(f"找到 {len(matches)} 个匹配项")
|
||||
for page, bbox in matches:
|
||||
print(f"在第 {page} 页找到匹配,位置:{bbox}")
|
Reference in New Issue
Block a user