完成图片的插入,典型图插入未测试

This commit is contained in:
Voge1imkafig 2025-07-24 18:00:03 +08:00
parent 165698c7a3
commit a6fa6ebf9a
22 changed files with 501 additions and 263 deletions

View File

@ -2,6 +2,7 @@
from tools.document_tools import ( from tools.document_tools import (
create_document, add_documents,add_table_and_replace, create_document, add_documents,add_table_and_replace,
add_table_to_document,process_images_table, add_table_to_document,process_images_table,
process_server_images_table
) )
# 内容处理工具 # 内容处理工具
@ -21,12 +22,13 @@ from tools.Get_Json import (
get_jizu_shigong_info,get_weather, get_jizu_shigong_info,get_weather,
get_part_picture,get_yepian_xiangqing, get_part_picture,get_yepian_xiangqing,
check_pic_url,get_full_picture_url, check_pic_url,get_full_picture_url,
get_defect_record_list get_defect_record_list,get_defect_detail
) )
from tools.dataproccess import ( from tools.dataproccess import (
caculate_work_days,add_dynamic_table, caculate_work_days,add_dynamic_table,
get_year_month,merge_info, get_year_month,merge_info,get_defect_str,
safe_get
) )
import asyncio import asyncio
@ -122,18 +124,8 @@ async def generate_report(base_info, baogao_info):
# weather = get_weather(shigong_data["weatherCode"]) #天气 不从此接口获取,待定!!! # weather = get_weather(shigong_data["weatherCode"]) #天气 不从此接口获取,待定!!!
#拉取部件、图片数据 #拉取部件、图片数据
part_data, picture_data = get_part_picture(turbine_id) part_data, picture_data, Yepians = get_part_picture(turbine_id)
#获取叶片信息
print(part_data)
Yepians = []
yepian_to_find = ["叶片1","叶片2","叶片3"]
#依次获取叶片123的信息未考虑多个同叶片的信息情况正常情况下不会发生
for name in yepian_to_find:
# 找到第一个匹配的部件并添加到列表中
for part in part_data:
if part['partName'] == name:
Yepians.append(part)
print(Yepians) print(Yepians)
Y1_info = get_yepian_xiangqing(Yepians[0]["partId"]) Y1_info = get_yepian_xiangqing(Yepians[0]["partId"])
Y_Code = [yepian["partCode"] for yepian in Yepians] Y_Code = [yepian["partCode"] for yepian in Yepians]
@ -149,6 +141,7 @@ async def generate_report(base_info, baogao_info):
beizhu = [] beizhu = []
jiancha = [] jiancha = []
neirong = [] neirong = []
neirong2 = []
#获取对应枚举字段 #获取对应枚举字段
if if_waibu: if if_waibu:
baogao_label.append("外观") baogao_label.append("外观")
@ -163,6 +156,7 @@ async def generate_report(base_info, baogao_info):
pass #待添加如果从平台传入枚举,但目前没必要,如果这种枚举标准信息库有更好的优化则可以实现 pass #待添加如果从平台传入枚举,但目前没必要,如果这种枚举标准信息库有更好的优化则可以实现
jiancha.append("无人机近距离外观检查") jiancha.append("无人机近距离外观检查")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的前缘、后缘、迎风面、背风面。") neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的前缘、后缘、迎风面、背风面。")
neirong2.append("前缘、后缘、迎风面、背风面。")
if if_neibu: if if_neibu:
baogao_label.append("内部") baogao_label.append("内部")
image_source_to_find.append(baogao_info['neibu_enum']) image_source_to_find.append(baogao_info['neibu_enum'])
@ -175,6 +169,7 @@ async def generate_report(base_info, baogao_info):
pass pass
jiancha.append("人工内部拍摄") jiancha.append("人工内部拍摄")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的内部导雷卡、腹板、透光、人孔盖版、叶根盖板...") neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
neirong2.append("内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
if if_fanglei: if if_fanglei:
baogao_label.append("防雷") baogao_label.append("防雷")
image_source_to_find.append(baogao_info['fanglei_enum']) image_source_to_find.append(baogao_info['fanglei_enum'])
@ -187,20 +182,17 @@ async def generate_report(base_info, baogao_info):
pass pass
jiancha.append("人工防雷") jiancha.append("人工防雷")
neirong.append(f"轮毂至塔基导通、内部导线线阻、外部导线线阻...") neirong.append(f"轮毂至塔基导通、内部导线线阻、外部导线线阻...")
neirong2.append("轮毂至塔基导通、内部导线线阻、外部导线线阻...")
#获取缺陷图列表和典型图列表 #获取缺陷图列表和典型图列表
(defect_pictures, typical_pictures, filtered_picture_data, total_picture_num = process_picture_data(picture_data, image_source_to_find)
other_pictures, normal_picture_num) = process_picture_data(picture_data, image_source_to_find,
quexian_type, dianxing_type, other_type)
print(f"\n\n\n缺陷图片列表:{defect_pictures}\n\n\n典型图片列表:{typical_pictures}\n\n\n")
#获取过滤后所有缺陷图的id
defect_pictures_id = [pic for pic in defect_pictures]
#获取所有缺陷记录 #获取所有缺陷记录
defect_records = get_defect_record_list() defect_records = get_defect_record_list()
#获取缺陷记录中对应图片的记录 #获取缺陷记录中对应图片的记录
defect_records_with_pic, error_pic_with_no_record = get_records_with_pic(defect_records, defect_pictures_id) defect_records_with_pic, error_pic_with_no_record = get_records_with_pic(defect_records, filtered_picture_data, quexian_type)
print(f"对应缺陷图的缺陷记录列表:{defect_records_with_pic}\n 没有缺陷记录但是是缺陷图片的列表:{error_pic_with_no_record}") if len(error_pic_with_no_record) > 0:
print(f"!!!有部分缺陷图片没有对应的缺陷记录,将不会生成这些图片的缺陷表:{error_pic_with_no_record} ")
print(f"对应缺陷图的缺陷记录列表:{defect_records_with_pic}")
except Exception as e: except Exception as e:
print(f"报告基本信息获取失败:{e}") print(f"报告基本信息获取失败:{e}")
return return
@ -234,7 +226,7 @@ async def generate_report(base_info, baogao_info):
print(await add_picture(output_dir, cover_dirs[1])) print(await add_picture(output_dir, cover_dirs[1]))
print(add_documents(output_dir, cover_dirs[2])) print(add_documents(output_dir, cover_dirs[2]))
print("封面创建成功") print("封面创建成功")
#YYYY年MM月DD日 HH:MM:SS
#更改文档信息 #更改文档信息
print(await search_and_replace(output_dir, TITLE_OF_REPORT, jizu_bianhao)) print(await search_and_replace(output_dir, TITLE_OF_REPORT, jizu_bianhao))
print(await search_and_replace(output_dir, baogao_name1, baogao_name)) print(await search_and_replace(output_dir, baogao_name1, baogao_name))
@ -311,7 +303,7 @@ async def generate_report(base_info, baogao_info):
JIANCHA_XINGXI[5][1] = Y_Code[0] if len(Y_Code) > 0 else "" JIANCHA_XINGXI[5][1] = Y_Code[0] if len(Y_Code) > 0 else ""
JIANCHA_XINGXI[6][1] = Y_Code[1] if len(Y_Code) > 1 else "" JIANCHA_XINGXI[6][1] = Y_Code[1] if len(Y_Code) > 1 else ""
JIANCHA_XINGXI[7][1] = Y_Code[2] if len(Y_Code) > 2 else "" JIANCHA_XINGXI[7][1] = Y_Code[2] if len(Y_Code) > 2 else ""
JIANCHA_XINGXI[8][0] = "本次" + "".join(_ for _ in jiancha) + f"检查,采集叶片图片{normal_picture_num}张,内容覆盖" + ";".join(_ for _ in neirong) JIANCHA_XINGXI[8][0] = "本次" + "".join(_ for _ in jiancha) + f"检查,采集叶片图片{total_picture_num}张,内容覆盖" + ";".join(_ for _ in neirong)
#新建检查信息表 #新建检查信息表
output_doc, message = await add_table_to_document(output_dir, JIANCHA_XINGXI_DIR,9,4,total_table_num ,JIANCHA_XINGXI,False) output_doc, message = await add_table_to_document(output_dir, JIANCHA_XINGXI_DIR,9,4,total_table_num ,JIANCHA_XINGXI,False)
print(message) print(message)
@ -358,15 +350,20 @@ async def generate_report(base_info, baogao_info):
最后将无异常的图片入队缺陷字典同数量有异常的图片再另存 最后将无异常的图片入队缺陷字典同数量有异常的图片再另存
""" """
Y1_quexian_dict = {} Y1_quexian_list = []
Y2_quexian_dict = {} Y2_quexian_list = []
Y3_quexian_dict = {} Y3_quexian_list = []
Y1_quexian_num = Y1_quexian_dict.keys() Y1_quexian_list = defect_records_with_pic["叶片1"]["DEFECT"] if "叶片1" in defect_records_with_pic else {}
Y2_quexian_num = Y2_quexian_dict.keys() Y2_quexian_list = defect_records_with_pic["叶片2"]["DEFECT"] if "叶片2" in defect_records_with_pic else {}
Y3_quexian_num = Y3_quexian_dict.keys() Y3_quexian_list = defect_records_with_pic["叶片3"]["DEFECT"] if "叶片3" in defect_records_with_pic else {}
weak_num_Y1 = f"{Y_Code[0] if len(Y_Code) > 0 else ''}叶片共发现缺陷{Y1_quexian_num}"
weak_num_Y2 = f"{Y_Code[1] if len(Y_Code) > 1 else ''}叶片共发现缺陷{Y2_quexian_num}" Y1_quexian_num = len(Y1_quexian_list)
weak_num_Y3 = f"{Y_Code[2] if len(Y_Code) > 2 else ''}叶片共发现缺陷{Y3_quexian_num}" Y2_quexian_num = len(Y2_quexian_list)
Y3_quexian_num = len(Y3_quexian_list)
no_defect_found = "未发现明显缺陷"
weak_num_Y1 = f"{Y_Code[0] if len(Y_Code) > 0 else ''}叶片" + f"共发现缺陷{Y1_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
weak_num_Y2 = f"{Y_Code[1] if len(Y_Code) > 1 else ''}叶片" + f"共发现缺陷{Y2_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
weak_num_Y3 = f"{Y_Code[2] if len(Y_Code) > 2 else ''}叶片" + f"共发现缺陷{Y3_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
except Exception as e: except Exception as e:
print(f"缺陷图获取失败:{e}") print(f"缺陷图获取失败:{e}")
return return
@ -378,12 +375,12 @@ async def generate_report(base_info, baogao_info):
JIANCHA_HUIZONG[1][0] = weak_num_Y1 JIANCHA_HUIZONG[1][0] = weak_num_Y1
JIANCHA_HUIZONG[2][0] = weak_num_Y2 JIANCHA_HUIZONG[2][0] = weak_num_Y2
JIANCHA_HUIZONG[3][0] = weak_num_Y3 JIANCHA_HUIZONG[3][0] = weak_num_Y3
JIANCHA_HUIZONG[1][1] = "\n".join(neirong) JIANCHA_HUIZONG[1][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[2][1] = "\n".join(neirong) JIANCHA_HUIZONG[2][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[3][1] = "\n".join(neirong) JIANCHA_HUIZONG[3][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[1][2] = "\n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y1_quexian_dict.items())]) if Y1_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷' JIANCHA_HUIZONG[1][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y1_quexian_list))]) if Y1_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
JIANCHA_HUIZONG[2][2] = "\n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y2_quexian_dict.items())]) if Y2_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷' JIANCHA_HUIZONG[2][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y2_quexian_list))]) if Y2_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
JIANCHA_HUIZONG[3][2] = "\n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y3_quexian_dict.items())]) if Y3_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷' JIANCHA_HUIZONG[3][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y3_quexian_list))]) if Y3_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
output_doc, message = await add_table_to_document(output_dir, JIANCHA_HUIZONG_DIR,4,3,total_table_num,JIANCHA_HUIZONG,False,ALIGMENT='LEFT') output_doc, message = await add_table_to_document(output_dir, JIANCHA_HUIZONG_DIR,4,3,total_table_num,JIANCHA_HUIZONG,False,ALIGMENT='LEFT')
print(message) print(message)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
@ -394,61 +391,106 @@ async def generate_report(base_info, baogao_info):
#获取典型图信息 #获取典型图信息
search_file_list = ["外汇总","内汇总","防汇总"] search_file_list = ["外汇总","内汇总","防汇总"]
try: try:
picture_Y1_num, Y1_dict = collect_defect_data(Y1, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei) Y1_list = safe_get(filtered_picture_data, "叶片1", dianxing_type, default=[])
picture_Y2_num, Y2_dict = collect_defect_data(Y2, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei) Y2_list = safe_get(filtered_picture_data, "叶片2", dianxing_type, default=[])
picture_Y3_num, Y3_dict = collect_defect_data(Y3, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei) Y3_list = safe_get(filtered_picture_data, "叶片3", dianxing_type, default=[])
picture_Y1_num = len(Y1_list)
picture_Y2_num = len(Y2_list)
picture_Y3_num = len(Y3_list)
except Exception as e: except Exception as e:
print(f"获取缺陷图失败:{e}") print(f"获取缺陷图失败:{e}")
print(f"图片、文字数量:{picture_Y1_num} {picture_Y2_num} {picture_Y3_num}") print(f"图片、文字数量:{picture_Y1_num} {picture_Y2_num} {picture_Y3_num}")
JIANCHA_NEIRONG_TOTAL_NUM = picture_Y1_num+ picture_Y2_num + picture_Y3_num JIANCHA_NEIRONG_TOTAL_NUM = picture_Y1_num+ picture_Y2_num + picture_Y3_num
col ,row = 3, 0 if JIANCHA_NEIRONG_TOTAL_NUM <= 0:
JIANCHA_NEIRONG_PICTURES_TABLE = os.path.join(muban_dir,"check2.docx") print("无典型图片数据,不生成典型图表")
JIANCHA_NEIRONG_Y1_DIR = os.path.join(muban_dir,"check_content.docx") else:
JIANCHA_NEIRONG_Y1 = list(list("" for _ in range(3)) for j in range(1)) col ,row = 3, 0
JIANCHA_NEIRONG_Y1[0][0] = f"叶片1{Y1}检查内容" JIANCHA_NEIRONG_PICTURES_TABLE = os.path.join(muban_dir,"check2.docx")
print(f"Y1标题内容{JIANCHA_NEIRONG_Y1}") JIANCHA_NEIRONG_Y1_DIR = os.path.join(muban_dir,"check_content.docx")
JIANCHA_NEIRONG_Y2_DIR = os.path.join(muban_dir,"check3.docx") JIANCHA_NEIRONG_Y1 = list(list("" for _ in range(3)) for j in range(1))
JIANCHA_NEIRONG_Y2 = list(list("" for _ in range(3)) for j in range(1)) Y1_code = Y_Code[0] if len(Y_Code) > 0 else ""
JIANCHA_NEIRONG_Y2[0][0] = f"叶片2{Y2}检查内容" Y2_code = Y_Code[1] if len(Y_Code) > 1 else ""
print(f"Y2标题内容{JIANCHA_NEIRONG_Y2}") Y3_code = Y_Code[2] if len(Y_Code) > 2 else ""
JIANCHA_NEIRONG_Y3_DIR = os.path.join(muban_dir,"check3.docx") JIANCHA_NEIRONG_Y1[0][0] = f"叶片1{Y1_code}检查内容"
JIANCHA_NEIRONG_Y3 = list(list("" for _ in range(3)) for j in range(1)) print(f"Y1标题内容{JIANCHA_NEIRONG_Y1}")
JIANCHA_NEIRONG_Y3[0][0] = f"叶片3{Y3}检查内容" JIANCHA_NEIRONG_Y2_DIR = os.path.join(muban_dir,"check3.docx")
print(f"Y3标题内容{JIANCHA_NEIRONG_Y3}") JIANCHA_NEIRONG_Y2 = list(list("" for _ in range(3)) for j in range(1))
print(f"当前表格序号为 {total_table_num}") JIANCHA_NEIRONG_Y2[0][0] = f"叶片2{Y2_code}检查内容"
print(key_words) print(f"Y2标题内容{JIANCHA_NEIRONG_Y2}")
output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y1_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y1,True, 1) JIANCHA_NEIRONG_Y3_DIR = os.path.join(muban_dir,"check3.docx")
print(message) JIANCHA_NEIRONG_Y3 = list(list("" for _ in range(3)) for j in range(1))
total_table_num += 1 JIANCHA_NEIRONG_Y3[0][0] = f"叶片3{Y3_code}检查内容"
print(f"Y3标题内容{JIANCHA_NEIRONG_Y3}")
total_table_num = await process_images_table(Y1_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words) print(f"当前表格序号为 {total_table_num}")
print(key_words)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y2_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y2,True, 1) output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y1_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y1,True, 1)
print(message) print(message)
total_table_num += 1 total_table_num += 1
total_table_num = await process_images_table(Y2_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words) total_table_num = await process_server_images_table(Y1_list, image_source_to_find, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y3_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y3,True, 1) output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y2_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y2,True, 1)
print(message) print(message)
total_table_num += 1 total_table_num += 1
total_table_num = await process_images_table(Y3_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words) total_table_num = await process_server_images_table(Y2_list, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1 output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y3_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y3,True, 1)
print(message)
total_table_num += 1
total_table_num = await process_server_images_table(Y3_list, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
# #缺陷详情 # #缺陷详情
# QUEXIAN_XIANGQING_DIR = os.path.join(muban_dir,"check_check.docx") QUEXIAN_XIANGQING_DIR = os.path.join(muban_dir,"check_check.docx")
# QUEXIAN_XIANGQING_TITLE_DIR = os.path.join(muban_dir,"check_check_title.docx") QUEXIAN_XIANGQING_TITLE_DIR = os.path.join(muban_dir,"check_check_title.docx")
# Y_tables = [Y1_quexian_dict,Y2_quexian_dict,Y3_quexian_dict] Y_tables = [Y1_quexian_list,Y2_quexian_list,Y3_quexian_list]
# Y1_table_list = [] Y1_table_list = []
# Y2_table_list = [] Y2_table_list = []
# Y3_table_list = [] Y3_table_list = []
# table_lists = [Y1_table_list, Y2_table_list, Y3_table_list] table_lists = [Y1_table_list, Y2_table_list, Y3_table_list]
for i, (table_list, Y_lists) in enumerate(zip(table_lists, Y_tables)):
for Y_list in Y_lists:
image_defect_record = Y_list["record"]
image_path = Y_list["imagePath"]
image_defect_record = get_defect_detail(image_defect_record["defectId"])
# 从imagedefectrecord中获取各个字段
defect_type = image_defect_record.get('defectTypeLabel', '未知缺陷类型')
defect_location = f"{image_defect_record.get('partName', '')}{image_defect_record.get('defectPosition', '')}"
# 使用axial和chordwise作为缺陷尺寸信息
axial = image_defect_record.get('axial', '')
chordwise = image_defect_record.get('chordwise', '')
defect_size = f"轴向:{axial} 弦向:{chordwise}" if axial or chordwise else ''
visibility = "暂无此项"
urgency = "暂无此项"
severity = image_defect_record.get('defectLevelLabel', '未知严重等级')
repair_suggestion = image_defect_record.get('repairIdea', '无维修建议')
print(f"获取第{i+1}个叶片的缺陷图: {image_path}")
table_list.append({
"QueXianLeiXing": defect_type,
"QueXianWeiZhi": defect_location,
"QueXianChiCun": defect_size,
"WeiZongDengJi": severity,
"Tupian_Dir": get_full_picture_url(image_path) if check_pic_url(image_path) else None,
"visibility": visibility,
"urgency": urgency,
"repair_suggestion": repair_suggestion, # 新增维修建议字段
})
# for i, (table_list, Y_dict) in enumerate(zip(table_lists, Y_tables)): # for i, (table_list, Y_dict) in enumerate(zip(table_lists, Y_tables)):
# for image_name, image_path in Y_dict.items(): # for image_defect_record, image_path in Y_dict.items():
# # 从图片名解析各个字段 # # 从图片名解析各个字段
# parts = image_name.split('_') # parts = image_name.split('_')
# if len(parts) >= 8: # 确保有7个部分 # if len(parts) >= 8: # 确保有7个部分
@ -485,17 +527,17 @@ async def generate_report(base_info, baogao_info):
# }) # })
# Y1_TABLES, Y1_TABLES_PICTURES = fill_tables(table_lists[0],4,5,len(table_lists[0]),Y1) Y1_TABLES, Y1_TABLES_PICTURES = fill_tables(table_lists[0],4,5,len(table_lists[0]),Y_Code[0] if len(Y_Code) > 0 else "")
# Y2_TABLES, Y2_TABLES_PICTURES = fill_tables(table_lists[1],4,5,len(table_lists[1]),Y2) Y2_TABLES, Y2_TABLES_PICTURES = fill_tables(table_lists[1],4,5,len(table_lists[1]),Y_Code[1] if len(Y_Code) > 1 else "")
# Y3_TABLES, Y3_TABLES_PICTURES = fill_tables(table_lists[2],4,5,len(table_lists[2]),Y3) Y3_TABLES, Y3_TABLES_PICTURES = fill_tables(table_lists[2],4,5,len(table_lists[2]),Y_Code[2] if len(Y_Code) > 2 else "")
# print(add_documents(output_dir, QUEXIAN_XIANGQING_TITLE_DIR)) print(add_documents(output_dir, QUEXIAN_XIANGQING_TITLE_DIR))
# print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
# head_num += 1 head_num += 1
# table_num = 0 table_num = 0
# Xu_Hao = 0 Xu_Hao = 0
# total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y1_TABLES,QUEXIAN_XIANGQING_DIR,Y1_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao) total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y1_TABLES,QUEXIAN_XIANGQING_DIR,Y1_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
# total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y2_TABLES,QUEXIAN_XIANGQING_DIR,Y2_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao) total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y2_TABLES,QUEXIAN_XIANGQING_DIR,Y2_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
# total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y3_TABLES,QUEXIAN_XIANGQING_DIR,Y3_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao) total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y3_TABLES,QUEXIAN_XIANGQING_DIR,Y3_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
#总结 #总结
@ -538,6 +580,7 @@ def parse_arguments():
'jiancha_renyuan': '未获取', 'jiancha_renyuan': '未获取',
"check_date" : None, "check_date" : None,
"coverurl" : None, "coverurl" : None,
"conclusion" : "未填写总结",
} }
parser.add_argument('--turbine_id', '--id', dest='turbine_id', default=default_json1['turbine_id'], parser.add_argument('--turbine_id', '--id', dest='turbine_id', default=default_json1['turbine_id'],
@ -563,6 +606,11 @@ def parse_arguments():
help='报告是否包含防雷检测,默认为True') help='报告是否包含防雷检测,默认为True')
parser.add_argument('--reportchecker', '--rc', dest='baogaoCheck', default=default_json2['baogaoCheck'], parser.add_argument('--reportchecker', '--rc', dest='baogaoCheck', default=default_json2['baogaoCheck'],
help='报告审核状态,默认为未审核') help='报告审核状态,默认为未审核')
parser.add_argument('--conclusion', '--con', dest='conclusion', default=default_json2['conclusion'],
help='总结,默认为未填写总结')
parser.add_argument('--keywords', '--kw', dest='keywords', default=default_json2['key_words'],
help='关键字,用于汇总图的名字包含缺陷时标红,匹配逻辑为正则匹配单个字则为红 后续可优化')
parser.add_argument('--coverurl', default=default_json2['coverurl'], help='封面图片url,指定coverurl则不使用模板中的封面图片')
parser.add_argument('--dianxing_enum', '--dx', dest='dianxing_enum', default=default_json2['dianxing_enum'], parser.add_argument('--dianxing_enum', '--dx', dest='dianxing_enum', default=default_json2['dianxing_enum'],
help='典型图的枚举值,默认为TYPICAL') help='典型图的枚举值,默认为TYPICAL')
parser.add_argument('--quexian_enum', '--qx', dest='quexian_enum', default=default_json2['quexian_enum'], parser.add_argument('--quexian_enum', '--qx', dest='quexian_enum', default=default_json2['quexian_enum'],
@ -575,10 +623,6 @@ def parse_arguments():
help='内部检测的枚举值,默认为in_work') help='内部检测的枚举值,默认为in_work')
parser.add_argument('--fanglei_enum', '--fb', dest='fanglei_enum', default=default_json2['fanglei_enum'], parser.add_argument('--fanglei_enum', '--fb', dest='fanglei_enum', default=default_json2['fanglei_enum'],
help='防雷检测的枚举值,默认为lightning-protection-work') help='防雷检测的枚举值,默认为lightning-protection-work')
parser.add_argument('--keywords', '--kw', dest='keywords', default=default_json2['key_words'],
help='关键字,用于汇总图的名字包含缺陷时标红,匹配逻辑为正则匹配单个字则为红 后续可优化')
parser.add_argument('--coverurl', default=default_json2['coverurl'], help='封面图片url,指定coverurl则不使用模板中的封面图片')
return parser.parse_args() return parser.parse_args()
def main(): def main():
@ -608,6 +652,7 @@ def main():
"check_date" : args.check_date, "check_date" : args.check_date,
"data_processor" : args.data_processor, "data_processor" : args.data_processor,
"coverurl" : args.coverurl, "coverurl" : args.coverurl,
"baogao_zongjie" : args.conclusion,
} }
json_data2['shengcheng_dir'] = r"/home/dtyx/桌面/yhh/Report_Generate_Server/output" json_data2['shengcheng_dir'] = r"/home/dtyx/桌面/yhh/Report_Generate_Server/output"

