data-return/neibub_final.py

618 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import sys
import sqlite3
import subprocess
from datetime import datetime
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QLineEdit, QPushButton, QFileDialog, QTableWidget,
QTableWidgetItem, QComboBox, QMessageBox, QHeaderView, QMenu, QDateEdit)
from PyQt5.QtCore import Qt, QDate, QThread, pyqtSignal
from PyQt5.QtGui import QIcon, QPixmap
class DirectoryScanner(QThread):
"""使用线程来扫描目录,避免界面冻结"""
scan_finished = pyqtSignal(str, bool, str) # 参数: root_dir, success, message
def __init__(self, root_dir, db_conn):
super().__init__()
self.root_dir = root_dir
self.db_conn = db_conn
def run(self):
try:
cursor = self.db_conn.cursor()
# 清理当前项目路径下的旧数据
print(f"[DEBUG] 清理数据库中的项目路径: {self.root_dir}")
cursor.execute("DELETE FROM photos WHERE project_path LIKE ?", (self.root_dir + '%',))
deleted_rows = cursor.rowcount
print(f"[DEBUG] 已删除 {deleted_rows} 条旧记录")
self.db_conn.commit()
if not os.path.exists(self.root_dir):
self.scan_finished.emit(self.root_dir, False, f"目录不存在: {self.root_dir}")
return
# 收集所有机组目录
unit_dirs = [d for d in os.listdir(self.root_dir)
if os.path.isdir(os.path.join(self.root_dir, d))]
print(f"[DEBUG] 找到 {len(unit_dirs)} 个机组目录: {unit_dirs}")
if not unit_dirs:
self.scan_finished.emit(self.root_dir, False, "未找到任何机组目录")
return
# 按机组编号排序
def extract_sort_key(unit_id):
numbers = re.findall(r'\d+', unit_id)
return [int(num) for num in numbers] if numbers else [0]
unit_dirs_sorted = sorted(unit_dirs, key=extract_sort_key)
total_photos = 0
for unit_dir in unit_dirs_sorted:
unit_path = os.path.join(self.root_dir, unit_dir)
unit_id = unit_dir
print(f"[DEBUG] 正在扫描机组: {unit_id}")
for item in os.listdir(unit_path):
item_path = os.path.join(unit_path, item)
if not os.path.isdir(item_path):
continue
blade_num, inspector = self.parse_blade_info(item)
if not blade_num:
print(f"[DEBUG] 跳过无法解析的目录: {item}")
continue
print(f"[DEBUG] 处理叶片: {blade_num}, 检查员: {inspector or ''}")
# 检查是否已经处理过这个目录
cursor.execute("""
SELECT 1 FROM photos
WHERE project_path = ? AND unit_id = ? AND blade_number = ?
LIMIT 1
""", (self.root_dir, unit_id, blade_num))
if cursor.fetchone():
print(f"[DEBUG] 跳过已处理的目录: {item_path}")
continue
for file in os.listdir(item_path):
if file.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')):
file_path = os.path.join(item_path, file)
capture_time = self.parse_capture_time(file)
if capture_time:
# 检查是否已经存在相同的照片记录
cursor.execute("""
SELECT 1 FROM photos
WHERE photo_path = ?
LIMIT 1
""", (file_path,))
if not cursor.fetchone():
cursor.execute("""
INSERT INTO photos (
project_path, unit_id, blade_number, inspector,
photo_path, capture_time, file_name
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
self.root_dir, unit_id, blade_num,
inspector if inspector else None, # 检查人员为空时存NULL
file_path, capture_time, file
))
total_photos += 1
else:
print(f"[DEBUG] 跳过已存在的照片: {file_path}")
else:
print(f"[DEBUG] 跳过无法解析时间的文件: {file}")
self.db_conn.commit()
print(f"[DEBUG] 扫描完成,共找到 {total_photos} 张照片")
self.scan_finished.emit(self.root_dir, True, f"目录扫描完成,共找到 {total_photos} 张照片")
except Exception as e:
error_msg = f"扫描目录时出错: {str(e)}"
print(f"[ERROR] {error_msg}")
self.scan_finished.emit(self.root_dir, False, error_msg)
def parse_blade_info(self, dir_name):
"""解析叶片目录名称,返回(叶片号, 检查人员)"""
dir_name = dir_name.strip()
# 匹配格式: 数字-姓名(编号) 或 数字-姓名 或 数字-(编号)
match = re.match(r'^(\d+)\s*-\s*([^(]*)(?:\s*[(][^)]+[)])?$', dir_name)
if match:
blade_num = match.group(1).lstrip('0') or '0'
inspector = match.group(2).strip()
return (blade_num, inspector if inspector else None)
# 匹配纯数字情况
if re.match(r'^\d+$', dir_name):
return (dir_name.lstrip('0') or '0', None)
return (None, None)
def parse_capture_time(self, filename):
match = re.search(r"(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})", filename)
if match:
year, month, day, hour, minute, second = match.groups()
try:
return f"{year}-{month}-{day} {hour}:{minute}:{second}"
except:
return None
return None
class PhotoManager(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("风电叶片检查照片管理系统")
self.setWindowIcon(QIcon("wind_turbine.ico"))
self.setGeometry(100, 100, 1200, 800)
# 数据库连接
self.db_conn = sqlite3.connect("wind_turbine_photos.db",
check_same_thread=False)
self.create_db_table()
# 初始化UI组件
self.init_ui_components()
# 加载数据
self.load_units_to_combo()
self.load_blades_to_combo()
self.load_inspectors_to_combo()
self.show()
def create_db_table(self):
cursor = self.db_conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS photos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_path TEXT NOT NULL,
unit_id TEXT NOT NULL,
blade_number TEXT,
inspector TEXT,
photo_path TEXT NOT NULL,
capture_time TEXT NOT NULL,
file_name TEXT NOT NULL,
UNIQUE(photo_path) -- 添加唯一约束防止重复
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_unit_id ON photos(unit_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_blade_number ON photos(blade_number)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_inspector ON photos(inspector)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_capture_time ON photos(capture_time)")
self.db_conn.commit()
def init_ui_components(self):
"""初始化所有UI组件"""
main_widget = QWidget()
main_layout = QVBoxLayout()
# 目录选择区域
dir_layout = QHBoxLayout()
self.dir_label = QLabel("项目目录:")
self.dir_input = QLineEdit()
self.dir_input.setPlaceholderText("请选择风电叶片检查照片根目录")
self.browse_btn = QPushButton("浏览...")
self.browse_btn.clicked.connect(self.browse_directory)
self.scan_btn = QPushButton("扫描目录")
self.scan_btn.clicked.connect(self.start_scan_directory)
self.refresh_btn = QPushButton("强制刷新")
self.refresh_btn.clicked.connect(self.force_refresh)
dir_layout.addWidget(self.dir_label)
dir_layout.addWidget(self.dir_input, stretch=1)
dir_layout.addWidget(self.browse_btn)
dir_layout.addWidget(self.scan_btn)
dir_layout.addWidget(self.refresh_btn)
# 查询条件区域
query_layout = QHBoxLayout()
# 机组编号查询
unit_layout = QVBoxLayout()
unit_layout.addWidget(QLabel("机组编号"))
self.unit_combo = QComboBox()
self.unit_combo.addItem("所有机组", "")
unit_layout.addWidget(self.unit_combo)
# 叶片号查询
blade_layout = QVBoxLayout()
blade_layout.addWidget(QLabel("叶片号"))
self.blade_combo = QComboBox()
self.blade_combo.addItem("所有叶片", "")
blade_layout.addWidget(self.blade_combo)
# 检查人员查询
inspector_layout = QVBoxLayout()
inspector_layout.addWidget(QLabel("检查人员"))
self.inspector_combo = QComboBox()
self.inspector_combo.addItem("所有人员", "")
self.inspector_combo.addItem("(空)", None)
inspector_layout.addWidget(self.inspector_combo)
# 拍摄日期查询
date_layout = QVBoxLayout()
date_layout.addWidget(QLabel("拍摄日期"))
self.date_edit = QDateEdit()
self.date_edit.setCalendarPopup(True)
self.date_edit.setDisplayFormat("yyyy-MM-dd")
self.date_edit.setDate(QDate.currentDate())
date_layout.addWidget(self.date_edit)
query_layout.addLayout(unit_layout)
query_layout.addLayout(blade_layout)
query_layout.addLayout(inspector_layout)
query_layout.addLayout(date_layout)
# 查询按钮
self.query_btn = QPushButton("查询")
self.query_btn.clicked.connect(self.query_photos)
query_layout.addWidget(self.query_btn)
# 主内容区域
content_layout = QHBoxLayout()
# 结果显示区域
self.result_table = QTableWidget()
self.result_table.setColumnCount(6)
self.result_table.setHorizontalHeaderLabels(["机组编号", "叶片号", "检查人员", "拍摄时间", "文件名", "路径"])
self.result_table.setSelectionBehavior(QTableWidget.SelectRows)
self.result_table.setEditTriggers(QTableWidget.NoEditTriggers)
self.result_table.doubleClicked.connect(self.open_selected_photo)
# 设置列宽策略
header = self.result_table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeToContents)
header.setSectionResizeMode(1, QHeaderView.ResizeToContents)
header.setSectionResizeMode(2, QHeaderView.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.ResizeToContents)
header.setSectionResizeMode(4, QHeaderView.Stretch)
header.setSectionResizeMode(5, QHeaderView.Stretch)
# 启用行交替颜色
self.result_table.setAlternatingRowColors(True)
# 启用右键菜单
self.result_table.setContextMenuPolicy(Qt.CustomContextMenu)
self.result_table.customContextMenuRequested.connect(self.show_context_menu)
# 预览区域
self.preview_label = QLabel()
self.preview_label.setAlignment(Qt.AlignCenter)
self.preview_label.setFixedSize(300, 300)
self.preview_label.setText("照片预览区域")
self.preview_label.setStyleSheet("border: 1px solid gray;")
# 连接选择变化信号
self.result_table.selectionModel().selectionChanged.connect(self.update_preview)
# 添加内容到主布局
content_layout.addWidget(self.result_table, stretch=3)
content_layout.addWidget(self.preview_label, stretch=1)
# 状态栏
self.statusBar().showMessage("就绪")
# 布局组装
main_layout.addLayout(dir_layout)
main_layout.addLayout(query_layout)
main_layout.addLayout(content_layout, stretch=1)
main_widget.setLayout(main_layout)
self.setCentralWidget(main_widget)
# 在UI组件初始化完成后连接信号
self.unit_combo.currentIndexChanged.connect(self.update_blades_by_unit)
self.unit_combo.currentIndexChanged.connect(self.update_inspectors_by_unit)
def browse_directory(self):
dir_path = QFileDialog.getExistingDirectory(self, "选择风电叶片检查照片根目录")
if dir_path:
self.dir_input.setText(dir_path)
def start_scan_directory(self):
"""启动目录扫描线程"""
root_dir = self.dir_input.text()
if not root_dir or not os.path.exists(root_dir):
QMessageBox.warning(self, "警告", "请选择有效的目录路径")
return
self.scan_btn.setEnabled(False)
self.statusBar().showMessage("正在扫描目录,请稍候...")
self.scanner = DirectoryScanner(root_dir, self.db_conn)
self.scanner.scan_finished.connect(self.on_scan_finished)
self.scanner.start()
def on_scan_finished(self, root_dir, success, message):
"""扫描完成后的回调"""
self.scan_btn.setEnabled(True)
if success:
self.load_units_to_combo(root_dir)
self.load_blades_to_combo()
self.load_inspectors_to_combo()
self.query_photos()
self.statusBar().showMessage(f"目录扫描完成: {root_dir}", 5000)
QMessageBox.information(self, "完成", message)
else:
error_msg = f"扫描目录时出错:\n\n{message}\n\n请检查:\n1. 目录结构是否符合要求\n2. 是否有访问权限"
QMessageBox.critical(self, "扫描错误", error_msg)
self.statusBar().showMessage(f"扫描失败: {message}", 5000)
def force_refresh(self):
"""强制刷新所有数据"""
project_path = self.dir_input.text()
if project_path and os.path.exists(project_path):
self.load_units_to_combo(project_path)
else:
self.load_units_to_combo()
self.load_blades_to_combo()
self.load_inspectors_to_combo()
self.query_photos()
self.statusBar().showMessage("已强制刷新所有数据", 3000)
def load_units_to_combo(self, project_path=None):
"""加载机组编号到下拉框,并按数字从小到大排序"""
cursor = self.db_conn.cursor()
if project_path:
cursor.execute("""
SELECT DISTINCT unit_id FROM photos
WHERE unit_id != '' AND project_path = ?
""", (project_path,))
else:
cursor.execute("SELECT DISTINCT unit_id FROM photos WHERE unit_id != ''")
units = [row[0] for row in cursor.fetchall()]
def natural_sort_key(unit_id):
numbers = re.findall(r'\d+', unit_id)
return [int(num) for num in numbers] if numbers else [0]
units_sorted = sorted(units, key=natural_sort_key)
self.unit_combo.clear()
self.unit_combo.addItem("所有机组", "")
for unit in units_sorted:
self.unit_combo.addItem(unit, unit)
def load_blades_to_combo(self, unit_id=None):
"""动态加载叶片号到下拉框"""
cursor = self.db_conn.cursor()
if unit_id:
cursor.execute("""
SELECT DISTINCT blade_number FROM photos
WHERE blade_number IS NOT NULL AND blade_number != '' AND unit_id = ?
ORDER BY CAST(blade_number AS INTEGER)
""", (unit_id,))
else:
cursor.execute("""
SELECT DISTINCT blade_number FROM photos
WHERE blade_number IS NOT NULL AND blade_number != ''
ORDER BY CAST(blade_number AS INTEGER)
""")
blades = [str(row[0]) for row in cursor.fetchall()]
self.blade_combo.clear()
self.blade_combo.addItem("所有叶片", "")
for blade in blades:
self.blade_combo.addItem(blade, blade)
def load_inspectors_to_combo(self, unit_id=None):
cursor = self.db_conn.cursor()
if unit_id:
cursor.execute("""
SELECT DISTINCT inspector FROM photos
WHERE (inspector != '' OR inspector IS NULL) AND unit_id = ?
ORDER BY CASE WHEN inspector IS NULL THEN 1 ELSE 0 END, inspector
""", (unit_id,))
else:
cursor.execute("""
SELECT DISTINCT inspector FROM photos
WHERE inspector != '' OR inspector IS NULL
ORDER BY CASE WHEN inspector IS NULL THEN 1 ELSE 0 END, inspector
""")
inspectors = [row[0] for row in cursor.fetchall()]
self.inspector_combo.clear()
self.inspector_combo.addItem("所有人员", "")
self.inspector_combo.addItem("(空)", None)
for inspector in inspectors:
if inspector is not None:
self.inspector_combo.addItem(inspector, inspector)
def update_blades_by_unit(self):
unit_id = self.unit_combo.currentData()
self.load_blades_to_combo(unit_id)
def update_inspectors_by_unit(self):
unit_id = self.unit_combo.currentData()
self.load_inspectors_to_combo(unit_id)
def query_photos(self):
# 获取查询条件
unit_id = self.unit_combo.currentData() or None
blade_num = self.blade_combo.currentData()
inspector = self.inspector_combo.currentData() # 可能是None、空字符串或具体值
# 处理检查人员查询条件
inspector_condition = ""
if inspector == "": # 选择了"所有人员"
pass
elif inspector is None: # 选择了"(空)"
inspector_condition = " AND inspector IS NULL"
else: # 选择了具体检查人员
inspector_condition = " AND inspector = ?"
selected_date = self.date_edit.date().toString("yyyy-MM-dd")
# 构建SQL查询
query = """
SELECT unit_id, blade_number, inspector, capture_time, file_name, photo_path
FROM photos
WHERE date(capture_time) = ?
"""
params = [selected_date]
if unit_id:
query += " AND unit_id = ?"
params.append(unit_id)
if blade_num:
query += " AND blade_number = ?"
params.append(str(blade_num))
# 添加检查人员条件
if inspector_condition:
query += inspector_condition
if inspector is not None and inspector != "":
params.append(inspector)
query += " ORDER BY unit_id, CAST(blade_number AS INTEGER), capture_time"
# 执行查询
cursor = self.db_conn.cursor()
try:
cursor.execute(query, params)
results = cursor.fetchall()
self.result_table.setRowCount(len(results))
for row_idx, row in enumerate(results):
for col_idx, col in enumerate(row):
item = QTableWidgetItem(str(col) if col is not None else "")
self.result_table.setItem(row_idx, col_idx, item)
self.statusBar().showMessage(f"找到 {len(results)} 条记录", 5000)
except sqlite3.Error as e:
QMessageBox.critical(self, "数据库错误", f"查询失败: {str(e)}")
def open_selected_photo(self, index):
"""双击打开选中的照片"""
selected_row = index.row()
photo_path = self.result_table.item(selected_row, 5).text()
if not photo_path:
QMessageBox.warning(self, "警告", "未找到文件路径")
return
photo_path = os.path.normpath(photo_path.replace('/', '\\'))
if photo_path.startswith('\\\\') and not os.path.exists(photo_path):
QMessageBox.warning(self, "网络路径不可访问",
f"无法访问网络路径:\n{photo_path}\n"
f"请检查网络连接或路径权限")
return
if not os.path.exists(photo_path):
project_dir = self.dir_input.text()
if project_dir:
project_dir = os.path.normpath(project_dir.replace('/', '\\'))
file_name = os.path.basename(photo_path)
found = False
for root, _, files in os.walk(project_dir):
if file_name in files:
photo_path = os.path.normpath(os.path.join(root, file_name))
found = True
break
if not found:
QMessageBox.warning(self, "文件不存在",
f"无法找到文件:\n原始路径: {photo_path}\n"
f"在项目目录中也没有找到同名文件")
return
try:
if not os.path.exists(photo_path):
QMessageBox.warning(self, "错误", f"文件不存在:\n{photo_path}")
return
if os.name == 'nt':
if photo_path.startswith('\\\\'):
photo_path = r'\\' + photo_path[2:]
os.startfile(photo_path)
elif sys.platform == 'darwin':
subprocess.call(['open', photo_path])
else:
subprocess.call(['xdg-open', photo_path])
self.statusBar().showMessage(f"正在打开: {os.path.basename(photo_path)}", 3000)
except Exception as e:
QMessageBox.critical(self, "错误", f"无法打开文件:\n路径: {photo_path}\n错误: {str(e)}")
def show_context_menu(self, position):
"""显示右键菜单"""
menu = QMenu()
open_action = menu.addAction("打开照片")
open_action.triggered.connect(lambda: self.open_selected_photo(self.result_table.currentIndex()))
show_in_folder_action = menu.addAction("在文件夹中显示")
show_in_folder_action.triggered.connect(self.show_in_folder)
menu.exec_(self.result_table.viewport().mapToGlobal(position))
def show_in_folder(self):
"""在文件夹中显示选中的照片"""
selected_row = self.result_table.currentRow()
if selected_row >= 0:
photo_path = self.result_table.item(selected_row, 5).text()
if os.path.exists(photo_path):
folder_path = os.path.dirname(photo_path)
try:
if os.name == 'nt':
os.startfile(folder_path)
elif sys.platform == 'darwin':
subprocess.call(['open', folder_path])
else:
subprocess.call(['xdg-open', folder_path])
self.statusBar().showMessage(f"打开文件夹: {folder_path}", 3000)
except Exception as e:
QMessageBox.critical(self, "错误", f"无法打开文件夹: {str(e)}")
def update_preview(self):
"""更新照片预览"""
selected_rows = self.result_table.selectionModel().selectedRows()
if selected_rows:
photo_path = self.result_table.item(selected_rows[0].row(), 5).text()
if os.path.exists(photo_path):
try:
pixmap = QPixmap(photo_path)
if not pixmap.isNull():
pixmap = pixmap.scaled(self.preview_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
self.preview_label.setPixmap(pixmap)
self.preview_label.setText("")
else:
self.preview_label.setText("无法加载预览")
except Exception:
self.preview_label.setText("预览错误")
else:
self.preview_label.setText("文件不存在")
def closeEvent(self, event):
self.db_conn.close()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
manager = PhotoManager()
sys.exit(app.exec_())