PyQt5与数据库交互:SQLite/MySQL持久化存储(整合下载工具历史记录)

寒烟似雪
1月18日发布 /正在检测是否收录...

第17篇:PyQt5与数据库交互:SQLite/MySQL持久化存储(整合下载工具历史记录)

哈喽~ 欢迎来到PyQt5系列的第17篇!前面我们开发的多线程下载工具功能很完整,但有一个致命缺点——关闭程序后,所有下载任务记录都会丢失。想要实现任务记录的永久保存、下次启动自动加载,就必须掌握 PyQt5与数据库的交互

今天我们就来学习如何在PyQt5中操作数据库,重点讲解轻量级的SQLite(无需额外安装,开箱即用)和主流的MySQL(适合多用户/远程场景),并将数据库功能整合到下载工具中,实现下载任务历史记录的持久化存储。全程搭配完整可运行代码,新手也能轻松上手!

一、核心概念:为什么需要数据库?

在桌面应用开发中,数据库的核心作用是数据持久化——将内存中的临时数据保存到硬盘,程序重启后数据不丢失。对于下载工具来说,数据库可以存储:

  • 历史下载任务的链接、保存路径、下载进度、状态;
  • 用户的个性化设置(如默认下载路径、主题偏好);
  • 下载文件的MD5值、文件大小等元信息。

PyQt5操作数据库的两种方式

方式工具库优点缺点适用场景
原生库操作sqlite3(Python内置)、pymysql(MySQL第三方库)语法简单,灵活度高,学习成本低需手动处理数据库连接、事务、异常中小型桌面应用
Qt数据库模块QSqlDatabaseQSqlQuery与PyQt5深度集成,支持信号与槽,适合UI联动语法稍复杂,需熟悉Qt的数据库API大型/复杂Qt应用

本文选择原生库操作(新手友好),重点讲解SQLite和MySQL的核心用法。

二、实战1:SQLite数据库操作(Python内置,零配置)

SQLite是一款嵌入式关系型数据库,无需安装服务端,数据存储在单个文件中,非常适合桌面应用。Python内置sqlite3库,直接导入即可使用。

1. 核心步骤:连接数据库→创建表→增删改查

import sqlite3
import os