View File

@ -6,3 +6,5 @@ GETWEATHERINFO = "/weather-type/{weatherCode}"
GETPICTURELIST = "/image/list" GETPICTURELIST = "/image/list"
GETPARTLIST = "/part/list" GETPARTLIST = "/part/list"
GETYEPIANINFO = "/part/detail/{partId}" GETYEPIANINFO = "/part/detail/{partId}"
DEFECTRECORDLIST = "/defect/list"
DEFECTDETAIL = "/defect/detail/{defectId}"

View File

@ -1,6 +1,7 @@
import requests import requests
import json import json
from tools.API import * from tools.API import *
from tools.dataproccess import merge_dicts
def get_data(url : str, data_type : str = "data", params : dict = None ) -> dict: def get_data(url : str, data_type : str = "data", params : dict = None ) -> dict:
headers = { headers = {
@ -41,10 +42,18 @@ def get_weather(weatherid : str) -> dict:
return get_data(weatherurl, "data") return get_data(weatherurl, "data")
def get_defect_record_list() -> list[dict]: def get_defect_record_list() -> list[dict]:
url = DTURL + "/defect/list" url = DTURL + DEFECTRECORDLIST
return get_data(url) return get_data(url)
def get_part_list(turbineId : str) -> dict: def get_part_list(turbineId : str) -> dict:
"""获取对应机组所有部件信息
Args:
turbineId (str): 机组ID
Return:
list: 包含所有部件完整信息的列表
list: 包含所有部件id的列表
"""
parturl = DTURL + GETPARTLIST parturl = DTURL + GETPARTLIST
params = { params = {
"turbineId" : turbineId "turbineId" : turbineId
@ -53,53 +62,44 @@ def get_part_list(turbineId : str) -> dict:
print(f"获取到部件{result}") print(f"获取到部件{result}")
return result, [item["partId"] for item in result] return result, [item["partId"] for item in result]
def get_part_picture(turbineId : str) -> tuple[list[dict], list[dict]]: def get_part_picture(turbineId : str) -> tuple[list[dict], dict[list], list[str]]:
"""获取对应机组所有图片 """获取对应机组所有图片
Args: Args:
turbineId (str): 机组ID turbineId (str): 机组ID
Return: Return:
list: 包含所有对应机组的部件信息的列表 list: 包含所有对应机组的部件信息的列表
list: 包含所有图片信息的列表(按部件分list) dict[list]: 包含所有图片信息的列表(按部件分list)
[ Yepians: 包含所有叶片信息的列表
{
"imageId": "图片id",
"imageName": "图片名",
"imagePath": "图片路径",
"partId": "部件id",
"partName": "部件名",
"imageResolution": "字符串",
"focalDistance": "字符串/None",
"shootingTime": "字符串/None",
"cameraManufacturer": "字符串/None",
"cameraModel": "字符串/None",
"weather": "字符串/None",
"weatherLabel": "字符串/None",
"humidness": "数值/None",
"temperature": "字符串/None",
"windLevel": "数值/None",
"shootingMethod": "字符串/None",
"shootingMethodLabel": "字符串/None",
"shootingDistance": "数值/None",
"collectorName": "字符串/None",
"imageType": "None",
"imageTypeLabel": "None",
"audioList": "None",
"gps": "字符串/None"
},
...
]
""" """
picturerul = DTURL + GETPICTURELIST picturerul = DTURL + GETPICTURELIST
part_data, part_list = get_part_list(turbineId) part_data, part_list = get_part_list(turbineId)
result = [] #先按顺序找叶片123
for part_id in part_list: Yepians = []
params = { yepian_to_find = ["叶片1","叶片2","叶片3"]
"partId" : part_id, #依次获取叶片123的信息未考虑多个同叶片的信息情况正常情况下不会发生
} try:
for images in get_data(picturerul, params = params): part_result = {}
result.append(images) yepian_part_result = {}
print(f"图片数据获取成功:{result}") for name in yepian_to_find:
return part_data, result # 找到第一个匹配的部件并添加到列表中
for part in part_data:
if part['partName'] == name:
Yepians.append(part)
for image in get_data(picturerul, params = {"partId" : part["partId"]}):
if part["partName"] not in yepian_part_result:
yepian_part_result[part["partName"]] = []
yepian_part_result[part["partName"]].append(image)
elif part['partName'] not in part_result and part['partName'] not in yepian_to_find:
for image in get_data(picturerul, params = {"partId" : part["partId"]}):
if part["partName"] not in part_result:
part_result[part["partName"]] = []
part_result[part["partName"]].append(image)
except Exception as e:
print(f"获取叶片信息失败,异常:{e}")
return None, None, None
#叶片和其它部件图片入队整合
result = merge_dicts(yepian_part_result, part_result)
return part_data, result, Yepians
def get_yepian_xiangqing(yepian_id : str) -> dict: def get_yepian_xiangqing(yepian_id : str) -> dict:
url = GETYEPIANINFO.format(partId=yepian_id) url = GETYEPIANINFO.format(partId=yepian_id)
@ -137,9 +137,9 @@ def check_pic_url(pic_url: str, timeout: int = 5) -> bool:
bool: True 如果是有效图片否则 False bool: True 如果是有效图片否则 False
""" """
print(f"检查图片URL{pic_url}") print(f"检查图片URL{pic_url}")
pic_url = get_full_picture_url(pic_url)
if pic_url is None: if pic_url is None:
return False return False
pic_url = get_full_picture_url(pic_url)
try: try:
# 发起HEAD请求更快仅获取头部信息 # 发起HEAD请求更快仅获取头部信息
response = requests.head(pic_url, timeout=timeout, allow_redirects=True) response = requests.head(pic_url, timeout=timeout, allow_redirects=True)
@ -155,4 +155,16 @@ def check_pic_url(pic_url: str, timeout: int = 5) -> bool:
return True return True
except Exception as e: except Exception as e:
print(f"请求图片失败, 未知错误{e}") print(f"请求图片失败, 未知错误{e}")
return False return False
def get_defect_detail(defectId : str) -> dict:
"""获取缺陷详情
Args:
defectId (str): 缺陷ID
Returns:
dict: 缺陷详情
"""
url = DTURL + DEFECTDETAIL.format(defectId=defectId)
return get_data(url, "data")

View File

@ -307,91 +307,91 @@ async def add_table(filename: str, rows: int, cols: int, data: Optional[List[Lis
except Exception as e: except Exception as e:
return f"Failed to add table: {str(e)}" return f"Failed to add table: {str(e)}"
async def add_picture_to_table(target_doc: Document, target_filename: str, row: int, col: int, image_path: str,table_num: int = -1, width: Optional[float] = None) -> str: async def add_picture_to_table(
"""向文档中对应表格添加图片 target_doc: Document,
target_filename: str,
Args: row: int,
target_doc: 目标文档 col: int,
target_filename: 目标文档保存路径 image_path: str,
row: 表格行数 table_num: int = -1,
col: 表格列数 width: Optional[float] = None
image_path: 图片路径 ) -> str:
table_num: 表格序号默认为-1即最后一个表格 """修正版图片添加函数(解决图片不显示问题)"""
width: 图片宽度默认为None表示使用原始图片大小
"""
from PIL import Image from PIL import Image
if not os.path.exists(image_path): from io import BytesIO
return f"Image file not found: {image_path}" import requests
# Check image file size is_url = image_path.startswith(("http://", "https://"))
try: image_bytes = None
image_size = os.path.getsize(image_path) / 1024 # Size in KB
if image_size <= 0:
return f"Image file appears to be empty: {image_path} (0 KB)"
elif image_size > 9126:
# Create the output directory if it doesn't exist
output_dir = os.path.join(os.path.dirname(image_path), "压缩图片")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Define the output path for the compressed image
image_name = os.path.basename(image_path)
output_path = os.path.join(output_dir, image_name)
# Compress the image
while image_size > 9126:
print(f"压缩图片:{image_path} ({image_size:.2f} KB) -> {output_path} (9126 KB)")
with Image.open(image_path) as img:
img.save(output_path, optimize=True, quality=85)
image_size = os.path.getsize(output_path) / 1024 # Size in KB
# Update the image path to the compressed image path
image_path = output_path
except Exception as size_error:
return f"Error checking image file: {str(size_error)}"
try: try:
# 1. 获取图片数据
if is_url:
response = requests.get(image_path, timeout=30)
response.raise_for_status()
image_bytes = BytesIO(response.content)
else:
if not os.path.exists(image_path):
return f"Image not found: {image_path}"
with open(image_path, 'rb') as f:
image_bytes = BytesIO(f.read())
# 2. 准备图片数据(关键步骤)
img = Image.open(image_bytes)
final_bytes = BytesIO()
# 转换为Word兼容的最佳格式
if img.mode == 'RGBA':
img.save(final_bytes, format='PNG')
else:
img.save(final_bytes, format='JPEG', quality=85)
final_bytes.seek(0) # ⚠️ 必须重置指针!
# 3. 添加到文档
table = target_doc.tables[table_num] table = target_doc.tables[table_num]
# Add the picture to the cell
cell = table.cell(row, col) cell = table.cell(row, col)
if len(cell.text) == 1: cell.text = ""
paragraph = cell.paragraphs[-1] # 彻底清除单元格
for paragraph in cell.paragraphs:
paragraph.clear()
paragraph = cell.add_paragraph()
run = paragraph.add_run() run = paragraph.add_run()
# 添加图片(带异常捕获)
try: try:
if width: if width:
run.add_picture(image_path, width=Inches(width)) run.add_picture(final_bytes, width=Inches(width))
else: else:
run.add_picture(image_path) run.add_picture(final_bytes)
except Exception as e: except Exception:
# 如果添加图片时出现问题尝试将图片转换为PNG格式 final_bytes.seek(0) # 再次重置指针
try: if width:
print(f"正常添加失败,尝试转换图片后添加:{image_path}") run.add_picture(final_bytes, width=Inches(width))
# 打开图片 else:
img = Image.open(image_path) run.add_picture(final_bytes)
# 转换为PNG格式
temp_image_path = os.path.splitext(image_path)[0] + '.png' # 4. 设置对齐并保存
img.save(temp_image_path, 'PNG') from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL
paragraph.paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
# 尝试添加转换后的图片
if width:
run.add_picture(temp_image_path, width=Inches(width))
else:
run.add_picture(temp_image_path)
# 添加完成后删除转换后的图片
os.remove(temp_image_path)
except Exception as e:
# 如果转换或添加转换后的图片时出现问题,返回错误信息
return f"调用add_picture函数出现问题: {str(e)}"
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.enum.table import WD_ALIGN_VERTICAL
cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
target_doc.save(target_filename) # 确保保存到新文件(避免内存文档与文件不同步)
return f"Picture {image_path} added to table {table_num} cell ({row},{col})" temp_filename = target_filename.replace('.docx', '_temp.docx')
target_doc.save(temp_filename)
# 验证文档有效性
try:
Document(temp_filename) # 尝试读取
os.replace(temp_filename, target_filename)
except Exception:
os.remove(temp_filename)
return "Failed to generate valid document"
return "Picture added successfully"
except Exception as e: except Exception as e:
return f"Failed to add picture to table: {str(e)}" return f"Error: {str(e)}"
import requests import requests
from io import BytesIO from io import BytesIO

View File

@ -96,4 +96,77 @@ def merge_info(frontend_info, default_info):
# 用前端字典完全覆写 # 用前端字典完全覆写
merged_info.update(frontend_info) merged_info.update(frontend_info)
return merged_info return merged_info
def merge_dicts(dict1, dict2):
# 创建一个新的字典来存储合并结果
merged_dict = {}
# 遍历第一个字典
for key, value_list in dict1.items():
if key in dict2:
# 如果键在第二个字典中存在,合并两个列表
merged_dict[key] = value_list + dict2[key]
else:
# 如果键在第二个字典中不存在,直接使用第一个字典的值列表
merged_dict[key] = value_list
# 遍历第二个字典
for key, value_list in dict2.items():
if key not in dict1:
# 如果键在第一个字典中不存在,直接使用第二个字典的值列表
merged_dict[key] = value_list
return merged_dict
def get_defect_str(Y_defect_list : list[dict] ) -> list:
"""将叶片缺陷信息转换为一条条描述信息
Args:
Y_defect_list (list):[
{
'record' : {'defectId': '02d892a178a82561bb565559102c7a58', 'imageId': '41543f531be24522b7bec741b9a483a2', 'defectName': '手动添加的缺陷1', 'defectCode': None, 'partName': '叶片1', 'defectTypeLabel': '表面裂纹', 'defectType': 'bmlw', 'defectLevelLabel': '轻微缺陷', 'defectLevel': 'SLIGHT', 'defectPosition': '', 'description': '手动添加的缺陷,请填写详细描述', 'repairIdea': '建议进行进一步检查', 'labelInfo': None, 'markInfo': {'label': None, 'clsId': None, 'bbox': None, 'confidence': 0.0}}
'imagePath' : '/image/path'
},
....
]
Returns:
result (list):
[
"叶片1表面裂纹轻微缺陷位于{defectPosition},建议进行进一步检查",
],
...
"""
result = []
for item in Y_defect_list:
record = item['record']
defect_type_label = record.get('defectTypeLabel', '未知类型')
defect_level_label = record.get('defectLevelLabel', '未知等级')
defect_position = record.get('defectPosition', '未知位置')
repair_idea = record.get('repairIdea', '无建议')
defect_description = f"{defect_type_label}{defect_level_label}缺陷,位于{defect_position}{repair_idea}"
result.append(defect_description)
return result
def safe_get(data, *keys, default=None):
"""
递归安全访问嵌套字典的键如果中间键不存在则返回默认值
Args:
data (dict): 要访问的字典
*keys: 要访问的键可以是多个 "叶片1", "裂纹"
default: 如果键不存在返回的默认值默认None
Returns:
如果所有键都存在返回对应的值否则返回 default
"""
if not keys or data is None:
return data if data is not None else default
current_key = keys[0]
if isinstance(data, dict):
return safe_get(data.get(current_key), *keys[1:], default=default)
else:
return default

View File

@ -606,4 +606,74 @@ async def process_images_table(data_dict, output_dir, start_i, JIANCHA_NEIRONG_P
picture_index += 1 picture_index += 1
i += 1 i += 1
print(message) print(message)
return i # 返回最后使用的表格序号 return i # 返回最后使用的表格序号
async def process_server_images_table(data_list, image_source_list, output_dir, start_i, JIANCHA_NEIRONG_PICTURES_TABLE, key_words=None):
"""从服务器获取的图片数据添加对应表格且填写图片名与插入图片
逻辑按来源排序添加
Args:
data_list (list): 图片数据列表每个元素为图片路径
[
{'imageId': '41543f531be24522b7bec741b9a483a2', 'imageName': 'F02_机组10060_叶片叶片PS面距叶根15.5米处轴向裂纹轴向260mm弦向20mm。DJI_20250526090227_0052_Z.png', 'imagePath': '/static/image/temp/out-work/12bc30fb209f3af3bf530541c5b062bd/F02_机组10060_叶片叶片PS面距叶根15.5米处轴向裂纹轴向260mm弦向20mm。DJI_20250526090227_0052_Z.png', 'partId': '12bc30fb209f3af3bf530541c5b062bd', 'partName': '叶片1', 'imageResolution': '8000x6000', 'focalDistance': None, 'shootingTime': None, 'cameraManufacturer': None, 'cameraModel': None, 'weather': None, 'weatherLabel': None, 'humidness': None, 'temperature': None, 'windLevel': None, 'shootingMethod': 'UAV', 'shootingMethodLabel': '无人机航拍', 'shootingDistance': None, 'collectorName': '程启谦', 'imageSource': 'out-work', 'imageSourceLabel': '外部工作', 'imageType': 'DEFECT', 'imageTypeLabel': '缺陷影像', 'audioList': None, 'preImagePath': None, 'preTreatment': False, 'projectId': None, 'gps': None},
...
]
image_source_list (list): 来源列表每个元素为来源名称
['out-work', 'in-work', 'in-field'] (来源可能会少于3个pop出没有给出对应来源的图片)
output_dir (str): 输出路径
start_i (int): 总表格数量
JIANCHA_NEIRONG_PICTURES_TABLE (str): 二维表模板路径
key_words (list): 关键字列表用于匹配图片名
Returns:
int: 最后使用的表格序号
"""
# 按来源对图片数据进行排序
sorted_data = {source: [] for source in image_source_list}
for item in data_list:
if item['imageSource'] in sorted_data:
if key_words is None or any(keyword in item['imageName'] for keyword in key_words):
sorted_data[item['imageSource']].append((item['imageName'], item['imagePath']))
# 过滤掉空的来源列表
sorted_data = {k: v for k, v in sorted_data.items() if v}
i = start_i
for source, items in sorted_data.items():
print(f"处理来源: {source}")
picture_num = len(items)
line_index = 0
picture_index = 0
for content_row in range(((picture_num + 2) // 3) * 2):
if content_row % 2 == 1:
# 文字行(从 items 取图片名)
JIANCHA_NEIRONG_TEXT = [["" for _ in range(3)] for _ in range(1)] # 1行3列
for k in range(1): # 只有1行
for l in range(3):
if line_index >= picture_num:
break
JIANCHA_NEIRONG_TEXT[k][l] = items[line_index][0] # 图片名
print(f'当前为文字表格,在({k},{l})位置插入文字: {items[line_index][0]}')
line_index += 1
print(f"当前待插入表格: {JIANCHA_NEIRONG_TEXT}")
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, JIANCHA_NEIRONG_TEXT, False, None, key_words
)
i += 1
else:
# 图片行(从 items 取图片路径)
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, None, False
)
for k in range(3):
if picture_index < picture_num:
pic_path = items[picture_index][1] # 图片路径
print(f"当前为图片表格,在(0,{k})位置插入图片: {pic_path}")
print(await add_picture_to_table(output_doc, output_dir, 0, k, pic_path, i, 1.8898))
picture_index += 1
i += 1
print(message)
return i # 返回最后使用的表格序号

View File

@ -233,66 +233,102 @@ def create_thumbnail(file_path: str, size: tuple) -> Image:
print(f"图片处理有问题:{e}") print(f"图片处理有问题:{e}")
return None return None
def process_picture_data(picture_data : list[dict], def process_picture_data(picture_data : dict[list[dict]],
image_source_to_find : list[str], image_source_to_find : list[str]
quexian_type : str, ) -> tuple[dict[dict[list[dict]]],int]:
dianxing_type : str,
other_type : str
) -> tuple[list[dict], list[dict], list[dict], int]:
"""处理从数据库获取的图片数据 """处理从数据库获取的图片数据
Args: Args:
picture_data (list[dict]): 图片数据 picture_data (dict[list[dict]]: 图片数据(按部件分类返回的列表也要按部件分类)
image_source_to_find (list[str]): 要查找的图片来源枚举(外内防雷) image_source_to_find (list[str]): 要查找的图片来源枚举(外内防雷)
quexian_type (str): 缺陷类型枚举值
dianxing_type (str): 典型类型枚举值
other_type (str): 其他类型枚举值
Returns: Returns:
tuple( tuple(
defct_pictures, 缺陷图列表 filtered_picture_data, 按部件缺陷类型典型类型其他类型层层分类的图片数据
dianxing_pictures, 典型图列表
other_pictures, 其他图列表
total_num 总图片数量 total_num 总图片数量
) )
""" """
print(f"image_source_to_find: {image_source_to_find}, quexian_type: {quexian_type}, dianxing_type: {dianxing_type}, other_type: {other_type}")
# 过滤目标来源的图片数据 # 过滤目标来源的图片数据
filtered_picture_data = [pic for pic in picture_data if pic['imageSource'] in image_source_to_find] filtered_picture_data = {}
print(f"过滤来源后的图片数据数量: {len(filtered_picture_data)}") total_num = 0
# 分别择出缺陷图和典型图 for partName, values in picture_data.items():
defct_pictures = [pic for pic in filtered_picture_data if pic['imageType'] == quexian_type] total_num = total_num + len(values)
dianxing_pictures = [pic for pic in filtered_picture_data if pic['imageType'] == dianxing_type] for pic in values:
other_pictures = [pic for pic in filtered_picture_data if pic['imageType'] == other_type] if pic['imageSource'] in image_source_to_find:
if partName not in filtered_picture_data:
# 计算总图片数量 filtered_picture_data[partName] = {}
total_num = len(filtered_picture_data) if pic['imageType'] not in filtered_picture_data[partName]:
filtered_picture_data[partName][pic['imageType']] = []
# 打印数据信息 filtered_picture_data[partName][pic['imageType']].append(pic)
print(f"总图片数量: {total_num}")
print(f"缺陷图数量: {len(defct_pictures)}")
print(f"典型图数量: {len(dianxing_pictures)}")
print(f"其他图数量: {len(other_pictures)}")
return defct_pictures, dianxing_pictures, other_pictures, total_num
def get_records_with_pic(records : list[dict], pic_id : list[str]) -> tuple[list[dict], list[dict]]: return filtered_picture_data, total_num
def print_data(filtered_picture_data : dict[dict[list[dict]]]) -> None:
"""打印图片数据"""
for partName, types in filtered_picture_data.items():
print(f"Part Name: {partName}")
for imageType, pictures in types.items():
print(f" Image Type: {imageType}, Number of Pictures: {len(pictures)}")
def get_records_with_pic(records: list[dict], filtered_picture_data: dict[str, dict[str, list[dict]]], defect_enum: str) -> tuple[dict[str, dict[str, list[dict]]], dict[str, list[dict]]]:
"""获取指定图片ID的记录 """获取指定图片ID的记录
Args: Args:
records (list[dict]): 记录列表 records (list[dict]): 记录列表每条记录包含部件名缺陷类型等信息
pic_id (list[str]): 图片ID列表 filtered_picture_data (dict[str, dict[str, list[dict]]]): 图片数据最外层的字典的键是部件名内层的字典的键是缺陷类型值是图片信息列表
defect_enum (str): 指定的缺陷类型
Returns: Returns:
tuple( tuple(
records_with_pic , 包含指定图片ID的记录列表记录为键图片ID为值 records_with_defect_pic: 按部件名分类的缺陷记录映射到图片的imagePath
error_pic, 是缺陷图但找不到对应图片的记录列表记录为键图片ID为值 defect_pic_with_no_records: 对应缺陷类型的图片没有记录的部件名映射
) )
""" """
records_with_pic = {} records_with_defect_pic = {}
error_pic = {} defect_pic_with_no_records = {}
for record in records:
if record['picId'] in pic_id: # 收集所有filtered_picture_data中的imageId
records_with_pic[record] = record['picId'] all_image_ids = set()
else: for partName, types in filtered_picture_data.items():
error_pic[record] = record['picId'] defect_pictures = types.get(defect_enum, [])
for pic in defect_pictures:
return records_with_pic, error_pic all_image_ids.add(pic.get('imageId'))
# 过滤掉records中imageId不在all_image_ids中的记录
filtered_records = [record for record in records if record.get('imageId') in all_image_ids]
# 遍历每个部件
for partName, types in filtered_picture_data.items():
# 获取指定缺陷类型的图片列表
defect_pictures = types.get(defect_enum, [])
if defect_pictures:
# 初始化部件名的字典
if partName not in records_with_defect_pic:
records_with_defect_pic[partName] = {}
# 初始化缺陷类型的字典
records_with_defect_pic[partName][defect_enum] = []
# 遍历缺陷图片列表,找到对应的记录并添加到字典中
for pic in defect_pictures:
pic_id = pic.get('imageId')
matched_records = [record for record in filtered_records if record.get('imageId') == pic_id]
# 如果有匹配的记录,则添加到字典中
if matched_records:
for record in matched_records:
records_with_defect_pic[partName][defect_enum].append({
'record': record,
'imagePath': pic.get('imagePath')
})
else:
# 如果没有匹配的记录,则将图片信息添加到没有记录的字典中
if partName not in defect_pic_with_no_records:
defect_pic_with_no_records[partName] = []
defect_pic_with_no_records[partName].append({
'defect_enum': defect_enum,
'imagePath': pic.get('imagePath')
})
return records_with_defect_pic, defect_pic_with_no_records