From bd3b95395af95a5eb5396f2632302265430cf2d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E5=BE=B7=E8=B6=85?= <13143889+he-dechao@user.noreply.gitee.com> Date: Fri, 1 Aug 2025 17:42:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=87=80=E7=A9=BA=E6=A3=80=E6=B5=8B=E4=B8=8E?= =?UTF-8?q?=E6=8A=A5=E5=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clearance.py | 425 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 425 insertions(+) create mode 100644 clearance.py diff --git a/clearance.py b/clearance.py new file mode 100644 index 0000000..de13fca --- /dev/null +++ b/clearance.py @@ -0,0 +1,425 @@ +import cv2 +import numpy as np +from ultralytics import YOLO +import os +import json +import argparse + + +class Target2D: + """二维坐标下的叶尖跟踪器""" + + def __init__(self, positions=None, hub=None): + if positions is None: + positions = np.array([[-100, -100], [-200, -200], [-300, -300]]) + if hub is None: + hub = np.array([400, 300]) + + self.positions = np.array(positions, dtype=float) # 叶尖位置 + self.hub = np.array(hub, dtype=float) # 轮毂位置 + + def update(self, new_positions, new_hub=None): + """更新叶尖位置,使用最近点匹配算法""" + if new_positions is None or len(new_positions) == 0: + return self.positions, [] + + # 确保新位置是二维数组 + new_positions = np.array(new_positions, dtype=float) + if new_positions.ndim == 1: + new_positions = new_positions.reshape(1, -1) + + # 更新轮毂位置 + if new_hub is not None: + self.hub = np.array(new_hub, dtype=float) + + # 如果还没有初始化位置,使用新位置 + if self.positions.size == 0: + self.positions = new_positions + return self.positions + # 创建位置列表副本 + new_pos_list = new_positions.tolist() + update_positons = self.positions.copy() + used = set() + # 对于每个新位置,找到最近的点 + for i, new_pos in enumerate(new_pos_list): + # 计算所有距离 + distances = np.linalg.norm(self.positions - new_pos, axis=1) + available_indices = [j for j in range(len(distances)) if j not in used] + # 找到最近点的索引 + closest_idx = min(available_indices, key=lambda j: distances[j]) + + # 记录ID并更新位置 + update_positons[closest_idx] = new_pos # new_pos + + # 更新位置 + self.positions = np.array(update_positons) + + return self.positions + + +class ClearanceCalculator: + def __init__(self, model_path, scale_factor=0.02,width=544, height=960): + self.size = (width, height) + # 初始化模型配置 + self.model = YOLO(model_path) + self.cap = None + self.video_path = "" + self.is_playing = False + self.scale_factor = scale_factor + self.reference_pixel = 0.0 + self.reference_actual = 0.0 + self.current_clearance = 0.0 + self.use_camera = False + self.hub_x_range = 100 # 轮毂x坐标的左右区间(像素) + self.hub_y_range = 50 + self.inside_region = False # 轨迹是否在轮毂区域内 + self.entered_from_left = False # 轨迹是否从左侧进入 + self.last_pixel_clearance = 0.0 # 保存上一次的像素净空 + self.last_actual_clearance = 0.0 # 保存上一次的实际净空 + # 轨迹相关变量 + self.trajectories = [] + # self.lowest_points = [] + self.calculate_points = [] + self.hub_position = None + self.last_tip_positions = [] + self.frame_count = 0 + self.nearest_tip_trajectory = [] # 只存储与轮毂x坐标最近的叶尖轨迹 + self.traj_cnt = 0 + self.tip_clearances = { + 0: None, # tip1 + 1: None, # tip2 + 2: None # tip3 + } + # 净空历史记录 + self.clearance_history = {0: [], 1: [], 2: []} + + # 初始化轨迹颜色 + self.tip_colors = [ + (255, 0, 0), # 红色 + (0, 255, 0), # 绿色 + (0, 0, 255) # 蓝色 + ] + + # 初始化2D跟踪器 + self.tracker = Target2D() + + def set_hub_x_range(self, x_value, y_value=None): + """设置轮毂x和y坐标的区间""" + self.hub_x_range = x_value + if y_value is not None: + self.hub_y_range = y_value + + def load_video(self, path): + """加载视频文件""" + self.use_camera = False + self.video_path = path + self.cap = cv2.VideoCapture(path) + return self.cap.isOpened() + + def process_frame(self, frame): + """处理视频帧并检测叶尖和轮毂""" + frame = cv2.resize(frame, self.size) + # 使用模型进行检测 + results = self.model(frame, verbose=False) + + # 初始化检测结果字典 + detected = {'tip': [], 'hub': []} + + # 解析检测结果 + for result in results: + boxes = result.boxes + classes = boxes.cls + class_names = result.names + + for i in range(len(boxes)): + class_id = int(classes[i]) + box = boxes.xyxy[i] + x1, y1, x2, y2 = box.tolist() + class_name = class_names[class_id] + + # 只关注叶尖(tip)和轮毂(hub) + if class_name in detected: + # 使用底部中心点作为位置 + if class_name == 'tip': + tip_x = (x1 + x2) / 2 + tip_y = y2 # 使用底部作为叶尖位置 + detected[class_name].append([tip_x, tip_y]) + else: # hub + hub_x = (x1 + x2) / 2 + hub_y = (y1 + y2) / 2 + detected[class_name].append([hub_x, hub_y]) + + # 限制检测数量 + if len(detected['tip']) > 3: + detected['tip'] = detected['tip'][:3] + if len(detected['hub']) > 1: + detected['hub'] = detected['hub'][:1] + + return frame, detected['tip'], detected['hub'] + + def check_trajectory_state(self, tip, hub, thresh=20): + """检查轨迹是否进入或离开轮毂区域""" + hub_x = hub[0] if hub else None + if hub_x is None or tip is None: + return None, None + + # 检查轨迹是否在轮毂区域内 + inside = (hub_x - self.hub_x_range) < tip[0] < (hub_x + self.hub_x_range) + # 如果轨迹进入区域,并且是从左侧进入 + if inside and not self.inside_region: + self.inside_region = True + self.entered_from_left = tip[0] >= (hub_x - self.hub_x_range) + + if inside and self.inside_region and self.entered_from_left and (hub_x - thresh) < tip[0] < (hub_x + thresh): + distance, actual_distance = self.calculate_clearance() + return distance, actual_distance + # 如果轨迹离开区域,并且是从左侧进入且从右侧离开 + if not inside and self.inside_region and self.entered_from_left and tip[0] > (hub_x + self.hub_x_range): + self.inside_region = False + self.traj_cnt += 1 + return None, None + + def update_trajectories(self, tips, hub): + """更新叶尖轨迹,只关注轮毂x和y坐标附近的叶尖""" + # 使用2D跟踪器更新位置 + self.tracker.update(tips, hub) + sorted_tips = self.tracker.positions + + # 如果没有轮毂位置,无法判断附近 + if hub is None or len(hub) == 0: + return None, None + + hub_x = hub[0] # 轮毂的x坐标 + hub_y = hub[1] # 轮毂的y坐标 + + # 初始化轨迹 + if not self.trajectories: + for _ in range(len(sorted_tips)): + self.trajectories.append([]) + + # 找到与轮毂x和y坐标都最近的叶尖 + nearest_tip = None + min_distance = float('inf') + + for tip in sorted_tips: + tip_x, tip_y = tip + # 检查是否在轮毂x坐标左右区间内 + if abs(tip_x - hub_x) > self.hub_x_range + 20: + continue + # 检查是否大于轮毂y坐标加y区间 + if tip_y < hub_y + self.hub_y_range: + continue + + # 计算与轮毂的综合距离(考虑x) + distance = abs(tip_x - hub_x) # + (tip_y - hub_y) ** 2 + if distance < min_distance: + min_distance = distance + nearest_tip = tip + + # 更新最近的叶尖轨迹 + if nearest_tip is not None: + if not self.nearest_tip_trajectory: + self.nearest_tip_trajectory = [nearest_tip] + else: + self.nearest_tip_trajectory.append(nearest_tip) + + # 限制轨迹长度 + if len(self.nearest_tip_trajectory) > 200: + self.nearest_tip_trajectory.pop(0) + + return self.check_trajectory_state(nearest_tip, hub) + + def find_closest_tip_x(self): + """找到与轮毂x坐标最接近的叶尖x坐标的叶尖""" + if not self.nearest_tip_trajectory: + return None + hub_x = self.hub_position[0] if self.hub_position else None + if hub_x is None: + return None + trajectories = np.array(self.nearest_tip_trajectory[:][0]) + distances = np.abs(trajectories - hub_x) + idx = np.argmin(distances) + tip = self.nearest_tip_trajectory[idx] + self.calculate_points = [tip] # 只保留一个最低点 + return tip + + def calculate_clearance(self): + """计算叶尖到轮毂的净空""" + if not self.nearest_tip_trajectory or self.hub_position is None: + return None, None + + hub_position = self.hub_position + tip_x, tip_y = self.find_closest_tip_x() + self.calculate_points = [[tip_x, tip_y]] # 只保留一个最低点 + hub_x, hub_y = hub_position + distance = np.sqrt((tip_x - hub_x) ** 2 + (tip_y - hub_y) ** 2) + + # 更新UI显示净空 + if self.scale_factor: + actual_distance = distance * self.scale_factor + tip_idx = self.traj_cnt % 3 # 根据实际匹配逻辑 + self.tip_clearances[tip_idx] = actual_distance + # 记录到历史 + self.clearance_history[tip_idx].append(actual_distance) + return distance, actual_distance + + def draw_annotations(self, frame): + """在帧上绘制轨迹和标记""" + # 绘制轮毂位置 + if self.hub_position: + hub_x, hub_y = self.hub_position + # 绘制轮毂区域标记 + cv2.rectangle(frame, + (int(hub_x - self.hub_x_range), int(hub_y - 10)), + (int(hub_x + self.hub_x_range), int(hub_y + 10)), + (0, 255, 255), 2) + cv2.circle(frame, (int(hub_x), int(hub_y)), 8, (0, 255, 255), -1) + cv2.putText(frame, "Hub", (int(hub_x) + 15, int(hub_y)), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + # 绘制最近的叶尖轨迹 + if len(self.nearest_tip_trajectory) >= 2: + # 绘制轨迹线(黄色) + for j in range(1, len(self.nearest_tip_trajectory)): + start_point = (int(self.nearest_tip_trajectory[j - 1][0]), + int(self.nearest_tip_trajectory[j - 1][1])) + end_point = (int(self.nearest_tip_trajectory[j][0]), + int(self.nearest_tip_trajectory[j][1])) + cv2.line(frame, start_point, end_point, (0, 255, 255), 1) + # 绘制三个叶尖的净空值 + y_offset = 30 + for i in range(3): + clearance = self.tip_clearances.get(i, None) + if clearance is not None: + text = f"tip{i + 1}: {clearance:.2f} m" + else: + text = f"tip{i + 1}: -- m" + cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) + y_offset += 25 + + # 绘制其他叶尖(不绘制轨迹) + if self.tracker.positions.size > 0: + for i, tip in enumerate(self.tracker.positions): + if not any(np.array_equal(tip, self.nearest_tip_trajectory[-1]) for p in self.nearest_tip_trajectory if + len(self.nearest_tip_trajectory) > 0): + color = self.tip_colors[i % len(self.tip_colors)] + cv2.circle(frame, (int(tip[0]), int(tip[1])), 6, color, -1) + cv2.putText(frame, f"Tip {i + 1}", (int(tip[0]) + 15, int(tip[1])), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) + + return frame + + def process_video(self, output_dir): + """处理整个视频并保存结果""" + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + last_frame = None + frame_count = 0 + + while True: + ret, frame = self.cap.read() + if not ret: + break + + # 处理帧 + processed_frame, tips, hubs = self.process_frame(frame) + + # 更新轮毂位置 + if hubs: + self.hub_position = hubs[0] + + # 更新轨迹 + if tips and self.hub_position is not None: + pixel_clearance, actual_clearance = self.update_trajectories(tips, self.hub_position) + + # 绘制注释 + annotated_frame = self.draw_annotations(processed_frame) + last_frame = annotated_frame.copy() + frame_count += 1 + + # 显示处理进度 + if frame_count % 10 == 0: + print(f"已处理 {frame_count} 帧...") + + # 保存最后一帧 + last_frame_path = os.path.join(output_dir, "last_frame.jpg") + cv2.imwrite(last_frame_path, last_frame) + print(f"最后一帧已保存至: {last_frame_path}") + + # 准备结果数据 + result = { + "clearance_history": { + "tip0": self.clearance_history[0], + "tip1": self.clearance_history[1], + "tip2": self.clearance_history[2] + }, + "last_frame_clearances": { + "tip0": self.tip_clearances[0], + "tip1": self.tip_clearances[1], + "tip2": self.tip_clearances[2] + }, + "last_frame_image": last_frame_path + } + + # 保存JSON结果 + json_path = os.path.join(output_dir, "results.json") + with open(json_path, 'w') as f: + json.dump(result, f, indent=4) + print(f"结果已保存至: {json_path}") + + return result + + +def main(): + parser = argparse.ArgumentParser(description='叶片净空计算系统') + parser.add_argument('--video_path', type=str, required=True, help='输入视频路径') + parser.add_argument('--model_path', type=str, required=True, help='模型文件路径') + parser.add_argument('--output_dir', type=str, required=True, help='结果保存目录') + parser.add_argument('--scale_factor', type=float, default=0.02, help='比例尺因子 (实际距离/像素距离)') + parser.add_argument('--width', type=int, default=544, help='图像宽度(像素)') + parser.add_argument('--height', type=int, default=960, help='图像高度(像素)') + args = parser.parse_args() + + calculator = ClearanceCalculator( + model_path=args.model_path, + scale_factor=args.scale_factor, + width=args.width, + height=args.height + ) + + if not calculator.load_video(args.video_path): + print(f"无法加载视频: {args.video_path}") + return + + print(f"开始处理视频: {args.video_path}") + print(f"图像尺寸设置为: ({args.width}, {args.height})") + + result = calculator.process_video(args.output_dir) + + +if __name__ == "__main__": + main() + """ + python cli_clearance.py \ + --video test.mp4 \ + --model best.pt \ + --output ./results \ + --scale_factor 0.02 \ + --width 640 \ + --height 480 + """ + """ + { + "clearances": { + "0": [5.12, 5.09, 5.11], + "1": [4.98, 5.01], + "2": [] + }, + "last_frame_clearances": { + "tip1": 5.11, + "tip2": 5.01, + "tip3": null + } +} + """ \ No newline at end of file