class SQLiteManager:
    def __init__(self, db_path="download_history.db"):
        """初始化数据库连接"""
        self.db_path = db_path
        self.conn = None  # 数据库连接对象
        self.cursor = None  # 游标对象,用于执行SQL
        self.connect()  # 初始化时自动连接
        self.create_table()  # 初始化时自动创建表

    def connect(self):
        """连接SQLite数据库"""
        try:
            # 连接数据库(文件不存在则自动创建)
            self.conn = sqlite3.connect(self.db_path)
            # 设置游标,用于执行SQL语句
            self.cursor = self.conn.cursor()
            # 解决中文乱码问题
            self.cursor.execute("PRAGMA encoding='UTF-8'")
            print(f"成功连接SQLite数据库:{self.db_path}")
        except Exception as e:
            print(f"数据库连接失败:{str(e)}")

    def create_table(self):
        """创建下载任务历史表"""
        create_sql = """
        CREATE TABLE IF NOT EXISTS download_tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            task_id TEXT NOT NULL,
            url TEXT NOT NULL,
            save_path TEXT NOT NULL,
            progress INTEGER DEFAULT 0,
            size TEXT DEFAULT '0 B/未知',
            speed TEXT DEFAULT '0 B/s',
            status TEXT DEFAULT '等待中',
            create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
        try:
            self.cursor.execute(create_sql)
            self.conn.commit()  # 提交事务
            print("成功创建download_tasks表")
        except Exception as e:
            self.conn.rollback()  # 出错时回滚
            print(f"创建表失败:{str(e)}")

    def add_task(self, task_id, url, save_path, progress=0, size="0 B/未知", speed="0 B/s", status="等待中"):
        """添加下载任务到数据库"""
        insert_sql = """
        INSERT INTO download_tasks (task_id, url, save_path, progress, size, speed, status)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """
        try:
            self.cursor.execute(insert_sql, (task_id, url, save_path, progress, size, speed, status))
            self.conn.commit()
            print(f"成功添加任务:{task_id}")
            return True
        except Exception as e:
            self.conn.rollback()
            print(f"添加任务失败:{str(e)}")
            return False

    def update_task(self, task_id, **kwargs):
        """更新任务信息(支持动态更新字段)"""
        # kwargs示例:{"progress": 50, "status": "下载中"}
        fields = []
        values = []
        for k, v in kwargs.items():
            fields.append(f"{k}=?")
            values.append(v)
        values.append(task_id)  # WHERE条件的值

        update_sql = f"""
        UPDATE download_tasks
        SET {', '.join(fields)}
        WHERE task_id=?
        """
        try:
            self.cursor.execute(update_sql, values)
            self.conn.commit()
            print(f"成功更新任务:{task_id}")
            return True
        except Exception as e:
            self.conn.rollback()
            print(f"更新任务失败:{str(e)}")
            return False

    def get_all_tasks(self):
        """获取所有下载任务"""
        select_sql = "SELECT * FROM download_tasks ORDER BY create_time DESC"
        try:
            self.cursor.execute(select_sql)
            # 获取字段名(用于构造字典)
            columns = [desc[0] for desc in self.cursor.description]
            # 将查询结果转换为字典列表(更易使用)
            tasks = []
            for row in self.cursor.fetchall():
                task = dict(zip(columns, row))
                tasks.append(task)
            return tasks
        except Exception as e:
            print(f"查询任务失败:{str(e)}")
            return []

    def delete_task(self, task_id):
        """删除指定任务"""
        delete_sql = "DELETE FROM download_tasks WHERE task_id=?"
        try:
            self.cursor.execute(delete_sql, (task_id,))
            self.conn.commit()
            print(f"成功删除任务:{task_id}")
            return True
        except Exception as e:
            self.conn.rollback()
            print(f"删除任务失败:{str(e)}")
            return False

    def close(self):
        """关闭数据库连接"""
        if self.conn:
            self.conn.close()
            print("数据库连接已关闭")

# -------------------------- 测试代码 --------------------------
if __name__ == "__main__":
    db = SQLiteManager()
    # 添加测试任务
    db.add_task(
        task_id="task_001",
        url="https://www.python.org/static/img/python-logo.png",
        save_path="python.png",
        progress=100,
        size="10 KB/10 KB",
        speed="2 KB/s",
        status="已完成"
    )
    # 更新任务
    db.update_task("task_001", progress=50, status="已暂停")
    # 查询所有任务
    tasks = db.get_all_tasks()
    for task in tasks:
        print(task)
    # 删除任务
    # db.delete_task("task_001")
    # 关闭连接
    db.close()

2. 核心知识点解析

  • 参数化查询:使用?作为占位符,避免SQL注入攻击(绝对不要用字符串拼接SQL!);
  • 事务管理commit()提交事务(执行增删改后必须调用),rollback()出错时回滚;
  • 结果转换:将查询结果转换为字典列表,比元组更易读取字段值;
  • 中文乱码:执行PRAGMA encoding='UTF-8'确保中文正常存储。

三、实战2:MySQL数据库操作(主流关系型数据库)

MySQL是一款开源的关系型数据库,适合多用户、远程访问的场景。使用前需安装:

  1. 安装MySQL服务端(官网下载);
  2. 安装Python驱动:pip install pymysql

1. 核心步骤:连接→建表→增删改查(与SQLite类似)

import pymysql

class MySQLManager:
    def __init__(self, host="localhost", port=3306, user="root", password="your_password", db="download_tool"):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.db = db
        self.conn = None
        self.cursor = None
        self.connect()
        self.create_table()

    def connect(self):
        """连接MySQL数据库"""
        try:
            self.conn = pymysql.connect(
                host=self.host,
                port=self.port,
                user=self.user,
                password=self.password,
                database=self.db,
                charset="utf8mb4"  # 支持emoji等特殊字符
            )
            self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)  # 直接返回字典格式
            print(f"成功连接MySQL数据库:{self.db}")
        except Exception as e:
            print(f"数据库连接失败:{str(e)}")

    def create_table(self):
        """创建下载任务表"""
        create_sql = """
        CREATE TABLE IF NOT EXISTS download_tasks (
            id INT AUTO_INCREMENT PRIMARY KEY,
            task_id VARCHAR(50) NOT NULL UNIQUE,
            url TEXT NOT NULL,
            save_path TEXT NOT NULL,
            progress INT DEFAULT 0,
            size VARCHAR(50) DEFAULT '0 B/未知',
            speed VARCHAR(50) DEFAULT '0 B/s',
            status VARCHAR(20) DEFAULT '等待中',
            create_time DATETIME DEFAULT CURRENT_TIMESTAMP
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
        """
        try:
            self.cursor.execute(create_sql)
            self.conn.commit()
            print("成功创建download_tasks表")
        except Exception as e:
            self.conn.rollback()
            print(f"创建表失败:{str(e)}")

    def add_task(self, task_id, url, save_path, progress=0, size="0 B/未知", speed="0 B/s", status="等待中"):
        """添加任务"""
        insert_sql = """
        INSERT INTO download_tasks (task_id, url, save_path, progress, size, speed, status)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        """
        try:
            self.cursor.execute(insert_sql, (task_id, url, save_path, progress, size, speed, status))
            self.conn.commit()
            return True
        except Exception as e:
            self.conn.rollback()
            print(f"添加任务失败:{str(e)}")
            return False

    def update_task(self, task_id, **kwargs):
        """更新任务"""
        fields = []
        values = []
        for k, v in kwargs.items():
            fields.append(f"{k}=%s")
            values.append(v)
        values.append(task_id)

        update_sql = f"""
        UPDATE download_tasks
        SET {', '.join(fields)}
        WHERE task_id=%s
        """
        try:
            self.cursor.execute(update_sql, values)
            self.conn.commit()
            return True
        except Exception as e:
            self.conn.rollback()
            print(f"更新任务失败:{str(e)}")
            return False

    def get_all_tasks(self):
        """获取所有任务"""
        select_sql = "SELECT * FROM download_tasks ORDER BY create_time DESC"
        try:
            self.cursor.execute(select_sql)
            return self.cursor.fetchall()  # 直接返回字典列表
        except Exception as e:
            print(f"查询任务失败:{str(e)}")
            return []

    def close(self):
        """关闭连接"""
        if self.conn:
            self.conn.close()
            print("MySQL连接已关闭")

# -------------------------- 测试代码 --------------------------
if __name__ == "__main__":
    # 注意:替换为你的MySQL账号密码
    db = MySQLManager(user="root", password="123456", db="download_tool")
    db.add_task("task_002", "https://www.baidu.com", "baidu.html", status="已完成")
    print(db.get_all_tasks())
    db.close()

2. SQLite vs MySQL 核心区别

对比项SQLiteMySQL
占位符?%s
游标返回格式需手动转换为字典可通过DictCursor直接返回字典
字符集PRAGMA encoding='UTF-8'连接时指定charset='utf8mb4'
事务自动提交(增删改需手动commit默认自动提交(可关闭)
适用场景单机桌面应用多用户/远程服务器应用

四、终极实战:整合数据库到多线程下载工具

我们将SQLite数据库整合到第16篇的美化版下载工具中,实现任务记录持久化

  1. 启动程序时自动加载历史任务到表格;
  2. 添加新任务时自动保存到数据库;
  3. 下载进度更新时自动同步到数据库;
  4. 关闭程序时自动关闭数据库连接。

完整整合版代码

import sys
import time
import requests
import sqlite3
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
    QTableWidget, QTableWidgetItem, QLineEdit, QPushButton,
    QFileDialog, QMessageBox, QHeaderView
)
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QTimer
from PyQt5.QtGui import QColor, QFont, QPainter, QBrush, QLinearGradient

# -------------------------- 1. 下载线程类 --------------------------
class DownloadThread(QThread):
    progress_signal = pyqtSignal(int, str, str)
    status_signal = pyqtSignal(str)
    finish_signal = pyqtSignal(str)  # 传递task_id

    def __init__(self, task_id, url, save_path):
        super().__init__()
        self.task_id = task_id
        self.url = url
        self.save_path = save_path
        self.is_paused = False
        self.is_canceled = False
        self.chunk_size = 1024 * 1024
        self.downloaded_size = 0
        self.total_size = 0

    def run(self):
        try:
            headers = {}
            if self.downloaded_size > 0:
                headers["Range"] = f"bytes={self.downloaded_size}-"

            response = requests.get(self.url, headers=headers, stream=True, timeout=15)
            self.total_size = int(response.headers.get("content-length", 0)) + self.downloaded_size

            with open(self.save_path, "ab") as f:
                self.status_signal.emit("下载中")
                start_time = time.time()

                for chunk in response.iter_content(chunk_size=self.chunk_size):
                    while self.is_paused:
                        time.sleep(0.1)
                        if self.is_canceled:
                            self.status_signal.emit("已取消")
                            self.finish_signal.emit(self.task_id)
                            return

                    if self.is_canceled:
                        self.status_signal.emit("已取消")
                        self.finish_signal.emit(self.task_id)
                        return

                    f.write(chunk)
                    self.downloaded_size += len(chunk)

                    progress = int((self.downloaded_size / self.total_size) * 100) if self.total_size > 0 else 0
                    downloaded_str = self.format_size(self.downloaded_size)
                    total_str = self.format_size(self.total_size)
                    speed_str = self.calculate_speed(self.downloaded_size, start_time)

                    self.progress_signal.emit(progress, f"{downloaded_str}/{total_str}", speed_str)

            if self.downloaded_size >= self.total_size and not self.is_canceled:
                self.status_signal.emit("已完成")
            elif self.is_canceled:
                self.status_signal.emit("已取消")
            else:
                self.status_signal.emit("已暂停")
            self.finish_signal.emit(self.task_id)

        except requests.exceptions.RequestException as e:
            self.status_signal.emit(f"失败:{str(e)}")
            self.finish_signal.emit(self.task_id)
        except Exception as e:
            self.status_signal.emit(f"失败:{str(e)}")
            self.finish_signal.emit(self.task_id)

    def pause(self):
        self.is_paused = not self.is_paused
        status = "已暂停" if self.is_paused else "下载中"
        self.status_signal.emit(status)

    def cancel(self):
        self.is_canceled = True
        self.is_paused = False

    def format_size(self, size):
        units = ["B", "KB", "MB", "GB"]
        index = 0
        while size >= 1024 and index < len(units) - 1:
            size /= 1024
            index += 1
        return f"{size:.2f} {units[index]}"

    def calculate_speed(self, downloaded_size, start_time):
        elapsed_time = time.time() - start_time
        if elapsed_time <= 0:
            return "0 B/s"
        speed = downloaded_size / elapsed_time
        return self.format_size(speed) + "/s"

# -------------------------- 2. 数据库管理类 --------------------------
class DBManager:
    def __init__(self, db_path="download_history.db"):
        self.db_path = db_path
        self.conn = None
        self.cursor = None
        self.connect()
        self.create_table()

    def connect(self):
        try:
            self.conn = sqlite3.connect(self.db_path)
            self.cursor = self.conn.cursor()
            self.cursor.execute("PRAGMA encoding='UTF-8'")
        except Exception as e:
            QMessageBox.critical(None, "数据库错误", f"连接失败:{str(e)}")

    def create_table(self):
        create_sql = """
        CREATE TABLE IF NOT EXISTS download_tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            task_id TEXT NOT NULL UNIQUE,
            url TEXT NOT NULL,
            save_path TEXT NOT NULL,
            progress INTEGER DEFAULT 0,
            size TEXT DEFAULT '0 B/未知',
            speed TEXT DEFAULT '0 B/s',
            status TEXT DEFAULT '等待中',
            create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
        try:
            self.cursor.execute(create_sql)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            QMessageBox.critical(None, "数据库错误", f"创建表失败:{str(e)}")

    def add_task(self, task_id, url, save_path, progress=0, size="0 B/未知", speed="0 B/s", status="等待中"):
        insert_sql = """
        INSERT OR IGNORE INTO download_tasks (task_id, url, save_path, progress, size, speed, status)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """
        try:
            self.cursor.execute(insert_sql, (task_id, url, save_path, progress, size, speed, status))
            self.conn.commit()
            return True
        except Exception as e:
            self.conn.rollback()
            QMessageBox.warning(None, "添加失败", f"任务已存在或数据库错误:{str(e)}")
            return False

    def update_task(self, task_id, **kwargs):
        fields = []
        values = []
        for k, v in kwargs.items():
            fields.append(f"{k}=?")
            values.append(v)
        values.append(task_id)

        update_sql = f"UPDATE download_tasks SET {', '.join(fields)} WHERE task_id=?"
        try:
            self.cursor.execute(update_sql, values)
            self.conn.commit()
            return True
        except Exception as e:
            self.conn.rollback()
            QMessageBox.warning(None, "更新失败", f"{str(e)}")
            return False

    def get_all_tasks(self):
        select_sql = "SELECT * FROM download_tasks ORDER BY create_time DESC"
        try:
            self.cursor.execute(select_sql)
            columns = [desc[0] for desc in self.cursor.description]
            tasks = []
            for row in self.cursor.fetchall():
                tasks.append(dict(zip(columns, row)))
            return tasks
        except Exception as e:
            QMessageBox.warning(None, "查询失败", f"{str(e)}")
            return []

    def delete_task(self, task_id):
        delete_sql = "DELETE FROM download_tasks WHERE task_id=?"
        try:
            self.cursor.execute(delete_sql, (task_id,))
            self.conn.commit()
            return True
        except Exception as e:
            self.conn.rollback()
            QMessageBox.warning(None, "删除失败", f"{str(e)}")
            return False

    def close(self):
        if self.conn:
            self.conn.close()

# -------------------------- 3. 主窗口类 --------------------------
class DownloaderWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.init_ui()
        # 初始化数据库
        self.db = DBManager()
        # 任务存储:{task_id: {"thread": 线程实例, ...}}
        self.download_tasks = {}
        self.current_task_id = 0
        # 加载历史任务
        self.load_history_tasks()
        # 加载QSS样式
        self.load_qss()

    def init_ui(self):
        self.setWindowTitle("多线程下载工具(带数据库持久化)")
        self.resize(900, 600)
        self.setMinimumSize(800, 500)
        self.setAttribute(Qt.WA_TranslucentBackground)
        self.setWindowFlags(Qt.Window | Qt.WindowMinimizeButtonHint | Qt.WindowCloseButtonHint)

        central_widget = QWidget()
        central_widget.setObjectName("centralWidget")
        self.setCentralWidget(central_widget)

        main_layout = QVBoxLayout(central_widget)
        main_layout.setSpacing(15)
        main_layout.setContentsMargins(20, 20, 20, 20)

        # 顶部任务添加区
        add_task_layout = QHBoxLayout()
        self.url_edit = QLineEdit()
        self.url_edit.setPlaceholderText("请输入下载链接")
        self.url_edit.setObjectName("urlEdit")

        self.path_edit = QLineEdit()
        self.path_edit.setPlaceholderText("请选择保存路径")
        self.path_edit.setObjectName("pathEdit")

        self.browse_btn = QPushButton("浏览")
        self.add_btn = QPushButton("添加任务")
        self.start_all_btn = QPushButton("开始所有")
        self.clear_btn = QPushButton("清空历史")  # 新增清空按钮
        for btn in [self.browse_btn, self.add_btn, self.start_all_btn, self.clear_btn]:
            btn.setFixedSize(80, 35)
            btn.setObjectName("funcBtn")

        add_task_layout.addWidget(self.url_edit)
        add_task_layout.addWidget(self.path_edit)
        add_task_layout.addWidget(self.browse_btn)
        add_task_layout.addWidget(self.add_btn)
        add_task_layout.addWidget(self.start_all_btn)
        add_task_layout.addWidget(self.clear_btn)

        # 中部任务列表
        self.task_table = QTableWidget()
        self.task_table.setColumnCount(9)  # 新增task_id列(隐藏)
        self.task_table.setHorizontalHeaderLabels([
            "任务ID", "链接", "保存路径", "进度", "大小", "速度", "状态", "操作", "隐藏ID"
        ])
        self.task_table.setObjectName("taskTable")
        self.task_table.horizontalHeader().setStretchLastSection(True)
        self.task_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
        self.task_table.verticalHeader().setVisible(False)
        self.task_table.setEditTriggers(QTableWidget.NoEditTriggers)
        self.task_table.setColumnHidden(8, True)  # 隐藏task_id列

        # 底部状态栏
        self.status_label = QLabel("就绪:已加载历史任务")
        self.status_label.setObjectName("statusLabel")

        main_layout.addLayout(add_task_layout)
        main_layout.addWidget(self.task_table)
        main_layout.addWidget(self.status_label)

        # 绑定信号
        self.browse_btn.clicked.connect(self.choose_save_path)
        self.add_btn.clicked.connect(self.add_download_task)
        self.start_all_btn.clicked.connect(self.start_all_tasks)
        self.clear_btn.clicked.connect(self.clear_all_tasks)

    def load_history_tasks(self):
        """加载历史任务到表格"""
        tasks = self.db.get_all_tasks()
        for task in tasks:
            row = self.task_table.rowCount()
            self.task_table.insertRow(row)
            # 填充数据
            self.task_table.setItem(row, 0, QTableWidgetItem(str(row + 1)))
            self.task_table.setItem(row, 1, QTableWidgetItem(task["url"]))
            self.task_table.setItem(row, 2, QTableWidgetItem(task["save_path"]))
            self.task_table.setItem(row, 3, QTableWidgetItem(f"{task['progress']}%"))
            self.task_table.setItem(row, 4, QTableWidgetItem(task["size"]))
            self.task_table.setItem(row, 5, QTableWidgetItem(task["speed"]))
            self.task_table.setItem(row, 6, QTableWidgetItem(task["status"]))
            # 隐藏的task_id列
            self.task_table.setItem(row, 8, QTableWidgetItem(task["task_id"]))

            # 添加操作按钮
            btn_layout = QHBoxLayout()
            start_btn = QPushButton("开始")
            pause_btn = QPushButton("暂停")
            cancel_btn = QPushButton("取消")
            del_btn = QPushButton("删除")  # 新增删除按钮
            for btn in [start_btn, pause_btn, cancel_btn, del_btn]:
                btn.setFixedSize(50, 30)
            btn_layout.addWidget(start_btn)
            btn_layout.addWidget(pause_btn)
            btn_layout.addWidget(cancel_btn)
            btn_layout.addWidget(del_btn)
            btn_widget = QWidget()
            btn_widget.setLayout(btn_layout)
            self.task_table.setCellWidget(row, 7, btn_widget)

            # 绑定按钮信号
            task_id = task["task_id"]
            start_btn.clicked.connect(lambda checked, tid=task_id: self.start_single_task(tid))
            pause_btn.clicked.connect(lambda checked, tid=task_id: self.pause_single_task(tid))
            cancel_btn.clicked.connect(lambda checked, tid=task_id: self.cancel_single_task(tid))
            del_btn.clicked.connect(lambda checked, tid=task_id: self.delete_single_task(tid))

            # 存储任务信息
            self.download_tasks[task_id] = {
                "thread": None,
                "url": task["url"],
                "save_path": task["save_path"],
                "start_btn": start_btn,
                "pause_btn": pause_btn,
                "cancel_btn": cancel_btn
            }

    def add_download_task(self):
        url = self.url_edit.text().strip()
        save_path = self.path_edit.text().strip()
        if not url or not save_path:
            QMessageBox.warning(self, "提示", "请输入链接和保存路径!")
            return

        # 生成唯一task_id
        task_id = f"task_{int(time.time() * 1000)}_{self.current_task_id}"
        self.current_task_id += 1

        # 添加到数据库
        if self.db.add_task(task_id, url, save_path):
            # 添加到表格
            row = self.task_table.rowCount()
            self.task_table.insertRow(row)
            self.task_table.setItem(row, 0, QTableWidgetItem(str(row + 1)))
            self.task_table.setItem(row, 1, QTableWidgetItem(url))
            self.task_table.setItem(row, 2, QTableWidgetItem(save_path))
            self.task_table.setItem(row, 3, QTableWidgetItem("0%"))
            self.task_table.setItem(row, 4, QTableWidgetItem("0 B/未知"))
            self.task_table.setItem(row, 5, QTableWidgetItem("0 B/s"))
            self.task_table.setItem(row, 6, QTableWidgetItem("等待中"))
            self.task_table.setItem(row, 8, QTableWidgetItem(task_id))

            # 添加操作按钮
            btn_layout = QHBoxLayout()
            start_btn = QPushButton("开始")
            pause_btn = QPushButton("暂停")
            cancel_btn = QPushButton("取消")
            del_btn = QPushButton("删除")
            for btn in [start_btn, pause_btn, cancel_btn, del_btn]:
                btn.setFixedSize(50, 30)
            btn_layout.addWidget(start_btn)
            btn_layout.addWidget(pause_btn)
            btn_layout.addWidget(cancel_btn)
            btn_layout.addWidget(del_btn)
            btn_widget = QWidget()
            btn_widget.setLayout(btn_layout)
            self.task_table.setCellWidget(row, 7, btn_widget)

            # 绑定信号
            start_btn.clicked.connect(lambda checked, tid=task_id: self.start_single_task(tid))
            pause_btn.clicked.connect(lambda checked, tid=task_id: self.pause_single_task(tid))
            cancel_btn.clicked.connect(lambda checked, tid=task_id: self.cancel_single_task(tid))
            del_btn.clicked.connect(lambda checked, tid=task_id: self.delete_single_task(tid))

            # 存储任务
            self.download_tasks[task_id] = {
                "thread": None,
                "url": url,
                "save_path": save_path,
                "start_btn": start_btn,
                "pause_btn": pause_btn,
                "cancel_btn": cancel_btn
            }

            self.status_label.setText(f"已添加任务:{task_id}")
            self.url_edit.clear()
            self.path_edit.clear()

    def start_single_task(self, task_id):
        task = self.download_tasks.get(task_id)
        if not task:
            return

        if task["thread"] is None:
            task["thread"] = DownloadThread(task_id, task["url"], task["save_path"])
            # 绑定信号
            task["thread"].progress_signal.connect(lambda p, s, sp, tid=task_id: self.update_task_progress(tid, p, s, sp))
            task["thread"].status_signal.connect(lambda st, tid=task_id: self.update_task_status(tid, st))
            task["thread"].finish_signal.connect(lambda tid=task_id: self.on_task_finished(tid))

        task["thread"].start()
        task["start_btn"].setEnabled(False)
        task["pause_btn"].setEnabled(True)
        task["cancel_btn"].setEnabled(True)
        self.status_label.setText(f"任务 {task_id} 开始下载...")

    def pause_single_task(self, task_id):
        task = self.download_tasks.get(task_id)
        if task and task["thread"] and task["thread"].isRunning():
            task["thread"].pause()

    def cancel_single_task(self, task_id):
        task = self.download_tasks.get(task_id)
        if task and task["thread"]:
            task["thread"].cancel()
            task["start_btn"].setEnabled(False)
            task["pause_btn"].setEnabled(False)

    def delete_single_task(self, task_id):
        if QMessageBox.question(self, "确认删除", f"确定删除任务 {task_id} 吗?", QMessageBox.Yes | QMessageBox.No) == QMessageBox.Yes:
            # 从数据库删除
            if self.db.delete_task(task_id):
                # 从表格删除
                for row in range(self.task_table.rowCount()):
                    item = self.task_table.item(row, 8)
                    if item and item.text() == task_id:
                        self.task_table.removeRow(row)
                        break
                # 从内存删除
                del self.download_tasks[task_id]
                self.status_label.setText(f"已删除任务 {task_id}")

    def clear_all_tasks(self):
        if QMessageBox.question(self, "确认清空", "确定清空所有历史任务吗?", QMessageBox.Yes | QMessageBox.No) == QMessageBox.Yes:
            # 清空表格
            self.task_table.setRowCount(0)
            # 清空内存
            self.download_tasks.clear()
            # 清空数据库
            for task in self.db.get_all_tasks():
                self.db.delete_task(task["task_id"])
            self.status_label.setText("已清空所有历史任务")

    def update_task_progress(self, task_id, progress, size_str, speed_str):
        # 更新数据库
        self.db.update_task(task_id, progress=progress, size=size_str, speed=speed_str)
        # 更新表格
        for row in range(self.task_table.rowCount()):
            item = self.task_table.item(row, 8)
            if item and item.text() == task_id:
                self.task_table.setItem(row, 3, QTableWidgetItem(f"{progress}%"))
                self.task_table.setItem(row, 4, QTableWidgetItem(size_str))
                self.task_table.setItem(row, 5, QTableWidgetItem(speed_str))
                break

    def update_task_status(self, task_id, status):
        # 更新数据库
        self.db.update_task(task_id, status=status)
        # 更新表格
        for row in range(self.task_table.rowCount()):
            item = self.task_table.item(row, 8)
            if item and item.text() == task_id:
                self.task_table.setItem(row, 6, QTableWidgetItem(status))
                # 更新状态文字颜色
                status_item = self.task_table.item(row, 6)
                if status == "下载中":
                    status_item.setForeground(QColor(64, 158, 255))
                elif status == "已完成":
                    status_item.setForeground(QColor(103, 194, 58))
                elif status == "已暂停":
                    status_item.setForeground(QColor(230, 162, 60))
                elif status == "已取消":
                    status_item.setForeground(QColor(144, 147, 153))
                else:
                    status_item.setForeground(QColor(245, 108, 108))
                break

    def on_task_finished(self, task_id):
        task = self.download_tasks.get(task_id)
        if task:
            task["start_btn"].setEnabled(False)
            task["pause_btn"].setEnabled(False)
            self.status_label.setText(f"任务 {task_id} 已完成!")

    def choose_save_path(self):
        file_path, _ = QFileDialog.getSaveFileName(self, "选择保存路径", "", "All Files (*.*)")
        if file_path:
            self.path_edit.setText(file_path)

    def load_qss(self):
        qss = """
            #centralWidget { background-color: transparent; }
            QLineEdit {
                border: 2px solid #e0e6ed; border-radius: 8px;
                padding: 8px 12px; font-size: 14px; color: #2c3e50; background-color: white;
            }
            QLineEdit:focus { border-color: #409eff; outline: none; }
            QLineEdit::placeholder { color: #909399; }
            #funcBtn {
                background-color: #409eff; color: white; border: none;
                border-radius: 8px; font-size: 14px;
            }
            #funcBtn:hover { background-color: #3390e7; }
            #funcBtn:pressed { background-color: #2680dc; }
            #taskTable {
                background-color: white; border: none; border-radius: 8px;
                gridline-color: #e0e6ed; font-size: 13px;
            }
            #taskTable QHeaderView::section {
                background-color: #409eff; color: white; border: none;
                padding: 10px; text-align: center; font-weight: bold; font-size: 14px;
            }
            #taskTable::item:alternate { background-color: #f8fafc; }
            #taskTable::item:selected { background-color: #e6f7ff; color: #2c3e50; }
            #taskTable::item:hover { background-color: #f0f8ff; }
            QPushButton[text="开始"] { background-color: #67c23a; color: white; border: none; border-radius: 4px; padding: 4px 8px; }
            QPushButton[text="暂停"] { background-color: #e6a23c; color: white; border: none; border-radius: 4px; padding: 4px 8px; }
            QPushButton[text="取消"] { background-color: #f56c6c; color: white; border: none; border-radius: 4px; padding: 4px 8px; }
            QPushButton[text="删除"] { background-color: #909399; color: white; border: none; border-radius: 4px; padding: 4px 8px; }
            #statusLabel { color: #606266; font-size: 12px; padding: 5px 0; }
        """
        self.setStyleSheet(qss)

    def paintEvent(self, event):
        painter = QPainter(self)
        painter.setRenderHint(QPainter.Antialiasing)
        gradient = QLinearGradient(0, 0, self.width(), self.height())
        gradient.setColorAt(0, QColor(245, 247, 250))
        gradient.setColorAt(1, QColor(230, 235, 240))
        painter.setBrush(QBrush(gradient))
        painter.setPen(Qt.NoPen)
        painter.drawRoundedRect(self.rect(), 15, 15)

    def closeEvent(self, event):
        reply = QMessageBox.question(self, "关闭确认", "确定关闭吗?正在下载的任务将被取消!", QMessageBox.Yes | QMessageBox.No)
        if reply == QMessageBox.Yes:
            # 停止所有线程
            for task in self.download_tasks.values():
                if task["thread"] and task["thread"].isRunning():
                    task["thread"].cancel()
                    task["thread"].wait()
            # 关闭数据库
            self.db.close()
            event.accept()
        else:
            event.ignore()

# -------------------------- 程序入口 --------------------------
if __name__ == "__main__":
    app = QApplication(sys.argv)
    app.setStyle("Fusion")
    window = DownloaderWindow()
    window.show()
    sys.exit(app.exec_())

mkj2moje.png

整合核心亮点

  1. 任务持久化:启动程序时自动从数据库加载历史任务,添加/更新/删除任务时同步到数据库;
  2. 唯一任务ID:用时间戳+计数器生成唯一task_id,避免任务重复;
  3. 线程-数据库联动:下载进度更新时,通过信号同步更新数据库和表格;
  4. 安全关闭:窗口关闭时,先停止所有下载线程,再关闭数据库连接,避免数据丢失。

五、数据库交互常见问题排查

1. 中文乱码

  • SQLite:执行PRAGMA encoding='UTF-8'
  • MySQL:连接时指定charset='utf8mb4',表字符集设为utf8mb4

2. 多线程操作数据库冲突

  • 问题:多个下载线程同时更新数据库,导致锁表或数据错乱;
  • 解决方案:使用数据库锁(sqlite3timeout参数),或在主线程统一处理数据库操作。

3. 任务重复添加

  • 解决方案:SQLite使用INSERT OR IGNORE,MySQL使用INSERT ... ON DUPLICATE KEY UPDATE,确保task_id唯一。

4. 数据库文件权限不足

  • 问题:无法创建或写入数据库文件;
  • 解决方案:将数据库文件保存到用户目录(如C:/Users/用户名/Documents),避免系统盘根目录。

六、进阶拓展方向

  1. 支持MySQL远程连接:将DBManager改为支持SQLite/MySQL切换,满足不同场景需求;
  2. 任务备份与恢复:导出数据库为SQL文件,支持一键恢复;
  3. 数据加密:对敏感字段(如用户密码)进行加密存储;
  4. 分页加载历史任务:当任务数量过多时,实现分页查询,提升界面加载速度。

总结

  1. 数据库核心作用:实现桌面应用的数据持久化,程序重启后数据不丢失;
  2. SQLite vs MySQL:SQLite适合单机应用,MySQL适合多用户/远程场景;
  3. 整合关键:将数据库操作与UI逻辑分离,通过信号与槽实现线程-数据库-界面的联动;
  4. 安全要点:使用参数化查询避免SQL注入,合理管理事务,安全关闭数据库连接。

下一章我们将学习PyQt5打包发布——如何将写好的Python程序打包成exe可执行文件,分发给用户直接运行,无需安装Python环境!如果在数据库整合中遇到问题,欢迎在评论区留言讨论~

© 版权声明
THE END
喜欢就支持一下吧
点赞 0 分享 收藏
评论 抢沙发
OωO
取消
SSL