AI_Agent/clearance.py

425 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
from ultralytics import YOLO
import os
import json
import argparse
class Target2D:
"""二维坐标下的叶尖跟踪器"""
def __init__(self, positions=None, hub=None):
if positions is None:
positions = np.array([[-100, -100], [-200, -200], [-300, -300]])
if hub is None:
hub = np.array([400, 300])
self.positions = np.array(positions, dtype=float) # 叶尖位置
self.hub = np.array(hub, dtype=float) # 轮毂位置
def update(self, new_positions, new_hub=None):
"""更新叶尖位置,使用最近点匹配算法"""
if new_positions is None or len(new_positions) == 0:
return self.positions, []
# 确保新位置是二维数组
new_positions = np.array(new_positions, dtype=float)
if new_positions.ndim == 1:
new_positions = new_positions.reshape(1, -1)
# 更新轮毂位置
if new_hub is not None:
self.hub = np.array(new_hub, dtype=float)
# 如果还没有初始化位置,使用新位置
if self.positions.size == 0:
self.positions = new_positions
return self.positions
# 创建位置列表副本
new_pos_list = new_positions.tolist()
update_positons = self.positions.copy()
used = set()
# 对于每个新位置,找到最近的点
for i, new_pos in enumerate(new_pos_list):
# 计算所有距离
distances = np.linalg.norm(self.positions - new_pos, axis=1)
available_indices = [j for j in range(len(distances)) if j not in used]
# 找到最近点的索引
closest_idx = min(available_indices, key=lambda j: distances[j])
# 记录ID并更新位置
update_positons[closest_idx] = new_pos # new_pos
# 更新位置
self.positions = np.array(update_positons)
return self.positions
class ClearanceCalculator:
def __init__(self, model_path, scale_factor=0.02,width=544, height=960):
self.size = (width, height)
# 初始化模型配置
self.model = YOLO(model_path)
self.cap = None
self.video_path = ""
self.is_playing = False
self.scale_factor = scale_factor
self.reference_pixel = 0.0
self.reference_actual = 0.0
self.current_clearance = 0.0
self.use_camera = False
self.hub_x_range = 100 # 轮毂x坐标的左右区间像素
self.hub_y_range = 50
self.inside_region = False # 轨迹是否在轮毂区域内
self.entered_from_left = False # 轨迹是否从左侧进入
self.last_pixel_clearance = 0.0 # 保存上一次的像素净空
self.last_actual_clearance = 0.0 # 保存上一次的实际净空
# 轨迹相关变量
self.trajectories = []
# self.lowest_points = []
self.calculate_points = []
self.hub_position = None
self.last_tip_positions = []
self.frame_count = 0
self.nearest_tip_trajectory = [] # 只存储与轮毂x坐标最近的叶尖轨迹
self.traj_cnt = 0
self.tip_clearances = {
0: None, # tip1
1: None, # tip2
2: None # tip3
}
# 净空历史记录
self.clearance_history = {0: [], 1: [], 2: []}
# 初始化轨迹颜色
self.tip_colors = [
(255, 0, 0), # 红色
(0, 255, 0), # 绿色
(0, 0, 255) # 蓝色
]
# 初始化2D跟踪器
self.tracker = Target2D()
def set_hub_x_range(self, x_value, y_value=None):
"""设置轮毂x和y坐标的区间"""
self.hub_x_range = x_value
if y_value is not None:
self.hub_y_range = y_value
def load_video(self, path):
"""加载视频文件"""
self.use_camera = False
self.video_path = path
self.cap = cv2.VideoCapture(path)
return self.cap.isOpened()
def process_frame(self, frame):
"""处理视频帧并检测叶尖和轮毂"""
frame = cv2.resize(frame, self.size)
# 使用模型进行检测
results = self.model(frame, verbose=False)
# 初始化检测结果字典
detected = {'tip': [], 'hub': []}
# 解析检测结果
for result in results:
boxes = result.boxes
classes = boxes.cls
class_names = result.names
for i in range(len(boxes)):
class_id = int(classes[i])
box = boxes.xyxy[i]
x1, y1, x2, y2 = box.tolist()
class_name = class_names[class_id]
# 只关注叶尖(tip)和轮毂(hub)
if class_name in detected:
# 使用底部中心点作为位置
if class_name == 'tip':
tip_x = (x1 + x2) / 2
tip_y = y2 # 使用底部作为叶尖位置
detected[class_name].append([tip_x, tip_y])
else: # hub
hub_x = (x1 + x2) / 2
hub_y = (y1 + y2) / 2
detected[class_name].append([hub_x, hub_y])
# 限制检测数量
if len(detected['tip']) > 3:
detected['tip'] = detected['tip'][:3]
if len(detected['hub']) > 1:
detected['hub'] = detected['hub'][:1]
return frame, detected['tip'], detected['hub']
def check_trajectory_state(self, tip, hub, thresh=20):
"""检查轨迹是否进入或离开轮毂区域"""
hub_x = hub[0] if hub else None
if hub_x is None or tip is None:
return None, None
# 检查轨迹是否在轮毂区域内
inside = (hub_x - self.hub_x_range) < tip[0] < (hub_x + self.hub_x_range)
# 如果轨迹进入区域,并且是从左侧进入
if inside and not self.inside_region:
self.inside_region = True
self.entered_from_left = tip[0] >= (hub_x - self.hub_x_range)
if inside and self.inside_region and self.entered_from_left and (hub_x - thresh) < tip[0] < (hub_x + thresh):
distance, actual_distance = self.calculate_clearance()
return distance, actual_distance
# 如果轨迹离开区域,并且是从左侧进入且从右侧离开
if not inside and self.inside_region and self.entered_from_left and tip[0] > (hub_x + self.hub_x_range):
self.inside_region = False
self.traj_cnt += 1
return None, None
def update_trajectories(self, tips, hub):
"""更新叶尖轨迹只关注轮毂x和y坐标附近的叶尖"""
# 使用2D跟踪器更新位置
self.tracker.update(tips, hub)
sorted_tips = self.tracker.positions
# 如果没有轮毂位置,无法判断附近
if hub is None or len(hub) == 0:
return None, None
hub_x = hub[0] # 轮毂的x坐标
hub_y = hub[1] # 轮毂的y坐标
# 初始化轨迹
if not self.trajectories:
for _ in range(len(sorted_tips)):
self.trajectories.append([])
# 找到与轮毂x和y坐标都最近的叶尖
nearest_tip = None
min_distance = float('inf')
for tip in sorted_tips:
tip_x, tip_y = tip
# 检查是否在轮毂x坐标左右区间内
if abs(tip_x - hub_x) > self.hub_x_range + 20:
continue
# 检查是否大于轮毂y坐标加y区间
if tip_y < hub_y + self.hub_y_range:
continue
# 计算与轮毂的综合距离考虑x
distance = abs(tip_x - hub_x) # + (tip_y - hub_y) ** 2
if distance < min_distance:
min_distance = distance
nearest_tip = tip
# 更新最近的叶尖轨迹
if nearest_tip is not None:
if not self.nearest_tip_trajectory:
self.nearest_tip_trajectory = [nearest_tip]
else:
self.nearest_tip_trajectory.append(nearest_tip)
# 限制轨迹长度
if len(self.nearest_tip_trajectory) > 200:
self.nearest_tip_trajectory.pop(0)
return self.check_trajectory_state(nearest_tip, hub)
def find_closest_tip_x(self):
"""找到与轮毂x坐标最接近的叶尖x坐标的叶尖"""
if not self.nearest_tip_trajectory:
return None
hub_x = self.hub_position[0] if self.hub_position else None
if hub_x is None:
return None
trajectories = np.array(self.nearest_tip_trajectory[:][0])
distances = np.abs(trajectories - hub_x)
idx = np.argmin(distances)
tip = self.nearest_tip_trajectory[idx]
self.calculate_points = [tip] # 只保留一个最低点
return tip
def calculate_clearance(self):
"""计算叶尖到轮毂的净空"""
if not self.nearest_tip_trajectory or self.hub_position is None:
return None, None
hub_position = self.hub_position
tip_x, tip_y = self.find_closest_tip_x()
self.calculate_points = [[tip_x, tip_y]] # 只保留一个最低点
hub_x, hub_y = hub_position
distance = np.sqrt((tip_x - hub_x) ** 2 + (tip_y - hub_y) ** 2)
# 更新UI显示净空
if self.scale_factor:
actual_distance = distance * self.scale_factor
tip_idx = self.traj_cnt % 3 # 根据实际匹配逻辑
self.tip_clearances[tip_idx] = actual_distance
# 记录到历史
self.clearance_history[tip_idx].append(actual_distance)
return distance, actual_distance
def draw_annotations(self, frame):
"""在帧上绘制轨迹和标记"""
# 绘制轮毂位置
if self.hub_position:
hub_x, hub_y = self.hub_position
# 绘制轮毂区域标记
cv2.rectangle(frame,
(int(hub_x - self.hub_x_range), int(hub_y - 10)),
(int(hub_x + self.hub_x_range), int(hub_y + 10)),
(0, 255, 255), 2)
cv2.circle(frame, (int(hub_x), int(hub_y)), 8, (0, 255, 255), -1)
cv2.putText(frame, "Hub", (int(hub_x) + 15, int(hub_y)),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# 绘制最近的叶尖轨迹
if len(self.nearest_tip_trajectory) >= 2:
# 绘制轨迹线(黄色)
for j in range(1, len(self.nearest_tip_trajectory)):
start_point = (int(self.nearest_tip_trajectory[j - 1][0]),
int(self.nearest_tip_trajectory[j - 1][1]))
end_point = (int(self.nearest_tip_trajectory[j][0]),
int(self.nearest_tip_trajectory[j][1]))
cv2.line(frame, start_point, end_point, (0, 255, 255), 1)
# 绘制三个叶尖的净空值
y_offset = 30
for i in range(3):
clearance = self.tip_clearances.get(i, None)
if clearance is not None:
text = f"tip{i + 1}: {clearance:.2f} m"
else:
text = f"tip{i + 1}: -- m"
cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
y_offset += 25
# 绘制其他叶尖(不绘制轨迹)
if self.tracker.positions.size > 0:
for i, tip in enumerate(self.tracker.positions):
if not any(np.array_equal(tip, self.nearest_tip_trajectory[-1]) for p in self.nearest_tip_trajectory if
len(self.nearest_tip_trajectory) > 0):
color = self.tip_colors[i % len(self.tip_colors)]
cv2.circle(frame, (int(tip[0]), int(tip[1])), 6, color, -1)
cv2.putText(frame, f"Tip {i + 1}", (int(tip[0]) + 15, int(tip[1])),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
return frame
def process_video(self, output_dir):
"""处理整个视频并保存结果"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
last_frame = None
frame_count = 0
while True:
ret, frame = self.cap.read()
if not ret:
break
# 处理帧
processed_frame, tips, hubs = self.process_frame(frame)
# 更新轮毂位置
if hubs:
self.hub_position = hubs[0]
# 更新轨迹
if tips and self.hub_position is not None:
pixel_clearance, actual_clearance = self.update_trajectories(tips, self.hub_position)
# 绘制注释
annotated_frame = self.draw_annotations(processed_frame)
last_frame = annotated_frame.copy()
frame_count += 1
# 显示处理进度
if frame_count % 10 == 0:
print(f"已处理 {frame_count} 帧...")
# 保存最后一帧
last_frame_path = os.path.join(output_dir, "last_frame.jpg")
cv2.imwrite(last_frame_path, last_frame)
print(f"最后一帧已保存至: {last_frame_path}")
# 准备结果数据
result = {
"clearance_history": {
"tip0": self.clearance_history[0],
"tip1": self.clearance_history[1],
"tip2": self.clearance_history[2]
},
"last_frame_clearances": {
"tip0": self.tip_clearances[0],
"tip1": self.tip_clearances[1],
"tip2": self.tip_clearances[2]
},
"last_frame_image": last_frame_path
}
# 保存JSON结果
json_path = os.path.join(output_dir, "results.json")
with open(json_path, 'w') as f:
json.dump(result, f, indent=4)
print(f"结果已保存至: {json_path}")
return result
def main():
parser = argparse.ArgumentParser(description='叶片净空计算系统')
parser.add_argument('--video_path', type=str, required=True, help='输入视频路径')
parser.add_argument('--model_path', type=str, required=True, help='模型文件路径')
parser.add_argument('--output_dir', type=str, required=True, help='结果保存目录')
parser.add_argument('--scale_factor', type=float, default=0.02, help='比例尺因子 (实际距离/像素距离)')
parser.add_argument('--width', type=int, default=544, help='图像宽度(像素)')
parser.add_argument('--height', type=int, default=960, help='图像高度(像素)')
args = parser.parse_args()
calculator = ClearanceCalculator(
model_path=args.model_path,
scale_factor=args.scale_factor,
width=args.width,
height=args.height
)
if not calculator.load_video(args.video_path):
print(f"无法加载视频: {args.video_path}")
return
print(f"开始处理视频: {args.video_path}")
print(f"图像尺寸设置为: ({args.width}, {args.height})")
result = calculator.process_video(args.output_dir)
if __name__ == "__main__":
main()
"""
python cli_clearance.py \
--video test.mp4 \
--model best.pt \
--output ./results \
--scale_factor 0.02 \
--width 640 \
--height 480
"""
"""
{
"clearances": {
"0": [5.12, 5.09, 5.11],
"1": [4.98, 5.01],
"2": []
},
"last_frame_clearances": {
"tip1": 5.11,
"tip2": 5.01,
"tip3": null
}
}
"""