Files
ragflow_api_test/minio_api.py

107 lines
3.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from minio import Minio
from minio.error import S3Error
import os
from datetime import timedelta
MINIO_HOST="127.0.0.1"
MINIO_CONFIG = {
"endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}",
"access_key": os.getenv("MINIO_USER", "rag_flow"),
"secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"),
"secure": False
}
def get_minio_client():
"""创建MinIO客户端"""
return Minio(
endpoint=MINIO_CONFIG["endpoint"],
access_key=MINIO_CONFIG["access_key"],
secret_key=MINIO_CONFIG["secret_key"],
secure=MINIO_CONFIG["secure"]
)
# def upload_file():
# if 'files' not in request.files:
# return jsonify({
# 'code': 400,
# 'message': '未选择文件',
# 'data': None
# }), 400
# files = request.files.getlist('files')
# upload_result = upload_files_to_server(files)
# # 返回标准格式
# return jsonify({
# 'code': 0,
# 'message': '上传成功',
# 'data': upload_result['data']
# })
# 配置MinIO客户端
# minio_client = Minio(
# endpoint="192.168.0.250:9000", # 替换为你的MinIO服务器地址例如"localhost:9000"
# access_key="jAGDJmMJ63k3MyRMmi4N", # 替换为你的Access Key
# secret_key="gH2ZOiVh9pJk8bLI8jBz6d6ABNiaTrCuFiDLWojK", # 替换为你的Secret Key
# secure=False # 使用HTTPS如果是本地测试且未配置SSL可设置为False
# )
def upload_file2minio(bucket_name, object_name, file_path):
"""上传文件到MinIO
# 通过fput_object上传时
# 如果object_name为image\image.jpg则上传后的名字就是image\image.jpg
# 如果object_name为image/image.jpg则上传后image为文件夹文件名为image.jpg
"""
minio_client= get_minio_client()
try:
# 检查存储桶是否存在,如果不存在则创建(可选)
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
print(f"Bucket '{bucket_name}' created")
# 上传文件
minio_client.fput_object(
bucket_name=bucket_name,
object_name=object_name,
file_path=file_path
)
# 获取文件的预签名URL可选
#res = minio_client.get_presigned_url("GET", bucket_name, object_name, expires=timedelta(days=7))
#res = "http://127.0.0.1:9000" + "/"+bucket_name+"/" + object_name
#print(res)
print(f"文件 '{file_path}' 成功上传到存储桶 '{bucket_name}''{object_name}'")
except S3Error as exc:
print("MinIO错误:", exc)
except Exception as e:
print("发生错误:", e)
if __name__ == "__main__":
# 要上传的存储桶信息
bucket_name = "my-bucket" # 替换为你的存储桶名称
object_name = "image/1.jpg" # 文件在MinIO中存储的名称
file_path = "G:\\11\\ragflow_api_test\\2.jpg" # 本地文件路径
upload_file2minio(bucket_name, object_name, file_path)