diff --git a/chunk_operations.py b/chunk_operations.py index 15e0024..7d1f209 100644 --- a/chunk_operations.py +++ b/chunk_operations.py @@ -2,7 +2,7 @@ from elasticsearch import Elasticsearch # 初始化 Elasticsearch 用户名elastic,密码infini_rag_flow es = Elasticsearch( - [{'host': '192.168.107.165', 'port': 1200, 'scheme': 'http'}], + [{'host': '127.0.0.1', 'port': 1200, 'scheme': 'http'}], basic_auth=('elastic', 'infini_rag_flow') ) @@ -118,12 +118,14 @@ if __name__ == "__main__": print("连接失败:", e) tenant_id = "d669205e57a211f0b9e7324e7f243034" - dataset_id = "10345832587311f0919f3a2728512a4b" # dataset_id = kb_id + tenant_id = "9c73df5a3ebc11f08410c237296aa408" + dataset_id = "0e6127da574a11f0a59c7e7439a490f8" # dataset_id = kb_id doc_id = "cbf576385bc911f08f23fedc3996e479" - doc_id = "4300e558609511f08b8bde4a87f78768" + doc_id = "323113d8670c11f0b4255ea1d23c381a" chunk_id = "f035247f7de579b0" # - chunk_id = "5b3445f7861ff772" # + chunk_id = "b2d53baddbfde97c" # new_img_id = "10345832587311f0919f3a2728512a4b-f035247f7de579b0" #"new_img_id_12345" + new_img_id = "0e6127da574a11f0a59c7e7439a490f8-b2d53baddbfde97c" #new_img_id ="c5142bce5ac611f0ae707a8b5ba029cb-thumbnail_fb3cbc165ac611f0b5897a8b5ba029cb.png" diff --git a/minio_api.py b/minio_api.py index 8edd852..215d698 100644 --- a/minio_api.py +++ b/minio_api.py @@ -2,7 +2,7 @@ from minio import Minio from minio.error import S3Error import os from datetime import timedelta -MINIO_HOST=os.getenv("MINIO_HOST", "127.0.0.1") +MINIO_HOST="127.0.0.1" MINIO_CONFIG = { "endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}", @@ -37,35 +37,6 @@ def get_minio_client(): # 'data': upload_result['data'] # }) -def upload_files_to_server(files, parent_id=None, user_id=None): - """ - 上传文件到MinIO服务器 - :param files: 文件路径列表 - :param parent_id: 父级ID(可选) - :param user_id: 用户ID(可选) - """ - for file_path in files: - if not os.path.isfile(file_path): - print(f"文件不存在: {file_path}") - continue - - # 获取文件名 - file_name = os.path.basename(file_path) - - # 上传文件到MinIO - try: - with open(file_path, 'rb') as file_data: - minio_client.put_object ( - bucket_name=parent_id, - object_name=file_name, - data=file_data, - length=os.path.getsize(file_path) - ) - print(f"文件已上传到MinIO: {parent_id}/{file_name}") - except S3Error as e: - print(f"上传文件 '{file_name}' 失败: {e}") - except Exception as e: - print(f"发生错误: {e}") @@ -81,48 +52,55 @@ def upload_files_to_server(files, parent_id=None, user_id=None): # secure=False # 使用HTTPS(如果是本地测试且未配置SSL,可设置为False) # ) -minio_client= get_minio_client() +def upload_file2minio(bucket_name, object_name, file_path): + """上传文件到MinIO + # 通过fput_object上传时: + # 如果object_name为image\image.jpg,则上传后的名字就是image\image.jpg; -# 要上传的存储桶信息 -bucket_name = "my-bucket" # 替换为你的存储桶名称 -object_name = "image/1.jpg" # 文件在MinIO中存储的名称 -file_path = "G:\\11\\ragflow_api_test\\2.jpg" # 本地文件路径 - - -# 通过fput_object上传时: - -# 如果object_name为image\image.jpg,则上传后的名字就是image\image.jpg; - -# 如果object_name为image/image.jpg,则上传后image为文件夹,文件名为image.jpg; - - -try: - # 检查存储桶是否存在,如果不存在则创建(可选) - if not minio_client.bucket_exists(bucket_name): - minio_client.make_bucket(bucket_name) - print(f"Bucket '{bucket_name}' created") + # 如果object_name为image/image.jpg,则上传后image为文件夹,文件名为image.jpg; - # 上传文件 - minio_client.fput_object( - bucket_name=bucket_name, - object_name=object_name, - file_path=file_path - ) + """ - # 获取文件的预签名URL(可选) - res = minio_client.get_presigned_url("GET", bucket_name, object_name, expires=timedelta(days=7)) + minio_client= get_minio_client() + + try: + # 检查存储桶是否存在,如果不存在则创建(可选) + if not minio_client.bucket_exists(bucket_name): + minio_client.make_bucket(bucket_name) + print(f"Bucket '{bucket_name}' created") + + # 上传文件 + minio_client.fput_object( + bucket_name=bucket_name, + object_name=object_name, + file_path=file_path + ) + + # 获取文件的预签名URL(可选) + #res = minio_client.get_presigned_url("GET", bucket_name, object_name, expires=timedelta(days=7)) + + #res = "http://127.0.0.1:9000" + "/"+bucket_name+"/" + object_name + + + #print(res) + print(f"文件 '{file_path}' 成功上传到存储桶 '{bucket_name}' 为 '{object_name}'") + + except S3Error as exc: + print("MinIO错误:", exc) + except Exception as e: + print("发生错误:", e) + + +if __name__ == "__main__": + # 要上传的存储桶信息 + bucket_name = "my-bucket" # 替换为你的存储桶名称 + object_name = "image/1.jpg" # 文件在MinIO中存储的名称 + file_path = "G:\\11\\ragflow_api_test\\2.jpg" # 本地文件路径 + upload_file2minio(bucket_name, object_name, file_path) - #res = "http://127.0.0.1:9000" + "/"+bucket_name+"/" + object_name - print(res) - print(f"文件 '{file_path}' 成功上传到存储桶 '{bucket_name}' 为 '{object_name}'") - -except S3Error as exc: - print("MinIO错误:", exc) -except Exception as e: - print("发生错误:", e) diff --git a/src/add_chunk_cli_pdf_img.py b/src/add_chunk_cli_pdf_img.py index e4ad54b..a1f4e3f 100644 --- a/src/add_chunk_cli_pdf_img.py +++ b/src/add_chunk_cli_pdf_img.py @@ -7,10 +7,148 @@ base_url = "http://127.0.0.1:8099" ## 公司内网 -base_url = "http://192.168.107.165:8099" -api_key = "ragflow-I5ZDNjMWNhNTdlMjExZjBiOTEwMzI0ZT" +# base_url = "http://192.168.107.165:8099" +# api_key = "ragflow-I5ZDNjMWNhNTdlMjExZjBiOTEwMzI0ZT" + + +elastic_tenant_id = "9c73df5a3ebc11f08410c237296aa408" + rag_object = RAGFlow(api_key=api_key, base_url=base_url) +elastic_url = "127.0.0.1" + +from elasticsearch import Elasticsearch + +# 初始化 Elasticsearch 用户名elastic,密码infini_rag_flow +es = Elasticsearch( + [{'host': elastic_url, 'port': 1200, 'scheme': 'http'}], + basic_auth=('elastic', 'infini_rag_flow') +) + +def update_img_id_in_elasticsearch(tenant_id, doc_id, chunk_id, new_img_id): + """ + 在 Elasticsearch 中更新指定文档块的 img_id。 + 如果img_id不存在,则增加一个新的 img_id。 + + :param tenant_id: 租户 ID + :param doc_id: 文档 ID + :param chunk_id: 文档块 ID + :param new_img_id: 新的 img_id + :return: 更新结果 + + """ + # 构建索引名称 + index_name = f"ragflow_{tenant_id}" # 这里需要替换为实际的索引名称生成逻辑 + + # 构建查询条件 + query = { + "bool": { + "must": [ + {"term": {"doc_id": doc_id}}, + {"term": {"_id": chunk_id}} + ] + } + } + + # 搜索目标文档 + result = es.search(index=index_name, body={"query": query}) + + # 检查是否找到目标文档 + if result['hits']['total']['value'] == 0: + return {"code": 102, "message": f"Can't find this chunk {chunk_id}"} + + + # 获取目标文档的 ID + hit = result['hits']['hits'][0] + doc_id_in_es = hit['_id'] + + update_body = { + "doc": { + "img_id": new_img_id + } + } + + # 更新文档 + update_result = es.update(index=index_name, id=doc_id_in_es, body=update_body) + print("更新结果:", update_result) + + + + if update_result['result'] == 'updated': + return {"code": 0, "message": ""} + else: + return {"code": 100, "message": "Failed to update img_id"} + + + + + + + +from minio import Minio +from minio.error import S3Error + + +MINIO_HOST="127.0.0.1" + +MINIO_CONFIG = { + "endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}", + "access_key": os.getenv("MINIO_USER", "rag_flow"), + "secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"), + "secure": False +} + +def get_minio_client(): + """创建MinIO客户端""" + return Minio( + endpoint=MINIO_CONFIG["endpoint"], + access_key=MINIO_CONFIG["access_key"], + secret_key=MINIO_CONFIG["secret_key"], + secure=MINIO_CONFIG["secure"] + ) + + +def upload_file2minio(bucket_name, object_name, file_path): + """上传文件到MinIO + # 通过fput_object上传时: + + # 如果object_name为image\image.jpg,则上传后的名字就是image\image.jpg; + + # 如果object_name为image/image.jpg,则上传后image为文件夹,文件名为image.jpg; + + """ + + minio_client= get_minio_client() + + try: + # 检查存储桶是否存在,如果不存在则创建(可选) + if not minio_client.bucket_exists(bucket_name): + minio_client.make_bucket(bucket_name) + print(f"Bucket '{bucket_name}' created") + + # 上传文件 + minio_client.fput_object( + bucket_name=bucket_name, + object_name=object_name, + file_path=file_path + ) + + # 获取文件的预签名URL(可选) + #res = minio_client.get_presigned_url("GET", bucket_name, object_name, expires=timedelta(days=7)) + + #res = "http://127.0.0.1:9000" + "/"+bucket_name+"/" + object_name + + + #print(res) + print(f"文件 '{file_path}' 成功上传到存储桶 '{bucket_name}' 为 '{object_name}'") + return True + + except S3Error as exc: + print("MinIO错误:", exc) + return False + except Exception as e: + print("发生错误:", e) + return False @@ -120,17 +258,29 @@ def divid_txt_chunk_img(txt_chunk): return clean_text, image_paths -def upload_images_to_minio(image_paths, document): - """ - 上传图片到MinIO, +def extract_images_from_chunk( content): + """从chunk内容中提取图片链接""" + img_pattern = r'!\[.*?\]\((.*?)\)' + return re.findall(img_pattern, content) + +def remove_images_from_content( content): + """从内容中移除图片链接""" + # 移除markdown图片语法 ![alt](url) + content = re.sub(r'!\[.*?\]\(.*?\)', '', content) + # 清理多余的空行 + content = re.sub(r'\n\s*\n\s*\n', '\n\n', content) + return content.strip() + + + + + +def process_txt_chunks( dataset_id, document, txt_path): + """处理文本分块并添加到文档 + dataset_id = kb_id + """ - - - - -def process_txt_chunks(document, txt_path): - """处理文本分块并添加到文档""" try: with open(txt_path, 'r', encoding='utf-8') as file: file_content = file.read() @@ -138,23 +288,51 @@ def process_txt_chunks(document, txt_path): for num, txt_chunk in enumerate(file_content.split('\n\n')): if txt_chunk.strip(): print(f"处理文本块: {txt_chunk[:30]}...") - chunk = document.add_chunk(content=txt_chunk) + img_urls= extract_images_from_chunk(txt_chunk) + img_url = img_urls[0] if img_urls else None + if img_url: + print(f"检测到图片链接: {img_url}") + # 清楚图片链接 + clean_chunk = remove_images_from_content(txt_chunk) + chunk = document.add_chunk(content=clean_chunk) + + # 判断是相对路径还是绝对路径 + if not os.path.isabs(img_url): + img_abs_path = os.path.join(os.path.dirname(txt_path), img_url) + else: + img_abs_path = img_url + print(f"图片绝对路径: {img_abs_path}") + if not os.path.exists(img_abs_path): + print(f"图片未找到: {img_abs_path},跳过。") + continue + else: + if(upload_file2minio(dataset_id, chunk.id, img_abs_path)): + new_img_id = f"{dataset_id}-{chunk.id}" + print(f"图片 {img_abs_path} 已上传,新的 img_id: {new_img_id}") + + update_img_id_in_elasticsearch(elastic_tenant_id, document.id, chunk.id, new_img_id) + else: + print("未检测到图片链接,直接添加文本块。") + chunk = document.add_chunk(content=txt_chunk) print(f"第{num+1} Chunk添加成功! ID: {chunk.id}") except Exception as e: print(f"处理文本文件时出错: {txt_path},错误: {e}") + + def process_pdf_txt_pairs(pdf_dict, txt_dict, dataset): """处理PDF-TXT文件对""" for name, pdf_path in pdf_dict.items(): display_name = os.path.basename(pdf_path) document = upload_or_get_document(dataset, pdf_path, display_name) + print(f"选择的文档: {document.name},ID: {document.id}") if not document: continue txt_path = txt_dict.get(name) if txt_path: - process_txt_chunks(document, txt_path) + process_txt_chunks(dataset.id,document, txt_path) def main(): @@ -163,12 +341,12 @@ def main(): dataset.id = bucket_name chunk_id = object_name """ - file_path = "g:\\11\\22\\规范\\" - #pdf_dict, txt_dict = pair_pdf_and_txt(file_path, file_path) + file_path = "g:\\11\\22\\test\\" + pdf_dict, txt_dict = pair_pdf_and_txt(file_path, file_path) - # if not pdf_dict: - # print("未选择任何文件。") - # return + if not pdf_dict: + print("未选择任何文件。") + return dataset = select_dataset(rag_object) print(f"选择的数据集: {dataset.name}") @@ -177,7 +355,7 @@ def main(): print("未选择数据集。") return - #process_pdf_txt_pairs(pdf_dict, txt_dict, dataset) + process_pdf_txt_pairs(pdf_dict, txt_dict, dataset)