复制代码,创建Python文件运行
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext
import requests
import re
import os
import threading
from urllib.parse import urlparse, parse_qs
import json
import time
from datetime import datetime
class JDInvoiceDownloader:
def __init__(self, root):
self.root = root
self.root.title("京东电子发票下载工具")
# 设置窗口大小和居中显示
window_width = 1000
window_height = 700
screen_width = root.winfo_screenwidth()
screen_height = root.winfo_screenheight()
center_x = int(screen_width/2 - window_width/2)
center_y = int(screen_height/2 - window_height/2)
self.root.geometry(f'{window_width}x{window_height}+{center_x}+{center_y}')
self.root.resizable(True, True)
# 设置窗口图标和背景色
self.root.configure(bg='#f5f5f5')
# 定义颜色主题
self.colors = {
'primary': '#1976D2', # 主色调 - 蓝色
'primary_dark': '#1565C0', # 深蓝色
'secondary': '#FF9800', # 次要色 - 橙色
'success': '#4CAF50', # 成功色 - 绿色
'warning': '#FF9800', # 警告色 - 橙色
'error': '#F44336', # 错误色 - 红色
'background': '#f5f5f5', # 背景色
'surface': '#ffffff', # 表面色
'text_primary': '#212121', # 主要文字色
'text_secondary': '#757575', # 次要文字色
'border': '#e0e0e0' # 边框色
}
# 设置自定义样式
self.setup_styles()
# 保存路径
self.save_path = ""
# 创建界面
self.create_widgets()
def setup_styles(self):
"""设置自定义样式"""
style = ttk.Style()
# 配置主题
style.theme_use('clam')
# 自定义按钮样式
style.configure('Primary.TButton',
background=self.colors['primary'],
foreground='white',
borderwidth=0,
focuscolor='none',
padding=(20, 10))
style.map('Primary.TButton',
background=[('active', self.colors['primary_dark']),
('pressed', self.colors['primary_dark'])])
style.configure('Secondary.TButton',
background=self.colors['secondary'],
foreground='white',
borderwidth=0,
focuscolor='none',
padding=(15, 8))
style.map('Secondary.TButton',
background=[('active', '#F57C00'),
('pressed', '#F57C00')])
style.configure('Success.TButton',
background=self.colors['success'],
foreground='white',
borderwidth=0,
focuscolor='none',
padding=(15, 8))
# 自定义进度条样式
style.configure('Custom.Horizontal.TProgressbar',
background=self.colors['primary'],
troughcolor=self.colors['border'],
borderwidth=0,
lightcolor=self.colors['primary'],
darkcolor=self.colors['primary'])
# 自定义输入框样式
style.configure('Custom.TEntry',
fieldbackground=self.colors['surface'],
borderwidth=1,
relief='solid',
padding=10)
def create_widgets(self):
# 标题区域
self.create_header(self.root)
# 主要内容区域 - 使用网格布局
main_container = ttk.Frame(self.root, padding="20")
main_container.pack(fill=tk.BOTH, expand=True)
# 配置网格权重
main_container.columnconfigure(0, weight=1)
main_container.columnconfigure(1, weight=1)
main_container.rowconfigure(2, weight=1)
# 左侧区域
left_frame = ttk.Frame(main_container)
left_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 10))
left_frame.columnconfigure(0, weight=1)
left_frame.rowconfigure(1, weight=1)
# 右侧区域
right_frame = ttk.Frame(main_container)
right_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(10, 0))
right_frame.columnconfigure(0, weight=1)
right_frame.rowconfigure(1, weight=1)
# 左侧内容
self.create_path_card(left_frame)
self.create_input_card(left_frame)
# 右侧内容
self.create_progress_card(right_frame)
self.create_log_card(right_frame)
# 操作按钮区域 - 跨两列
button_container = ttk.Frame(main_container)
button_container.grid(row=1, column=0, columnspan=2, pady=20)
self.create_action_buttons(button_container)
def create_header(self, parent):
"""创建标题区域"""
header_frame = tk.Frame(parent, bg=self.colors['primary'], height=80)
header_frame.pack(fill=tk.X, pady=(0, 20))
header_frame.pack_propagate(False)
# 标题文字
title_label = tk.Label(header_frame,
text="📄 电子发票下载工具",
font=('Microsoft YaHei UI', 18, 'bold'),
fg='white',
bg=self.colors['primary'])
title_label.pack(expand=True)
# 副标题
subtitle_label = tk.Label(header_frame,
text="批量下载京东电子发票,简单高效",
font=('Microsoft YaHei UI', 10),
fg='white',
bg=self.colors['primary'])
subtitle_label.pack()
def create_path_card(self, parent):
"""创建路径选择卡片"""
path_card = ttk.LabelFrame(parent, text="📁 保存路径设置", padding="15")
path_card.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 15))
path_card.columnconfigure(0, weight=1)
# 路径显示
self.path_var = tk.StringVar()
self.path_entry = ttk.Entry(path_card,
textvariable=self.path_var,
state="readonly",
style='Custom.TEntry',
font=('Microsoft YaHei UI', 10))
self.path_entry.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 10))
# 选择按钮
select_btn = ttk.Button(path_card,
text="🗂️ 选择文件夹",
command=self.select_folder,
style='Primary.TButton')
select_btn.grid(row=0, column=1)
def create_input_card(self, parent):
"""创建输入卡片"""
input_card = ttk.LabelFrame(parent, text="📝 发票信息输入", padding="15")
input_card.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(15, 0))
input_card.columnconfigure(0, weight=1)
input_card.rowconfigure(1, weight=1)
# 提示信息
tip_label = tk.Label(input_card,
text="💡 输入格式:每个发票占两行,第一行为发票名称,第二行为京东链接",
font=('Microsoft YaHei UI', 9),
fg=self.colors['text_secondary'],
bg=self.colors['surface'])
tip_label.grid(row=0, column=0, sticky=tk.W, pady=(0, 10))
# 创建带边框的输入框
text_frame = tk.Frame(input_card,
bg=self.colors['border'],
bd=1,
relief='solid')
text_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
text_frame.columnconfigure(0, weight=1)
text_frame.rowconfigure(0, weight=1)
self.text_input = scrolledtext.ScrolledText(text_frame,
height=15,
font=('Consolas', 10),
bg=self.colors['surface'],
fg=self.colors['text_primary'],
bd=0,
padx=15,
pady=15,
wrap=tk.WORD)
self.text_input.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=1, pady=1)
# 提示文本
placeholder_text = """请按以下格式输入发票信息(支持多种发票平台):
得力打印纸
【京东】https://3.cn/2o 「京东电子发票」
鼠标垫、采集卡
【天猫】https://example.com/invoice/123456
办公用品
【淘宝】https://invoice.taobao.com/pdf/download?id=789
支持格式:京东、天猫、淘宝、拼多多等各大电商平台发票链接
每个发票信息占两行,发票名称在第一行,链接在第二行"""
self.text_input.insert("1.0", placeholder_text)
self.text_input.bind("<FocusIn>", self.clear_placeholder)
self.text_input.bind("<FocusOut>", self.add_placeholder)
self.placeholder_active = True
def create_action_buttons(self, parent):
"""创建操作按钮区域"""
button_frame = tk.Frame(parent, bg=self.colors['background'])
button_frame.pack(fill=tk.X, pady=15)
# 按钮容器
btn_container = tk.Frame(button_frame, bg=self.colors['background'])
btn_container.pack()
# 开始下载按钮
download_btn = ttk.Button(btn_container,
text="🚀 开始下载",
command=self.start_download,
style='Primary.TButton')
download_btn.pack(side=tk.LEFT, padx=5)
# 清空输入按钮
clear_btn = ttk.Button(btn_container,
text="🗑️ 清空输入",
command=self.clear_input,
style='Secondary.TButton')
clear_btn.pack(side=tk.LEFT, padx=5)
# 退出按钮
exit_btn = ttk.Button(btn_container,
text="❌ 退出程序",
command=self.root.quit,
style='Secondary.TButton')
exit_btn.pack(side=tk.LEFT, padx=5)
def create_progress_card(self, parent):
"""创建进度显示卡片"""
progress_card = ttk.LabelFrame(parent, text="📊 下载进度", padding="15")
progress_card.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 15))
progress_card.columnconfigure(0, weight=1)
# 总体进度
overall_label = tk.Label(progress_card,
text="📈 总体进度",
font=('Microsoft YaHei UI', 10, 'bold'),
fg=self.colors['text_primary'],
bg=self.colors['surface'])
overall_label.grid(row=0, column=0, sticky=tk.W, pady=(0, 5))
self.overall_progress = ttk.Progressbar(progress_card,
mode='determinate')
self.overall_progress.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=(0, 15))
# 当前文件进度
current_label = tk.Label(progress_card,
text="📄 当前文件",
font=('Microsoft YaHei UI', 10, 'bold'),
fg=self.colors['text_primary'],
bg=self.colors['surface'])
current_label.grid(row=2, column=0, sticky=tk.W, pady=(0, 5))
self.current_progress = ttk.Progressbar(progress_card,
mode='determinate')
self.current_progress.grid(row=3, column=0, sticky=(tk.W, tk.E), pady=(0, 15))
# 状态显示
status_frame = tk.Frame(progress_card, bg=self.colors['surface'])
status_frame.grid(row=4, column=0, sticky=(tk.W, tk.E))
status_icon = tk.Label(status_frame,
text="ℹ️",
font=('Microsoft YaHei UI', 12),
bg=self.colors['surface'])
status_icon.pack(side=tk.LEFT)
self.status_var = tk.StringVar(value="就绪")
self.status_label = tk.Label(status_frame,
textvariable=self.status_var,
font=('Microsoft YaHei UI', 10),
fg=self.colors['text_primary'],
bg=self.colors['surface'])
self.status_label.pack(side=tk.LEFT, padx=(5, 0))
def create_log_card(self, parent):
"""创建日志显示卡片"""
log_card = ttk.LabelFrame(parent, text="📋 下载日志", padding="15")
log_card.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(15, 0))
log_card.columnconfigure(0, weight=1)
log_card.rowconfigure(0, weight=1)
# 日志容器
log_container = tk.Frame(log_card, bg=self.colors['border'], bd=1, relief='solid')
log_container.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
log_container.columnconfigure(0, weight=1)
log_container.rowconfigure(0, weight=1)
self.log_text = scrolledtext.ScrolledText(log_container,
height=12,
font=('Consolas', 9),
bg='#1e1e1e',
fg='#ffffff',
bd=0,
padx=10,
pady=10,
wrap=tk.WORD)
self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=1, pady=1)
# 配置日志文本颜色
self.log_text.tag_configure("info", foreground="#00bcd4")
self.log_text.tag_configure("success", foreground="#4caf50")
self.log_text.tag_configure("warning", foreground="#ff9800")
self.log_text.tag_configure("error", foreground="#f44336")
def clear_placeholder(self, event):
if self.placeholder_active:
self.text_input.delete("1.0", tk.END)
self.placeholder_active = False
def add_placeholder(self, event):
if not self.text_input.get("1.0", tk.END).strip():
placeholder_text = """请按以下格式输入发票信息(支持多种发票平台):
得力打印纸
【京东】https://3.cn/2odQ-YR5 「京东电子发票」
鼠标垫、采集卡
【天猫】https://example.com/invoice/123456
办公用品
【淘宝】https://invoice.taobao.com/pdf/download?id=789
支持格式:京东、天猫、淘宝、拼多多等各大电商平台发票链接
每个发票信息占两行,发票名称在第一行,链接在第二行"""
self.text_input.insert("1.0", placeholder_text)
self.placeholder_active = True
def select_folder(self):
folder = filedialog.askdirectory(title="选择保存文件夹")
if folder:
self.save_path = folder
self.path_var.set(folder)
self.log_message(f"✅ 已选择保存路径: {folder}", "success")
def clear_input(self):
self.text_input.delete("1.0", tk.END)
self.placeholder_active = False
self.add_placeholder(None)
self.log_message("🗑️ 已清空输入内容", "info")
def log_message(self, message, level="info"):
timestamp = datetime.now().strftime("%H:%M:%S")
# 根据级别选择图标
icons = {
"info": "ℹ️",
"success": "✅",
"warning": "⚠️",
"error": "❌"
}
icon = icons.get(level, "ℹ️")
formatted_message = f"[{timestamp}] {icon} {message}\n"
self.log_text.insert(tk.END, formatted_message, level)
self.log_text.see(tk.END)
self.root.update_idletasks()
def is_direct_pdf_url(self, url):
"""判断是否为直接PDF链接"""
url_lower = url.lower()
return (url_lower.endswith('.pdf') or
'.pdf?' in url_lower or
'digital-invoice' in url_lower or
'invoice' in url_lower and 'pdf' in url_lower or
'fapiao' in url_lower and 'pdf' in url_lower)
def find_pdf_link_from_page(self, html_content, base_url):
"""从页面内容中查找PDF链接"""
from urllib.parse import urljoin
# 多种PDF链接查找模式
patterns = [
# 标准href链接
r'href=["\']([^"\']*\.pdf[^"\']*)["\']',
# 包含invoice关键词的PDF链接
r'href=["\']([^"\']*invoice[^"\']*\.pdf[^"\']*)["\']',
r'href=["\']([^"\']*fapiao[^"\']*\.pdf[^"\']*)["\']',
# JavaScript中的PDF链接
r'"(https?://[^"]*\.pdf[^"]*)"',
r"'(https?://[^']*\.pdf[^']*)'",
# 下载按钮相关
r'onclick="[^"]*"[^>]*href=["\']([^"\']*\.pdf[^"\']*)["\']',
r'data-url=["\']([^"\']*\.pdf[^"\']*)["\']',
# API接口链接
r'"(https?://[^"]*api[^"]*pdf[^"]*)"',
r'"(https?://[^"]*download[^"]*pdf[^"]*)"',
# 通用发票链接模式
r'"(https?://[^"]*invoice[^"]*)"',
r'"(https?://[^"]*fapiao[^"]*)"',
r'"(https?://[^"]*bill[^"]*)"',
r'"(https?://[^"]*receipt[^"]*)"'
]
for pattern in patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE)
for match in matches:
if match:
# 转换为绝对链接
if not match.startswith('http'):
match = urljoin(base_url, match)
# 验证链接有效性
if self.is_valid_pdf_link(match):
return match
return None
def is_valid_pdf_link(self, url):
"""验证PDF链接的有效性"""
try:
# 简单的URL格式验证
if not url.startswith('http'):
return False
# 检查是否包含可疑字符
if any(char in url for char in ['<', '>', '"', "'"]):
return False
return True
except:
return False
def parse_input(self):
if self.placeholder_active:
return []
content = self.text_input.get("1.0", tk.END).strip()
if not content:
return []
lines = [line.strip() for line in content.split('\n') if line.strip()]
invoices = []
i = 0
while i < len(lines):
if i + 1 < len(lines):
name = lines[i]
link_line = lines[i + 1]
# 提取链接 - 支持更多链接格式
url_patterns = [
r'https://[^\s]+', # 标准https链接
r'http://[^\s]+', # http链接
]
urls = []
for pattern in url_patterns:
urls.extend(re.findall(pattern, link_line))
if urls:
invoices.append({
'name': name,
'url': urls[0]
})
i += 2
else:
i += 1
else:
i += 1
return invoices
def extract_real_url(self, short_url):
"""提取短链接的真实URL - 支持多种发票平台"""
try:
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
# 处理各种短链接和发票链接
if any(domain in short_url for domain in ['3.cn', 'jd.com', 't.cn', 'dwz.cn', 'suo.im', 'tinyurl.com']):
# 短链接解析
response = session.get(short_url, allow_redirects=True, timeout=15)
final_url = response.url
self.log_message(f"🔗 短链接解析: {short_url} -> {final_url}", "info")
return final_url
elif any(keyword in short_url for keyword in ['invoice', 'fapiao', 'bill', 'receipt', '.pdf']):
# 直接的发票链接
self.log_message(f"🔗 检测到发票链接: {short_url}", "info")
return short_url
else:
# 其他链接也尝试解析
response = session.get(short_url, allow_redirects=True, timeout=15)
final_url = response.url
if final_url != short_url:
self.log_message(f"🔗 链接重定向: {short_url} -> {final_url}", "info")
return final_url
except Exception as e:
self.log_message(f"❌ 提取真实链接失败: {str(e)}", "error")
return short_url
def download_invoice_pdf(self, invoice_info, index, total):
"""下载单个发票PDF - 支持多种发票平台"""
try:
name = invoice_info['name']
url = invoice_info['url']
self.status_var.set(f"正在处理: {name}")
self.log_message(f"🚀 开始下载发票: {name}", "info")
# 提取真实URL
real_url = self.extract_real_url(url)
# 创建会话
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
pdf_url = None
# 检查是否直接是PDF链接
if self.is_direct_pdf_url(real_url):
pdf_url = real_url
self.log_message(f"📄 检测到直接PDF链接", "info")
else:
# 获取页面内容并查找PDF链接
try:
response = session.get(real_url, timeout=30)
response.raise_for_status()
# 多种PDF链接查找策略
pdf_url = self.find_pdf_link_from_page(response.text, real_url)
if pdf_url:
self.log_message(f"🔍 从页面找到PDF链接", "info")
else:
# 尝试检查响应是否本身就是PDF
content_type = response.headers.get('content-type', '').lower()
if 'pdf' in content_type:
pdf_url = real_url
self.log_message(f"📄 响应内容为PDF格式", "info")
except Exception as e:
self.log_message(f"⚠️ 页面访问失败,尝试直接下载: {str(e)}", "warning")
# 如果页面访问失败,尝试直接当作PDF链接处理
pdf_url = real_url
if pdf_url:
# 下载PDF文件
return self.download_pdf_file(pdf_url, name, session)
else:
self.log_message(f"❌ 未找到PDF下载链接: {name}", "error")
return False
except Exception as e:
self.log_message(f"❌ 下载失败 {name}: {str(e)}", "error")
return False
finally:
# 更新总体进度
overall_progress = ((index + 1) / total) * 100
self.overall_progress['value'] = overall_progress
self.current_progress['value'] = 0
self.root.update_idletasks()
def download_all_invoices(self):
"""下载所有发票"""
try:
invoices = self.parse_input()
if not invoices:
messagebox.showwarning("警告", "请输入有效的发票信息")
return
if not self.save_path:
messagebox.showwarning("警告", "请选择保存路径")
return
self.log_message(f"🚀 开始批量下载 {len(invoices)} 个发票", "info")
# 重置进度条
self.overall_progress['value'] = 0
self.current_progress['value'] = 0
success_count = 0
for i, invoice in enumerate(invoices):
if self.download_invoice_pdf(invoice, i, len(invoices)):
success_count += 1
time.sleep(1) # 避免请求过于频繁
self.status_var.set("下载完成")
self.log_message(f"🎉 批量下载完成! 成功: {success_count}/{len(invoices)}", "success")
# 显示完成对话框
result_msg = f"下载完成!\n\n✅ 成功: {success_count} 个\n❌ 失败: {len(invoices) - success_count} 个\n📁 保存位置: {self.save_path}"
messagebox.showinfo("下载完成", result_msg)
except Exception as e:
self.log_message(f"❌ 下载过程出错: {str(e)}", "error")
messagebox.showerror("错误", f"下载过程出错: {str(e)}")
def download_pdf_file(self, pdf_url, name, session):
"""下载PDF文件的通用方法"""
try:
self.log_message(f"⬇️ 开始下载PDF文件...", "info")
# 尝试多种下载策略
for attempt in range(3): # 最多重试3次
try:
# 添加更多请求头以提高兼容性
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/pdf,application/octet-stream,*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Referer': pdf_url,
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin'
}
session.headers.update(headers)
pdf_response = session.get(pdf_url, stream=True, timeout=30, allow_redirects=True)
pdf_response.raise_for_status()
# 检查响应内容
content_type = pdf_response.headers.get('content-type', '').lower()
content_length = pdf_response.headers.get('content-length', '0')
self.log_message(f"📄 响应类型: {content_type}, 大小: {content_length} 字节", "info")
# 验证是否为有效的PDF响应
if not self.is_valid_pdf_response(pdf_response):
if attempt < 2: # 还有重试机会
self.log_message(f"⚠️ 第{attempt+1}次尝试失败,重试中...", "warning")
time.sleep(2)
continue
else:
self.log_message(f"⚠️ 响应可能不是PDF文件,但仍尝试下载", "warning")
# 生成安全的文件名
safe_name = re.sub(r'[<>:"/\\|?*]', '_', name)
filename = f"{safe_name}.pdf"
filepath = os.path.join(self.save_path, filename)
# 确保文件名唯一
counter = 1
while os.path.exists(filepath):
name_part, ext = os.path.splitext(filename)
filename = f"{name_part}_{counter}{ext}"
filepath = os.path.join(self.save_path, filename)
counter += 1
# 下载文件
total_size = int(content_length) if content_length.isdigit() else 0
downloaded = 0
with open(filepath, 'wb') as f:
for chunk in pdf_response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 更新进度
if total_size > 0:
progress = (downloaded / total_size) * 100
self.current_progress['value'] = progress
self.root.update_idletasks()
# 验证下载的文件
if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
file_size = os.path.getsize(filepath)
# 验证文件是否为PDF格式
if self.verify_pdf_file(filepath):
size_mb = file_size / (1024 * 1024)
self.log_message(f"✅ 下载完成: {filename} ({size_mb:.2f} MB)", "success")
return True
else:
self.log_message(f"⚠️ 文件下载完成但可能不是有效的PDF格式: {filename}", "warning")
return True # 仍然认为下载成功,让用户自己判断
else:
if attempt < 2:
self.log_message(f"❌ 第{attempt+1}次下载失败,重试中...", "error")
time.sleep(2)
continue
else:
self.log_message(f"❌ 下载失败: 文件为空或不存在", "error")
return False
break # 成功则跳出重试循环
except requests.exceptions.RequestException as e:
if attempt < 2:
self.log_message(f"🔄 网络错误,第{attempt+1}次重试: {str(e)}", "warning")
time.sleep(3)
continue
else:
raise e
except Exception as e:
self.log_message(f"❌ 下载过程出错: {str(e)}", "error")
return False
def is_valid_pdf_response(self, response):
"""验证响应是否为有效的PDF"""
content_type = response.headers.get('content-type', '').lower()
# 检查Content-Type
if any(pdf_type in content_type for pdf_type in ['pdf', 'octet-stream']):
return True
# 检查文件头(如果可以peek的话)
try:
# 读取前几个字节检查PDF魔数
first_chunk = next(response.iter_content(chunk_size=10), b'')
if first_chunk.startswith(b'%PDF'):
return True
except:
pass
return False
def verify_pdf_file(self, filepath):
"""验证文件是否为有效的PDF"""
try:
with open(filepath, 'rb') as f:
header = f.read(10)
return header.startswith(b'%PDF')
except:
return False
def start_download(self):
"""开始下载(在新线程中执行)"""
thread = threading.Thread(target=self.download_all_invoices)
thread.daemon = True
thread.start()
def main():
root = tk.Tk()
app = JDInvoiceDownloader(root)
root.mainloop()
if __name__ == "__main__":
main()