pingtai_test/video_upload.py

302 lines
11 KiB
Python

import os
import sys
import re
import requests
from datetime import datetime
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QLineEdit, QPushButton, QListWidget, QMessageBox,
QProgressBar, QFileDialog, QGroupBox, QFormLayout)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
class VideoUploadThread(QThread):
progress_updated = pyqtSignal(int, str)
upload_finished = pyqtSignal(int, int)
def __init__(self, files, params, parent=None):
super().__init__(parent)
self.files = files
self.params = params
self.base_url = "http://pms.dtyx.net:9158"
self.upload_endpoint = "/video-file-info/batch-upload"
self.running = True
def run(self):
successful_uploads = 0
failed_uploads = 0
for index, file_path in enumerate(self.files):
if not self.running:
break
try:
# 从文件名中提取拍摄时间
shooting_time = self.extract_time_from_filename(file_path)
params = self.params.copy()
if shooting_time:
params["shootingTime"] = shooting_time.strftime("%Y-%m-%d %H:%M:%S")
else:
# 如果没有提取到时间,使用当前时间
params["shootingTime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 读取文件内容
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'video/mp4')}
# 发送请求
response = requests.post(
url=f"{self.base_url}{self.upload_endpoint}",
params=params,
files=files,
headers={"Authorization": None} # token为null
)
# 处理响应
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.progress_updated.emit(index + 1, f"上传成功: {os.path.basename(file_path)}")
successful_uploads += 1
else:
self.progress_updated.emit(index + 1,
f"上传失败: {os.path.basename(file_path)} - {result.get('msg', '未知错误')}")
failed_uploads += 1
else:
self.progress_updated.emit(index + 1,
f"上传失败: {os.path.basename(file_path)} - HTTP状态码: {response.status_code}")
failed_uploads += 1
except Exception as e:
self.progress_updated.emit(index + 1, f"上传异常: {os.path.basename(file_path)} - {str(e)}")
failed_uploads += 1
self.upload_finished.emit(successful_uploads, failed_uploads)
def extract_time_from_filename(self, file_path):
"""从文件名中提取时间信息"""
filename = os.path.basename(file_path)
# 匹配类似 VID_20250611_144614 的格式
match = re.search(r'(?:VID|VIDEO|视频)_?(\d{4})(\d{2})(\d{2})_?(\d{2})(\d{2})(\d{2})', filename, re.IGNORECASE)
if match:
year, month, day, hour, minute, second = match.groups()
try:
return datetime(
year=int(year),
month=int(month),
day=int(day),
hour=int(hour),
minute=int(minute),
second=int(second)
)
except:
pass
# 匹配其他常见日期格式
date_patterns = [
r'(\d{4})-(\d{2})-(\d{2})[ _](\d{2})-(\d{2})-(\d{2})', # 2025-06-11 14-46-14
r'(\d{4})(\d{2})(\d{2})[ _](\d{2})(\d{2})(\d{2})', # 20250611 144614
r'(\d{4})_(\d{2})_(\d{2})[ _](\d{2})_(\d{2})_(\d{2})', # 2025_06_11 14_46_14
]
for pattern in date_patterns:
match = re.search(pattern, filename)
if match:
year, month, day, hour, minute, second = match.groups()
try:
return datetime(
year=int(year),
month=int(month),
day=int(day),
hour=int(hour),
minute=int(minute),
second=int(second)
)
except:
continue
return None
def stop(self):
self.running = False
class VideoUploaderApp(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("防雷工作视频批量上传工具")
self.setGeometry(100, 100, 800, 600)
self.upload_thread = None
self.init_ui()
def init_ui(self):
# 主窗口布局
main_widget = QWidget()
main_layout = QVBoxLayout()
# 参数设置区域
params_group = QGroupBox("上传参数设置")
params_layout = QFormLayout()
self.part_id_input = QLineEdit("81d2bcdf0db9b0e986b61f986c5d520d")
self.location_input = QLineEdit()
self.test_point_input = QLineEdit()
self.worker_id_input = QLineEdit()
params_layout.addRow(QLabel("部件ID:"), self.part_id_input)
params_layout.addRow(QLabel("拍摄地点:"), self.location_input)
params_layout.addRow(QLabel("测试点:"), self.test_point_input)
params_layout.addRow(QLabel("作业人员ID:"), self.worker_id_input)
params_group.setLayout(params_layout)
# 文件选择区域
file_group = QGroupBox("视频文件选择")
file_layout = QVBoxLayout()
self.file_list = QListWidget()
self.select_btn = QPushButton("选择视频文件")
self.select_btn.clicked.connect(self.select_videos)
self.clear_btn = QPushButton("清空列表")
self.clear_btn.clicked.connect(self.clear_file_list)
file_layout.addWidget(self.file_list)
file_btn_layout = QHBoxLayout()
file_btn_layout.addWidget(self.select_btn)
file_btn_layout.addWidget(self.clear_btn)
file_layout.addLayout(file_btn_layout)
file_group.setLayout(file_layout)
# 上传控制区域
control_layout = QHBoxLayout()
self.upload_btn = QPushButton("开始上传")
self.upload_btn.clicked.connect(self.start_upload)
self.stop_btn = QPushButton("停止上传")
self.stop_btn.clicked.connect(self.stop_upload)
self.stop_btn.setEnabled(False)
control_layout.addWidget(self.upload_btn)
control_layout.addWidget(self.stop_btn)
# 进度显示区域
self.progress_bar = QProgressBar()
self.progress_bar.setAlignment(Qt.AlignCenter)
self.status_label = QLabel("准备就绪")
self.status_label.setAlignment(Qt.AlignCenter)
# 组装主布局
main_layout.addWidget(params_group)
main_layout.addWidget(file_group)
main_layout.addLayout(control_layout)
main_layout.addWidget(self.progress_bar)
main_layout.addWidget(self.status_label)
main_widget.setLayout(main_layout)
self.setCentralWidget(main_widget)
def select_videos(self):
"""选择视频文件"""
file_dialog = QFileDialog()
file_dialog.setFileMode(QFileDialog.ExistingFiles)
file_dialog.setNameFilter("视频文件 (*.mp4 *.avi *.mov *.mkv)")
if file_dialog.exec_():
file_paths = file_dialog.selectedFiles()
for file_path in file_paths:
self.file_list.addItem(file_path)
self.status_label.setText(f"已选择 {len(file_paths)} 个视频文件")
def clear_file_list(self):
"""清空文件列表"""
self.file_list.clear()
self.status_label.setText("文件列表已清空")
def start_upload(self):
"""开始上传视频"""
if self.file_list.count() == 0:
QMessageBox.warning(self, "警告", "请先选择要上传的视频文件!")
return
# 获取参数
params = {
"partId": self.part_id_input.text().strip(),
"qualified": 1, # 默认合格
"location": self.location_input.text().strip() or None,
"testPoint": self.test_point_input.text().strip() or None,
"workerUserId": self.worker_id_input.text().strip() or None
}
# 验证部件ID
if not params["partId"]:
QMessageBox.warning(self, "警告", "部件ID不能为空!")
return
# 获取文件列表
file_paths = [self.file_list.item(i).text() for i in range(self.file_list.count())]
# 设置UI状态
self.upload_btn.setEnabled(False)
self.stop_btn.setEnabled(True)
self.progress_bar.setMaximum(len(file_paths))
self.progress_bar.setValue(0)
self.status_label.setText("开始上传...")
# 创建并启动上传线程
self.upload_thread = VideoUploadThread(file_paths, params)
self.upload_thread.progress_updated.connect(self.update_progress)
self.upload_thread.upload_finished.connect(self.upload_finished)
self.upload_thread.start()
def stop_upload(self):
"""停止上传"""
if self.upload_thread and self.upload_thread.isRunning():
self.upload_thread.stop()
self.status_label.setText("上传已停止")
self.upload_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
def update_progress(self, value, message):
"""更新上传进度"""
self.progress_bar.setValue(value)
self.status_label.setText(message)
def upload_finished(self, success_count, fail_count):
"""上传完成处理"""
self.upload_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
msg = f"上传完成! 成功: {success_count} 个, 失败: {fail_count}"
self.status_label.setText(msg)
QMessageBox.information(self, "上传完成", msg)
# 重置进度条
self.progress_bar.reset()
def closeEvent(self, event):
"""窗口关闭事件"""
if self.upload_thread and self.upload_thread.isRunning():
reply = QMessageBox.question(
self, '确认退出',
'上传正在进行中,确定要退出吗?',
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.Yes:
self.upload_thread.stop()
self.upload_thread.wait()
event.accept()
else:
event.ignore()
else:
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = VideoUploaderApp()
window.show()
sys.exit(app.exec_())