diff --git a/src/add_chunk_cli_pdf_img.py b/src/add_chunk_cli_pdf_img.py index a1f4e3f..9dbcef5 100644 --- a/src/add_chunk_cli_pdf_img.py +++ b/src/add_chunk_cli_pdf_img.py @@ -1,103 +1,118 @@ from ragflow_sdk import RAGFlow import os import re -## home -api_key = "ragflow-MyMjM2ODE2NThlMTExZjBiMzJlNzY5Mj" -base_url = "http://127.0.0.1:8099" - - -## 公司内网 -# base_url = "http://192.168.107.165:8099" -# api_key = "ragflow-I5ZDNjMWNhNTdlMjExZjBiOTEwMzI0ZT" - - -elastic_tenant_id = "9c73df5a3ebc11f08410c237296aa408" - -rag_object = RAGFlow(api_key=api_key, base_url=base_url) - -elastic_url = "127.0.0.1" +# 在文件顶部添加新依赖 +import requests +#from urllib.parse import urlparse +import tempfile from elasticsearch import Elasticsearch +from minio import Minio +from minio.error import S3Error -# 初始化 Elasticsearch 用户名elastic,密码infini_rag_flow + +from dotenv import load_dotenv # 新增 +# 加载 .env 文件中的环境变量 +load_dotenv() + + +# 从环境变量初始化配置 +api_key = os.getenv("RAGFLOW_API_KEY") +base_url = os.getenv("RAGFLOW_BASE_URL") +elastic_tenant_id = os.getenv("ELASTIC_TENANT_ID") + +# 初始化 RAGFlow +rag_object = RAGFlow(api_key=api_key, base_url=base_url) + +# 初始化 Elasticsearch es = Elasticsearch( - [{'host': elastic_url, 'port': 1200, 'scheme': 'http'}], - basic_auth=('elastic', 'infini_rag_flow') + [{ + 'host': os.getenv("ELASTIC_HOST"), + 'port': int(os.getenv("ELASTIC_PORT")), + 'scheme': 'http' + }], + basic_auth=( + os.getenv("ELASTIC_USERNAME"), + os.getenv("ELASTIC_PASSWORD") + ) ) +# MinIO 配置 +MINIO_CONFIG = { + "endpoint": f"{os.getenv('MINIO_HOST')}:{os.getenv('MINIO_PORT')}", + "access_key": os.getenv("MINIO_USER"), + "secret_key": os.getenv("MINIO_PASSWORD"), + "secure": False +} + def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id): """ 在 Elasticsearch 中更新指定文档块的 img_id。 - 如果img_id不存在,则增加一个新的 img_id。 - + :param tenant_id: 租户 ID :param doc_id: 文档 ID :param chunk_id: 文档块 ID :param new_img_id: 新的 img_id :return: 更新结果 - """ - # 构建索引名称 - index_name = f"ragflow_{tenant_id}" # 这里需要替换为实际的索引名称生成逻辑 + try: + # 构建索引名称 + index_name = f"ragflow_{tenant_id}" - # 构建查询条件 - query = { - "bool": { - "must": [ - {"term": {"doc_id": doc_id}}, - {"term": {"_id": chunk_id}} - ] - } - } - - # 搜索目标文档 - result = es.search(index=index_name, body={"query": query}) - - # 检查是否找到目标文档 - if result['hits']['total']['value'] == 0: - return {"code": 102, "message": f"Can't find this chunk {chunk_id}"} - - - # 获取目标文档的 ID - hit = result['hits']['hits'][0] - doc_id_in_es = hit['_id'] - - update_body = { - "doc": { - "img_id": new_img_id - } + # 构建查询条件 + query = { + "bool": { + "must": [ + {"term": {"doc_id": doc_id}}, + {"term": {"_id": chunk_id}} + ] + } } - # 更新文档 - update_result = es.update(index=index_name, id=doc_id_in_es, body=update_body) - print("更新结果:", update_result) - - + # 搜索目标文档 + result = es.search(index=index_name, body={"query": query}) - if update_result['result'] == 'updated': - return {"code": 0, "message": ""} - else: - return {"code": 100, "message": "Failed to update img_id"} + # 检查是否找到目标文档 + if result['hits']['total']['value'] == 0: + print(f"在 Elasticsearch 中未找到文档: index={index_name}, doc_id={doc_id}, chunk_id={chunk_id}") + return {"code": 102, "message": f"Can't find this chunk {chunk_id}"} + + # 获取目标文档的 ID + hit = result['hits']['hits'][0] + doc_id_in_es = hit['_id'] + + # 构建更新请求 + update_body = { + "doc": { + "img_id": new_img_id + } + } + + # 更新文档 + update_result = es.update( + index=index_name, + id=doc_id_in_es, + body=update_body, + refresh=True # 确保更新立即可见 + ) + + print(f"Elasticsearch 更新结果: index={index_name}, id={doc_id_in_es}, result={update_result}") + + # 验证更新 + verify_doc = es.get(index=index_name, id=doc_id_in_es) + if verify_doc['_source'].get('img_id') == new_img_id: + print(f"成功更新 img_id 为: {new_img_id}") + return {"code": 0, "message": ""} + else: + print(f"更新验证失败,当前 img_id: {verify_doc['_source'].get('img_id')}") + return {"code": 100, "message": "Failed to verify img_id update"} + + except Exception as e: + print(f"更新 Elasticsearch 时发生错误: {str(e)}") + return {"code": 101, "message": f"Error updating img_id: {str(e)}"} - - - - -from minio import Minio -from minio.error import S3Error - - -MINIO_HOST="127.0.0.1" - -MINIO_CONFIG = { - "endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}", - "access_key": os.getenv("MINIO_USER", "rag_flow"), - "secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"), - "secure": False -} - def get_minio_client(): """创建MinIO客户端""" return Minio( @@ -275,52 +290,76 @@ def remove_images_from_content( content): -def process_txt_chunks( dataset_id, document, txt_path): - """处理文本分块并添加到文档 - dataset_id = kb_id - - - """ +# 修改 process_txt_chunks 函数中的图片处理逻辑 +def process_txt_chunks(dataset_id, document, txt_path): try: with open(txt_path, 'r', encoding='utf-8') as file: file_content = file.read() - + img_chunk_ids = [] for num, txt_chunk in enumerate(file_content.split('\n\n')): if txt_chunk.strip(): print(f"处理文本块: {txt_chunk[:30]}...") - img_urls= extract_images_from_chunk(txt_chunk) + img_urls = extract_images_from_chunk(txt_chunk) img_url = img_urls[0] if img_urls else None + if img_url: print(f"检测到图片链接: {img_url}") - # 清楚图片链接 clean_chunk = remove_images_from_content(txt_chunk) chunk = document.add_chunk(content=clean_chunk) - # 判断是相对路径还是绝对路径 - if not os.path.isabs(img_url): - img_abs_path = os.path.join(os.path.dirname(txt_path), img_url) + # 判断是否为网络图片 (新增逻辑) + if img_url.startswith(('http://', 'https://')): + # 下载网络图片到临时文件 + try: + response = requests.get(img_url) + response.raise_for_status() + + # 创建临时文件 + with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file: + tmp_file.write(response.content) + tmp_path = tmp_file.name + + # 上传临时文件 + if upload_file2minio(dataset_id, chunk.id, tmp_path): + img_chunk_ids.append(chunk.id) + # new_img_id = f"{dataset_id}-{chunk.id}" + # print(f"网络图片 {img_url} 已下载并上传,新的 img_id: {new_img_id}") + # update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id) + + # 删除临时文件 + os.unlink(tmp_path) + except Exception as e: + print(f"下载网络图片失败: {e}") else: - img_abs_path = img_url + # 处理本地图片 (原逻辑) + if not os.path.isabs(img_url): + img_abs_path = os.path.join(os.path.dirname(txt_path), img_url) + else: + img_abs_path = img_url + print(f"图片绝对路径: {img_abs_path}") - if not os.path.exists(img_abs_path): - print(f"图片未找到: {img_abs_path},跳过。") - continue - else: - if(upload_file2minio(dataset_id, chunk.id, img_abs_path)): - new_img_id = f"{dataset_id}-{chunk.id}" - print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}") - - update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id) + if os.path.exists(img_abs_path): + if upload_file2minio(dataset_id, chunk.id, img_abs_path): + img_chunk_ids.append(chunk.id) + # new_img_id = f"{dataset_id}-{chunk.id}" + # print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}") + # update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id) + else: + print(f"图片未找到: {img_abs_path},跳过。") else: print("未检测到图片链接,直接添加文本块。") chunk = document.add_chunk(content=txt_chunk) print(f"第{num+1} Chunk添加成功! ID: {chunk.id}") + for img_chunk_id in img_chunk_ids: + update_img_id_in_elasticsearch(elastic_tenant_id, document.id, img_chunk_id, f"{dataset_id}-{img_chunk_id}") except Exception as e: print(f"处理文本文件时出错: {txt_path},错误: {e}") + + def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset): """处理PDF-TXT文件对""" for name, pdf_path in pdf_dict.items(): @@ -341,7 +380,7 @@ def main(): dataset.id = bucket_name chunk_id = object_name """ - file_path = "g:\\11\\22\\test\\" + file_path = "F:\\Synology_nas\\SynologyDrive\\大模型\\厦门市城市道路开口设置指引DB3502T 141-2024\\" pdf_dict, txt_dict = pair_pdf_and_txt(file_path, file_path) if not pdf_dict: