巨二兔 发表于 2025-4-20 11:19:19

电脑自动备份插入的U盘数据

https://wwvu.lanzouq.com/icus82tvlt9c
通过网盘分享的文件:电脑自动备份插入U盘数据.exe
链接: https://pan.baidu.com/s/1fI_QTf3F6DKw7cFzMJr2Hw?pwd=52pj 提取码: 52pj

import os
import shutil
import time
import string
import win32file # 需要安装 pywin32
import logging
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import scrolledtext
import queue

# --- 配置 ---
# 备份文件存储的基础路径 (请确保这个文件夹存在,或者脚本有权限创建它)
BACKUP_DESTINATION_BASE = r"D:\USB_Backups"
# 检测时间间隔(秒)
CHECK_INTERVAL = 5
# 日志文件路径
LOG_FILE = os.path.join(BACKUP_DESTINATION_BASE, "usb_backup_log.txt")
# --- 配置结束 ---

# --- GUI 相关 ---
class TextHandler(logging.Handler):
    """自定义日志处理器,将日志记录发送到 Text 控件"""
    def __init__(self, text_widget):
      logging.Handler.__init__(self)
      self.text_widget = text_widget
      self.queue = queue.Queue()
      # 启动一个线程来处理队列中的日志消息,避免阻塞主线程
      self.thread = threading.Thread(target=self.process_queue, daemon=True)
      self.thread.start()

    def emit(self, record):
      msg = self.format(record)
      self.queue.put(msg)

    def process_queue(self):
      while True:
            try:
                msg = self.queue.get()
                if msg is None: # Sentinel value to stop the thread
                  break
                # Schedule GUI update on the main thread
                def update_widget():
                  try:
                        self.text_widget.configure(state='normal')
                        self.text_widget.insert(tk.END, msg + '\n')
                        self.text_widget.configure(state='disabled')
                        self.text_widget.yview(tk.END)
                  except tk.TclError: # Handle cases where the widget might be destroyed
                        pass
                self.text_widget.after(0, update_widget)
                self.queue.task_done()
            except Exception:
                # 处理可能的窗口已销毁等异常
                import traceback
                traceback.print_exc()
                break

    def close(self):
      self.stop_processing() # Signal the thread to stop
      # Don't join here to avoid blocking the main thread
      logging.Handler.close(self)

    def stop_processing(self):
      """Signals the processing thread to stop without waiting for it."""
      self.queue.put(None) # Send sentinel to stop the processing thread

class App(tk.Tk):
    def __init__(self):
      super().__init__()
      self.title("USB 自动备份")
      self.geometry("600x400")

      self.log_text = scrolledtext.ScrolledText(self, state='disabled', wrap=tk.WORD)
      self.log_text.pack(expand=True, fill='both', padx=10, pady=5)

      self.status_label = tk.Label(self, text="状态: 初始化中...", anchor='w')
      self.status_label.pack(fill='x', padx=10, pady=2)

      self.exit_button = tk.Button(self, text="退出", command=self.quit_app)
      self.exit_button.pack(pady=5)

      self.backup_thread = None
      self.running = True
      self.protocol("WM_DELETE_WINDOW", self.quit_app)

      self.configure_logging()

    def configure_logging(self):
      # 日志配置前先确保备份目录存在
      if not os.path.exists(BACKUP_DESTINATION_BASE):
            try:
                os.makedirs(BACKUP_DESTINATION_BASE)
            except Exception as e:
                # 如果无法创建目录,在GUI中显示错误
                self.update_status(f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}")
                self.log_text.configure(state='normal')
                self.log_text.insert(tk.END, f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}\n")
                self.log_text.configure(state='disabled')
                return

      log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

      # 文件处理器
      file_handler = logging.FileHandler(LOG_FILE)
      file_handler.setFormatter(log_formatter)

      # GUI 文本框处理器
      self.text_handler = TextHandler(self.log_text)
      self.text_handler.setFormatter(log_formatter)

      # 获取根 logger 并添加处理器
      root_logger = logging.getLogger()
      root_logger.setLevel(logging.INFO)
      # 清除可能存在的默认处理器(例如 basicConfig 创建的 StreamHandler)
      if root_logger.hasHandlers():
            root_logger.handlers.clear()
      root_logger.addHandler(file_handler)
      root_logger.addHandler(self.text_handler)
      # 添加一个 StreamHandler 以便在控制台也看到日志(调试用)
      # stream_handler = logging.StreamHandler()
      # stream_handler.setFormatter(log_formatter)
      # root_logger.addHandler(stream_handler)

    def update_status(self, message):
      # 使用 self.after 将 GUI 更新调度回主线程
      self.after(0, lambda: self.status_label.config(text=f"状态: {message}"))

    def start_backup_monitor(self):
      self.backup_thread = threading.Thread(target=run_backup_monitor, args=(self,), daemon=True)
      self.backup_thread.start()

    def quit_app(self):
      logging.info("收到退出信号,程序即将关闭。")
      self.running = False # Signal the backup thread to stop

      # Signal the logger thread to stop processing new messages
      if hasattr(self, 'text_handler'):
            self.text_handler.stop_processing()

      # Give the backup thread a short time to finish
      if self.backup_thread and self.backup_thread.is_alive():
            try:
                self.backup_thread.join(timeout=1.0) # Wait max 1 second
                if self.backup_thread.is_alive():
                  logging.warning("备份线程未能在1秒内停止,将强制关闭窗口。")
            except Exception as e:
                logging.error(f"等待备份线程时出错: {e}")

      # Close the main window. Daemon threads will be terminated.
      self.destroy()
      # os._exit(0) # Avoid force exit, let the application close naturally

# --- 核心备份逻辑 (从旧 main 函数提取) ---
def get_available_drives():
    """获取当前所有可用的驱动器盘符"""
    drives = []
    bitmask = win32file.GetLogicalDrives()
    for letter in string.ascii_uppercase:
      if bitmask & 1:
            drives.append(letter)
      bitmask >>= 1
    return set(drives)

def is_removable_drive(drive_letter):
    """判断指定盘符是否是可移动驱动器 (U盘通常是这个类型)"""
    drive_path = f"{drive_letter}:\\"
    try:
      # DRIVE_REMOVABLE 的类型代码是 2
      return win32file.GetDriveTypeW(drive_path) == win32file.DRIVE_REMOVABLE
    except Exception as e:
      # logging.error(f"检查驱动器 {drive_path} 类型时出错: {e}") # 可能在驱动器刚插入时发生
      return False

def should_skip_file(src, dst):
    """判断是否需要跳过备份(增量备份逻辑)"""
    if not os.path.exists(dst):
      return False
    try:
      src_stat = os.stat(src)
      dst_stat = os.stat(dst)
      # 文件大小和修改时间都相同则跳过
      return src_stat.st_size == dst_stat.st_size and int(src_stat.st_mtime) == int(dst_stat.st_mtime)
    except Exception:
      return False

def copy_file_with_log(src, dst):
    try:
      file_size = os.path.getsize(src)
      # 超过128MB的大文件采用分块复制
      if file_size > 128 * 1024 * 1024:
            chunk_size = 16 * 1024 * 1024# 16MB
            with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
                while True:
                  chunk = fsrc.read(chunk_size)
                  if not chunk:
                        break
                  fdst.write(chunk)
            # 尝试复制亓数据,如果失败则记录但不中断
            try:
                shutil.copystat(src, dst)
            except Exception as e_stat:
                logging.warning(f"无法复制亓数据 {src} -> {dst}: {e_stat}")
            logging.info(f"分块复制大文件: {src} -> {dst}")
      else:
            shutil.copy2(src, dst)
            logging.info(f"已复制: {src} -> {dst}")
    except Exception as e:
      logging.error(f"复制文件 {src} 时出错: {e}")

def threaded_copytree(src, dst, skip_exts=None, skip_dirs=None, max_workers=8):
    """线程池递归复制目录,支持增量备份和跳过指定类型,限制最大线程数"""
    if skip_exts is None:
      skip_exts = ['.tmp', '.log', '.sys']
    if skip_dirs is None:
      skip_dirs = ['$RECYCLE.BIN', 'System Volume Information']
    if not os.path.exists(dst):
      try:
            os.makedirs(dst)
      except Exception as e_mkdir:
            logging.error(f"创建目录 {dst} 失败: {e_mkdir}")
            return #无法创建目标目录,则无法继续复制
    tasks = []
    small_files = []
    try:
      with ThreadPoolExecutor(max_workers=max_workers) as executor:
            for item in os.listdir(src):
                s = os.path.join(src, item)
                d = os.path.join(dst, item)
                try:
                  if os.path.isdir(s):
                        if item in skip_dirs:
                            logging.info(f"跳过目录: {s}")
                            continue
                        # 递归调用也放入线程池
                        tasks.append(executor.submit(threaded_copytree, s, d, skip_exts, skip_dirs, max_workers))
                  else:
                        ext = os.path.splitext(item).lower()
                        if ext in skip_exts:
                            logging.info(f"跳过文件: {s}")
                            continue
                        if should_skip_file(s, d):
                            # logging.debug(f"跳过未变更文件: {s}") # 改为 debug 级别
                            continue
                        # 小于16MB的小文件批量处理
                        if os.path.getsize(s) < 16 * 1024 * 1024:
                            small_files.append((s, d))
                        else:
                            tasks.append(executor.submit(copy_file_with_log, s, d))
                except PermissionError:
                  logging.warning(f"无权限访问: {s},跳过")
                except FileNotFoundError:
                  logging.warning(f"文件或目录不存在(可能在扫描时被移除): {s},跳过")
                except Exception as e_item:
                  logging.error(f"处理 {s} 时出错: {e_item}")

            # 批量提交小文件任务,减少线程调度开销
            batch_size = 16
            for i in range(0, len(small_files), batch_size):
                batch = small_files
                tasks.append(executor.submit(batch_copy_files, batch))

            # 等待所有任务完成
            for future in as_completed(tasks):
                try:
                  future.result() # 获取结果以暴露异常
                except Exception as e_future:
                  logging.error(f"线程池任务出错: {e_future}")
    except PermissionError:
      logging.error(f"无权限访问源目录: {src}")
    except FileNotFoundError:
      logging.error(f"源目录不存在: {src}")
    except Exception as e_pool:
      logging.error(f"处理目录 {src} 时线程池出错: {e_pool}")

def batch_copy_files(file_pairs):
    for src, dst in file_pairs:
      copy_file_with_log(src, dst)

def backup_usb_drive(drive_letter, app_instance):
    """执行U盘备份(多线程+增量),并更新GUI状态"""
    source_drive = f"{drive_letter}:\\"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    destination_folder = os.path.join(BACKUP_DESTINATION_BASE, f"Backup_{drive_letter}_{timestamp}")

    logging.info(f"检测到U盘: {source_drive}")
    app_instance.update_status(f"检测到U盘: {drive_letter}:\\,准备备份...")
    logging.info(f"开始备份到: {destination_folder}")
    app_instance.update_status(f"开始备份 {drive_letter}:\\ 到 {destination_folder}")

    start_time = time.time()
    try:
      threaded_copytree(source_drive, destination_folder, max_workers=16)
      end_time = time.time()
      duration = end_time - start_time
      logging.info(f"成功完成备份: {source_drive} -> {destination_folder} (耗时: {duration:.2f} 秒)")
      app_instance.update_status(f"备份完成: {drive_letter}:\\ (耗时: {duration:.2f} 秒)")
    except FileNotFoundError:
      logging.error(f"错误:源驱动器 {source_drive} 不存在或无法访问。")
      app_instance.update_status(f"错误: 无法访问 {drive_letter}:\\")
    except PermissionError:
      logging.error(f"错误:没有权限读取 {source_drive} 或写入 {destination_folder}。请检查权限设置。")
      app_instance.update_status(f"错误: 权限不足 {drive_letter}:\\ 或目标文件夹")
    except Exception as e:
      logging.error(f"备份U盘 {source_drive} 时发生未知错误: {e}")
      app_instance.update_status(f"错误: 备份 {drive_letter}:\\ 时发生未知错误")
    finally:
      # 短暂显示完成/错误状态后,恢复到空闲状态
      app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入..."))

def run_backup_monitor(app_instance):
    """后台监控线程的主函数"""
    logging.info("U盘自动备份程序启动...")
    logging.info(f"备份将存储在: {BACKUP_DESTINATION_BASE}")
    app_instance.update_status("启动成功,等待U盘插入...")

    # 检查备份目录是否已成功创建(在 App 初始化时完成)
    if not os.path.exists(BACKUP_DESTINATION_BASE):
      logging.error(f"无法启动监控:备份目录 {BACKUP_DESTINATION_BASE} 不存在且无法创建。")
      app_instance.update_status(f"错误: 备份目录不存在且无法创建")
      return

    try:
      known_drives = get_available_drives()
      logging.info(f"当前已知驱动器: {sorted(list(known_drives))}")
    except Exception as e_init_drives:
      logging.error(f"初始化获取驱动器列表失败: {e_init_drives}")
      app_instance.update_status(f"错误: 获取驱动器列表失败")
      known_drives = set()

    while app_instance.running:
      try:
            app_instance.update_status("正在检测驱动器...")
            current_drives = get_available_drives()
            new_drives = current_drives - known_drives
            removed_drives = known_drives - current_drives

            if new_drives:
                logging.info(f"检测到新驱动器: {sorted(list(new_drives))}")
                for drive in new_drives:
                  if not app_instance.running: break # Check flag before potentially long operation
                  # 稍作等待,确保驱动器已准备好
                  logging.info(f"等待驱动器 {drive}: 准备就绪...")
                  app_instance.update_status(f"检测到新驱动器 {drive}:,等待准备就绪...")
                  time.sleep(3) # 增加等待时间
                  if not app_instance.running: break
                  try:
                        if is_removable_drive(drive):
                            backup_usb_drive(drive, app_instance)
                        else:
                            logging.info(f"驱动器 {drive}: 不是可移动驱动器,跳过备份。")
                            app_instance.update_status(f"驱动器 {drive}: 非U盘,跳过")
                            # 短暂显示后恢复空闲
                            app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
                  except Exception as e_check:
                         logging.error(f"检查或备份驱动器 {drive}: 时出错: {e_check}")
                         app_instance.update_status(f"错误: 处理驱动器 {drive}: 时出错")
                         app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)

            if removed_drives:
                logging.info(f"检测到驱动器移除: {sorted(list(removed_drives))}")
                # Optionally update status for removed drives
                # app_instance.update_status(f"驱动器 {','.join(sorted(list(removed_drives)))} 已移除")
                # app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)

            # 更新已知驱动器列表
            known_drives = current_drives

            # 在循环末尾更新状态为空闲(如果没有正在进行的草作)
            if not new_drives and app_instance.status_label.cget("text").startswith("状态: 正在检测驱动器"):
               app_instance.update_status("空闲,等待U盘插入...")

            # 等待指定间隔,并允许提前退出
            interval_counter = 0
            while app_instance.running and interval_counter < CHECK_INTERVAL:
                time.sleep(1)
                interval_counter += 1
            if not app_instance.running:
                break

      except Exception as e:
            logging.error(f"主循环发生错误: {e}")
            app_instance.update_status(f"错误: {e}")
            # 防止因临时错误导致程序崩溃,稍等后继续,并允许提前退出
            error_wait_counter = 0
            while app_instance.running and error_wait_counter < CHECK_INTERVAL * 2:
               time.sleep(1)
               error_wait_counter += 1
            if not app_instance.running:
                break

    logging.info("后台监控线程已停止。")
    app_instance.update_status("程序已停止")

if __name__ == "__main__":
    app = App()
    app.start_backup_monitor()
    app.mainloop()https://pic.siuth.cn/i/2025/680467b058cb8da5c8b8038a.png
页: [1]
查看完整版本: 电脑自动备份插入的U盘数据