净空检测与报告
This commit is contained in:
parent
1cd2676814
commit
bd3b95395a
|
@ -0,0 +1,425 @@
|
|||
import cv2
|
||||
import numpy as np
|
||||
from ultralytics import YOLO
|
||||
import os
|
||||
import json
|
||||
import argparse
|
||||
|
||||
|
||||
class Target2D:
|
||||
"""二维坐标下的叶尖跟踪器"""
|
||||
|
||||
def __init__(self, positions=None, hub=None):
|
||||
if positions is None:
|
||||
positions = np.array([[-100, -100], [-200, -200], [-300, -300]])
|
||||
if hub is None:
|
||||
hub = np.array([400, 300])
|
||||
|
||||
self.positions = np.array(positions, dtype=float) # 叶尖位置
|
||||
self.hub = np.array(hub, dtype=float) # 轮毂位置
|
||||
|
||||
def update(self, new_positions, new_hub=None):
|
||||
"""更新叶尖位置,使用最近点匹配算法"""
|
||||
if new_positions is None or len(new_positions) == 0:
|
||||
return self.positions, []
|
||||
|
||||
# 确保新位置是二维数组
|
||||
new_positions = np.array(new_positions, dtype=float)
|
||||
if new_positions.ndim == 1:
|
||||
new_positions = new_positions.reshape(1, -1)
|
||||
|
||||
# 更新轮毂位置
|
||||
if new_hub is not None:
|
||||
self.hub = np.array(new_hub, dtype=float)
|
||||
|
||||
# 如果还没有初始化位置,使用新位置
|
||||
if self.positions.size == 0:
|
||||
self.positions = new_positions
|
||||
return self.positions
|
||||
# 创建位置列表副本
|
||||
new_pos_list = new_positions.tolist()
|
||||
update_positons = self.positions.copy()
|
||||
used = set()
|
||||
# 对于每个新位置,找到最近的点
|
||||
for i, new_pos in enumerate(new_pos_list):
|
||||
# 计算所有距离
|
||||
distances = np.linalg.norm(self.positions - new_pos, axis=1)
|
||||
available_indices = [j for j in range(len(distances)) if j not in used]
|
||||
# 找到最近点的索引
|
||||
closest_idx = min(available_indices, key=lambda j: distances[j])
|
||||
|
||||
# 记录ID并更新位置
|
||||
update_positons[closest_idx] = new_pos # new_pos
|
||||
|
||||
# 更新位置
|
||||
self.positions = np.array(update_positons)
|
||||
|
||||
return self.positions
|
||||
|
||||
|
||||
class ClearanceCalculator:
|
||||
def __init__(self, model_path, scale_factor=0.02,width=544, height=960):
|
||||
self.size = (width, height)
|
||||
# 初始化模型配置
|
||||
self.model = YOLO(model_path)
|
||||
self.cap = None
|
||||
self.video_path = ""
|
||||
self.is_playing = False
|
||||
self.scale_factor = scale_factor
|
||||
self.reference_pixel = 0.0
|
||||
self.reference_actual = 0.0
|
||||
self.current_clearance = 0.0
|
||||
self.use_camera = False
|
||||
self.hub_x_range = 100 # 轮毂x坐标的左右区间(像素)
|
||||
self.hub_y_range = 50
|
||||
self.inside_region = False # 轨迹是否在轮毂区域内
|
||||
self.entered_from_left = False # 轨迹是否从左侧进入
|
||||
self.last_pixel_clearance = 0.0 # 保存上一次的像素净空
|
||||
self.last_actual_clearance = 0.0 # 保存上一次的实际净空
|
||||
# 轨迹相关变量
|
||||
self.trajectories = []
|
||||
# self.lowest_points = []
|
||||
self.calculate_points = []
|
||||
self.hub_position = None
|
||||
self.last_tip_positions = []
|
||||
self.frame_count = 0
|
||||
self.nearest_tip_trajectory = [] # 只存储与轮毂x坐标最近的叶尖轨迹
|
||||
self.traj_cnt = 0
|
||||
self.tip_clearances = {
|
||||
0: None, # tip1
|
||||
1: None, # tip2
|
||||
2: None # tip3
|
||||
}
|
||||
# 净空历史记录
|
||||
self.clearance_history = {0: [], 1: [], 2: []}
|
||||
|
||||
# 初始化轨迹颜色
|
||||
self.tip_colors = [
|
||||
(255, 0, 0), # 红色
|
||||
(0, 255, 0), # 绿色
|
||||
(0, 0, 255) # 蓝色
|
||||
]
|
||||
|
||||
# 初始化2D跟踪器
|
||||
self.tracker = Target2D()
|
||||
|
||||
def set_hub_x_range(self, x_value, y_value=None):
|
||||
"""设置轮毂x和y坐标的区间"""
|
||||
self.hub_x_range = x_value
|
||||
if y_value is not None:
|
||||
self.hub_y_range = y_value
|
||||
|
||||
def load_video(self, path):
|
||||
"""加载视频文件"""
|
||||
self.use_camera = False
|
||||
self.video_path = path
|
||||
self.cap = cv2.VideoCapture(path)
|
||||
return self.cap.isOpened()
|
||||
|
||||
def process_frame(self, frame):
|
||||
"""处理视频帧并检测叶尖和轮毂"""
|
||||
frame = cv2.resize(frame, self.size)
|
||||
# 使用模型进行检测
|
||||
results = self.model(frame, verbose=False)
|
||||
|
||||
# 初始化检测结果字典
|
||||
detected = {'tip': [], 'hub': []}
|
||||
|
||||
# 解析检测结果
|
||||
for result in results:
|
||||
boxes = result.boxes
|
||||
classes = boxes.cls
|
||||
class_names = result.names
|
||||
|
||||
for i in range(len(boxes)):
|
||||
class_id = int(classes[i])
|
||||
box = boxes.xyxy[i]
|
||||
x1, y1, x2, y2 = box.tolist()
|
||||
class_name = class_names[class_id]
|
||||
|
||||
# 只关注叶尖(tip)和轮毂(hub)
|
||||
if class_name in detected:
|
||||
# 使用底部中心点作为位置
|
||||
if class_name == 'tip':
|
||||
tip_x = (x1 + x2) / 2
|
||||
tip_y = y2 # 使用底部作为叶尖位置
|
||||
detected[class_name].append([tip_x, tip_y])
|
||||
else: # hub
|
||||
hub_x = (x1 + x2) / 2
|
||||
hub_y = (y1 + y2) / 2
|
||||
detected[class_name].append([hub_x, hub_y])
|
||||
|
||||
# 限制检测数量
|
||||
if len(detected['tip']) > 3:
|
||||
detected['tip'] = detected['tip'][:3]
|
||||
if len(detected['hub']) > 1:
|
||||
detected['hub'] = detected['hub'][:1]
|
||||
|
||||
return frame, detected['tip'], detected['hub']
|
||||
|
||||
def check_trajectory_state(self, tip, hub, thresh=20):
|
||||
"""检查轨迹是否进入或离开轮毂区域"""
|
||||
hub_x = hub[0] if hub else None
|
||||
if hub_x is None or tip is None:
|
||||
return None, None
|
||||
|
||||
# 检查轨迹是否在轮毂区域内
|
||||
inside = (hub_x - self.hub_x_range) < tip[0] < (hub_x + self.hub_x_range)
|
||||
# 如果轨迹进入区域,并且是从左侧进入
|
||||
if inside and not self.inside_region:
|
||||
self.inside_region = True
|
||||
self.entered_from_left = tip[0] >= (hub_x - self.hub_x_range)
|
||||
|
||||
if inside and self.inside_region and self.entered_from_left and (hub_x - thresh) < tip[0] < (hub_x + thresh):
|
||||
distance, actual_distance = self.calculate_clearance()
|
||||
return distance, actual_distance
|
||||
# 如果轨迹离开区域,并且是从左侧进入且从右侧离开
|
||||
if not inside and self.inside_region and self.entered_from_left and tip[0] > (hub_x + self.hub_x_range):
|
||||
self.inside_region = False
|
||||
self.traj_cnt += 1
|
||||
return None, None
|
||||
|
||||
def update_trajectories(self, tips, hub):
|
||||
"""更新叶尖轨迹,只关注轮毂x和y坐标附近的叶尖"""
|
||||
# 使用2D跟踪器更新位置
|
||||
self.tracker.update(tips, hub)
|
||||
sorted_tips = self.tracker.positions
|
||||
|
||||
# 如果没有轮毂位置,无法判断附近
|
||||
if hub is None or len(hub) == 0:
|
||||
return None, None
|
||||
|
||||
hub_x = hub[0] # 轮毂的x坐标
|
||||
hub_y = hub[1] # 轮毂的y坐标
|
||||
|
||||
# 初始化轨迹
|
||||
if not self.trajectories:
|
||||
for _ in range(len(sorted_tips)):
|
||||
self.trajectories.append([])
|
||||
|
||||
# 找到与轮毂x和y坐标都最近的叶尖
|
||||
nearest_tip = None
|
||||
min_distance = float('inf')
|
||||
|
||||
for tip in sorted_tips:
|
||||
tip_x, tip_y = tip
|
||||
# 检查是否在轮毂x坐标左右区间内
|
||||
if abs(tip_x - hub_x) > self.hub_x_range + 20:
|
||||
continue
|
||||
# 检查是否大于轮毂y坐标加y区间
|
||||
if tip_y < hub_y + self.hub_y_range:
|
||||
continue
|
||||
|
||||
# 计算与轮毂的综合距离(考虑x)
|
||||
distance = abs(tip_x - hub_x) # + (tip_y - hub_y) ** 2
|
||||
if distance < min_distance:
|
||||
min_distance = distance
|
||||
nearest_tip = tip
|
||||
|
||||
# 更新最近的叶尖轨迹
|
||||
if nearest_tip is not None:
|
||||
if not self.nearest_tip_trajectory:
|
||||
self.nearest_tip_trajectory = [nearest_tip]
|
||||
else:
|
||||
self.nearest_tip_trajectory.append(nearest_tip)
|
||||
|
||||
# 限制轨迹长度
|
||||
if len(self.nearest_tip_trajectory) > 200:
|
||||
self.nearest_tip_trajectory.pop(0)
|
||||
|
||||
return self.check_trajectory_state(nearest_tip, hub)
|
||||
|
||||
def find_closest_tip_x(self):
|
||||
"""找到与轮毂x坐标最接近的叶尖x坐标的叶尖"""
|
||||
if not self.nearest_tip_trajectory:
|
||||
return None
|
||||
hub_x = self.hub_position[0] if self.hub_position else None
|
||||
if hub_x is None:
|
||||
return None
|
||||
trajectories = np.array(self.nearest_tip_trajectory[:][0])
|
||||
distances = np.abs(trajectories - hub_x)
|
||||
idx = np.argmin(distances)
|
||||
tip = self.nearest_tip_trajectory[idx]
|
||||
self.calculate_points = [tip] # 只保留一个最低点
|
||||
return tip
|
||||
|
||||
def calculate_clearance(self):
|
||||
"""计算叶尖到轮毂的净空"""
|
||||
if not self.nearest_tip_trajectory or self.hub_position is None:
|
||||
return None, None
|
||||
|
||||
hub_position = self.hub_position
|
||||
tip_x, tip_y = self.find_closest_tip_x()
|
||||
self.calculate_points = [[tip_x, tip_y]] # 只保留一个最低点
|
||||
hub_x, hub_y = hub_position
|
||||
distance = np.sqrt((tip_x - hub_x) ** 2 + (tip_y - hub_y) ** 2)
|
||||
|
||||
# 更新UI显示净空
|
||||
if self.scale_factor:
|
||||
actual_distance = distance * self.scale_factor
|
||||
tip_idx = self.traj_cnt % 3 # 根据实际匹配逻辑
|
||||
self.tip_clearances[tip_idx] = actual_distance
|
||||
# 记录到历史
|
||||
self.clearance_history[tip_idx].append(actual_distance)
|
||||
return distance, actual_distance
|
||||
|
||||
def draw_annotations(self, frame):
|
||||
"""在帧上绘制轨迹和标记"""
|
||||
# 绘制轮毂位置
|
||||
if self.hub_position:
|
||||
hub_x, hub_y = self.hub_position
|
||||
# 绘制轮毂区域标记
|
||||
cv2.rectangle(frame,
|
||||
(int(hub_x - self.hub_x_range), int(hub_y - 10)),
|
||||
(int(hub_x + self.hub_x_range), int(hub_y + 10)),
|
||||
(0, 255, 255), 2)
|
||||
cv2.circle(frame, (int(hub_x), int(hub_y)), 8, (0, 255, 255), -1)
|
||||
cv2.putText(frame, "Hub", (int(hub_x) + 15, int(hub_y)),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||||
|
||||
# 绘制最近的叶尖轨迹
|
||||
if len(self.nearest_tip_trajectory) >= 2:
|
||||
# 绘制轨迹线(黄色)
|
||||
for j in range(1, len(self.nearest_tip_trajectory)):
|
||||
start_point = (int(self.nearest_tip_trajectory[j - 1][0]),
|
||||
int(self.nearest_tip_trajectory[j - 1][1]))
|
||||
end_point = (int(self.nearest_tip_trajectory[j][0]),
|
||||
int(self.nearest_tip_trajectory[j][1]))
|
||||
cv2.line(frame, start_point, end_point, (0, 255, 255), 1)
|
||||
# 绘制三个叶尖的净空值
|
||||
y_offset = 30
|
||||
for i in range(3):
|
||||
clearance = self.tip_clearances.get(i, None)
|
||||
if clearance is not None:
|
||||
text = f"tip{i + 1}: {clearance:.2f} m"
|
||||
else:
|
||||
text = f"tip{i + 1}: -- m"
|
||||
cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
|
||||
y_offset += 25
|
||||
|
||||
# 绘制其他叶尖(不绘制轨迹)
|
||||
if self.tracker.positions.size > 0:
|
||||
for i, tip in enumerate(self.tracker.positions):
|
||||
if not any(np.array_equal(tip, self.nearest_tip_trajectory[-1]) for p in self.nearest_tip_trajectory if
|
||||
len(self.nearest_tip_trajectory) > 0):
|
||||
color = self.tip_colors[i % len(self.tip_colors)]
|
||||
cv2.circle(frame, (int(tip[0]), int(tip[1])), 6, color, -1)
|
||||
cv2.putText(frame, f"Tip {i + 1}", (int(tip[0]) + 15, int(tip[1])),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
|
||||
|
||||
return frame
|
||||
|
||||
def process_video(self, output_dir):
|
||||
"""处理整个视频并保存结果"""
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
last_frame = None
|
||||
frame_count = 0
|
||||
|
||||
while True:
|
||||
ret, frame = self.cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
# 处理帧
|
||||
processed_frame, tips, hubs = self.process_frame(frame)
|
||||
|
||||
# 更新轮毂位置
|
||||
if hubs:
|
||||
self.hub_position = hubs[0]
|
||||
|
||||
# 更新轨迹
|
||||
if tips and self.hub_position is not None:
|
||||
pixel_clearance, actual_clearance = self.update_trajectories(tips, self.hub_position)
|
||||
|
||||
# 绘制注释
|
||||
annotated_frame = self.draw_annotations(processed_frame)
|
||||
last_frame = annotated_frame.copy()
|
||||
frame_count += 1
|
||||
|
||||
# 显示处理进度
|
||||
if frame_count % 10 == 0:
|
||||
print(f"已处理 {frame_count} 帧...")
|
||||
|
||||
# 保存最后一帧
|
||||
last_frame_path = os.path.join(output_dir, "last_frame.jpg")
|
||||
cv2.imwrite(last_frame_path, last_frame)
|
||||
print(f"最后一帧已保存至: {last_frame_path}")
|
||||
|
||||
# 准备结果数据
|
||||
result = {
|
||||
"clearance_history": {
|
||||
"tip0": self.clearance_history[0],
|
||||
"tip1": self.clearance_history[1],
|
||||
"tip2": self.clearance_history[2]
|
||||
},
|
||||
"last_frame_clearances": {
|
||||
"tip0": self.tip_clearances[0],
|
||||
"tip1": self.tip_clearances[1],
|
||||
"tip2": self.tip_clearances[2]
|
||||
},
|
||||
"last_frame_image": last_frame_path
|
||||
}
|
||||
|
||||
# 保存JSON结果
|
||||
json_path = os.path.join(output_dir, "results.json")
|
||||
with open(json_path, 'w') as f:
|
||||
json.dump(result, f, indent=4)
|
||||
print(f"结果已保存至: {json_path}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='叶片净空计算系统')
|
||||
parser.add_argument('--video_path', type=str, required=True, help='输入视频路径')
|
||||
parser.add_argument('--model_path', type=str, required=True, help='模型文件路径')
|
||||
parser.add_argument('--output_dir', type=str, required=True, help='结果保存目录')
|
||||
parser.add_argument('--scale_factor', type=float, default=0.02, help='比例尺因子 (实际距离/像素距离)')
|
||||
parser.add_argument('--width', type=int, default=544, help='图像宽度(像素)')
|
||||
parser.add_argument('--height', type=int, default=960, help='图像高度(像素)')
|
||||
args = parser.parse_args()
|
||||
|
||||
calculator = ClearanceCalculator(
|
||||
model_path=args.model_path,
|
||||
scale_factor=args.scale_factor,
|
||||
width=args.width,
|
||||
height=args.height
|
||||
)
|
||||
|
||||
if not calculator.load_video(args.video_path):
|
||||
print(f"无法加载视频: {args.video_path}")
|
||||
return
|
||||
|
||||
print(f"开始处理视频: {args.video_path}")
|
||||
print(f"图像尺寸设置为: ({args.width}, {args.height})")
|
||||
|
||||
result = calculator.process_video(args.output_dir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
"""
|
||||
python cli_clearance.py \
|
||||
--video test.mp4 \
|
||||
--model best.pt \
|
||||
--output ./results \
|
||||
--scale_factor 0.02 \
|
||||
--width 640 \
|
||||
--height 480
|
||||
"""
|
||||
"""
|
||||
{
|
||||
"clearances": {
|
||||
"0": [5.12, 5.09, 5.11],
|
||||
"1": [4.98, 5.01],
|
||||
"2": []
|
||||
},
|
||||
"last_frame_clearances": {
|
||||
"tip1": 5.11,
|
||||
"tip2": 5.01,
|
||||
"tip3": null
|
||||
}
|
||||
}
|
||||
"""
|
Loading…
Reference in New Issue