@@ -11,11 +11,14 @@ import shutil
# 配置信息
local_image_folder = ' downloaded_images '
# lsky_pro_url = ' http://192.168.107.248:18089/api/v1'
lsky_pro_url_local = ' http://192.168.107.248:18089/api/v1 '
lsky_pro_url = ' https://image.lqsjy.cn/api/v1 '
upload_endpoint = ' /upload '
token = ' 1|QJP2YEr9GIN52VBgmm5hCqV5DwBSvJLUKjnwcKB8 '
# 添加全局变量控制使用本地/在线图床
use_local_server = True
# 确保本地图片文件夹存在
#os.makedirs(local_image_folder, exist_ok=True)
def get_image_folder ( markdown_path ) :
@@ -127,6 +130,24 @@ def download_images():
else :
messagebox . showwarning ( " 警告 " , " 没有图片被下载 " )
# def upload_image_to_lsky_pro(image_path):
# """上传图片到Lsky Pro图床"""
# with open(image_path, 'rb') as img_file:
# files = {'file': img_file}
# headers = {
# 'Authorization': f'Bearer {token}',
# 'Accept': 'application/json'
# }
# response = requests.post(f'{lsky_pro_url}{upload_endpoint}', files=files, headers=headers)
# if response.status_code == 200:
# try:
# data = response.json()
# if data['status']:
# return data['data']['links']['url']
# except:
# pass
# return None
def upload_image_to_lsky_pro ( image_path , retries = 3 , timeout = 30 ) :
""" 上传图片到Lsky Pro图床, 增加重试机制和超时设置 """
for attempt in range ( retries ) :
@@ -137,7 +158,12 @@ def upload_image_to_lsky_pro(image_path, retries=3, timeout=30):
' Authorization ' : f ' Bearer { token } ' ,
' Accept ' : ' application/json '
}
response = requests . post ( f ' { lsky_pro_url } { upload_endpoint } ' , files = files , headers = headers , timeout = timeout )
# 根据复选框状态选择URL
current_url = lsky_pro_url_local if use_local_server else lsky_pro_url
response = requests . post ( f ' { current_url } { upload_endpoint } ' ,
files = files ,
headers = headers ,
timeout = timeout )
if response . status_code == 200 :
try :
data = response . json ( )
@@ -147,11 +173,16 @@ def upload_image_to_lsky_pro(image_path, retries=3, timeout=30):
pass
except requests . exceptions . RequestException as e :
print ( f " 上传失败,重试 { attempt + 1 } / { retries } : { str ( e ) } " )
time . sleep ( 3 ) # 重试前等待3秒
time . sleep ( 10 ) # 重试前等待3秒
return None
def process_image_file ( markdown_file_path ) :
with open ( markdown_file_path , ' r ' , encoding = ' utf-8 ' ) as file :
markdown_content = file . read ( )
@@ -164,11 +195,16 @@ def process_image_file(markdown_file_path):
try :
if not url . startswith ( ( ' http:// ' , ' https:// ' ) ) :
if ' image.lqsjy.cn ' not in url :
print ( f " 处理中: { url } " )
url = url . split ( ' ' ) [ 0 ]
# 上传到图床
if " : " in url :
new_url = upload_image_to_lsky_pro ( url )
else :
new_url = upload_image_to_lsky_pro ( os . path . dirname ( markdown_file_path ) + " // " + url )
if new_url :
url_mapping [ url ] = new_url
print ( f " 处理成功: { url } -> { new_url } " )
@@ -187,15 +223,17 @@ def process_image_file(markdown_file_path):
# 构造新的文件名
file_name , file_ext = os . path . splitext ( markdown_file_path )
#new_file_path = f"{file_name}_lsky{file_ext}"
new_file_path = f " { file_name } { file_ext } "
# 保存为新文件
with open ( new_file_path , ' w ' , encoding = ' utf-8 ' ) as file :
file . write ( new_content )
messagebox. showinfo( " 完成 " , f " 成功处理 { len ( url_mapping ) } 张图片 " )
# messagebox. showinfo("完成", f"成功处理 {len(url_mapping)} 张图片" )
print ( f " { file_name } :成功处理 { len ( url_mapping ) } 张图片 " )
else :
messagebox. showwarning( " 警告 " , " 没有图片被处理" )
# messagebox. showwarning("警告", " 没有图片被处理" )
print ( " 没有图片被处理 " )
@@ -314,18 +352,24 @@ def download_and_upload_images():
except Exception as e :
messagebox . showerror ( " 错误 " , f " 处理过程中出错: { str ( e ) } " )
def upload_images_batch_files ( file_paths , max_concurrent_uploads = 5 ) :
""" 批量上传图片,限制并发上传数量 """
from concurrent . futures import ThreadPoolExecutor , as_completed
with ThreadPoolExecutor ( max_workers = max_concurrent_uploads ) as executor :
futures = { executor . submit ( process_image_file , file_path ) : file_path for file_path in file_paths }
for future in as_completed ( future s ) :
file_path = futures [ future ]
try :
future . result ( )
except Exception as e :
pr int ( f " 处理文件 { file_path } 时出错: { str ( e ) } " )
def upload_images_batch_files ( file_path s) :
# """ 你的图片上传逻辑(需自行实现) """
for file_path in file_paths :
# 这里添加处理每个文件的逻辑
if not file_path or not os . path . isfile ( file_path ) :
messagebox . showwarn ing ( " 警告 " , " 请选择有效的Markdown文件。 " )
return
print ( f " Processing: { file_path } " )
process_image_file ( file_path )
# 示例: 读取markdown文件内容
# with open(file_path, 'r') as f:
# content = f.read()
# 添加你的图片上传逻辑...
def select_files ( ) :
root = tk . Tk ( )
@@ -346,6 +390,19 @@ def select_files():
root = tk . Tk ( )
root . title ( " Markdown图片处理工具 " )
def toggle_server ( ) :
global use_local_server
use_local_server = use_local_var . get ( )
print ( f " 使用 { ' 本地 ' if use_local_server else ' 在线 ' } 服务器 " )
# 添加复选框
use_local_var = tk . BooleanVar ( value = True ) # 默认使用本地服务器
use_local_checkbox = tk . Checkbutton ( root ,
text = " 使用本地服务器 " ,
variable = use_local_var ,
command = toggle_server )
use_local_checkbox . grid ( row = 5 , column = 0 , columnspan = 3 , pady = 5 )
# 文件路径显示
tk . Label ( root , text = " Input File: " ) . grid ( row = 0 , column = 0 , padx = 10 , pady = 10 )
input_file_entry = tk . Entry ( root , width = 50 )
@@ -356,9 +413,9 @@ tk.Button(root, text="Browse", command=choose_input_file).grid(row=0, column=2,
download_button = tk . Button ( root , text = " 下载图片 " , command = download_images )
download_button . grid ( row = 1 , column = 0 , columnspan = 3 , pady = 5 )
# 上传图片按钮
upload_button = tk . Button ( root , text = " 上传图片 " , command= upload_images )
upload_button. grid ( row = 2 , column = 0 , columnspan = 3 , pady = 5 )
# # 上传图片按钮
# upload_button = tk.Button(root, text="上传图片", command= upload_images)
# upload_button.grid(row=2, column=0, columnspan=3, pady=5 )
# 上传图片按钮_batch_files
# upload_button = tk.Button(root, text="上传图片_批量", command=upload_images_batch_files)