Report_Generate_Server/tools/dataproccess.py

422 lines
16 KiB
Python
Raw Normal View History

import sys, os
from tools.Get_Json import get_defect_detail
from tools.defines import *
def caculate_work_days(start_date : str, end_date : str) -> str:
"""根据起止日期计算工期
Args:
start_date (str): 格式: yyyy-mm-ddThh:mm:ss
end_date (str): 格式: yyyy-mm-ddThh:mm:ss
Returns:
str: 计算出来的总工期单位为天
"""
import datetime
if start_date == None or end_date == None:
return f"开始时间{start_date} ---- 结束时间{end_date} 日期格式错误"
start_date = datetime.datetime.strptime(start_date, '%Y-%m-%dT%H:%M:%S')
end_date = datetime.datetime.strptime(end_date, '%Y-%m-%dT%H:%M:%S')
return (end_date - start_date).days
def get_year_month(date):
"""根据格式化date字符串获取年月 'date': '二〇二一年十二月十日 9:00'
Args: date (str): 日期字符串
Returns: 年月字符串 '二〇二一年十二月'
"""
unit_map = {'1' : '', '2' : '', '3' : '', '4' : '', '5' : '', '6' : '', '7' : '', '8' : '', '9' : '', '0' : ''}
unit_map_month = {1 : '', 2 : '', 3 : '', 4 : '', 5 : '', 6 : '', 7 : '', 8 : '', 9 : '', 10 : '', 11 : '十一', 12 : '十二'}
year = date.split('')[0]
month = date.split('')[1].split('')[0]
year = ''.join([unit_map[i] for i in year])
month = unit_map_month[int(month)]
return f"{year}{month}"
def merge_info(frontend_info, default_info):
"""
合并前端传入的 info 和默认 info
新规则以default_info为基础字典用frontend_info完全覆写取两者的并集
Args:
frontend_info: 前端传入的字典
default_info: 默认的完整字典
Returns:
合并后的完整字典
"""
if not isinstance(frontend_info, dict) or frontend_info is None:
return default_info.copy()
# 先复制默认字典
merged_info = default_info.copy()
# 用前端字典完全覆写
merged_info.update(frontend_info)
return merged_info
def merge_dicts(dict1, dict2):
# 创建一个新的字典来存储合并结果
merged_dict = {}
# 遍历第一个字典
for key, value_list in dict1.items():
if key in dict2:
# 如果键在第二个字典中存在,合并两个列表
merged_dict[key] = value_list + dict2[key]
else:
# 如果键在第二个字典中不存在,直接使用第一个字典的值列表
merged_dict[key] = value_list
# 遍历第二个字典
for key, value_list in dict2.items():
if key not in dict1:
# 如果键在第一个字典中不存在,直接使用第二个字典的值列表
merged_dict[key] = value_list
return merged_dict
def get_defect_str(Y_defect_list : list[dict] ) -> list:
"""将叶片缺陷信息转换为一条条描述信息
Args:
Y_defect_list (list):[
{
'record' : {'defectId': '02d892a178a82561bb565559102c7a58', 'imageId': '41543f531be24522b7bec741b9a483a2', 'defectName': '手动添加的缺陷1', 'defectCode': None, 'partName': '叶片1', 'defectTypeLabel': '表面裂纹', 'defectType': 'bmlw', 'defectLevelLabel': '轻微缺陷', 'defectLevel': 'SLIGHT', 'defectPosition': '', 'description': '手动添加的缺陷,请填写详细描述', 'repairIdea': '建议进行进一步检查', 'labelInfo': None, 'markInfo': {'label': None, 'clsId': None, 'bbox': None, 'confidence': 0.0}}
'imagePath' : '/image/path'
},
....
]
Returns:
result (list):
[
"叶片1表面裂纹轻微缺陷位于{defectPosition},建议进行进一步检查",
],
...
"""
result = []
for item in Y_defect_list:
record = item['record']
defect_type_label = record.get('defectTypeLabel', '未知类型')
defect_level_label = record.get('defectLevelLabel', '未知等级')
defect_position = record.get('defectPosition', '未知位置')
repair_idea = record.get('repairIdea', '无建议')
defect_description = f"{defect_type_label}{defect_level_label}缺陷,位于{defect_position}{repair_idea}"
result.append(defect_description)
return result
def safe_get(data, *keys, default=None):
"""
递归安全访问嵌套字典的键如果中间键不存在则返回默认值
Args:
data (dict): 要访问的字典
*keys: 要访问的键可以是多个 "叶片1", "裂纹"
default: 如果键不存在返回的默认值默认None
Returns:
如果所有键都存在返回对应的值否则返回 default
"""
if not keys or data is None:
return data if data is not None else default
current_key = keys[0]
if isinstance(data, dict):
return safe_get(data.get(current_key), *keys[1:], default=default)
else:
return default
def is_frozen():
return hasattr(sys, 'frozen') or hasattr(sys, '_MEIPASS')
def get_resource_path(relative_path):
""" 获取资源的路径 """
if is_frozen():
base_path = sys._MEIPASS
else:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def get_defedct_info(defect_dict):
"""获取按缺陷类型分类的缺陷信息
Args:
defect_dict (dict):
{'叶片1':
{'DEFECT':[{'record':
{
'defectId': '02d892a178a82561bb565559102c7a58',
'imageId': '41543f531be24522b7bec741b9a483a2',
'defectName': '手动添加的缺陷1',
'defectCode': None,
'partName': '叶片1',
'defectTypeLabel': '表面裂纹',
'defectType': 'bmlw',
'defectLevelLabel': '轻微缺陷',
'defectLevel': 'SLIGHT',
'defectPosition': '',
'description': '手动添加的缺陷,请填写详细描述',
'repairIdea': '建议进行进一步检查',
'labelInfo': None,
'markInfo': {'label': None, 'clsId': None, 'bbox': None, 'confidence': 0.0},
'imagePath': '/static/image/temp/out-work/12bc30fb209f3af3bf530541c5b062bd/F02_机组10060_叶片叶片PS面距叶根15.5米处轴向裂纹轴向260mm弦向20mm。DJI_20250526090227_0052_Z.png'
},
...
},]
}
},
...
Returns:
result (dict):
{
'叶片1': { #按叶片分记录
'表面裂纹': { 'SLIGHT': #按类型分等级,等级从'defectLevel'获取
[ #按等级分记录
{
record:{...}
}, #一张图片一个记录
...
]
...
}
}
}
result2 (dict):
{
'叶片1': { #按叶片分记录
'表面裂纹': [ #按类型分记录
"""
result = {} #缺陷等级数量列表
result2 = {} #缺陷树状列表
for part_name, part_info in defect_dict.items():
if part_name not in result:
result[part_name] = {}
result2[part_name] = {}
for defect_type, defect_list in part_info.items():
for defect_dict in defect_list:
record = defect_dict['record']
type_label = record['defectTypeLabel']
level = record['defectLevel']
if type_label not in result[part_name]:
result2[part_name][type_label] = []
if type_label not in result[part_name]:
result[part_name][type_label] = {}
if level not in result[part_name][type_label]:
result[part_name][type_label][level] = []
result[part_name][type_label][level].append(defect_dict)
p = record['defectPosition']
t = record['defectTypeLabel']
l = record['defectLevelLabel']
d = record['description']
did = record['defectId']
iid = record['imageId']
detail = get_defect_detail(did)
a = detail.get('axial', '')
c = detail.get('chordwise', '')
area = f"轴向:{a}mm弦向{c}mm"
record_info = [f"{p}{t}{l}{d}", area, iid]
result2[part_name][type_label].append(record_info)
return result, result2
def get_defect_json(defect_info: dict, Y_code: list[str], turbine_code: str) -> dict:
"""将缺陷信息转换为表格形式的JSON格式
Args:
defect_info: 结构为 {叶片1/2/3: {缺陷类型: [{'record': {...}}, ...]}}
Y_code: 叶片编号列表 ['1001', '1002', '1003']
turbine_code: 机组编号 'G01'
Returns:
{
"type": "table",
"content": {
"rows": 总行数,
"cols": 4,
"merged_cells": [合并信息],
"cells": [表格内容]
}
}
"""
# 初始化表格
cells = []
merged_cells = []
# 默认设置
default_border = {
"top": {"style": "single", "size": "4", "color": "000000"},
"left": {"style": "single", "size": "4", "color": "000000"},
"bottom": {"style": "single", "size": "4", "color": "000000"},
"right": {"style": "single", "size": "4", "color": "000000"}
}
# 1. 计算总行数(先计算再创建表头)
total_defect_rows = sum(len(defects) for blade_defects in defect_info.values()
for defects in blade_defects.values())
total_rows = 1 + total_defect_rows # 表头 + 数据行
# 2. 创建表头行第0行
header_row = [
create_cell(0, 0, "机组号", default_border),
create_cell(0, 1, "叶片号", default_border),
create_cell(0, 2, "损坏描述", default_border),
create_cell(0, 3, "缺陷面积", default_border)
]
cells.append(header_row)
# 3. 填充数据行从第1行开始
current_data_row = 1
# 合并机组号(如果有多行数据)
if total_defect_rows > 1:
merged_cells.append({
"start_row": 1, # 第2行(0-based中的1)
"start_col": 0,
"end_row": total_rows - 1, # 最后一行(0-based)
"end_col": 0
})
# 填充机组号(仅第一行数据行)
if total_defect_rows > 0:
first_data_row = [
create_cell(1, 0, turbine_code, default_border,
is_merged=total_defect_rows>1, is_primary=True),
None, None, None
]
cells.append(first_data_row)
# 按叶片顺序填充数据
for blade_idx, (blade_key, blade_defects) in enumerate(defect_info.items(), 1):
blade_code = Y_code[blade_idx - 1]
blade_defect_count = sum(len(defects) for defects in blade_defects.values())
if blade_defect_count > 1:
merged_cells.append({
"start_row": current_data_row, # 当前行(0-based)
"start_col": 1,
"end_row": current_data_row + blade_defect_count - 1, # 结束行(0-based)
"end_col": 1
})
# 填充缺陷数据
for defect_type, records in blade_defects.items():
for record in records:
# 确保不超过总行数
if current_row >= total_rows:
break
description = record.get('record', {}).get('损坏描述', '待填写')
area = record.get('record', {}).get('缺陷面积', '待填写')
is_primary = (not merged_cells or
current_row == merged_cells[-1]["start_row"])
row_data = [
None, # 机组号(已合并)
create_cell(current_row, 1, blade_code, default_border,
is_merged=blade_defect_count>1, is_primary=is_primary),
create_cell(current_row, 2, str(description), default_border),
create_cell(current_row, 3, str(area), default_border)
]
cells.append(row_data)
current_row += 1
# 填充空单元格
for row_idx, row in enumerate(cells):
for col_idx in range(4):
if row[col_idx] is None:
row[col_idx] = create_cell(row_idx, col_idx, "", default_border)
# 最终行数检查(调试用)
print(f"调试信息: 预期行数={total_rows}, 实际行数={len(cells)}")
print(f"缺陷信息统计: 叶片数={len(defect_info)}, 总缺陷数={total_defect_rows}")
return [{
"type": "table",
"content": {
"rows": len(cells), # 使用实际行数而非计算行数
"cols": 4,
"merged_cells": merged_cells,
"cells": cells
}
}]
def create_cell(row, col, text, border, is_merged=False, is_primary=False):
"""创建标准化的单元格结构"""
return {
"row": row,
"col": col,
"is_merged": is_merged,
"alignment": "center",
"border": border,
"merge_info": {"is_primary": is_primary},
"content": [{
"alignment": "center",
"runs": [{
"text": text,
"font": {
"name": "宋体",
"size": 12,
"bold": False,
"italic": False,
"underline": False,
"color": {"r": 0, "g": 0, "b": 0}
}
}]
}]
}
from typing import Dict, List, Any
def tree_dict_to_table_data(tree: Dict) -> List[List[Any]]:
"""
将树状字典转换为二维表格数据保留原始类型
参数:
tree: 树状字典结构为dict[dict[...[list]]]
返回:
二维列表每个子列表代表一条路径元素类型与输入一致确保最内层列表被展开
"""
result = []
def traverse(node: Dict, path: List[Any]) -> None:
for key, value in node.items():
current_path = path + [key] # 保留键的原始类型
if isinstance(value, dict):
traverse(value, current_path)
elif isinstance(value, list):
for item in value:
if isinstance(item, list):
# 如果列表项本身是列表,则展开它
result.append(current_path + item)
else:
# 否则作为单个元素添加
result.append(current_path + [item])
else:
result.append(current_path + [value]) # 保留值的原始类型
traverse(tree, [])
return result
def defect_list_addtitle(list_data: List[List[str]], turbine_code: str):
"""
给缺陷列表添加标题行
"""
list_data.insert(0, DEFECT_TABLE)
for i,l1 in enumerate(list_data):
if i == 0:
continue
l1.insert(0, turbine_code)
return list_data