425 lines
15 KiB
Python
425 lines
15 KiB
Python
|
import cv2
|
|||
|
import numpy as np
|
|||
|
from ultralytics import YOLO
|
|||
|
import os
|
|||
|
import json
|
|||
|
import argparse
|
|||
|
|
|||
|
|
|||
|
class Target2D:
|
|||
|
"""二维坐标下的叶尖跟踪器"""
|
|||
|
|
|||
|
def __init__(self, positions=None, hub=None):
|
|||
|
if positions is None:
|
|||
|
positions = np.array([[-100, -100], [-200, -200], [-300, -300]])
|
|||
|
if hub is None:
|
|||
|
hub = np.array([400, 300])
|
|||
|
|
|||
|
self.positions = np.array(positions, dtype=float) # 叶尖位置
|
|||
|
self.hub = np.array(hub, dtype=float) # 轮毂位置
|
|||
|
|
|||
|
def update(self, new_positions, new_hub=None):
|
|||
|
"""更新叶尖位置,使用最近点匹配算法"""
|
|||
|
if new_positions is None or len(new_positions) == 0:
|
|||
|
return self.positions, []
|
|||
|
|
|||
|
# 确保新位置是二维数组
|
|||
|
new_positions = np.array(new_positions, dtype=float)
|
|||
|
if new_positions.ndim == 1:
|
|||
|
new_positions = new_positions.reshape(1, -1)
|
|||
|
|
|||
|
# 更新轮毂位置
|
|||
|
if new_hub is not None:
|
|||
|
self.hub = np.array(new_hub, dtype=float)
|
|||
|
|
|||
|
# 如果还没有初始化位置,使用新位置
|
|||
|
if self.positions.size == 0:
|
|||
|
self.positions = new_positions
|
|||
|
return self.positions
|
|||
|
# 创建位置列表副本
|
|||
|
new_pos_list = new_positions.tolist()
|
|||
|
update_positons = self.positions.copy()
|
|||
|
used = set()
|
|||
|
# 对于每个新位置,找到最近的点
|
|||
|
for i, new_pos in enumerate(new_pos_list):
|
|||
|
# 计算所有距离
|
|||
|
distances = np.linalg.norm(self.positions - new_pos, axis=1)
|
|||
|
available_indices = [j for j in range(len(distances)) if j not in used]
|
|||
|
# 找到最近点的索引
|
|||
|
closest_idx = min(available_indices, key=lambda j: distances[j])
|
|||
|
|
|||
|
# 记录ID并更新位置
|
|||
|
update_positons[closest_idx] = new_pos # new_pos
|
|||
|
|
|||
|
# 更新位置
|
|||
|
self.positions = np.array(update_positons)
|
|||
|
|
|||
|
return self.positions
|
|||
|
|
|||
|
|
|||
|
class ClearanceCalculator:
|
|||
|
def __init__(self, model_path, scale_factor=0.02,width=544, height=960):
|
|||
|
self.size = (width, height)
|
|||
|
# 初始化模型配置
|
|||
|
self.model = YOLO(model_path)
|
|||
|
self.cap = None
|
|||
|
self.video_path = ""
|
|||
|
self.is_playing = False
|
|||
|
self.scale_factor = scale_factor
|
|||
|
self.reference_pixel = 0.0
|
|||
|
self.reference_actual = 0.0
|
|||
|
self.current_clearance = 0.0
|
|||
|
self.use_camera = False
|
|||
|
self.hub_x_range = 100 # 轮毂x坐标的左右区间(像素)
|
|||
|
self.hub_y_range = 50
|
|||
|
self.inside_region = False # 轨迹是否在轮毂区域内
|
|||
|
self.entered_from_left = False # 轨迹是否从左侧进入
|
|||
|
self.last_pixel_clearance = 0.0 # 保存上一次的像素净空
|
|||
|
self.last_actual_clearance = 0.0 # 保存上一次的实际净空
|
|||
|
# 轨迹相关变量
|
|||
|
self.trajectories = []
|
|||
|
# self.lowest_points = []
|
|||
|
self.calculate_points = []
|
|||
|
self.hub_position = None
|
|||
|
self.last_tip_positions = []
|
|||
|
self.frame_count = 0
|
|||
|
self.nearest_tip_trajectory = [] # 只存储与轮毂x坐标最近的叶尖轨迹
|
|||
|
self.traj_cnt = 0
|
|||
|
self.tip_clearances = {
|
|||
|
0: None, # tip1
|
|||
|
1: None, # tip2
|
|||
|
2: None # tip3
|
|||
|
}
|
|||
|
# 净空历史记录
|
|||
|
self.clearance_history = {0: [], 1: [], 2: []}
|
|||
|
|
|||
|
# 初始化轨迹颜色
|
|||
|
self.tip_colors = [
|
|||
|
(255, 0, 0), # 红色
|
|||
|
(0, 255, 0), # 绿色
|
|||
|
(0, 0, 255) # 蓝色
|
|||
|
]
|
|||
|
|
|||
|
# 初始化2D跟踪器
|
|||
|
self.tracker = Target2D()
|
|||
|
|
|||
|
def set_hub_x_range(self, x_value, y_value=None):
|
|||
|
"""设置轮毂x和y坐标的区间"""
|
|||
|
self.hub_x_range = x_value
|
|||
|
if y_value is not None:
|
|||
|
self.hub_y_range = y_value
|
|||
|
|
|||
|
def load_video(self, path):
|
|||
|
"""加载视频文件"""
|
|||
|
self.use_camera = False
|
|||
|
self.video_path = path
|
|||
|
self.cap = cv2.VideoCapture(path)
|
|||
|
return self.cap.isOpened()
|
|||
|
|
|||
|
def process_frame(self, frame):
|
|||
|
"""处理视频帧并检测叶尖和轮毂"""
|
|||
|
frame = cv2.resize(frame, self.size)
|
|||
|
# 使用模型进行检测
|
|||
|
results = self.model(frame, verbose=False)
|
|||
|
|
|||
|
# 初始化检测结果字典
|
|||
|
detected = {'tip': [], 'hub': []}
|
|||
|
|
|||
|
# 解析检测结果
|
|||
|
for result in results:
|
|||
|
boxes = result.boxes
|
|||
|
classes = boxes.cls
|
|||
|
class_names = result.names
|
|||
|
|
|||
|
for i in range(len(boxes)):
|
|||
|
class_id = int(classes[i])
|
|||
|
box = boxes.xyxy[i]
|
|||
|
x1, y1, x2, y2 = box.tolist()
|
|||
|
class_name = class_names[class_id]
|
|||
|
|
|||
|
# 只关注叶尖(tip)和轮毂(hub)
|
|||
|
if class_name in detected:
|
|||
|
# 使用底部中心点作为位置
|
|||
|
if class_name == 'tip':
|
|||
|
tip_x = (x1 + x2) / 2
|
|||
|
tip_y = y2 # 使用底部作为叶尖位置
|
|||
|
detected[class_name].append([tip_x, tip_y])
|
|||
|
else: # hub
|
|||
|
hub_x = (x1 + x2) / 2
|
|||
|
hub_y = (y1 + y2) / 2
|
|||
|
detected[class_name].append([hub_x, hub_y])
|
|||
|
|
|||
|
# 限制检测数量
|
|||
|
if len(detected['tip']) > 3:
|
|||
|
detected['tip'] = detected['tip'][:3]
|
|||
|
if len(detected['hub']) > 1:
|
|||
|
detected['hub'] = detected['hub'][:1]
|
|||
|
|
|||
|
return frame, detected['tip'], detected['hub']
|
|||
|
|
|||
|
def check_trajectory_state(self, tip, hub, thresh=20):
|
|||
|
"""检查轨迹是否进入或离开轮毂区域"""
|
|||
|
hub_x = hub[0] if hub else None
|
|||
|
if hub_x is None or tip is None:
|
|||
|
return None, None
|
|||
|
|
|||
|
# 检查轨迹是否在轮毂区域内
|
|||
|
inside = (hub_x - self.hub_x_range) < tip[0] < (hub_x + self.hub_x_range)
|
|||
|
# 如果轨迹进入区域,并且是从左侧进入
|
|||
|
if inside and not self.inside_region:
|
|||
|
self.inside_region = True
|
|||
|
self.entered_from_left = tip[0] >= (hub_x - self.hub_x_range)
|
|||
|
|
|||
|
if inside and self.inside_region and self.entered_from_left and (hub_x - thresh) < tip[0] < (hub_x + thresh):
|
|||
|
distance, actual_distance = self.calculate_clearance()
|
|||
|
return distance, actual_distance
|
|||
|
# 如果轨迹离开区域,并且是从左侧进入且从右侧离开
|
|||
|
if not inside and self.inside_region and self.entered_from_left and tip[0] > (hub_x + self.hub_x_range):
|
|||
|
self.inside_region = False
|
|||
|
self.traj_cnt += 1
|
|||
|
return None, None
|
|||
|
|
|||
|
def update_trajectories(self, tips, hub):
|
|||
|
"""更新叶尖轨迹,只关注轮毂x和y坐标附近的叶尖"""
|
|||
|
# 使用2D跟踪器更新位置
|
|||
|
self.tracker.update(tips, hub)
|
|||
|
sorted_tips = self.tracker.positions
|
|||
|
|
|||
|
# 如果没有轮毂位置,无法判断附近
|
|||
|
if hub is None or len(hub) == 0:
|
|||
|
return None, None
|
|||
|
|
|||
|
hub_x = hub[0] # 轮毂的x坐标
|
|||
|
hub_y = hub[1] # 轮毂的y坐标
|
|||
|
|
|||
|
# 初始化轨迹
|
|||
|
if not self.trajectories:
|
|||
|
for _ in range(len(sorted_tips)):
|
|||
|
self.trajectories.append([])
|
|||
|
|
|||
|
# 找到与轮毂x和y坐标都最近的叶尖
|
|||
|
nearest_tip = None
|
|||
|
min_distance = float('inf')
|
|||
|
|
|||
|
for tip in sorted_tips:
|
|||
|
tip_x, tip_y = tip
|
|||
|
# 检查是否在轮毂x坐标左右区间内
|
|||
|
if abs(tip_x - hub_x) > self.hub_x_range + 20:
|
|||
|
continue
|
|||
|
# 检查是否大于轮毂y坐标加y区间
|
|||
|
if tip_y < hub_y + self.hub_y_range:
|
|||
|
continue
|
|||
|
|
|||
|
# 计算与轮毂的综合距离(考虑x)
|
|||
|
distance = abs(tip_x - hub_x) # + (tip_y - hub_y) ** 2
|
|||
|
if distance < min_distance:
|
|||
|
min_distance = distance
|
|||
|
nearest_tip = tip
|
|||
|
|
|||
|
# 更新最近的叶尖轨迹
|
|||
|
if nearest_tip is not None:
|
|||
|
if not self.nearest_tip_trajectory:
|
|||
|
self.nearest_tip_trajectory = [nearest_tip]
|
|||
|
else:
|
|||
|
self.nearest_tip_trajectory.append(nearest_tip)
|
|||
|
|
|||
|
# 限制轨迹长度
|
|||
|
if len(self.nearest_tip_trajectory) > 200:
|
|||
|
self.nearest_tip_trajectory.pop(0)
|
|||
|
|
|||
|
return self.check_trajectory_state(nearest_tip, hub)
|
|||
|
|
|||
|
def find_closest_tip_x(self):
|
|||
|
"""找到与轮毂x坐标最接近的叶尖x坐标的叶尖"""
|
|||
|
if not self.nearest_tip_trajectory:
|
|||
|
return None
|
|||
|
hub_x = self.hub_position[0] if self.hub_position else None
|
|||
|
if hub_x is None:
|
|||
|
return None
|
|||
|
trajectories = np.array(self.nearest_tip_trajectory[:][0])
|
|||
|
distances = np.abs(trajectories - hub_x)
|
|||
|
idx = np.argmin(distances)
|
|||
|
tip = self.nearest_tip_trajectory[idx]
|
|||
|
self.calculate_points = [tip] # 只保留一个最低点
|
|||
|
return tip
|
|||
|
|
|||
|
def calculate_clearance(self):
|
|||
|
"""计算叶尖到轮毂的净空"""
|
|||
|
if not self.nearest_tip_trajectory or self.hub_position is None:
|
|||
|
return None, None
|
|||
|
|
|||
|
hub_position = self.hub_position
|
|||
|
tip_x, tip_y = self.find_closest_tip_x()
|
|||
|
self.calculate_points = [[tip_x, tip_y]] # 只保留一个最低点
|
|||
|
hub_x, hub_y = hub_position
|
|||
|
distance = np.sqrt((tip_x - hub_x) ** 2 + (tip_y - hub_y) ** 2)
|
|||
|
|
|||
|
# 更新UI显示净空
|
|||
|
if self.scale_factor:
|
|||
|
actual_distance = distance * self.scale_factor
|
|||
|
tip_idx = self.traj_cnt % 3 # 根据实际匹配逻辑
|
|||
|
self.tip_clearances[tip_idx] = actual_distance
|
|||
|
# 记录到历史
|
|||
|
self.clearance_history[tip_idx].append(actual_distance)
|
|||
|
return distance, actual_distance
|
|||
|
|
|||
|
def draw_annotations(self, frame):
|
|||
|
"""在帧上绘制轨迹和标记"""
|
|||
|
# 绘制轮毂位置
|
|||
|
if self.hub_position:
|
|||
|
hub_x, hub_y = self.hub_position
|
|||
|
# 绘制轮毂区域标记
|
|||
|
cv2.rectangle(frame,
|
|||
|
(int(hub_x - self.hub_x_range), int(hub_y - 10)),
|
|||
|
(int(hub_x + self.hub_x_range), int(hub_y + 10)),
|
|||
|
(0, 255, 255), 2)
|
|||
|
cv2.circle(frame, (int(hub_x), int(hub_y)), 8, (0, 255, 255), -1)
|
|||
|
cv2.putText(frame, "Hub", (int(hub_x) + 15, int(hub_y)),
|
|||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
|||
|
|
|||
|
# 绘制最近的叶尖轨迹
|
|||
|
if len(self.nearest_tip_trajectory) >= 2:
|
|||
|
# 绘制轨迹线(黄色)
|
|||
|
for j in range(1, len(self.nearest_tip_trajectory)):
|
|||
|
start_point = (int(self.nearest_tip_trajectory[j - 1][0]),
|
|||
|
int(self.nearest_tip_trajectory[j - 1][1]))
|
|||
|
end_point = (int(self.nearest_tip_trajectory[j][0]),
|
|||
|
int(self.nearest_tip_trajectory[j][1]))
|
|||
|
cv2.line(frame, start_point, end_point, (0, 255, 255), 1)
|
|||
|
# 绘制三个叶尖的净空值
|
|||
|
y_offset = 30
|
|||
|
for i in range(3):
|
|||
|
clearance = self.tip_clearances.get(i, None)
|
|||
|
if clearance is not None:
|
|||
|
text = f"tip{i + 1}: {clearance:.2f} m"
|
|||
|
else:
|
|||
|
text = f"tip{i + 1}: -- m"
|
|||
|
cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
|
|||
|
y_offset += 25
|
|||
|
|
|||
|
# 绘制其他叶尖(不绘制轨迹)
|
|||
|
if self.tracker.positions.size > 0:
|
|||
|
for i, tip in enumerate(self.tracker.positions):
|
|||
|
if not any(np.array_equal(tip, self.nearest_tip_trajectory[-1]) for p in self.nearest_tip_trajectory if
|
|||
|
len(self.nearest_tip_trajectory) > 0):
|
|||
|
color = self.tip_colors[i % len(self.tip_colors)]
|
|||
|
cv2.circle(frame, (int(tip[0]), int(tip[1])), 6, color, -1)
|
|||
|
cv2.putText(frame, f"Tip {i + 1}", (int(tip[0]) + 15, int(tip[1])),
|
|||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
|
|||
|
|
|||
|
return frame
|
|||
|
|
|||
|
def process_video(self, output_dir):
|
|||
|
"""处理整个视频并保存结果"""
|
|||
|
if not os.path.exists(output_dir):
|
|||
|
os.makedirs(output_dir)
|
|||
|
|
|||
|
last_frame = None
|
|||
|
frame_count = 0
|
|||
|
|
|||
|
while True:
|
|||
|
ret, frame = self.cap.read()
|
|||
|
if not ret:
|
|||
|
break
|
|||
|
|
|||
|
# 处理帧
|
|||
|
processed_frame, tips, hubs = self.process_frame(frame)
|
|||
|
|
|||
|
# 更新轮毂位置
|
|||
|
if hubs:
|
|||
|
self.hub_position = hubs[0]
|
|||
|
|
|||
|
# 更新轨迹
|
|||
|
if tips and self.hub_position is not None:
|
|||
|
pixel_clearance, actual_clearance = self.update_trajectories(tips, self.hub_position)
|
|||
|
|
|||
|
# 绘制注释
|
|||
|
annotated_frame = self.draw_annotations(processed_frame)
|
|||
|
last_frame = annotated_frame.copy()
|
|||
|
frame_count += 1
|
|||
|
|
|||
|
# 显示处理进度
|
|||
|
if frame_count % 10 == 0:
|
|||
|
print(f"已处理 {frame_count} 帧...")
|
|||
|
|
|||
|
# 保存最后一帧
|
|||
|
last_frame_path = os.path.join(output_dir, "last_frame.jpg")
|
|||
|
cv2.imwrite(last_frame_path, last_frame)
|
|||
|
print(f"最后一帧已保存至: {last_frame_path}")
|
|||
|
|
|||
|
# 准备结果数据
|
|||
|
result = {
|
|||
|
"clearance_history": {
|
|||
|
"tip0": self.clearance_history[0],
|
|||
|
"tip1": self.clearance_history[1],
|
|||
|
"tip2": self.clearance_history[2]
|
|||
|
},
|
|||
|
"last_frame_clearances": {
|
|||
|
"tip0": self.tip_clearances[0],
|
|||
|
"tip1": self.tip_clearances[1],
|
|||
|
"tip2": self.tip_clearances[2]
|
|||
|
},
|
|||
|
"last_frame_image": last_frame_path
|
|||
|
}
|
|||
|
|
|||
|
# 保存JSON结果
|
|||
|
json_path = os.path.join(output_dir, "results.json")
|
|||
|
with open(json_path, 'w') as f:
|
|||
|
json.dump(result, f, indent=4)
|
|||
|
print(f"结果已保存至: {json_path}")
|
|||
|
|
|||
|
return result
|
|||
|
|
|||
|
|
|||
|
def main():
|
|||
|
parser = argparse.ArgumentParser(description='叶片净空计算系统')
|
|||
|
parser.add_argument('--video_path', type=str, required=True, help='输入视频路径')
|
|||
|
parser.add_argument('--model_path', type=str, required=True, help='模型文件路径')
|
|||
|
parser.add_argument('--output_dir', type=str, required=True, help='结果保存目录')
|
|||
|
parser.add_argument('--scale_factor', type=float, default=0.02, help='比例尺因子 (实际距离/像素距离)')
|
|||
|
parser.add_argument('--width', type=int, default=544, help='图像宽度(像素)')
|
|||
|
parser.add_argument('--height', type=int, default=960, help='图像高度(像素)')
|
|||
|
args = parser.parse_args()
|
|||
|
|
|||
|
calculator = ClearanceCalculator(
|
|||
|
model_path=args.model_path,
|
|||
|
scale_factor=args.scale_factor,
|
|||
|
width=args.width,
|
|||
|
height=args.height
|
|||
|
)
|
|||
|
|
|||
|
if not calculator.load_video(args.video_path):
|
|||
|
print(f"无法加载视频: {args.video_path}")
|
|||
|
return
|
|||
|
|
|||
|
print(f"开始处理视频: {args.video_path}")
|
|||
|
print(f"图像尺寸设置为: ({args.width}, {args.height})")
|
|||
|
|
|||
|
result = calculator.process_video(args.output_dir)
|
|||
|
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
main()
|
|||
|
"""
|
|||
|
python cli_clearance.py \
|
|||
|
--video test.mp4 \
|
|||
|
--model best.pt \
|
|||
|
--output ./results \
|
|||
|
--scale_factor 0.02 \
|
|||
|
--width 640 \
|
|||
|
--height 480
|
|||
|
"""
|
|||
|
"""
|
|||
|
{
|
|||
|
"clearances": {
|
|||
|
"0": [5.12, 5.09, 5.11],
|
|||
|
"1": [4.98, 5.01],
|
|||
|
"2": []
|
|||
|
},
|
|||
|
"last_frame_clearances": {
|
|||
|
"tip1": 5.11,
|
|||
|
"tip2": 5.01,
|
|||
|
"tip3": null
|
|||
|
}
|
|||
|
}
|
|||
|
"""
|