Compare commits
3 Commits
466fae53c9
...
c47ddad5f1
Author | SHA1 | Date | |
---|---|---|---|
c47ddad5f1 | |||
73557a272d | |||
44ef61daab |
532
src/get_pos_pdf.py
Normal file
532
src/get_pos_pdf.py
Normal file
@@ -0,0 +1,532 @@
|
||||
import requests
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
from difflib import SequenceMatcher
|
||||
from pdfminer.pdfdocument import PDFDocument
|
||||
from pdfminer.pdfpage import PDFPage
|
||||
from pdfminer.pdfparser import PDFParser
|
||||
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
|
||||
from pdfminer.converter import PDFPageAggregator
|
||||
from pdfminer.layout import LAParams, LTText, LTChar, LTAnno
|
||||
|
||||
def parse_char_layout(layout):
|
||||
"""解析页面内容,一个字母一个字母的解析"""
|
||||
# bbox:
|
||||
# x0:从页面左侧到框左边缘的距离。
|
||||
# y0:从页面底部到框的下边缘的距离。
|
||||
# x1:从页面左侧到方框右边缘的距离。
|
||||
# y1:从页面底部到框的上边缘的距离
|
||||
char_list = []
|
||||
for textbox in layout:
|
||||
if isinstance(textbox, LTText):
|
||||
for line in textbox:
|
||||
for char in line:
|
||||
# If the char is a line-break or an empty space, the word is complete
|
||||
if isinstance(char, LTAnno):
|
||||
char_info = {
|
||||
'x': char.bbox[0] if hasattr(char, 'bbox') else 0,
|
||||
'y': char.bbox[3] if hasattr(char, 'bbox') else 0,
|
||||
'char': char.get_text()
|
||||
}
|
||||
char_list.append(char_info)
|
||||
elif isinstance(char, LTChar):
|
||||
char_info = {
|
||||
'x': char.bbox[0],
|
||||
'y': char.bbox[3],
|
||||
'char': char.get_text()
|
||||
}
|
||||
char_list.append(char_info)
|
||||
return char_list
|
||||
|
||||
def normalize_text(text):
|
||||
"""标准化文本,移除多余空白字符"""
|
||||
# 将换行符、制表符等替换为空格,然后合并多个空格为一个
|
||||
import re
|
||||
normalized = re.sub(r'\s+', ' ', text.strip())
|
||||
return normalized
|
||||
|
||||
|
||||
def clean_text_for_fuzzy_match(text):
|
||||
"""清理文本用于模糊匹配,移除特殊字符,只保留字母数字和空格"""
|
||||
# 移除标点符号和特殊字符,只保留字母、数字、中文字符和空格
|
||||
cleaned = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)
|
||||
# 标准化空白字符
|
||||
cleaned = re.sub(r'\s+', ' ', cleaned.strip())
|
||||
return cleaned
|
||||
def find_fuzzy_text_positions_batch(pdf_path, target_texts, similarity_threshold=0.8):
|
||||
"""
|
||||
在PDF中批量模糊查找指定文本并返回坐标
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
similarity_threshold (float): 相似度阈值 (0-1),默认0.8
|
||||
|
||||
Returns:
|
||||
dict: 以target_text为键,包含匹配文本坐标信息列表为值的字典
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果字典
|
||||
batch_results = {text: [] for text in target_texts}
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
# 处理每一页
|
||||
pages_chars = []
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
pages_chars.append((page_num, char_list))
|
||||
|
||||
# 为每个目标文本进行查找
|
||||
for target_text in target_texts:
|
||||
# 清理目标文本
|
||||
cleaned_target = clean_text_for_fuzzy_match(target_text)
|
||||
target_len = len(cleaned_target)
|
||||
|
||||
if target_len == 0:
|
||||
continue
|
||||
|
||||
found_positions = []
|
||||
|
||||
# 在每一页中查找
|
||||
for page_num, char_list in pages_chars:
|
||||
# 将页面字符组合成文本
|
||||
page_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
cleaned_page_text = clean_text_for_fuzzy_match(page_text)
|
||||
|
||||
# 滑动窗口查找相似文本
|
||||
matches = []
|
||||
for i in range(len(cleaned_page_text) - target_len + 1):
|
||||
window_text = cleaned_page_text[i:i + target_len]
|
||||
similarity = SequenceMatcher(None, cleaned_target, window_text).ratio()
|
||||
|
||||
if similarity >= similarity_threshold:
|
||||
# 找到匹配项,记录位置和相似度
|
||||
if i < len(char_list):
|
||||
matches.append({
|
||||
'start_idx': i,
|
||||
'end_idx': min(i + target_len - 1, len(char_list) - 1),
|
||||
'similarity': similarity
|
||||
})
|
||||
|
||||
# 合并相邻的匹配块
|
||||
if matches:
|
||||
# 按起始位置排序
|
||||
matches.sort(key=lambda x: x['start_idx'])
|
||||
|
||||
# 合并相邻或重叠的匹配块
|
||||
merged_matches = []
|
||||
current_match = matches[0].copy() # 创建副本
|
||||
|
||||
for i in range(1, len(matches)):
|
||||
next_match = matches[i]
|
||||
# 如果下一个匹配块与当前块相邻或重叠,则合并
|
||||
# 判断条件:下一个块的起始位置 <= 当前块的结束位置 + 一些缓冲距离
|
||||
if next_match['start_idx'] <= current_match['end_idx'] + min(target_len, 10):
|
||||
# 合并索引范围
|
||||
current_match['start_idx'] = min(current_match['start_idx'], next_match['start_idx'])
|
||||
current_match['end_idx'] = max(current_match['end_idx'], next_match['end_idx'])
|
||||
# 计算加权平均相似度
|
||||
total_length = (current_match['end_idx'] - current_match['start_idx'] + 1) + \
|
||||
(next_match['end_idx'] - next_match['start_idx'] + 1)
|
||||
current_match['similarity'] = (
|
||||
current_match['similarity'] * (current_match['end_idx'] - current_match['start_idx'] + 1) +
|
||||
next_match['similarity'] * (next_match['end_idx'] - next_match['start_idx'] + 1)
|
||||
) / total_length
|
||||
else:
|
||||
# 不相邻,保存当前块,开始新的块
|
||||
merged_matches.append(current_match)
|
||||
current_match = next_match.copy() # 创建副本
|
||||
|
||||
# 添加最后一个块
|
||||
merged_matches.append(current_match)
|
||||
|
||||
# 为每个合并后的块生成坐标信息
|
||||
for match in merged_matches:
|
||||
start_idx = match['start_idx']
|
||||
end_idx = match['end_idx']
|
||||
|
||||
if start_idx < len(char_list) and end_idx < len(char_list):
|
||||
# 获取匹配区域的所有字符
|
||||
matched_chars = char_list[start_idx:end_idx+1]
|
||||
|
||||
# 过滤掉坐标为0的字符(通常是特殊字符)
|
||||
valid_chars = [char for char in matched_chars
|
||||
if char['x'] > 0 and char['y'] > 0]
|
||||
|
||||
# 如果没有有效字符,则使用所有字符
|
||||
chars_to_use = valid_chars if valid_chars else matched_chars
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
if chars_to_use:
|
||||
# 计算边界值
|
||||
left = min([char['x'] for char in chars_to_use])
|
||||
right = max([char['x'] for char in chars_to_use])
|
||||
bottom = min([char['y'] for char in chars_to_use])
|
||||
top = max([char['y'] for char in chars_to_use])
|
||||
|
||||
# 获取匹配的文本内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in chars_to_use])
|
||||
|
||||
# 只有当边界框有效时才添加结果
|
||||
if left >= 0 and right > left and top > bottom:
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
matched_text, # 添加匹配的内容
|
||||
match['similarity'] # 添加相似度信息
|
||||
]
|
||||
found_positions.append(position)
|
||||
|
||||
batch_results[target_text] = found_positions
|
||||
|
||||
return batch_results
|
||||
|
||||
def find_text_positions_batch(pdf_path, target_texts):
|
||||
"""
|
||||
在PDF中批量查找指定文本并返回坐标
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
|
||||
Returns:
|
||||
dict: 以target_text为键,包含匹配文本坐标信息列表为值的字典
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果字典
|
||||
batch_results = {text: [] for text in target_texts}
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
all_chars = [] # 存储所有页面的字符
|
||||
page_start_indices = [] # 存储每页开始的索引
|
||||
|
||||
# 处理每一页并收集所有字符
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
page_start_indices.append(len(all_chars))
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
all_chars.extend(char_list)
|
||||
|
||||
# 将所有字符组合成文本并标准化
|
||||
full_text = ''.join([char_info['char'] for char_info in all_chars])
|
||||
normalized_full_text = normalize_text(full_text)
|
||||
|
||||
# 为每个目标文本查找位置
|
||||
for target_text in target_texts:
|
||||
# 标准化目标文本
|
||||
normalized_target = normalize_text(target_text)
|
||||
|
||||
found_positions = []
|
||||
start = 0
|
||||
while True:
|
||||
pos = normalized_full_text.find(normalized_target, start)
|
||||
if pos == -1:
|
||||
break
|
||||
|
||||
# 找到匹配项,获取对应的坐标信息
|
||||
if pos < len(all_chars):
|
||||
start_char = all_chars[pos]
|
||||
end_pos = pos + len(normalized_target) - 1
|
||||
if end_pos < len(all_chars):
|
||||
end_char = all_chars[end_pos]
|
||||
# 确定在哪一页
|
||||
page_num = 1
|
||||
for i, page_start in enumerate(page_start_indices):
|
||||
if pos >= page_start:
|
||||
page_num = i + 1
|
||||
|
||||
# 获取匹配的文本内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in all_chars[pos:pos+len(normalized_target)]])
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
left = min(start_char['x'], end_char['x'])
|
||||
right = max(start_char['x'], end_char['x'])
|
||||
bottom = min(start_char['y'], end_char['y'])
|
||||
top = max(start_char['y'], end_char['y'])
|
||||
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
]
|
||||
found_positions.append(position)
|
||||
|
||||
start = pos + 1
|
||||
|
||||
batch_results[target_text] = found_positions
|
||||
|
||||
return batch_results
|
||||
|
||||
def find_text_in_pdf_per_page_batch(pdf_path, target_texts):
|
||||
"""
|
||||
在PDF中逐页批量查找指定文本并返回坐标
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
|
||||
Returns:
|
||||
dict: 以target_text为键,包含匹配文本坐标信息列表为值的字典
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果字典
|
||||
batch_results = {text: [] for text in target_texts}
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
# 处理每一页
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
|
||||
# 将页面字符组合成文本并标准化
|
||||
page_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
normalized_page_text = normalize_text(page_text)
|
||||
|
||||
# 为每个目标文本在当前页查找
|
||||
for target_text in target_texts:
|
||||
normalized_target = normalize_text(target_text)
|
||||
|
||||
# 在页面文本中查找目标文本
|
||||
pos = normalized_page_text.find(normalized_target)
|
||||
if pos != -1:
|
||||
# 找到匹配项,获取对应的坐标信息
|
||||
if pos < len(char_list):
|
||||
start_char = char_list[pos]
|
||||
end_pos = pos + len(normalized_target) - 1
|
||||
if end_pos < len(char_list):
|
||||
end_char = char_list[end_pos]
|
||||
|
||||
# 获取匹配的文本内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in char_list[pos:pos+len(normalized_target)]])
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
left = min(start_char['x'], end_char['x'])
|
||||
right = max(start_char['x'], end_char['x'])
|
||||
bottom = min(start_char['y'], end_char['y'])
|
||||
top = max(start_char['y'], end_char['y'])
|
||||
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
]
|
||||
batch_results[target_text].append(position)
|
||||
|
||||
return batch_results
|
||||
|
||||
def find_partial_text_positions_batch(pdf_path, target_texts, min_match_ratio=0.7):
|
||||
"""
|
||||
批量查找部分匹配的文本(适用于较长的文本)
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
min_match_ratio (float): 最小匹配比例 (0-1)
|
||||
|
||||
Returns:
|
||||
dict: 以target_text为键,包含匹配文本坐标信息列表为值的字典
|
||||
"""
|
||||
if not os.path.exists(pdf_path):
|
||||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||||
|
||||
# 初始化结果字典
|
||||
batch_results = {text: [] for text in target_texts}
|
||||
|
||||
# 打开本地PDF文件
|
||||
with open(pdf_path, 'rb') as fp:
|
||||
parser = PDFParser(fp)
|
||||
doc = PDFDocument(parser)
|
||||
|
||||
rsrcmgr = PDFResourceManager()
|
||||
laparams = LAParams()
|
||||
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
|
||||
interpreter = PDFPageInterpreter(rsrcmgr, device)
|
||||
|
||||
# 处理每一页
|
||||
for page_num, page in enumerate(PDFPage.create_pages(doc), 1):
|
||||
interpreter.process_page(page)
|
||||
layout = device.get_result()
|
||||
char_list = parse_char_layout(layout)
|
||||
|
||||
# 将页面字符组合成文本并标准化
|
||||
page_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
normalized_page_text = normalize_text(page_text)
|
||||
|
||||
# 为每个目标文本计算匹配
|
||||
for target_text in target_texts:
|
||||
# 将目标文本分割成关键词或短语
|
||||
normalized_target = normalize_text(target_text)
|
||||
# 提取关键词(移除常见停用词后的词)
|
||||
keywords = [word for word in normalized_target.split() if len(word) > 2]
|
||||
|
||||
if not keywords:
|
||||
keywords = normalized_target.split() # 如果没有长词,则使用所有词
|
||||
|
||||
if not keywords:
|
||||
continue
|
||||
|
||||
# 计算匹配的关键词数量
|
||||
matched_keywords = 0
|
||||
for keyword in keywords:
|
||||
if keyword in normalized_page_text:
|
||||
matched_keywords += 1
|
||||
|
||||
# 如果匹配的关键词比例超过阈值,则认为找到匹配
|
||||
if len(keywords) > 0 and (matched_keywords / len(keywords)) >= min_match_ratio:
|
||||
# 简单起见,返回页面第一个字符和最后一个字符的坐标
|
||||
if char_list:
|
||||
start_char = char_list[0]
|
||||
end_char = char_list[-1]
|
||||
match_ratio = matched_keywords / len(keywords)
|
||||
|
||||
# 获取页面文本作为匹配内容
|
||||
matched_text = ''.join([char_info['char'] for char_info in char_list])
|
||||
|
||||
# 计算边界框 (left, right, top, bottom)
|
||||
left = min(start_char['x'], end_char['x'])
|
||||
right = max(start_char['x'], end_char['x'])
|
||||
bottom = min(start_char['y'], end_char['y'])
|
||||
top = max(start_char['y'], end_char['y'])
|
||||
|
||||
position = [
|
||||
page_num,
|
||||
left, # left
|
||||
right, # right
|
||||
top, # top
|
||||
bottom, # bottom
|
||||
]
|
||||
batch_results[target_text].append(position)
|
||||
|
||||
return batch_results
|
||||
|
||||
def smart_fuzzy_find_text_batch(pdf_path, target_texts, similarity_threshold=0.8):
|
||||
"""
|
||||
智能批量模糊文本查找,结合多种方法
|
||||
|
||||
Args:
|
||||
pdf_path (str): PDF文件路径
|
||||
target_texts (list): 要查找的文本列表
|
||||
similarity_threshold (float): 相似度阈值
|
||||
|
||||
Returns:
|
||||
dict: 以target_text为键,包含匹配文本坐标信息列表为值的字典
|
||||
"""
|
||||
# 初始化结果字典
|
||||
batch_results = {text: [] for text in target_texts}
|
||||
|
||||
# 方法1: 精确匹配
|
||||
exact_results = find_text_in_pdf_per_page_batch(pdf_path, target_texts)
|
||||
|
||||
# 对于已经找到精确匹配的文本,直接使用结果
|
||||
remaining_texts = []
|
||||
for text in target_texts:
|
||||
if exact_results.get(text):
|
||||
batch_results[text] = exact_results[text]
|
||||
else:
|
||||
remaining_texts.append(text)
|
||||
|
||||
if not remaining_texts:
|
||||
return batch_results
|
||||
|
||||
# 方法2: 模糊匹配(仅对未找到精确匹配的文本)
|
||||
fuzzy_results = find_fuzzy_text_positions_batch(pdf_path, remaining_texts, similarity_threshold)
|
||||
|
||||
# 更新结果
|
||||
for text in remaining_texts:
|
||||
if fuzzy_results.get(text):
|
||||
batch_results[text] = fuzzy_results[text]
|
||||
remaining_texts = [t for t in remaining_texts if t != text] # 从剩余文本中移除
|
||||
|
||||
if not remaining_texts:
|
||||
return batch_results
|
||||
|
||||
# 方法3: 部分匹配(关键词匹配,仅对仍未找到匹配的文本)
|
||||
partial_results = find_partial_text_positions_batch(pdf_path, remaining_texts, 0.5)
|
||||
|
||||
# 更新最终结果
|
||||
for text in remaining_texts:
|
||||
if partial_results.get(text):
|
||||
batch_results[text] = partial_results[text]
|
||||
|
||||
return batch_results
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 使用本地PDF文件
|
||||
pdf_file_path = 'F:\\gitea\\ragflow_api_test\\ai协作方式.pdf' # 修改为你的PDF文件路径
|
||||
target_texts = [
|
||||
'''创建 `plan` 文件: 固化和锁定最终的"怎么做"
|
||||
• 基于 `plan` 执行: 精准驱动 AI 完成任务''',
|
||||
"其他要查找的文本1",
|
||||
"其他要查找的文本2"
|
||||
]
|
||||
|
||||
try:
|
||||
print("批量智能模糊查找:")
|
||||
batch_positions = smart_fuzzy_find_text_batch(pdf_file_path, target_texts, similarity_threshold=0.7)
|
||||
|
||||
for target_text, positions in batch_positions.items():
|
||||
print(f"\n查找文本: {target_text[:50]}{'...' if len(target_text) > 50 else ''}")
|
||||
if positions:
|
||||
print(f"找到文本在以下位置:")
|
||||
for pos in positions:
|
||||
if len(pos) >= 6: # 包含匹配内容和相似度信息
|
||||
print(f"页面: {pos[0]}, 边界框: Left({pos[1]:.2f}), Right({pos[2]:.2f}), Top({pos[3]:.2f}), Bottom({pos[4]:.2f})")
|
||||
if len(pos) >= 7: # 包含相似度信息
|
||||
print(f"相似度: {pos[6]:.2f}")
|
||||
if len(pos) >= 6: # 包含匹配内容
|
||||
print(f"匹配内容: {pos[5][:50]}{'...' if len(pos[5]) > 50 else ''}")
|
||||
print("-" * 50)
|
||||
else:
|
||||
print(f"页面: {pos[0]}, 边界框: Left({pos[1]:.2f}), Right({pos[2]:.2f}), Top({pos[3]:.2f}), Bottom({pos[4]:.2f})")
|
||||
else:
|
||||
print("未找到文本")
|
||||
|
||||
except FileNotFoundError as e:
|
||||
print(e)
|
||||
except Exception as e:
|
||||
print(f"处理PDF时出错: {e}")
|
Reference in New Issue
Block a user