电脑自动备份插入的U盘数据
https://wwvu.lanzouq.com/icus82tvlt9c通过网盘分享的文件:电脑自动备份插入U盘数据.exe
链接: https://pan.baidu.com/s/1fI_QTf3F6DKw7cFzMJr2Hw?pwd=52pj 提取码: 52pj
import os
import shutil
import time
import string
import win32file # 需要安装 pywin32
import logging
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import scrolledtext
import queue
# --- 配置 ---
# 备份文件存储的基础路径 (请确保这个文件夹存在,或者脚本有权限创建它)
BACKUP_DESTINATION_BASE = r"D:\USB_Backups"
# 检测时间间隔(秒)
CHECK_INTERVAL = 5
# 日志文件路径
LOG_FILE = os.path.join(BACKUP_DESTINATION_BASE, "usb_backup_log.txt")
# --- 配置结束 ---
# --- GUI 相关 ---
class TextHandler(logging.Handler):
"""自定义日志处理器,将日志记录发送到 Text 控件"""
def __init__(self, text_widget):
logging.Handler.__init__(self)
self.text_widget = text_widget
self.queue = queue.Queue()
# 启动一个线程来处理队列中的日志消息,避免阻塞主线程
self.thread = threading.Thread(target=self.process_queue, daemon=True)
self.thread.start()
def emit(self, record):
msg = self.format(record)
self.queue.put(msg)
def process_queue(self):
while True:
try:
msg = self.queue.get()
if msg is None: # Sentinel value to stop the thread
break
# Schedule GUI update on the main thread
def update_widget():
try:
self.text_widget.configure(state='normal')
self.text_widget.insert(tk.END, msg + '\n')
self.text_widget.configure(state='disabled')
self.text_widget.yview(tk.END)
except tk.TclError: # Handle cases where the widget might be destroyed
pass
self.text_widget.after(0, update_widget)
self.queue.task_done()
except Exception:
# 处理可能的窗口已销毁等异常
import traceback
traceback.print_exc()
break
def close(self):
self.stop_processing() # Signal the thread to stop
# Don't join here to avoid blocking the main thread
logging.Handler.close(self)
def stop_processing(self):
"""Signals the processing thread to stop without waiting for it."""
self.queue.put(None) # Send sentinel to stop the processing thread
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("USB 自动备份")
self.geometry("600x400")
self.log_text = scrolledtext.ScrolledText(self, state='disabled', wrap=tk.WORD)
self.log_text.pack(expand=True, fill='both', padx=10, pady=5)
self.status_label = tk.Label(self, text="状态: 初始化中...", anchor='w')
self.status_label.pack(fill='x', padx=10, pady=2)
self.exit_button = tk.Button(self, text="退出", command=self.quit_app)
self.exit_button.pack(pady=5)
self.backup_thread = None
self.running = True
self.protocol("WM_DELETE_WINDOW", self.quit_app)
self.configure_logging()
def configure_logging(self):
# 日志配置前先确保备份目录存在
if not os.path.exists(BACKUP_DESTINATION_BASE):
try:
os.makedirs(BACKUP_DESTINATION_BASE)
except Exception as e:
# 如果无法创建目录,在GUI中显示错误
self.update_status(f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}")
self.log_text.configure(state='normal')
self.log_text.insert(tk.END, f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}\n")
self.log_text.configure(state='disabled')
return
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 文件处理器
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(log_formatter)
# GUI 文本框处理器
self.text_handler = TextHandler(self.log_text)
self.text_handler.setFormatter(log_formatter)
# 获取根 logger 并添加处理器
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# 清除可能存在的默认处理器(例如 basicConfig 创建的 StreamHandler)
if root_logger.hasHandlers():
root_logger.handlers.clear()
root_logger.addHandler(file_handler)
root_logger.addHandler(self.text_handler)
# 添加一个 StreamHandler 以便在控制台也看到日志(调试用)
# stream_handler = logging.StreamHandler()
# stream_handler.setFormatter(log_formatter)
# root_logger.addHandler(stream_handler)
def update_status(self, message):
# 使用 self.after 将 GUI 更新调度回主线程
self.after(0, lambda: self.status_label.config(text=f"状态: {message}"))
def start_backup_monitor(self):
self.backup_thread = threading.Thread(target=run_backup_monitor, args=(self,), daemon=True)
self.backup_thread.start()
def quit_app(self):
logging.info("收到退出信号,程序即将关闭。")
self.running = False # Signal the backup thread to stop
# Signal the logger thread to stop processing new messages
if hasattr(self, 'text_handler'):
self.text_handler.stop_processing()
# Give the backup thread a short time to finish
if self.backup_thread and self.backup_thread.is_alive():
try:
self.backup_thread.join(timeout=1.0) # Wait max 1 second
if self.backup_thread.is_alive():
logging.warning("备份线程未能在1秒内停止,将强制关闭窗口。")
except Exception as e:
logging.error(f"等待备份线程时出错: {e}")
# Close the main window. Daemon threads will be terminated.
self.destroy()
# os._exit(0) # Avoid force exit, let the application close naturally
# --- 核心备份逻辑 (从旧 main 函数提取) ---
def get_available_drives():
"""获取当前所有可用的驱动器盘符"""
drives = []
bitmask = win32file.GetLogicalDrives()
for letter in string.ascii_uppercase:
if bitmask & 1:
drives.append(letter)
bitmask >>= 1
return set(drives)
def is_removable_drive(drive_letter):
"""判断指定盘符是否是可移动驱动器 (U盘通常是这个类型)"""
drive_path = f"{drive_letter}:\\"
try:
# DRIVE_REMOVABLE 的类型代码是 2
return win32file.GetDriveTypeW(drive_path) == win32file.DRIVE_REMOVABLE
except Exception as e:
# logging.error(f"检查驱动器 {drive_path} 类型时出错: {e}") # 可能在驱动器刚插入时发生
return False
def should_skip_file(src, dst):
"""判断是否需要跳过备份(增量备份逻辑)"""
if not os.path.exists(dst):
return False
try:
src_stat = os.stat(src)
dst_stat = os.stat(dst)
# 文件大小和修改时间都相同则跳过
return src_stat.st_size == dst_stat.st_size and int(src_stat.st_mtime) == int(dst_stat.st_mtime)
except Exception:
return False
def copy_file_with_log(src, dst):
try:
file_size = os.path.getsize(src)
# 超过128MB的大文件采用分块复制
if file_size > 128 * 1024 * 1024:
chunk_size = 16 * 1024 * 1024# 16MB
with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
while True:
chunk = fsrc.read(chunk_size)
if not chunk:
break
fdst.write(chunk)
# 尝试复制亓数据,如果失败则记录但不中断
try:
shutil.copystat(src, dst)
except Exception as e_stat:
logging.warning(f"无法复制亓数据 {src} -> {dst}: {e_stat}")
logging.info(f"分块复制大文件: {src} -> {dst}")
else:
shutil.copy2(src, dst)
logging.info(f"已复制: {src} -> {dst}")
except Exception as e:
logging.error(f"复制文件 {src} 时出错: {e}")
def threaded_copytree(src, dst, skip_exts=None, skip_dirs=None, max_workers=8):
"""线程池递归复制目录,支持增量备份和跳过指定类型,限制最大线程数"""
if skip_exts is None:
skip_exts = ['.tmp', '.log', '.sys']
if skip_dirs is None:
skip_dirs = ['$RECYCLE.BIN', 'System Volume Information']
if not os.path.exists(dst):
try:
os.makedirs(dst)
except Exception as e_mkdir:
logging.error(f"创建目录 {dst} 失败: {e_mkdir}")
return #无法创建目标目录,则无法继续复制
tasks = []
small_files = []
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
try:
if os.path.isdir(s):
if item in skip_dirs:
logging.info(f"跳过目录: {s}")
continue
# 递归调用也放入线程池
tasks.append(executor.submit(threaded_copytree, s, d, skip_exts, skip_dirs, max_workers))
else:
ext = os.path.splitext(item).lower()
if ext in skip_exts:
logging.info(f"跳过文件: {s}")
continue
if should_skip_file(s, d):
# logging.debug(f"跳过未变更文件: {s}") # 改为 debug 级别
continue
# 小于16MB的小文件批量处理
if os.path.getsize(s) < 16 * 1024 * 1024:
small_files.append((s, d))
else:
tasks.append(executor.submit(copy_file_with_log, s, d))
except PermissionError:
logging.warning(f"无权限访问: {s},跳过")
except FileNotFoundError:
logging.warning(f"文件或目录不存在(可能在扫描时被移除): {s},跳过")
except Exception as e_item:
logging.error(f"处理 {s} 时出错: {e_item}")
# 批量提交小文件任务,减少线程调度开销
batch_size = 16
for i in range(0, len(small_files), batch_size):
batch = small_files
tasks.append(executor.submit(batch_copy_files, batch))
# 等待所有任务完成
for future in as_completed(tasks):
try:
future.result() # 获取结果以暴露异常
except Exception as e_future:
logging.error(f"线程池任务出错: {e_future}")
except PermissionError:
logging.error(f"无权限访问源目录: {src}")
except FileNotFoundError:
logging.error(f"源目录不存在: {src}")
except Exception as e_pool:
logging.error(f"处理目录 {src} 时线程池出错: {e_pool}")
def batch_copy_files(file_pairs):
for src, dst in file_pairs:
copy_file_with_log(src, dst)
def backup_usb_drive(drive_letter, app_instance):
"""执行U盘备份(多线程+增量),并更新GUI状态"""
source_drive = f"{drive_letter}:\\"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
destination_folder = os.path.join(BACKUP_DESTINATION_BASE, f"Backup_{drive_letter}_{timestamp}")
logging.info(f"检测到U盘: {source_drive}")
app_instance.update_status(f"检测到U盘: {drive_letter}:\\,准备备份...")
logging.info(f"开始备份到: {destination_folder}")
app_instance.update_status(f"开始备份 {drive_letter}:\\ 到 {destination_folder}")
start_time = time.time()
try:
threaded_copytree(source_drive, destination_folder, max_workers=16)
end_time = time.time()
duration = end_time - start_time
logging.info(f"成功完成备份: {source_drive} -> {destination_folder} (耗时: {duration:.2f} 秒)")
app_instance.update_status(f"备份完成: {drive_letter}:\\ (耗时: {duration:.2f} 秒)")
except FileNotFoundError:
logging.error(f"错误:源驱动器 {source_drive} 不存在或无法访问。")
app_instance.update_status(f"错误: 无法访问 {drive_letter}:\\")
except PermissionError:
logging.error(f"错误:没有权限读取 {source_drive} 或写入 {destination_folder}。请检查权限设置。")
app_instance.update_status(f"错误: 权限不足 {drive_letter}:\\ 或目标文件夹")
except Exception as e:
logging.error(f"备份U盘 {source_drive} 时发生未知错误: {e}")
app_instance.update_status(f"错误: 备份 {drive_letter}:\\ 时发生未知错误")
finally:
# 短暂显示完成/错误状态后,恢复到空闲状态
app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入..."))
def run_backup_monitor(app_instance):
"""后台监控线程的主函数"""
logging.info("U盘自动备份程序启动...")
logging.info(f"备份将存储在: {BACKUP_DESTINATION_BASE}")
app_instance.update_status("启动成功,等待U盘插入...")
# 检查备份目录是否已成功创建(在 App 初始化时完成)
if not os.path.exists(BACKUP_DESTINATION_BASE):
logging.error(f"无法启动监控:备份目录 {BACKUP_DESTINATION_BASE} 不存在且无法创建。")
app_instance.update_status(f"错误: 备份目录不存在且无法创建")
return
try:
known_drives = get_available_drives()
logging.info(f"当前已知驱动器: {sorted(list(known_drives))}")
except Exception as e_init_drives:
logging.error(f"初始化获取驱动器列表失败: {e_init_drives}")
app_instance.update_status(f"错误: 获取驱动器列表失败")
known_drives = set()
while app_instance.running:
try:
app_instance.update_status("正在检测驱动器...")
current_drives = get_available_drives()
new_drives = current_drives - known_drives
removed_drives = known_drives - current_drives
if new_drives:
logging.info(f"检测到新驱动器: {sorted(list(new_drives))}")
for drive in new_drives:
if not app_instance.running: break # Check flag before potentially long operation
# 稍作等待,确保驱动器已准备好
logging.info(f"等待驱动器 {drive}: 准备就绪...")
app_instance.update_status(f"检测到新驱动器 {drive}:,等待准备就绪...")
time.sleep(3) # 增加等待时间
if not app_instance.running: break
try:
if is_removable_drive(drive):
backup_usb_drive(drive, app_instance)
else:
logging.info(f"驱动器 {drive}: 不是可移动驱动器,跳过备份。")
app_instance.update_status(f"驱动器 {drive}: 非U盘,跳过")
# 短暂显示后恢复空闲
app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
except Exception as e_check:
logging.error(f"检查或备份驱动器 {drive}: 时出错: {e_check}")
app_instance.update_status(f"错误: 处理驱动器 {drive}: 时出错")
app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
if removed_drives:
logging.info(f"检测到驱动器移除: {sorted(list(removed_drives))}")
# Optionally update status for removed drives
# app_instance.update_status(f"驱动器 {','.join(sorted(list(removed_drives)))} 已移除")
# app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
# 更新已知驱动器列表
known_drives = current_drives
# 在循环末尾更新状态为空闲(如果没有正在进行的草作)
if not new_drives and app_instance.status_label.cget("text").startswith("状态: 正在检测驱动器"):
app_instance.update_status("空闲,等待U盘插入...")
# 等待指定间隔,并允许提前退出
interval_counter = 0
while app_instance.running and interval_counter < CHECK_INTERVAL:
time.sleep(1)
interval_counter += 1
if not app_instance.running:
break
except Exception as e:
logging.error(f"主循环发生错误: {e}")
app_instance.update_status(f"错误: {e}")
# 防止因临时错误导致程序崩溃,稍等后继续,并允许提前退出
error_wait_counter = 0
while app_instance.running and error_wait_counter < CHECK_INTERVAL * 2:
time.sleep(1)
error_wait_counter += 1
if not app_instance.running:
break
logging.info("后台监控线程已停止。")
app_instance.update_status("程序已停止")
if __name__ == "__main__":
app = App()
app.start_backup_monitor()
app.mainloop()https://pic.siuth.cn/i/2025/680467b058cb8da5c8b8038a.png
页:
[1]