Files
ragflow_api_test/src/add_chunk_cli_pdf_img.py

370 lines
11 KiB
Python
Raw Normal View History

from ragflow_sdk import RAGFlow
import os
import re
## home
api_key = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj"
base_url = "http://127.0.0.1:8099"
## 公司内网
# base_url = "http://192.168.107.165:8099"
# api_key = "ragflow-I5ZDNjMWNhNTdlMjExZjBiOTEwMzI0ZT"
elastic_tenant_id = "9c73df5a3ebc11f08410c237296aa408"
rag_object = RAGFlow(api_key=api_key, base_url=base_url)
elastic_url = "127.0.0.1"
from elasticsearch import Elasticsearch
# 初始化 Elasticsearch 用户名elastic密码infini_rag_flow
es = Elasticsearch(
[{'host': elastic_url, 'port': 1200, 'scheme': 'http'}],
basic_auth=('elastic', 'infini_rag_flow')
)
def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id):
"""
Elasticsearch 中更新指定文档块的 img_id
如果img_id不存在则增加一个新的 img_id
:param tenant_id: 租户 ID
:param doc_id: 文档 ID
:param chunk_id: 文档块 ID
:param new_img_id: 新的 img_id
:return: 更新结果
"""
# 构建索引名称
index_name = f"ragflow_{tenant_id}" # 这里需要替换为实际的索引名称生成逻辑
# 构建查询条件
query = {
"bool": {
"must": [
{"term": {"doc_id": doc_id}},
{"term": {"_id": chunk_id}}
]
}
}
# 搜索目标文档
result = es.search(index=index_name, body={"query": query})
# 检查是否找到目标文档
if result['hits']['total']['value'] == 0:
return {"code": 102, "message": f"Can't find this chunk {chunk_id}"}
# 获取目标文档的 ID
hit = result['hits']['hits'][0]
doc_id_in_es = hit['_id']
update_body = {
"doc": {
"img_id": new_img_id
}
}
# 更新文档
update_result = es.update(index=index_name, id=doc_id_in_es, body=update_body)
print("更新结果:", update_result)
if update_result['result'] == 'updated':
return {"code": 0, "message": ""}
else:
return {"code": 100, "message": "Failed to update img_id"}
from minio import Minio
from minio.error import S3Error
MINIO_HOST="127.0.0.1"
MINIO_CONFIG = {
"endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}",
"access_key": os.getenv("MINIO_USER", "rag_flow"),
"secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"),
"secure": False
}
def get_minio_client():
"""创建MinIO客户端"""
return Minio(
endpoint=MINIO_CONFIG["endpoint"],
access_key=MINIO_CONFIG["access_key"],
secret_key=MINIO_CONFIG["secret_key"],
secure=MINIO_CONFIG["secure"]
)
def upload_file2minio(bucket_name, object_name, file_path):
"""上传文件到MinIO
# 通过fput_object上传时
# 如果object_name为image\image.jpg则上传后的名字就是image\image.jpg
# 如果object_name为image/image.jpg则上传后image为文件夹文件名为image.jpg
"""
minio_client= get_minio_client()
try:
# 检查存储桶是否存在,如果不存在则创建(可选)
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
print(f"Bucket '{bucket_name}' created")
# 上传文件
minio_client.fput_object(
bucket_name=bucket_name,
object_name=object_name,
file_path=file_path
)
# 获取文件的预签名URL可选
#res = minio_client.get_presigned_url("GET", bucket_name, object_name, expires=timedelta(days=7))
#res = "http://127.0.0.1:9000" + "/"+bucket_name+"/" + object_name
#print(res)
print(f"文件 '{file_path}' 成功上传到存储桶 '{bucket_name}''{object_name}'")
return True
except S3Error as exc:
print("MinIO错误:", exc)
return False
except Exception as e:
print("发生错误:", e)
return False
def choose_from_list(options, prompt):
for idx, item in enumerate(options):
print(f"{idx + 1}. {item}")
while True:
choice = input(prompt)
if choice.isdigit() and 1 <= int(choice) <= len(options):
return options[int(choice) - 1]
else:
print("输入无效,请重新输入编号。")
def select_files(file_path, file_type="pdf"):
"""
选择file_path中的所有指定类型文件默认pdf
返回文件路径列表
"""
file_list = []
for root, dirs, files in os.walk(file_path):
for file in files:
if file.lower().endswith(f".{file_type.lower()}"):
file_list.append(os.path.join(root, file))
return file_list
def pair_pdf_and_txt(pdf_path, txt_path):
"""
将pdf和txt文件对齐
返回对齐pdf_dict和txt_dict
pdf_dict和txt_dict的key为文件名不含后缀value为文件路径
txt_dict仅收入与pdf_dict中存在的文件
如果pdf_dict中有文件名没有对应的txt文件则不收入txt_dict
"""
pdf_files = select_files(pdf_path, "pdf")
txt_files = select_files(txt_path, "txt")
# 构建文件名到路径的映射
pdf_dict = {os.path.splitext(os.path.basename(f))[0]: f for f in pdf_files}
txt_dict_all = {os.path.splitext(os.path.basename(f))[0]: f for f in txt_files}
# 只保留有对应txt的pdf
pdf_dict_aligned = {}
txt_dict_aligned = {}
for name in pdf_dict:
if name in txt_dict_all:
pdf_dict_aligned[name] = pdf_dict[name]
txt_dict_aligned[name] = txt_dict_all[name]
return pdf_dict_aligned, txt_dict_aligned
def select_dataset(rag_object):
"""选择可用数据集"""
datasets = rag_object.list_datasets()
if not datasets:
print("没有可用的数据集。")
return None
dataset_names = [ds.name for ds in datasets]
dataset_name = choose_from_list(dataset_names, "请选择数据集编号:")
return [ds for ds in datasets if ds.name == dataset_name][0]
def upload_or_get_document(dataset, pdf_path, display_name):
"""上传或获取已存在的文档"""
try:
document = dataset.list_documents(name=display_name)[0]
print(f"文档已存在: {display_name},跳过上传。")
return document
except Exception:
try:
with open(pdf_path, "rb") as f:
blob = f.read()
dataset.upload_documents([{"display_name": display_name, "blob": blob}])
return dataset.list_documents(name=display_name)[0]
except Exception as e:
print(f"上传PDF失败: {pdf_path},错误: {e}")
return None
def divid_txt_chunk_img(txt_chunk):
"""分离文本块中的图片链接和纯文本内容
输入格式示例:
"这是文本内容![image](路径/IMAGE1.png)更多文本![image](路径/IMAGE2.png)"
返回:
clean_text: 移除所有图片链接后的纯文本内容
image_paths: 提取到的图片路径列表
"""
# 正则表达式匹配Markdown图片格式: ![alt_text](path)
pattern = r'!\[.*?\]\((.*?)\)'
# 提取所有图片路径
image_paths = re.findall(pattern, txt_chunk)
# 移除所有图片标记
clean_text = re.sub(pattern, '', txt_chunk)
# 移除多余空行并清理首尾空白
clean_text = re.sub(r'\n\s*\n', '\n\n', clean_text).strip()
return clean_text, image_paths
def extract_images_from_chunk( content):
"""从chunk内容中提取图片链接"""
img_pattern = r'!\[.*?\]\((.*?)\)'
return re.findall(img_pattern, content)
def remove_images_from_content( content):
"""从内容中移除图片链接"""
# 移除markdown图片语法 ![alt](url)
content = re.sub(r'!\[.*?\]\(.*?\)', '', content)
# 清理多余的空行
content = re.sub(r'\n\s*\n\s*\n', '\n\n', content)
return content.strip()
def process_txt_chunks( dataset_id, document, txt_path):
"""处理文本分块并添加到文档
dataset_id = kb_id
"""
try:
with open(txt_path, 'r', encoding='utf-8') as file:
file_content = file.read()
for num, txt_chunk in enumerate(file_content.split('\n\n')):
if txt_chunk.strip():
print(f"处理文本块: {txt_chunk[:30]}...")
img_urls= extract_images_from_chunk(txt_chunk)
img_url = img_urls[0] if img_urls else None
if img_url:
print(f"检测到图片链接: {img_url}")
# 清楚图片链接
clean_chunk = remove_images_from_content(txt_chunk)
chunk = document.add_chunk(content=clean_chunk)
# 判断是相对路径还是绝对路径
if not os.path.isabs(img_url):
img_abs_path = os.path.join(os.path.dirname(txt_path), img_url)
else:
img_abs_path = img_url
print(f"图片绝对路径: {img_abs_path}")
if not os.path.exists(img_abs_path):
print(f"图片未找到: {img_abs_path},跳过。")
continue
else:
if(upload_file2minio(dataset_id, chunk.id, img_abs_path)):
new_img_id = f"{dataset_id}-{chunk.id}"
print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}")
update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id)
else:
print("未检测到图片链接,直接添加文本块。")
chunk = document.add_chunk(content=txt_chunk)
print(f"{num+1} Chunk添加成功! ID: {chunk.id}")
except Exception as e:
print(f"处理文本文件时出错: {txt_path},错误: {e}")
def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset):
"""处理PDF-TXT文件对"""
for name, pdf_path in pdf_dict.items():
display_name = os.path.basename(pdf_path)
document = upload_or_get_document(dataset, pdf_path, display_name)
print(f"选择的文档: {document.name}ID: {document.id}")
if not document:
continue
txt_path = txt_dict.get(name)
if txt_path:
process_txt_chunks(dataset.id,document, txt_path)
def main():
"""主函数处理PDF和TXT文件对
dataset.id = bucket_name
chunk_id = object_name
"""
file_path = "g:\\11\\22\\test\\"
pdf_dict, txt_dict = pair_pdf_and_txt(file_path, file_path)
if not pdf_dict:
print("未选择任何文件。")
return
dataset = select_dataset(rag_object)
print(f"选择的数据集: {dataset.name}")
print(f"选择的数据集id: {dataset.id}")
if not dataset:
print("未选择数据集。")
return
process_pdf_txt_pairs(pdf_dict, txt_dict, dataset)
if __name__ == "__main__":
main()