import cv2 import numpy as np from ultralytics import YOLO import os import json import argparse class Target2D: """二维坐标下的叶尖跟踪器""" def __init__(self, positions=None, hub=None): if positions is None: positions = np.array([[-100, -100], [-200, -200], [-300, -300]]) if hub is None: hub = np.array([400, 300]) self.positions = np.array(positions, dtype=float) # 叶尖位置 self.hub = np.array(hub, dtype=float) # 轮毂位置 def update(self, new_positions, new_hub=None): """更新叶尖位置,使用最近点匹配算法""" if new_positions is None or len(new_positions) == 0: return self.positions, [] # 确保新位置是二维数组 new_positions = np.array(new_positions, dtype=float) if new_positions.ndim == 1: new_positions = new_positions.reshape(1, -1) # 更新轮毂位置 if new_hub is not None: self.hub = np.array(new_hub, dtype=float) # 如果还没有初始化位置,使用新位置 if self.positions.size == 0: self.positions = new_positions return self.positions # 创建位置列表副本 new_pos_list = new_positions.tolist() update_positons = self.positions.copy() used = set() # 对于每个新位置,找到最近的点 for i, new_pos in enumerate(new_pos_list): # 计算所有距离 distances = np.linalg.norm(self.positions - new_pos, axis=1) available_indices = [j for j in range(len(distances)) if j not in used] # 找到最近点的索引 closest_idx = min(available_indices, key=lambda j: distances[j]) # 记录ID并更新位置 update_positons[closest_idx] = new_pos # new_pos # 更新位置 self.positions = np.array(update_positons) return self.positions class ClearanceCalculator: def __init__(self, model_path, scale_factor=0.02,width=544, height=960): self.size = (width, height) # 初始化模型配置 self.model = YOLO(model_path) self.cap = None self.video_path = "" self.is_playing = False self.scale_factor = scale_factor self.reference_pixel = 0.0 self.reference_actual = 0.0 self.current_clearance = 0.0 self.use_camera = False self.hub_x_range = 100 # 轮毂x坐标的左右区间(像素) self.hub_y_range = 50 self.inside_region = False # 轨迹是否在轮毂区域内 self.entered_from_left = False # 轨迹是否从左侧进入 self.last_pixel_clearance = 0.0 # 保存上一次的像素净空 self.last_actual_clearance = 0.0 # 保存上一次的实际净空 # 轨迹相关变量 self.trajectories = [] # self.lowest_points = [] self.calculate_points = [] self.hub_position = None self.last_tip_positions = [] self.frame_count = 0 self.nearest_tip_trajectory = [] # 只存储与轮毂x坐标最近的叶尖轨迹 self.traj_cnt = 0 self.tip_clearances = { 0: None, # tip1 1: None, # tip2 2: None # tip3 } # 净空历史记录 self.clearance_history = {0: [], 1: [], 2: []} # 初始化轨迹颜色 self.tip_colors = [ (255, 0, 0), # 红色 (0, 255, 0), # 绿色 (0, 0, 255) # 蓝色 ] # 初始化2D跟踪器 self.tracker = Target2D() def set_hub_x_range(self, x_value, y_value=None): """设置轮毂x和y坐标的区间""" self.hub_x_range = x_value if y_value is not None: self.hub_y_range = y_value def load_video(self, path): """加载视频文件""" self.use_camera = False self.video_path = path self.cap = cv2.VideoCapture(path) return self.cap.isOpened() def process_frame(self, frame): """处理视频帧并检测叶尖和轮毂""" frame = cv2.resize(frame, self.size) # 使用模型进行检测 results = self.model(frame, verbose=False) # 初始化检测结果字典 detected = {'tip': [], 'hub': []} # 解析检测结果 for result in results: boxes = result.boxes classes = boxes.cls class_names = result.names for i in range(len(boxes)): class_id = int(classes[i]) box = boxes.xyxy[i] x1, y1, x2, y2 = box.tolist() class_name = class_names[class_id] # 只关注叶尖(tip)和轮毂(hub) if class_name in detected: # 使用底部中心点作为位置 if class_name == 'tip': tip_x = (x1 + x2) / 2 tip_y = y2 # 使用底部作为叶尖位置 detected[class_name].append([tip_x, tip_y]) else: # hub hub_x = (x1 + x2) / 2 hub_y = (y1 + y2) / 2 detected[class_name].append([hub_x, hub_y]) # 限制检测数量 if len(detected['tip']) > 3: detected['tip'] = detected['tip'][:3] if len(detected['hub']) > 1: detected['hub'] = detected['hub'][:1] return frame, detected['tip'], detected['hub'] def check_trajectory_state(self, tip, hub, thresh=20): """检查轨迹是否进入或离开轮毂区域""" hub_x = hub[0] if hub else None if hub_x is None or tip is None: return None, None # 检查轨迹是否在轮毂区域内 inside = (hub_x - self.hub_x_range) < tip[0] < (hub_x + self.hub_x_range) # 如果轨迹进入区域,并且是从左侧进入 if inside and not self.inside_region: self.inside_region = True self.entered_from_left = tip[0] >= (hub_x - self.hub_x_range) if inside and self.inside_region and self.entered_from_left and (hub_x - thresh) < tip[0] < (hub_x + thresh): distance, actual_distance = self.calculate_clearance() return distance, actual_distance # 如果轨迹离开区域,并且是从左侧进入且从右侧离开 if not inside and self.inside_region and self.entered_from_left and tip[0] > (hub_x + self.hub_x_range): self.inside_region = False self.traj_cnt += 1 return None, None def update_trajectories(self, tips, hub): """更新叶尖轨迹,只关注轮毂x和y坐标附近的叶尖""" # 使用2D跟踪器更新位置 self.tracker.update(tips, hub) sorted_tips = self.tracker.positions # 如果没有轮毂位置,无法判断附近 if hub is None or len(hub) == 0: return None, None hub_x = hub[0] # 轮毂的x坐标 hub_y = hub[1] # 轮毂的y坐标 # 初始化轨迹 if not self.trajectories: for _ in range(len(sorted_tips)): self.trajectories.append([]) # 找到与轮毂x和y坐标都最近的叶尖 nearest_tip = None min_distance = float('inf') for tip in sorted_tips: tip_x, tip_y = tip # 检查是否在轮毂x坐标左右区间内 if abs(tip_x - hub_x) > self.hub_x_range + 20: continue # 检查是否大于轮毂y坐标加y区间 if tip_y < hub_y + self.hub_y_range: continue # 计算与轮毂的综合距离(考虑x) distance = abs(tip_x - hub_x) # + (tip_y - hub_y) ** 2 if distance < min_distance: min_distance = distance nearest_tip = tip # 更新最近的叶尖轨迹 if nearest_tip is not None: if not self.nearest_tip_trajectory: self.nearest_tip_trajectory = [nearest_tip] else: self.nearest_tip_trajectory.append(nearest_tip) # 限制轨迹长度 if len(self.nearest_tip_trajectory) > 200: self.nearest_tip_trajectory.pop(0) return self.check_trajectory_state(nearest_tip, hub) def find_closest_tip_x(self): """找到与轮毂x坐标最接近的叶尖x坐标的叶尖""" if not self.nearest_tip_trajectory: return None hub_x = self.hub_position[0] if self.hub_position else None if hub_x is None: return None trajectories = np.array(self.nearest_tip_trajectory[:][0]) distances = np.abs(trajectories - hub_x) idx = np.argmin(distances) tip = self.nearest_tip_trajectory[idx] self.calculate_points = [tip] # 只保留一个最低点 return tip def calculate_clearance(self): """计算叶尖到轮毂的净空""" if not self.nearest_tip_trajectory or self.hub_position is None: return None, None hub_position = self.hub_position tip_x, tip_y = self.find_closest_tip_x() self.calculate_points = [[tip_x, tip_y]] # 只保留一个最低点 hub_x, hub_y = hub_position distance = np.sqrt((tip_x - hub_x) ** 2 + (tip_y - hub_y) ** 2) # 更新UI显示净空 if self.scale_factor: actual_distance = distance * self.scale_factor tip_idx = self.traj_cnt % 3 # 根据实际匹配逻辑 self.tip_clearances[tip_idx] = actual_distance # 记录到历史 self.clearance_history[tip_idx].append(actual_distance) return distance, actual_distance def draw_annotations(self, frame): """在帧上绘制轨迹和标记""" # 绘制轮毂位置 if self.hub_position: hub_x, hub_y = self.hub_position # 绘制轮毂区域标记 cv2.rectangle(frame, (int(hub_x - self.hub_x_range), int(hub_y - 10)), (int(hub_x + self.hub_x_range), int(hub_y + 10)), (0, 255, 255), 2) cv2.circle(frame, (int(hub_x), int(hub_y)), 8, (0, 255, 255), -1) cv2.putText(frame, "Hub", (int(hub_x) + 15, int(hub_y)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) # 绘制最近的叶尖轨迹 if len(self.nearest_tip_trajectory) >= 2: # 绘制轨迹线(黄色) for j in range(1, len(self.nearest_tip_trajectory)): start_point = (int(self.nearest_tip_trajectory[j - 1][0]), int(self.nearest_tip_trajectory[j - 1][1])) end_point = (int(self.nearest_tip_trajectory[j][0]), int(self.nearest_tip_trajectory[j][1])) cv2.line(frame, start_point, end_point, (0, 255, 255), 1) # 绘制三个叶尖的净空值 y_offset = 30 for i in range(3): clearance = self.tip_clearances.get(i, None) if clearance is not None: text = f"tip{i + 1}: {clearance:.2f} m" else: text = f"tip{i + 1}: -- m" cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) y_offset += 25 # 绘制其他叶尖(不绘制轨迹) if self.tracker.positions.size > 0: for i, tip in enumerate(self.tracker.positions): if not any(np.array_equal(tip, self.nearest_tip_trajectory[-1]) for p in self.nearest_tip_trajectory if len(self.nearest_tip_trajectory) > 0): color = self.tip_colors[i % len(self.tip_colors)] cv2.circle(frame, (int(tip[0]), int(tip[1])), 6, color, -1) cv2.putText(frame, f"Tip {i + 1}", (int(tip[0]) + 15, int(tip[1])), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) return frame def process_video(self, output_dir): """处理整个视频并保存结果""" if not os.path.exists(output_dir): os.makedirs(output_dir) last_frame = None frame_count = 0 while True: ret, frame = self.cap.read() if not ret: break # 处理帧 processed_frame, tips, hubs = self.process_frame(frame) # 更新轮毂位置 if hubs: self.hub_position = hubs[0] # 更新轨迹 if tips and self.hub_position is not None: pixel_clearance, actual_clearance = self.update_trajectories(tips, self.hub_position) # 绘制注释 annotated_frame = self.draw_annotations(processed_frame) last_frame = annotated_frame.copy() frame_count += 1 # 显示处理进度 if frame_count % 10 == 0: print(f"已处理 {frame_count} 帧...") # 保存最后一帧 last_frame_path = os.path.join(output_dir, "last_frame.jpg") cv2.imwrite(last_frame_path, last_frame) print(f"最后一帧已保存至: {last_frame_path}") # 准备结果数据 result = { "clearance_history": { "tip0": self.clearance_history[0], "tip1": self.clearance_history[1], "tip2": self.clearance_history[2] }, "last_frame_clearances": { "tip0": self.tip_clearances[0], "tip1": self.tip_clearances[1], "tip2": self.tip_clearances[2] }, "last_frame_image": last_frame_path } # 保存JSON结果 json_path = os.path.join(output_dir, "results.json") with open(json_path, 'w') as f: json.dump(result, f, indent=4) print(f"结果已保存至: {json_path}") return result def main(): parser = argparse.ArgumentParser(description='叶片净空计算系统') parser.add_argument('--video_path', type=str, required=True, help='输入视频路径') parser.add_argument('--model_path', type=str, required=True, help='模型文件路径') parser.add_argument('--output_dir', type=str, required=True, help='结果保存目录') parser.add_argument('--scale_factor', type=float, default=0.02, help='比例尺因子 (实际距离/像素距离)') parser.add_argument('--width', type=int, default=544, help='图像宽度(像素)') parser.add_argument('--height', type=int, default=960, help='图像高度(像素)') args = parser.parse_args() calculator = ClearanceCalculator( model_path=args.model_path, scale_factor=args.scale_factor, width=args.width, height=args.height ) if not calculator.load_video(args.video_path): print(f"无法加载视频: {args.video_path}") return print(f"开始处理视频: {args.video_path}") print(f"图像尺寸设置为: ({args.width}, {args.height})") result = calculator.process_video(args.output_dir) if __name__ == "__main__": main() """ python cli_clearance.py \ --video test.mp4 \ --model best.pt \ --output ./results \ --scale_factor 0.02 \ --width 640 \ --height 480 """ """ { "clearances": { "0": [5.12, 5.09, 5.11], "1": [4.98, 5.01], "2": [] }, "last_frame_clearances": { "tip1": 5.11, "tip2": 5.01, "tip3": null } } """