markdown_solve/dialogue_download_change.py

373 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import requests
from requests.auth import HTTPBasicAuth
import tkinter as tk
from tkinter import filedialog, messagebox
import time
import tempfile
import uuid
import shutil
# 配置信息
local_image_folder = 'downloaded_images'
#lsky_pro_url = 'http://192.168.107.248:18089/api/v1'
lsky_pro_url = 'https://image.lqsjy.cn/api/v1'
upload_endpoint = '/upload'
token = '1|QJP2YEr9GIN52VBgmm5hCqV5DwBSvJLUKjnwcKB8'
# 确保本地图片文件夹存在
#os.makedirs(local_image_folder, exist_ok=True)
def get_image_folder(markdown_path):
# 获取markdown文件所在目录
markdown_dir = os.path.dirname(markdown_path)
# 在markdown文件同目录下创建images文件夹
image_folder = os.path.join(markdown_dir, 'images')
return image_folder
def choose_input_file():
file_path = filedialog.askopenfilename(
title="选择Markdown文件",
filetypes=(("Markdown files", "*.md"), ("All files", "*.*"))
)
if file_path:
input_file_entry.delete(0, tk.END)
input_file_entry.insert(0, file_path)
def clean_filename(url):
#print("origin: "+url)
try:
# 首先检查URL是否有常见的图片扩展名
image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp')
if url.lower().endswith(image_extensions):
# 如果有扩展名直接使用URL的最后部分作为文件名
return os.path.basename(url)
# 如果没有常见扩展名则检查fdId
fd_id = re.search(r'fdId=([0-9a-f]+)', url)
if fd_id:
return f"{fd_id.group(1)}.jpg"
# 如果既没有扩展名也没有fdId使用时间戳
return f"image_{int(time.time())}.jpg"
except:
return f"image_{int(time.time())}.jpg"
def download_images():
markdown_file_path = input_file_entry.get()
if not markdown_file_path or not os.path.isfile(markdown_file_path):
messagebox.showwarning("警告", "请选择有效的Markdown文件。")
return
# 根据markdown文件位置设置图片文件夹
global local_image_folder
local_image_folder = get_image_folder(markdown_file_path)
os.makedirs(local_image_folder, exist_ok=True)
with open(markdown_file_path, 'r', encoding='utf-8') as file:
markdown_content = file.read()
image_urls = re.findall(r'!\[.*?\]\((.*?)\)', markdown_content)
success_count = 0
url_mapping = {} # 存储URL到本地路径的映射
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Referer": "https://www.soujianzhu.cn/",
"Sec-Ch-Ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": "Windows"
}
for url in image_urls:
if not url.startswith(('http://', 'https://')):
if 'www.soujianzhu.cn' not in url:
url = 'https://www.soujianzhu.cn' + url
if '"' in url:
url = url.split(' ')[0]
url=url.strip()
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
image_name = clean_filename(url)
image_path = os.path.join(local_image_folder, image_name)
# 保存图片
with open(image_path, 'wb') as img_file:
img_file.write(response.content)
# 使用相对路径
relative_path = os.path.join('.', local_image_folder, image_name)
relative_path = relative_path.replace('\\', '/') # 统一使用正斜杠
url_mapping[url] = relative_path
success_count += 1
except Exception as e:
print(f"下载失败 {url}: {str(e)}")
continue
# 更新Markdown内容
if url_mapping:
new_content = markdown_content
for old_url, new_path in url_mapping.items():
new_content = new_content.replace(old_url, new_path)
# 构造新的文件名
file_name, file_ext = os.path.splitext(markdown_file_path)
new_file_path = f"{file_name}_local{file_ext}"
# 保存更新后的Markdown文件
with open(new_file_path, 'w', encoding='utf-8') as file:
file.write(new_content)
messagebox.showinfo("完成", f"成功下载 {success_count} 张图片并更新Markdown文件")
else:
messagebox.showwarning("警告", "没有图片被下载")
def upload_image_to_lsky_pro(image_path, retries=3, timeout=30):
"""上传图片到Lsky Pro图床增加重试机制和超时设置"""
for attempt in range(retries):
try:
with open(image_path, 'rb') as img_file:
files = {'file': img_file}
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
response = requests.post(f'{lsky_pro_url}{upload_endpoint}', files=files, headers=headers, timeout=timeout)
if response.status_code == 200:
try:
data = response.json()
if data['status']:
return data['data']['links']['url']
except:
pass
except requests.exceptions.RequestException as e:
print(f"上传失败,重试 {attempt + 1}/{retries}: {str(e)}")
time.sleep(3) # 重试前等待3秒
return None
def process_image_file(markdown_file_path):
with open(markdown_file_path, 'r', encoding='utf-8') as file:
markdown_content = file.read()
# 提取所有图片URL
image_urls = re.findall(r'!\[.*?\]\((.*?)\)', markdown_content)
url_mapping = {} # 存储原始URL到新URL的映射
for url in image_urls:
try:
if not url.startswith(('http://', 'https://')):
if 'image.lqsjy.cn' not in url:
# 上传到图床
new_url = upload_image_to_lsky_pro(url)
if new_url:
url_mapping[url] = new_url
print(f"处理成功: {url} -> {new_url}")
else:
print(f"上传失败: {url}")
except Exception as e:
print(f"处理出错 {url}: {str(e)}")
continue
# 更新Markdown内容
if url_mapping:
new_content = markdown_content
for old_url, new_url in url_mapping.items():
new_content = new_content.replace(old_url, new_url)
# 构造新的文件名
file_name, file_ext = os.path.splitext(markdown_file_path)
#new_file_path = f"{file_name}_lsky{file_ext}"
new_file_path = f"{file_name}{file_ext}"
# 保存为新文件
with open(new_file_path, 'w', encoding='utf-8') as file:
file.write(new_content)
messagebox.showinfo("完成", f"成功处理 {len(url_mapping)} 张图片")
else:
messagebox.showwarning("警告", "没有图片被处理")
def upload_images():
markdown_file_path = input_file_entry.get()
if not markdown_file_path or not os.path.isfile(markdown_file_path):
messagebox.showwarning("警告", "请选择有效的Markdown文件。")
return
with open(markdown_file_path, 'r', encoding='utf-8') as file:
markdown_content = file.read()
# 提取所有图片URL
image_urls = re.findall(r'!\[.*?\]\((.*?)\)', markdown_content)
url_mapping = {} # 存储原始URL到新URL的映射
for url in image_urls:
try:
if not url.startswith(('http://', 'https://')):
if 'image.lqsjy.cn' not in url:
# 上传到图床
new_url = upload_image_to_lsky_pro(url)
if new_url:
url_mapping[url] = new_url
print(f"处理成功: {url} -> {new_url}")
else:
print(f"上传失败: {url}")
except Exception as e:
print(f"处理出错 {url}: {str(e)}")
continue
# 更新Markdown内容
if url_mapping:
new_content = markdown_content
for old_url, new_url in url_mapping.items():
new_content = new_content.replace(old_url, new_url)
# 构造新的文件名
file_name, file_ext = os.path.splitext(markdown_file_path)
new_file_path = f"{file_name}_lsky{file_ext}"
# 保存为新文件
with open(new_file_path, 'w', encoding='utf-8') as file:
file.write(new_content)
messagebox.showinfo("完成", f"成功处理 {len(url_mapping)} 张图片")
else:
messagebox.showwarning("警告", "没有图片被处理")
def download_and_upload_images():
markdown_file_path = input_file_entry.get()
if not markdown_file_path:
messagebox.showerror("错误", "请选择Markdown文件")
return
try:
# 读取Markdown文件
with open(markdown_file_path, 'r', encoding='utf-8') as file:
markdown_content = file.read()
# 提取所有图片URL
image_urls = re.findall(r'!\[.*?\]\((.*?)\)', markdown_content)
if not image_urls:
messagebox.showinfo("提示", "文件中未找到图片链接")
return
url_mapping = {}
temp_dir = tempfile.mkdtemp()
for img_url in image_urls:
try:
# 下载图片
response = requests.get(img_url, timeout=10)
if response.status_code == 200:
# 生成临时文件名
file_extension = os.path.splitext(img_url.split('/')[-1])[-1]
if not file_extension:
file_extension = '.png'
temp_file = os.path.join(temp_dir, f"temp_{uuid.uuid4()}{file_extension}")
# 保存图片
with open(temp_file, 'wb') as f:
f.write(response.content)
# 使用现有函数上传到lsky
new_url = upload_image_to_lsky_pro(temp_file)
if new_url:
url_mapping[img_url] = new_url
print(f"成功上传: {img_url} -> {new_url}")
else:
print(f"上传失败: {img_url}")
except Exception as e:
print(f"处理图片失败: {img_url}, 错误: {str(e)}")
# 更新Markdown内容
if url_mapping:
new_content = markdown_content
for old_url, new_url in url_mapping.items():
new_content = new_content.replace(old_url, new_url)
# 保存更新后的文件
with open(markdown_file_path, 'w', encoding='utf-8') as file:
file.write(new_content)
messagebox.showinfo("完成", f"成功处理 {len(url_mapping)} 张图片")
else:
messagebox.showwarning("警告", "没有图片被处理")
# 清理临时目录
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
messagebox.showerror("错误", f"处理过程中出错: {str(e)}")
def upload_images_batch_files(file_paths, max_concurrent_uploads=5):
"""批量上传图片,限制并发上传数量"""
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=max_concurrent_uploads) as executor:
futures = {executor.submit(process_image_file, file_path): file_path for file_path in file_paths}
for future in as_completed(futures):
file_path = futures[future]
try:
future.result()
except Exception as e:
print(f"处理文件 {file_path} 时出错: {str(e)}")
def select_files():
root = tk.Tk()
root.withdraw() # 隐藏主窗口
file_paths = filedialog.askopenfilenames(
title='选择Markdown文件',
filetypes=[('Markdown Files', '*.md')]
)
if file_paths:
upload_images_batch_files(file_paths)
else:
print("未选择文件")
# 创建主窗口
root = tk.Tk()
root.title("Markdown图片处理工具")
# 文件路径显示
tk.Label(root, text="Input File:").grid(row=0, column=0, padx=10, pady=10)
input_file_entry = tk.Entry(root, width=50)
input_file_entry.grid(row=0, column=1, padx=10, pady=10)
tk.Button(root, text="Browse", command=choose_input_file).grid(row=0, column=2, padx=10, pady=10)
# 下载图片按钮
download_button = tk.Button(root, text="下载图片", command=download_images)
download_button.grid(row=1, column=0, columnspan=3, pady=5)
# 上传图片按钮
upload_button = tk.Button(root, text="上传图片", command=upload_images)
upload_button.grid(row=2, column=0, columnspan=3, pady=5)
# 上传图片按钮_batch_files
# upload_button = tk.Button(root, text="上传图片_批量", command=upload_images_batch_files)
# upload_button.grid(row=3, column=0, columnspan=3, pady=5)
tk.Button(root, text="上传图片_批量", command=select_files).grid(row=3, column=1, padx=10, pady=10)
# 下载并上传图片按钮
download_upload_button = tk.Button(root, text="下载并上传图片", command=download_and_upload_images)
download_upload_button.grid(row=4, column=0, columnspan=3, pady=5)
# 运行主循环
root.mainloop()