增加金风的生成,优化代码,目前能生成金风的静态内容

This commit is contained in:
Voge1imkafig 2025-07-29 18:01:15 +08:00
parent 98866d8d94
commit a3c430ff91
51 changed files with 1573 additions and 1424 deletions

567
Dt_report.py Normal file
View File

@ -0,0 +1,567 @@
# 文档处理工具
from tools.document_tools import (
create_document, add_documents,add_table_and_replace,
add_table_to_document,add_dynamic_table,
process_server_images_table,add_header
)
# 内容处理工具
from tools.content_tools import (
add_picture,split_table_by_row_content,
search_and_replace
)
from tools.get_pictures import (
process_picture_data,get_records_with_pic
)
from tools.Get_Json import (
get_project_info,get_jizu_info,
get_jizu_shigong_info,get_defect_detail,
get_part_picture,get_yepian_xiangqing,
check_pic_url,get_full_picture_url,
get_defect_record_list
)
from tools.dataproccess import (
caculate_work_days,get_year_month,
merge_info,get_defect_str,
safe_get,get_resource_path
)
from core.tables import fill_tables
from tools.defines import *
import os, re, datetime
async def generate_dt_report(base_info, baogao_info):
#获取模板编号、模板名称
num_to_chinese = {1 : '', 2 : '', 3 : '', 4 : '', 5 : '', 6 : '', 7 : '', 8 : '', 9 : '', 10 : '', 11 : '十一', 12 : '十二'}
cover_encode = "encode"
cover_project = "project"
baogao_name1 = "baogaoname1"
baogao_name2 = "baogaoname2"
company_name_yi = "company_name_yi"
cover_date = "time"
TITLE_OF_REPORT = "companyencode"
jiegou_xuhao = 'num'
print(f"获取到参数:基本信息:{base_info}\n\n报告信息:{baogao_info}")
try:
base_info = merge_info(base_info, DEFAULT_BASE_INFO)
turbine_id = base_info['turbine_id']
jizu_data = get_jizu_info(turbine_id)
project_data = get_project_info(jizu_data['projectId'])
shigong_data = get_jizu_shigong_info(turbine_id)
fengchang_name = project_data['farmName']
Yi_company = project_data['inspectionUnit']
yi_fuzeren = project_data['inspectionContact']
yi_phone = project_data['inspectionPhone']
fengchang_location = project_data['farmAddress']
Jia_company = project_data['client']
jia_fuzeren = project_data['clientContact']
jia_phone = project_data['clientPhone']
jizu_num = project_data['scale']
jizu_xinghao = project_data['turbineModel']
project_name = project_data['projectName']
jizu_bianhao = jizu_data["turbineName"]
start_date = project_data['startDate']
end_date = project_data['endDate']
cover_url = project_data['coverUrl']
gongqi = caculate_work_days(start_date, end_date)
except Exception as e:
print(f"数据库的项目-机组基本信息获取失败:{e}")
return
try:
baogao_date = datetime.datetime.now().strftime("%Y年%m月%d%H:%M") #现在的时间
date_year_month = get_year_month(baogao_date)
#前端信息
baogao_info = merge_info(baogao_info, DEFAULT_BAOGAO_INFO)
key_words= re.compile('|'.join(map(re.escape, baogao_info['key_words'].split(',')))) #关键字
shengcheng_dir = baogao_info['shengcheng_dir'] #路径
if shengcheng_dir == "":
print("未配置生成路径,请检查配置")
return
if_waibu = baogao_info["if_waibu"]
if_neibu = baogao_info["if_neibu"]
if_fanglei = baogao_info["if_fanglei"]
quexian_type = baogao_info['quexian_enum']
dianxing_type = baogao_info['dianxing_enum']
other_type = baogao_info["other_enum"]
jiancha_renyuan = baogao_info['jiancha_renyuan'] #检查人员,目前是从命令行参数获取,需要完善
check_date = baogao_info['check_date']
if check_date == None:
check_date = "未获取"
baogao_bianzhi = baogao_info["userName"]
baogao_shenghe = baogao_info["baogaoCheck"]
Jiancha_date = baogao_info["check_date"]
data_processor = baogao_info["data_processor"]
coverurl = baogao_info["coverurl"]
#数据库拉取信息
# Jiancha_date = shigong_data["startTime"].replace("T", " ") #检查日期
# image_count = shigong_data['imageCount'] #从施工方案获取的图片数量,待定!!!
# temperature = shigong_data['temperature'] #温度
# wind_speed = shigong_data['windSpeed'] #风速
# weather = get_weather(shigong_data["weatherCode"]) #天气 不从此接口获取,待定!!!
#拉取部件、图片数据
part_data, picture_data, Yepians = get_part_picture(turbine_id)
print(Yepians)
Y1_info = get_yepian_xiangqing(Yepians[0]["partId"])
Y_Code = [yepian["partCode"] for yepian in Yepians]
partManufacturer = Y1_info["partManufacturer"]
print(f"找到叶片号{Y_Code},厂商{partManufacturer}")
image_source_to_find = []
baogao_label = []
renyuan_peizhi = []
gongzuo_neirong = []
shigong_fangan = []
shebei_peizhi = []
beizhu = []
jiancha = []
neirong = []
neirong2 = []
#获取对应枚举字段
if if_waibu:
baogao_label.append("外观")
image_source_to_find.append(baogao_info['waibu_enum'])
if baogao_info["shigong_fangan"] == None:
print("未传入施工方案,使用已有枚举")
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.WAIBU.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.WAIBU.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.WAIBU.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.WAIBU.SHIGONG_FANGAN)
else:
pass #待添加如果从平台传入枚举,但目前没必要,如果这种枚举标准信息库有更好的优化则可以实现
jiancha.append("无人机近距离外观检查")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的前缘、后缘、迎风面、背风面。")
neirong2.append("前缘、后缘、迎风面、背风面。")
if if_neibu:
baogao_label.append("内部")
image_source_to_find.append(baogao_info['neibu_enum'])
if baogao_info["shigong_fangan"] == None:
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.NEIBU.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.NEIBU.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.NEIBU.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.NEIBU.SHIGONG_FANGAN)
else:
pass
jiancha.append("人工内部拍摄")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
neirong2.append("内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
if if_fanglei:
baogao_label.append("防雷")
image_source_to_find.append(baogao_info['fanglei_enum'])
if baogao_info["shigong_fangan"] == None:
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.SHIGONG_FANGAN)
else:
pass
jiancha.append("人工防雷")
neirong.append(f"轮毂至塔基导通、内部导线线阻、外部导线线阻...")
neirong2.append("轮毂至塔基导通、内部导线线阻、外部导线线阻...")
#获取缺陷图列表和典型图列表
filtered_picture_data, total_picture_num = process_picture_data(picture_data, image_source_to_find)
#获取所有缺陷记录
defect_records = get_defect_record_list()
#获取缺陷记录中对应图片的记录
defect_records_with_pic, error_pic_with_no_record = get_records_with_pic(defect_records, filtered_picture_data, quexian_type)
if len(error_pic_with_no_record) > 0:
print(f"!!!有部分缺陷图片没有对应的缺陷记录,将不会生成这些图片的缺陷表:{error_pic_with_no_record} ")
print(f"对应缺陷图的缺陷记录列表:{defect_records_with_pic}")
except Exception as e:
print(f"报告基本信息获取失败:{e}")
return
#检查参数合法性
if not if_fanglei or not if_neibu or not if_waibu:
print("请至少选择一种检查项目")
return
if not os.path.exists(shengcheng_dir):
print(f"生成路径{shengcheng_dir}不存在")
return
output_doc = None
head_num = 1
###封面创建###
cover_dirs = [get_resource_path("muban/fengmian1.docx"),get_resource_path("muban/fengmian.jpg"),get_resource_path("muban/fengmian2.docx")]
#输出目录
baogao_name = "叶片" + "".join(baogao_label) + "检查报告"
output_dir = os.path.normpath(f"{shengcheng_dir}/{project_name}项目{baogao_name}{jizu_bianhao}{baogao_date.split(' ')[0]}版.docx")
version = 1
while os.path.exists(output_dir):
if version != 1:
output_dir = output_dir.replace(f"{version - 1}",f"{version}")
else:
output_dir = output_dir.replace("",f"{version}")
version += 1
mianzhe_shengming = f"本报告仅涵盖{''.join(baogao_label)}检测内容"
print(await create_document(output_dir))
if baogao_info["if_docx_fengmian"] :
#创建文档、添加封面
print(add_documents(output_dir, cover_dirs[0]))
if check_pic_url(coverurl): #手动导入封面图片测试用
print(add_picture(output_dir, get_full_picture_url(coverurl), width = 6.41, height = 4))
elif check_pic_url(cover_url):
print(add_picture(output_dir, get_full_picture_url(cover_url), width = 6.41, height = 4))
else:
print(add_picture(output_dir, cover_dirs[1]))
print(add_documents(output_dir, cover_dirs[2]))
print("封面创建成功")
#YYYY年MM月DD日 HH:MM:SS
#更改文档信息
print(search_and_replace(output_dir, TITLE_OF_REPORT, jizu_bianhao))
print(search_and_replace(output_dir, baogao_name1, baogao_name))
print(search_and_replace(output_dir, company_name_yi, Yi_company))
print(search_and_replace(output_dir, cover_project, fengchang_name))
print(search_and_replace(output_dir, cover_encode, jizu_bianhao))
print(search_and_replace(output_dir, cover_date, date_year_month))
print(search_and_replace(output_dir, 'bianzhi', baogao_bianzhi))
print(search_and_replace(output_dir, 'shenghe', baogao_shenghe))
print(search_and_replace(output_dir, 'mianzhe_shengming', mianzhe_shengming))
add_header(output_dir, TEMPLATE_HEADER.DT_HEADER.ENUM)
total_table_num = 0
if baogao_info["if_docx_project_overview"]:
#项目概况表
print("开始添加项目概况表")
XIANG_MU_GAI_KUANG = get_resource_path("muban/xiangmugaikuo.docx")
print(f"查找模板,找到模板:{XIANG_MU_GAI_KUANG}")
project_location = fengchang_location
company_name_jia = Jia_company
fuzeren = yi_fuzeren
phone_fuzeren = yi_phone
xiangmuguige = jizu_num
Yi_company = Yi_company
XIANGMU_GAIKUO = list(list("" for i in range(6)) for j in range(5))
XIANGMU_GAIKUO[0][1] = fengchang_name
XIANGMU_GAIKUO[0][4] = project_location
XIANGMU_GAIKUO[1][1] = company_name_jia
XIANGMU_GAIKUO[1][4] = Yi_company
XIANGMU_GAIKUO[2][1] = jia_fuzeren
XIANGMU_GAIKUO[2][4] = fuzeren
XIANGMU_GAIKUO[3][2] = jia_phone
XIANGMU_GAIKUO[3][5] = phone_fuzeren
XIANGMU_GAIKUO[4][1] = jizu_xinghao
XIANGMU_GAIKUO[4][3] = xiangmuguige
XIANGMU_GAIKUO[4][5] = gongqi
print("建立表结构完毕,开始插入模板")
#添加项目概况表
print(f"输出路径:{output_dir},模板路径:{XIANG_MU_GAI_KUANG},插入数据:{XIANGMU_GAIKUO}")
output_doc, message = add_table_to_document(output_dir, XIANG_MU_GAI_KUANG,5,5,total_table_num,XIANGMU_GAIKUO)
print(message)
print("模板插入完毕,开始替换内容")
total_table_num += 1
print(search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
print(search_and_replace(output_dir, TITLE_OF_REPORT, jizu_bianhao))
print(search_and_replace(output_dir, baogao_name2, baogao_name))
head_num += 1
if baogao_info['if_docx_inspection_method']:
#检查方案描述
FANGAN_JIANCHA_DIR = get_resource_path("muban/checkmethod.docx")
list_to_replace = {
'renyuan_peizhi' : "\n".join(renyuan_peizhi),
'shebei_peizhi' : "\n".join(shebei_peizhi),
'shigong_fangan' : "\n".join(shigong_fangan),
'gongzuo_neirong' : "\n".join(gongzuo_neirong),
'beizhu' : beizhu,
'num' : num_to_chinese[head_num],
}
print(add_table_and_replace(output_dir, FANGAN_JIANCHA_DIR, 0, list_to_replace))
print(split_table_by_row_content(output_dir, output_dir, total_table_num))
total_table_num += 1
head_num += 1
if baogao_info['if_docx_inspection_info']:
#检查信息
JIANCHA_XINGXI_DIR = get_resource_path("muban/checkinfo.docx")
JIANCHA_XINGXI = list(list("" for i in range(4)) for j in range(9))
JIANCHA_XINGXI[0][1] = jiancha_renyuan
try:
JIANCHA_XINGXI[1][1] = Jiancha_date.split('T')[0]
except:
JIANCHA_XINGXI[1][1] = "格式不对或无数据"
JIANCHA_XINGXI[1][3] = jizu_bianhao
JIANCHA_XINGXI[2][1] = "风力发电机组" + baogao_name
JIANCHA_XINGXI[2][3] = "".join(jiancha)
JIANCHA_XINGXI[3][2] = partManufacturer
JIANCHA_XINGXI[4][1] = '叶片型号:' + jizu_xinghao
JIANCHA_XINGXI[5][1] = Y_Code[0] if len(Y_Code) > 0 else ""
JIANCHA_XINGXI[6][1] = Y_Code[1] if len(Y_Code) > 1 else ""
JIANCHA_XINGXI[7][1] = Y_Code[2] if len(Y_Code) > 2 else ""
JIANCHA_XINGXI[8][0] = "本次" + "".join(_ for _ in jiancha) + f"检查,采集叶片图片{total_picture_num}张,内容覆盖" + ";".join(_ for _ in neirong)
#新建检查信息表
output_doc, message = add_table_to_document(output_dir, JIANCHA_XINGXI_DIR,9,4,total_table_num ,JIANCHA_XINGXI,False)
print(message)
print(search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
total_table_num += 1
if baogao_info['if_docx_chengguo_sub']:
# 添加成果递交表
CHENGGUO_DIJIAO_DIR = get_resource_path("muban/chengguo_sub.docx")
CHENGGUO_DIJIAO = list(list("" for i in range(4)) for j in range(6))
CHENGGUO_DIJIAO[0][1] = jiancha_renyuan
CHENGGUO_DIJIAO[1][1] = jia_fuzeren
try:
CHENGGUO_DIJIAO[2][1] = Jiancha_date.split('T')[0]
except:
CHENGGUO_DIJIAO[2][1] = "格式不对或无数据"
CHENGGUO_DIJIAO[3][1] = data_processor
CHENGGUO_DIJIAO[4][1] = baogao_bianzhi
CHENGGUO_DIJIAO[5][1] = baogao_shenghe
try:
CHENGGUO_DIJIAO[2][3] = Jiancha_date.split('T')[1]
except:
CHENGGUO_DIJIAO[2][3] = "格式不对或无数据"
CHENGGUO_DIJIAO[3][3] = baogao_date.split(' ')[0]
CHENGGUO_DIJIAO[4][3] = baogao_date.split(' ')[0]
CHENGGUO_DIJIAO[5][3] = "未审核"
output_doc, message = add_table_to_document(output_dir, CHENGGUO_DIJIAO_DIR,5,5,total_table_num,CHENGGUO_DIJIAO,True,0.04)
print(message)
print(search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
total_table_num += 1
if baogao_info['if_docx_inspection_text'] and if_waibu:
#检查情况汇总表(文字信息)
try:
#获取缺陷信息
"""
需要获取
Y1Y2Y3叶片的缺陷数量缺陷字典{描述图片路径}
和数据库连接需要的新增异常处理
目前逻辑为找到图片后找缺陷类型的图片通过图片id查找对应缺陷记录
如果没有找到要返回对应异常即已选出标注缺陷图但未填写对应缺陷信息的异常返回
最后将无异常的图片入队缺陷字典同数量有异常的图片再另存
"""
Y1_quexian_list = []
Y2_quexian_list = []
Y3_quexian_list = []
Y1_quexian_list = defect_records_with_pic["叶片1"]["DEFECT"] if "叶片1" in defect_records_with_pic else {}
Y2_quexian_list = defect_records_with_pic["叶片2"]["DEFECT"] if "叶片2" in defect_records_with_pic else {}
Y3_quexian_list = defect_records_with_pic["叶片3"]["DEFECT"] if "叶片3" in defect_records_with_pic else {}
Y1_quexian_num = len(Y1_quexian_list)
Y2_quexian_num = len(Y2_quexian_list)
Y3_quexian_num = len(Y3_quexian_list)
no_defect_found = "未发现明显缺陷"
weak_num_Y1 = f"{Y_Code[0] if len(Y_Code) > 0 else ''}叶片" + f"共发现缺陷{Y1_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
weak_num_Y2 = f"{Y_Code[1] if len(Y_Code) > 1 else ''}叶片" + f"共发现缺陷{Y2_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
weak_num_Y3 = f"{Y_Code[2] if len(Y_Code) > 2 else ''}叶片" + f"共发现缺陷{Y3_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
except Exception as e:
print(f"缺陷图获取失败:{e}")
return
#添加检查情况汇总表
JIANCHA_HUIZONG_DIR = get_resource_path("muban/total_check.docx")
JIANCHA_HUIZONG = list(list("" for i in range(3)) for j in range(4))
JIANCHA_HUIZONG[1][0] = weak_num_Y1
JIANCHA_HUIZONG[2][0] = weak_num_Y2
JIANCHA_HUIZONG[3][0] = weak_num_Y3
JIANCHA_HUIZONG[1][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[2][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[3][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[1][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y1_quexian_list))]) if Y1_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
JIANCHA_HUIZONG[2][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y2_quexian_list))]) if Y2_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
JIANCHA_HUIZONG[3][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y3_quexian_list))]) if Y3_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
output_doc, message = add_table_to_document(output_dir, JIANCHA_HUIZONG_DIR,4,3,total_table_num,JIANCHA_HUIZONG,False,ALIGMENT='LEFT')
print(message)
print(search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
total_table_num += 1
head_num += 1
if baogao_info['if_docx_inspection_picture']:
#主要部位图片展示表/检查内容表
#获取典型图信息
try:
Y1_list = safe_get(filtered_picture_data, "叶片1", dianxing_type, default=[])
Y2_list = safe_get(filtered_picture_data, "叶片2", dianxing_type, default=[])
Y3_list = safe_get(filtered_picture_data, "叶片3", dianxing_type, default=[])
picture_Y1_num = len(Y1_list)
picture_Y2_num = len(Y2_list)
picture_Y3_num = len(Y3_list)
except Exception as e:
print(f"获取缺陷图失败:{e}")
print(f"图片、文字数量:{picture_Y1_num} {picture_Y2_num} {picture_Y3_num}")
JIANCHA_NEIRONG_TOTAL_NUM = picture_Y1_num+ picture_Y2_num + picture_Y3_num
if JIANCHA_NEIRONG_TOTAL_NUM <= 0:
print("无典型图片数据,无法生成典型图表")
else:
col ,row = 3, 0
JIANCHA_NEIRONG_PICTURES_TABLE = get_resource_path("muban/check2.docx")
JIANCHA_NEIRONG_Y1_DIR = get_resource_path("muban/check_content.docx")
JIANCHA_NEIRONG_Y1 = list(list("" for _ in range(3)) for j in range(1))
Y1_code = Y_Code[0] if len(Y_Code) > 0 else ""
Y2_code = Y_Code[1] if len(Y_Code) > 1 else ""
Y3_code = Y_Code[2] if len(Y_Code) > 2 else ""
JIANCHA_NEIRONG_Y1[0][0] = f"叶片1{Y1_code}检查内容"
print(f"Y1标题内容{JIANCHA_NEIRONG_Y1}")
JIANCHA_NEIRONG_Y2_DIR = get_resource_path("muban/check3.docx")
JIANCHA_NEIRONG_Y2 = list(list("" for _ in range(3)) for j in range(1))
JIANCHA_NEIRONG_Y2[0][0] = f"叶片2{Y2_code}检查内容"
print(f"Y2标题内容{JIANCHA_NEIRONG_Y2}")
JIANCHA_NEIRONG_Y3_DIR = get_resource_path("muban/check3.docx")
JIANCHA_NEIRONG_Y3 = list(list("" for _ in range(3)) for j in range(1))
JIANCHA_NEIRONG_Y3[0][0] = f"叶片3{Y3_code}检查内容"
print(f"Y3标题内容{JIANCHA_NEIRONG_Y3}")
print(f"当前表格序号为 {total_table_num}")
print(key_words)
output_doc, message = add_table_to_document(output_dir, JIANCHA_NEIRONG_Y1_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y1,True, 1)
print(message)
total_table_num += 1
total_table_num = await process_server_images_table(Y1_list, image_source_to_find, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
output_doc, message = add_table_to_document(output_dir, JIANCHA_NEIRONG_Y2_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y2,True, 1)
print(message)
total_table_num += 1
total_table_num = await process_server_images_table(Y2_list, image_source_to_find, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
output_doc, message = add_table_to_document(output_dir, JIANCHA_NEIRONG_Y3_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y3,True, 1)
print(message)
total_table_num += 1
total_table_num = await process_server_images_table(Y3_list, image_source_to_find, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
print(search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
if baogao_info['if_docx_defect_picture'] and if_waibu:
# #缺陷详情
QUEXIAN_XIANGQING_DIR = get_resource_path("muban/check_check.docx")
QUEXIAN_XIANGQING_TITLE_DIR = get_resource_path("muban/check_check_title.docx")
Y_tables = [Y1_quexian_list,Y2_quexian_list,Y3_quexian_list]
Y1_table_list = []
Y2_table_list = []
Y3_table_list = []
table_lists = [Y1_table_list, Y2_table_list, Y3_table_list]
for i, (table_list, Y_lists) in enumerate(zip(table_lists, Y_tables)):
for Y_list in Y_lists:
image_defect_record = Y_list["record"]
image_path = Y_list["imagePath"]
image_defect_record = get_defect_detail(image_defect_record["defectId"])
# 从imagedefectrecord中获取各个字段
defect_type = image_defect_record.get('defectTypeLabel', '未知缺陷类型')
defect_location = f"{image_defect_record.get('partName', '')}{image_defect_record.get('defectPosition', '')}"
# 使用axial和chordwise作为缺陷尺寸信息
axial = image_defect_record.get('axial', '')
chordwise = image_defect_record.get('chordwise', '')
defect_size = f"轴向:{axial} 弦向:{chordwise}" if axial or chordwise else ''
visibility = "暂无此项"
urgency = "暂无此项"
severity = image_defect_record.get('defectLevelLabel', '未知严重等级')
repair_suggestion = image_defect_record.get('repairIdea', '无维修建议')
print(f"获取第{i + 1}个叶片的缺陷图: {image_path}")
table_list.append({
"QueXianLeiXing": defect_type,
"QueXianWeiZhi": defect_location,
"QueXianChiCun": defect_size,
"WeiZongDengJi": severity,
"Tupian_Dir": get_full_picture_url(image_path) if check_pic_url(image_path) else None,
"visibility": visibility,
"urgency": urgency,
"repair_suggestion": repair_suggestion, # 新增维修建议字段
})
# for i, (table_list, Y_dict) in enumerate(zip(table_lists, Y_tables)):
# for image_defect_record, image_path in Y_dict.items():
# # 从图片名解析各个字段
# parts = image_name.split('_')
# if len(parts) >= 8: # 确保有7个部分
# defect_type = parts[1]
# defect_location = parts[2]
# defect_size = parts[3]
# visibility = parts[4]
# urgency = parts[5]
# severity = parts[6]
# repair_suggestion = parts[7]
# print(f"获取第{i+1}个叶片的缺陷图: {image_path}")
# table_list.append({
# "QueXianLeiXing": defect_type,
# "QueXianWeiZhi": defect_location,
# "QueXianChiCun": defect_size,
# "WeiZongDengJi": severity,
# "Tupian_Dir": image_path,
# "visibility": visibility,
# "urgency": urgency,
# "repair_suggestion": repair_suggestion.split('.')[0], # 新增维修建议字段
# })
# else:
# table_list.append({
# "QueXianLeiXing": "图片命名有误",
# "QueXianWeiZhi": "图片命名有误",
# "QueXianChiCun": "图片命名有误",
# "WeiZongDengJi": "图片命名有误",
# "Tupian_Dir": image_path,
# "visibility": "图片命名有误",
# "urgency": "图片命名有误",
# "repair_suggestion": "图片命名有误", # 新增维修建议字段
# })
Y1_TABLES, Y1_TABLES_PICTURES = fill_tables(table_lists[0],4,5,len(table_lists[0]),Y_Code[0] if len(Y_Code) > 0 else "")
Y2_TABLES, Y2_TABLES_PICTURES = fill_tables(table_lists[1],4,5,len(table_lists[1]),Y_Code[1] if len(Y_Code) > 1 else "")
Y3_TABLES, Y3_TABLES_PICTURES = fill_tables(table_lists[2],4,5,len(table_lists[2]),Y_Code[2] if len(Y_Code) > 2 else "")
print(add_documents(output_dir, QUEXIAN_XIANGQING_TITLE_DIR))
print(search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
table_num = 0
Xu_Hao = 0
total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y1_TABLES,QUEXIAN_XIANGQING_DIR,Y1_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y2_TABLES,QUEXIAN_XIANGQING_DIR,Y2_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y3_TABLES,QUEXIAN_XIANGQING_DIR,Y3_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
if baogao_info['if_docx_conclusion']:
#总结
ZONG_JIE_DIR = get_resource_path("muban/result.docx")
ZONG_JIE_BEFORE = "result"
ZONG_JIE = baogao_info['conclusion']
print(add_documents(output_dir, ZONG_JIE_DIR))
print(search_and_replace(output_dir, ZONG_JIE_BEFORE, ZONG_JIE))
print(search_and_replace(output_dir, 'company_yi', Yi_company))
print(search_and_replace(output_dir, 'baogao_date', baogao_date.split(' ')[0]))
print(search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))

View File

@ -1,569 +1,9 @@
# 文档处理工具
from tools.document_tools import (
create_document, add_documents,add_table_and_replace,
add_table_to_document,add_dynamic_table,
process_server_images_table
)
# 内容处理工具
from tools.content_tools import (
add_picture,split_table_by_row_content,
search_and_replace
)
from tools.get_pictures import (
process_picture_data,get_records_with_pic
)
from tools.Get_Json import (
get_project_info,get_jizu_info,
get_jizu_shigong_info,get_defect_detail,
get_part_picture,get_yepian_xiangqing,
check_pic_url,get_full_picture_url,
get_defect_record_list
)
from tools.dataproccess import (
caculate_work_days,get_year_month,
merge_info,get_defect_str,
safe_get,get_resource_path
)
import asyncio
from core.tables import fill_tables
from tools.defines import *
import os, re, datetime
from pathlib import Path
async def generate_report(base_info, baogao_info):
#获取模板编号、模板名称
num_to_chinese = {1 : '', 2 : '', 3 : '', 4 : '', 5 : '', 6 : '', 7 : '', 8 : '', 9 : '', 10 : '', 11 : '十一', 12 : '十二'}
cover_encode = "encode"
cover_project = "project"
baogao_name1 = "baogaoname1"
baogao_name2 = "baogaoname2"
company_name_yi = "company_name_yi"
cover_date = "time"
TITLE_OF_REPORT = "companyencode"
jiegou_xuhao = 'num'
print(f"获取到参数:基本信息:{base_info}\n\n报告信息:{baogao_info}")
try:
base_info = merge_info(base_info, DEFAULT_BASE_INFO)
turbine_id = base_info['turbine_id']
jizu_data = get_jizu_info(turbine_id)
project_data = get_project_info(jizu_data['projectId'])
shigong_data = get_jizu_shigong_info(turbine_id)
fengchang_name = project_data['farmName']
Yi_company = project_data['inspectionUnit']
yi_fuzeren = project_data['inspectionContact']
yi_phone = project_data['inspectionPhone']
fengchang_location = project_data['farmAddress']
Jia_company = project_data['client']
jia_fuzeren = project_data['clientContact']
jia_phone = project_data['clientPhone']
jizu_num = project_data['scale']
jizu_xinghao = project_data['turbineModel']
project_name = project_data['projectName']
jizu_bianhao = jizu_data["turbineName"]
start_date = project_data['startDate']
end_date = project_data['endDate']
cover_url = project_data['coverUrl']
gongqi = caculate_work_days(start_date, end_date)
except Exception as e:
print(f"数据库的项目-机组基本信息获取失败:{e}")
return
try:
baogao_date = datetime.datetime.now().strftime("%Y年%m月%d%H:%M") #现在的时间
date_year_month = get_year_month(baogao_date)
#前端信息
baogao_info = merge_info(baogao_info, DEFAULT_BAOGAO_INFO)
key_words= re.compile('|'.join(map(re.escape, baogao_info['key_words'].split(',')))) #关键字
shengcheng_dir = baogao_info['shengcheng_dir'] #路径
if shengcheng_dir == "":
print("未配置生成路径,请检查配置")
return
if_waibu = baogao_info["if_waibu"]
if_neibu = baogao_info["if_neibu"]
if_fanglei = baogao_info["if_fanglei"]
quexian_type = baogao_info['quexian_enum']
dianxing_type = baogao_info['dianxing_enum']
other_type = baogao_info["other_enum"]
jiancha_renyuan = baogao_info['jiancha_renyuan'] #检查人员,目前是从命令行参数获取,需要完善
check_date = baogao_info['check_date']
if check_date == None:
check_date = "未获取"
baogao_bianzhi = baogao_info["userName"]
baogao_shenghe = baogao_info["baogaoCheck"]
Jiancha_date = baogao_info["check_date"]
data_processor = baogao_info["data_processor"]
coverurl = baogao_info["coverurl"]
#数据库拉取信息
# Jiancha_date = shigong_data["startTime"].replace("T", " ") #检查日期
# image_count = shigong_data['imageCount'] #从施工方案获取的图片数量,待定!!!
# temperature = shigong_data['temperature'] #温度
# wind_speed = shigong_data['windSpeed'] #风速
# weather = get_weather(shigong_data["weatherCode"]) #天气 不从此接口获取,待定!!!
#拉取部件、图片数据
part_data, picture_data, Yepians = get_part_picture(turbine_id)
print(Yepians)
Y1_info = get_yepian_xiangqing(Yepians[0]["partId"])
Y_Code = [yepian["partCode"] for yepian in Yepians]
partManufacturer = Y1_info["partManufacturer"]
print(f"找到叶片号{Y_Code},厂商{partManufacturer}")
image_source_to_find = []
baogao_label = []
renyuan_peizhi = []
gongzuo_neirong = []
shigong_fangan = []
shebei_peizhi = []
beizhu = []
jiancha = []
neirong = []
neirong2 = []
#获取对应枚举字段
if if_waibu:
baogao_label.append("外观")
image_source_to_find.append(baogao_info['waibu_enum'])
if baogao_info["shigong_fangan"] == None:
print("未传入施工方案,使用已有枚举")
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.WAIBU.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.WAIBU.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.WAIBU.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.WAIBU.SHIGONG_FANGAN)
else:
pass #待添加如果从平台传入枚举,但目前没必要,如果这种枚举标准信息库有更好的优化则可以实现
jiancha.append("无人机近距离外观检查")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的前缘、后缘、迎风面、背风面。")
neirong2.append("前缘、后缘、迎风面、背风面。")
if if_neibu:
baogao_label.append("内部")
image_source_to_find.append(baogao_info['neibu_enum'])
if baogao_info["shigong_fangan"] == None:
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.NEIBU.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.NEIBU.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.NEIBU.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.NEIBU.SHIGONG_FANGAN)
else:
pass
jiancha.append("人工内部拍摄")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
neirong2.append("内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
if if_fanglei:
baogao_label.append("防雷")
image_source_to_find.append(baogao_info['fanglei_enum'])
if baogao_info["shigong_fangan"] == None:
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.SHIGONG_FANGAN)
else:
pass
jiancha.append("人工防雷")
neirong.append(f"轮毂至塔基导通、内部导线线阻、外部导线线阻...")
neirong2.append("轮毂至塔基导通、内部导线线阻、外部导线线阻...")
#获取缺陷图列表和典型图列表
filtered_picture_data, total_picture_num = process_picture_data(picture_data, image_source_to_find)
#获取所有缺陷记录
defect_records = get_defect_record_list()
#获取缺陷记录中对应图片的记录
defect_records_with_pic, error_pic_with_no_record = get_records_with_pic(defect_records, filtered_picture_data, quexian_type)
if len(error_pic_with_no_record) > 0:
print(f"!!!有部分缺陷图片没有对应的缺陷记录,将不会生成这些图片的缺陷表:{error_pic_with_no_record} ")
print(f"对应缺陷图的缺陷记录列表:{defect_records_with_pic}")
except Exception as e:
print(f"报告基本信息获取失败:{e}")
return
#检查参数合法性
if not if_fanglei or not if_neibu or not if_waibu:
print("请至少选择一种检查项目")
return
if not os.path.exists(shengcheng_dir):
print(f"生成路径{shengcheng_dir}不存在")
return
output_doc = None
head_num = 1
###封面创建###
cover_dirs = [get_resource_path("muban/fengmian1.docx"),get_resource_path("muban/fengmian.jpg"),get_resource_path("muban/fengmian2.docx")]
#输出目录
baogao_name = "叶片" + "".join(baogao_label) + "检查报告"
output_dir = os.path.normpath(f"{shengcheng_dir}/{project_name}项目{baogao_name}{jizu_bianhao}{baogao_date.split(' ')[0]}版.docx")
version = 1
while os.path.exists(output_dir):
if version != 1:
output_dir = output_dir.replace(f"{version - 1}",f"{version}")
else:
output_dir = output_dir.replace("",f"{version}")
version += 1
mianzhe_shengming = f"本报告仅涵盖{''.join(baogao_label)}检测内容"
print(await create_document(output_dir))
if baogao_info["if_docx_fengmian"] :
#创建文档、添加封面
print(add_documents(output_dir, cover_dirs[0]))
if check_pic_url(coverurl): #手动导入封面图片测试用
print(await add_picture(output_dir, get_full_picture_url(coverurl), width = 6.41, height = 4))
elif check_pic_url(cover_url):
print(await add_picture(output_dir, get_full_picture_url(cover_url), width = 6.41, height = 4))
else:
print(await add_picture(output_dir, cover_dirs[1]))
print(add_documents(output_dir, cover_dirs[2]))
print("封面创建成功")
#YYYY年MM月DD日 HH:MM:SS
#更改文档信息
print(await search_and_replace(output_dir, TITLE_OF_REPORT, jizu_bianhao))
print(await search_and_replace(output_dir, baogao_name1, baogao_name))
print(await search_and_replace(output_dir, baogao_name2, baogao_name))
print(await search_and_replace(output_dir, company_name_yi, Yi_company))
print(await search_and_replace(output_dir, cover_project, fengchang_name))
print(await search_and_replace(output_dir, cover_encode, jizu_bianhao))
print(await search_and_replace(output_dir, cover_date, date_year_month))
print(await search_and_replace(output_dir, 'bianzhi', baogao_bianzhi))
print(await search_and_replace(output_dir, 'shenghe', baogao_shenghe))
print(await search_and_replace(output_dir, 'mianzhe_shengming', mianzhe_shengming))
total_table_num = 0
if baogao_info["if_docx_project_overview"]:
#项目概况表
print("开始添加项目概况表")
XIANG_MU_GAI_KUANG = get_resource_path("muban/xiangmugaikuo.docx")
print(f"查找模板,找到模板:{XIANG_MU_GAI_KUANG}")
project_location = fengchang_location
company_name_jia = Jia_company
fuzeren = yi_fuzeren
phone_fuzeren = yi_phone
xiangmuguige = jizu_num
Yi_company = Yi_company
XIANGMU_GAIKUO = list(list("" for i in range(6)) for j in range(5))
XIANGMU_GAIKUO[0][1] = fengchang_name
XIANGMU_GAIKUO[0][4] = project_location
XIANGMU_GAIKUO[1][1] = company_name_jia
XIANGMU_GAIKUO[1][4] = Yi_company
XIANGMU_GAIKUO[2][1] = jia_fuzeren
XIANGMU_GAIKUO[2][4] = fuzeren
XIANGMU_GAIKUO[3][2] = jia_phone
XIANGMU_GAIKUO[3][5] = phone_fuzeren
XIANGMU_GAIKUO[4][1] = jizu_xinghao
XIANGMU_GAIKUO[4][3] = xiangmuguige
XIANGMU_GAIKUO[4][5] = gongqi
print("建立表结构完毕,开始插入模板")
#添加项目概况表
print(f"输出路径:{output_dir},模板路径:{XIANG_MU_GAI_KUANG},插入数据:{XIANGMU_GAIKUO}")
output_doc, message = await add_table_to_document(output_dir, XIANG_MU_GAI_KUANG,5,5,total_table_num,XIANGMU_GAIKUO)
print(message)
print("模板插入完毕,开始替换内容")
total_table_num += 1
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
if baogao_info['if_docx_inspection_method']:
#检查方案描述
FANGAN_JIANCHA_DIR = get_resource_path("muban/checkmethod.docx")
list_to_replace = {
'renyuan_peizhi' : "\n".join(renyuan_peizhi),
'shebei_peizhi' : "\n".join(shebei_peizhi),
'shigong_fangan' : "\n".join(shigong_fangan),
'gongzuo_neirong' : "\n".join(gongzuo_neirong),
'beizhu' : beizhu,
'num' : num_to_chinese[head_num],
}
print(await add_table_and_replace(output_dir, FANGAN_JIANCHA_DIR, 0, list_to_replace))
print(split_table_by_row_content(output_dir, output_dir, total_table_num))
total_table_num += 1
head_num += 1
if baogao_info['if_docx_inspection_info']:
#检查信息
JIANCHA_XINGXI_DIR = get_resource_path("muban/checkinfo.docx")
JIANCHA_XINGXI = list(list("" for i in range(4)) for j in range(9))
JIANCHA_XINGXI[0][1] = jiancha_renyuan
try:
JIANCHA_XINGXI[1][1] = Jiancha_date.split('T')[0]
except:
JIANCHA_XINGXI[1][1] = "格式不对或无数据"
JIANCHA_XINGXI[1][3] = jizu_bianhao
JIANCHA_XINGXI[2][1] = "风力发电机组" + baogao_name
JIANCHA_XINGXI[2][3] = "".join(jiancha)
JIANCHA_XINGXI[3][2] = partManufacturer
JIANCHA_XINGXI[4][1] = '叶片型号:' + jizu_xinghao
JIANCHA_XINGXI[5][1] = Y_Code[0] if len(Y_Code) > 0 else ""
JIANCHA_XINGXI[6][1] = Y_Code[1] if len(Y_Code) > 1 else ""
JIANCHA_XINGXI[7][1] = Y_Code[2] if len(Y_Code) > 2 else ""
JIANCHA_XINGXI[8][0] = "本次" + "".join(_ for _ in jiancha) + f"检查,采集叶片图片{total_picture_num}张,内容覆盖" + ";".join(_ for _ in neirong)
#新建检查信息表
output_doc, message = await add_table_to_document(output_dir, JIANCHA_XINGXI_DIR,9,4,total_table_num ,JIANCHA_XINGXI,False)
print(message)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
total_table_num += 1
if baogao_info['if_docx_chengguo_sub']:
# 添加成果递交表
CHENGGUO_DIJIAO_DIR = get_resource_path("muban/chengguo_sub.docx")
CHENGGUO_DIJIAO = list(list("" for i in range(4)) for j in range(6))
CHENGGUO_DIJIAO[0][1] = jiancha_renyuan
CHENGGUO_DIJIAO[1][1] = jia_fuzeren
try:
CHENGGUO_DIJIAO[2][1] = Jiancha_date.split('T')[0]
except:
CHENGGUO_DIJIAO[2][1] = "格式不对或无数据"
CHENGGUO_DIJIAO[3][1] = data_processor
CHENGGUO_DIJIAO[4][1] = baogao_bianzhi
CHENGGUO_DIJIAO[5][1] = baogao_shenghe
try:
CHENGGUO_DIJIAO[2][3] = Jiancha_date.split('T')[1]
except:
CHENGGUO_DIJIAO[2][3] = "格式不对或无数据"
CHENGGUO_DIJIAO[3][3] = baogao_date.split(' ')[0]
CHENGGUO_DIJIAO[4][3] = baogao_date.split(' ')[0]
CHENGGUO_DIJIAO[5][3] = "未审核"
output_doc, message = await add_table_to_document(output_dir, CHENGGUO_DIJIAO_DIR,5,5,total_table_num,CHENGGUO_DIJIAO,True,0.04)
print(message)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
total_table_num += 1
if baogao_info['if_docx_inspection_text'] and if_waibu:
#检查情况汇总表(文字信息)
try:
#获取缺陷信息
"""
需要获取
Y1Y2Y3叶片的缺陷数量缺陷字典{描述图片路径}
和数据库连接需要的新增异常处理
目前逻辑为找到图片后找缺陷类型的图片通过图片id查找对应缺陷记录
如果没有找到要返回对应异常即已选出标注缺陷图但未填写对应缺陷信息的异常返回
最后将无异常的图片入队缺陷字典同数量有异常的图片再另存
"""
Y1_quexian_list = []
Y2_quexian_list = []
Y3_quexian_list = []
Y1_quexian_list = defect_records_with_pic["叶片1"]["DEFECT"] if "叶片1" in defect_records_with_pic else {}
Y2_quexian_list = defect_records_with_pic["叶片2"]["DEFECT"] if "叶片2" in defect_records_with_pic else {}
Y3_quexian_list = defect_records_with_pic["叶片3"]["DEFECT"] if "叶片3" in defect_records_with_pic else {}
Y1_quexian_num = len(Y1_quexian_list)
Y2_quexian_num = len(Y2_quexian_list)
Y3_quexian_num = len(Y3_quexian_list)
no_defect_found = "未发现明显缺陷"
weak_num_Y1 = f"{Y_Code[0] if len(Y_Code) > 0 else ''}叶片" + f"共发现缺陷{Y1_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
weak_num_Y2 = f"{Y_Code[1] if len(Y_Code) > 1 else ''}叶片" + f"共发现缺陷{Y2_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
weak_num_Y3 = f"{Y_Code[2] if len(Y_Code) > 2 else ''}叶片" + f"共发现缺陷{Y3_quexian_num}" if Y1_quexian_num > 0 else no_defect_found
except Exception as e:
print(f"缺陷图获取失败:{e}")
return
#添加检查情况汇总表
JIANCHA_HUIZONG_DIR = get_resource_path("muban/total_check.docx")
JIANCHA_HUIZONG = list(list("" for i in range(3)) for j in range(4))
JIANCHA_HUIZONG[1][0] = weak_num_Y1
JIANCHA_HUIZONG[2][0] = weak_num_Y2
JIANCHA_HUIZONG[3][0] = weak_num_Y3
JIANCHA_HUIZONG[1][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[2][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[3][1] = "\n".join(neirong2)
JIANCHA_HUIZONG[1][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y1_quexian_list))]) if Y1_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
JIANCHA_HUIZONG[2][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y2_quexian_list))]) if Y2_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
JIANCHA_HUIZONG[3][2] = "\n".join([f"{i+1}.{s}" for i, s in enumerate(get_defect_str(Y3_quexian_list))]) if Y3_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
output_doc, message = await add_table_to_document(output_dir, JIANCHA_HUIZONG_DIR,4,3,total_table_num,JIANCHA_HUIZONG,False,ALIGMENT='LEFT')
print(message)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
total_table_num += 1
head_num += 1
if baogao_info['if_docx_inspection_picture']:
#主要部位图片展示表/检查内容表
#获取典型图信息
try:
Y1_list = safe_get(filtered_picture_data, "叶片1", dianxing_type, default=[])
Y2_list = safe_get(filtered_picture_data, "叶片2", dianxing_type, default=[])
Y3_list = safe_get(filtered_picture_data, "叶片3", dianxing_type, default=[])
picture_Y1_num = len(Y1_list)
picture_Y2_num = len(Y2_list)
picture_Y3_num = len(Y3_list)
except Exception as e:
print(f"获取缺陷图失败:{e}")
print(f"图片、文字数量:{picture_Y1_num} {picture_Y2_num} {picture_Y3_num}")
JIANCHA_NEIRONG_TOTAL_NUM = picture_Y1_num+ picture_Y2_num + picture_Y3_num
if JIANCHA_NEIRONG_TOTAL_NUM <= 0:
print("无典型图片数据,无法生成典型图表")
else:
col ,row = 3, 0
JIANCHA_NEIRONG_PICTURES_TABLE = get_resource_path("muban/check2.docx")
JIANCHA_NEIRONG_Y1_DIR = get_resource_path("muban/check_content.docx")
JIANCHA_NEIRONG_Y1 = list(list("" for _ in range(3)) for j in range(1))
Y1_code = Y_Code[0] if len(Y_Code) > 0 else ""
Y2_code = Y_Code[1] if len(Y_Code) > 1 else ""
Y3_code = Y_Code[2] if len(Y_Code) > 2 else ""
JIANCHA_NEIRONG_Y1[0][0] = f"叶片1{Y1_code}检查内容"
print(f"Y1标题内容{JIANCHA_NEIRONG_Y1}")
JIANCHA_NEIRONG_Y2_DIR = get_resource_path("muban/check3.docx")
JIANCHA_NEIRONG_Y2 = list(list("" for _ in range(3)) for j in range(1))
JIANCHA_NEIRONG_Y2[0][0] = f"叶片2{Y2_code}检查内容"
print(f"Y2标题内容{JIANCHA_NEIRONG_Y2}")
JIANCHA_NEIRONG_Y3_DIR = get_resource_path("muban/check3.docx")
JIANCHA_NEIRONG_Y3 = list(list("" for _ in range(3)) for j in range(1))
JIANCHA_NEIRONG_Y3[0][0] = f"叶片3{Y3_code}检查内容"
print(f"Y3标题内容{JIANCHA_NEIRONG_Y3}")
print(f"当前表格序号为 {total_table_num}")
print(key_words)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y1_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y1,True, 1)
print(message)
total_table_num += 1
total_table_num = await process_server_images_table(Y1_list, image_source_to_find, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y2_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y2,True, 1)
print(message)
total_table_num += 1
total_table_num = await process_server_images_table(Y2_list, image_source_to_find, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y3_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y3,True, 1)
print(message)
total_table_num += 1
total_table_num = await process_server_images_table(Y3_list, image_source_to_find, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
if baogao_info['if_docx_defect_picture'] and if_waibu:
# #缺陷详情
QUEXIAN_XIANGQING_DIR = get_resource_path("muban/check_check.docx")
QUEXIAN_XIANGQING_TITLE_DIR = get_resource_path("muban/check_check_title.docx")
Y_tables = [Y1_quexian_list,Y2_quexian_list,Y3_quexian_list]
Y1_table_list = []
Y2_table_list = []
Y3_table_list = []
table_lists = [Y1_table_list, Y2_table_list, Y3_table_list]
for i, (table_list, Y_lists) in enumerate(zip(table_lists, Y_tables)):
for Y_list in Y_lists:
image_defect_record = Y_list["record"]
image_path = Y_list["imagePath"]
image_defect_record = get_defect_detail(image_defect_record["defectId"])
# 从imagedefectrecord中获取各个字段
defect_type = image_defect_record.get('defectTypeLabel', '未知缺陷类型')
defect_location = f"{image_defect_record.get('partName', '')}{image_defect_record.get('defectPosition', '')}"
# 使用axial和chordwise作为缺陷尺寸信息
axial = image_defect_record.get('axial', '')
chordwise = image_defect_record.get('chordwise', '')
defect_size = f"轴向:{axial} 弦向:{chordwise}" if axial or chordwise else ''
visibility = "暂无此项"
urgency = "暂无此项"
severity = image_defect_record.get('defectLevelLabel', '未知严重等级')
repair_suggestion = image_defect_record.get('repairIdea', '无维修建议')
print(f"获取第{i + 1}个叶片的缺陷图: {image_path}")
table_list.append({
"QueXianLeiXing": defect_type,
"QueXianWeiZhi": defect_location,
"QueXianChiCun": defect_size,
"WeiZongDengJi": severity,
"Tupian_Dir": get_full_picture_url(image_path) if check_pic_url(image_path) else None,
"visibility": visibility,
"urgency": urgency,
"repair_suggestion": repair_suggestion, # 新增维修建议字段
})
# for i, (table_list, Y_dict) in enumerate(zip(table_lists, Y_tables)):
# for image_defect_record, image_path in Y_dict.items():
# # 从图片名解析各个字段
# parts = image_name.split('_')
# if len(parts) >= 8: # 确保有7个部分
# defect_type = parts[1]
# defect_location = parts[2]
# defect_size = parts[3]
# visibility = parts[4]
# urgency = parts[5]
# severity = parts[6]
# repair_suggestion = parts[7]
# print(f"获取第{i+1}个叶片的缺陷图: {image_path}")
# table_list.append({
# "QueXianLeiXing": defect_type,
# "QueXianWeiZhi": defect_location,
# "QueXianChiCun": defect_size,
# "WeiZongDengJi": severity,
# "Tupian_Dir": image_path,
# "visibility": visibility,
# "urgency": urgency,
# "repair_suggestion": repair_suggestion.split('.')[0], # 新增维修建议字段
# })
# else:
# table_list.append({
# "QueXianLeiXing": "图片命名有误",
# "QueXianWeiZhi": "图片命名有误",
# "QueXianChiCun": "图片命名有误",
# "WeiZongDengJi": "图片命名有误",
# "Tupian_Dir": image_path,
# "visibility": "图片命名有误",
# "urgency": "图片命名有误",
# "repair_suggestion": "图片命名有误", # 新增维修建议字段
# })
Y1_TABLES, Y1_TABLES_PICTURES = fill_tables(table_lists[0],4,5,len(table_lists[0]),Y_Code[0] if len(Y_Code) > 0 else "")
Y2_TABLES, Y2_TABLES_PICTURES = fill_tables(table_lists[1],4,5,len(table_lists[1]),Y_Code[1] if len(Y_Code) > 1 else "")
Y3_TABLES, Y3_TABLES_PICTURES = fill_tables(table_lists[2],4,5,len(table_lists[2]),Y_Code[2] if len(Y_Code) > 2 else "")
print(add_documents(output_dir, QUEXIAN_XIANGQING_TITLE_DIR))
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
table_num = 0
Xu_Hao = 0
total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y1_TABLES,QUEXIAN_XIANGQING_DIR,Y1_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y2_TABLES,QUEXIAN_XIANGQING_DIR,Y2_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y3_TABLES,QUEXIAN_XIANGQING_DIR,Y3_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
if baogao_info['if_docx_conclusion']:
#总结
ZONG_JIE_DIR = get_resource_path("muban/result.docx")
ZONG_JIE_BEFORE = "result"
ZONG_JIE = baogao_info['conclusion']
print(add_documents(output_dir, ZONG_JIE_DIR))
print(await search_and_replace(output_dir, ZONG_JIE_BEFORE, ZONG_JIE))
print(await search_and_replace(output_dir, 'company_yi', Yi_company))
print(await search_and_replace(output_dir, 'baogao_date', baogao_date.split(' ')[0]))
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
from tools.argtool import (load_config, parse_arguments, get_default_config,
merge_configs)
from Dt_report import generate_dt_report
from Jf_report import generate_jf_report
def main():
args = parse_arguments()
@ -586,7 +26,12 @@ def main():
Path(merged['json2']['shengcheng_dir']).mkdir(parents=True, exist_ok=True)
asyncio.run(generate_report(merged['json1'], merged['json2']))
if merged['json2']['choose_template'] == 'DT':
asyncio.run(generate_dt_report(merged['json1'], merged['json2']))
elif merged['json2']['choose_template'] == 'JF':
asyncio.run(generate_jf_report(merged['json1'], merged['json2']))
else:
print('指定了不存在的,请检查配置文件')
print('文档生成完毕')
if __name__ == '__main__':

284
Jf_report.py Normal file
View File

@ -0,0 +1,284 @@
# 文档处理工具
from tools.document_tools import (
create_document, add_documents,add_table_and_replace,
add_table_to_document,add_dynamic_table,
process_server_images_table,add_header,add_landscape_section,
merge_documents,add_table,add_defect_info_table
)
# 内容处理工具
from tools.content_tools import (
add_picture,split_table_by_row_content,
search_and_replace
)
from tools.get_pictures import (
process_picture_data,get_records_with_pic
)
from tools.Get_Json import (
get_project_info,get_jizu_info,
get_jizu_shigong_info,get_defect_detail,
get_part_picture,get_yepian_xiangqing,
check_pic_url,get_full_picture_url,
get_defect_record_list
)
from tools.dataproccess import (
caculate_work_days,get_year_month,
merge_info,get_defect_str,get_defect_json,
safe_get,get_resource_path,get_defedct_info,
)
from tools.json_to_docx import json_to_docx
from core.tables import fill_tables
from tools.defines import *
import os, re, datetime
async def generate_jf_report(base_info, baogao_info):
#获取模板编号、模板名称
num_to_chinese = {1 : '', 2 : '', 3 : '', 4 : '', 5 : '', 6 : '', 7 : '', 8 : '', 9 : '', 10 : '', 11 : '十一', 12 : '十二'}
cover_encode = "encode"
cover_project = "project"
baogao_name1 = "baogaoname1"
baogao_name2 = "baogaoname2"
company_name_yi = "company_name_yi"
cover_date = "time"
TITLE_OF_REPORT = "companyencode"
jiegou_xuhao = 'num'
print(f"获取到参数:基本信息:{base_info}\n\n报告信息:{baogao_info}")
try:
base_info = merge_info(base_info, DEFAULT_BASE_INFO)
turbine_id = base_info['turbine_id']
jizu_data = get_jizu_info(turbine_id)
project_data = get_project_info(jizu_data['projectId'])
shigong_data = get_jizu_shigong_info(turbine_id)
fengchang_name = project_data['farmName']
Yi_company = project_data['inspectionUnit']
yi_fuzeren = project_data['inspectionContact']
yi_phone = project_data['inspectionPhone']
fengchang_location = project_data['farmAddress']
Jia_company = project_data['client']
jia_fuzeren = project_data['clientContact']
jia_phone = project_data['clientPhone']
jizu_num = project_data['scale']
jizu_xinghao = project_data['turbineModel']
jizu_manufacture = project_data.get('turbineManufacturer', "")
project_name = project_data['projectName']
jizu_bianhao = jizu_data["turbineName"]
start_date = project_data['startDate']
end_date = project_data['endDate']
cover_url = project_data['coverUrl']
gongqi = caculate_work_days(start_date, end_date)
except Exception as e:
print(f"数据库的项目-机组基本信息获取失败:{e}")
return
try:
baogao_date = datetime.datetime.now().strftime("%Y年%m月%d%H:%M") #现在的时间
date_year_month = get_year_month(baogao_date)
#前端信息
baogao_info = merge_info(baogao_info, DEFAULT_BAOGAO_INFO)
key_words= re.compile('|'.join(map(re.escape, baogao_info['key_words'].split(',')))) #关键字
shengcheng_dir = baogao_info['shengcheng_dir'] #路径
if shengcheng_dir == "":
print("未配置生成路径,请检查配置")
return
if_waibu = baogao_info["if_waibu"]
if_neibu = baogao_info["if_neibu"]
if_fanglei = baogao_info["if_fanglei"]
quexian_type = baogao_info['quexian_enum']
dianxing_type = baogao_info['dianxing_enum']
other_type = baogao_info["other_enum"]
jiancha_renyuan = baogao_info['jiancha_renyuan'] #检查人员,目前是从命令行参数获取,需要完善
check_date = baogao_info['check_date']
if check_date == None:
check_date = "未获取"
baogao_bianzhi = baogao_info["userName"]
baogao_shenghe = baogao_info["baogaoCheck"]
Jiancha_date = baogao_info["check_date"]
data_processor = baogao_info["data_processor"]
coverurl = baogao_info["coverurl"]
#数据库拉取信息
# Jiancha_date = shigong_data["startTime"].replace("T", " ") #检查日期
# image_count = shigong_data['imageCount'] #从施工方案获取的图片数量,待定!!!
# temperature = shigong_data['temperature'] #温度
# wind_speed = shigong_data['windSpeed'] #风速
# weather = get_weather(shigong_data["weatherCode"]) #天气 不从此接口获取,待定!!!
#拉取部件、图片数据
part_data, picture_data, Yepians = get_part_picture(turbine_id)
print(Yepians)
Y1_info = get_yepian_xiangqing(Yepians[0]["partId"])
Y_Code = [yepian["partCode"] for yepian in Yepians]
partType = Y1_info["partType"]
partManufacturer = Y1_info["partManufacturer"]
print(f"找到叶片号{Y_Code},厂商{partManufacturer}")
image_source_to_find = []
baogao_label = []
renyuan_peizhi = []
gongzuo_neirong = []
shigong_fangan = []
shebei_peizhi = []
beizhu = []
jiancha = []
neirong = []
neirong2 = []
#获取对应枚举字段
if if_waibu:
baogao_label.append("外观")
image_source_to_find.append(baogao_info['waibu_enum'])
if baogao_info["shigong_fangan"] == None:
print("未传入施工方案,使用已有枚举")
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.WAIBU.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.WAIBU.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.WAIBU.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.WAIBU.SHIGONG_FANGAN)
else:
pass #待添加如果从平台传入枚举,但目前没必要,如果这种枚举标准信息库有更好的优化则可以实现
jiancha.append("无人机外观检查")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的前缘、后缘、迎风面、背风面。")
neirong2.append("前缘、后缘、迎风面、背风面。")
if if_neibu:
baogao_label.append("内部")
image_source_to_find.append(baogao_info['neibu_enum'])
if baogao_info["shigong_fangan"] == None:
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.NEIBU.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.NEIBU.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.NEIBU.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.NEIBU.SHIGONG_FANGAN)
else:
pass
jiancha.append("叶片内部检查")
neirong.append(f"".join(Y_Code) + f"{len(Y_Code)}支叶片的内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
neirong2.append("内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
if if_fanglei:
baogao_label.append("防雷")
image_source_to_find.append(baogao_info['fanglei_enum'])
if baogao_info["shigong_fangan"] == None:
renyuan_peizhi.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.RENYUAN_PEIZHI)
gongzuo_neirong.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.GONGZUO_NEIRONG)
shebei_peizhi.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.SHEBEI_PEIZHI)
shigong_fangan.append(SHIGONG_FANGAN_ENUM.FANGLEI.YEPIAN.SHIGONG_FANGAN)
else:
pass
jiancha.append("叶片导通测试")
neirong.append(f"轮毂至塔基导通、内部导线线阻、外部导线线阻...")
neirong2.append("轮毂至塔基导通、内部导线线阻、外部导线线阻...")
#获取缺陷图列表和典型图列表
filtered_picture_data, total_picture_num = process_picture_data(picture_data, image_source_to_find)
#获取所有缺陷记录
defect_records = get_defect_record_list()
#获取缺陷记录中对应图片的记录
defect_records_with_pic, error_pic_with_no_record = get_records_with_pic(defect_records, filtered_picture_data, quexian_type)
if len(error_pic_with_no_record) > 0:
print(f"!!!有部分缺陷图片没有对应的缺陷记录,将不会生成这些图片的缺陷表:{error_pic_with_no_record} ")
print(f"对应缺陷图的缺陷记录列表:{defect_records_with_pic}")
except Exception as e:
print(f"报告基本信息获取失败:{e}")
return
#检查参数合法性
if not if_fanglei or not if_neibu or not if_waibu:
print("请至少选择一种检查项目")
return
if not os.path.exists(shengcheng_dir):
print(f"生成路径{shengcheng_dir}不存在")
return
baogao_name = "叶片" + "".join(baogao_label) + "检查报告"
output_dir = os.path.normpath(f"{shengcheng_dir}/{project_name}项目{baogao_name}{jizu_bianhao}{baogao_date.split(' ')[0]}版.docx")
version = 1
while os.path.exists(output_dir):
if version != 1:
output_dir = output_dir.replace(f"{version - 1}",f"{version}")
else:
output_dir = output_dir.replace("",f"{version}")
version += 1
add_list = [
get_resource_path(MUBAN_DIR + '/jf_fengmian.png'),
get_resource_path(MUBAN_DIR + '/jf_fengmian1.png'),
get_resource_path(MUBAN_DIR + '/jinfeng_fengmian_name.docx'),
get_resource_path(MUBAN_DIR + '/jf_fengmian2.png'),
get_resource_path(MUBAN_DIR + '/jinfeng_fengmian_renyuan.docx'),
get_resource_path(MUBAN_DIR + '/jf_fengmian2.png'),
get_resource_path(MUBAN_DIR + '/jinfeng_fengmian_riqi.docx'),
get_resource_path(MUBAN_DIR + '/jf_fengmian1.png'),
get_resource_path(MUBAN_DIR + '/jinfeng_fengmian_luokuan.docx'),
get_resource_path(MUBAN_DIR + '/jf_fengmian3.png'),
]
print(await create_document(output_dir, section_args={
"page_height" : JF_HEIGHT,
"page_width" : JF_WIDTH,
"left_margin" : JF_L_MARGIN,
"right_margin" : JF_R_MARGIN,
"top_margin" : JF_T_MARGIN,
"bottom_margin" : JF_B_MARGIN,
}))
merge_documents(output_dir, add_list)
add_header(output_dir, TEMPLATE_HEADER.JINFENG_HEADER.ENUM)
print(add_documents(output_dir, get_resource_path(MUBAN_DIR + '/jf_table_title.docx')))
total_table_num = 0
print(add_table(output_dir, get_resource_path(MUBAN_DIR + '/jinfeng_table.docx')))
total_picture_num += 1
list_to_replace = {
'jia_company_name' : Jia_company,
'fengchang_name' : fengchang_name,
'jizu_hao' : jizu_bianhao,
'bianzhi_renyuan': baogao_bianzhi,
'bianzhi_riqi': baogao_date,
'fengchang_name' : fengchang_name,
'jizu_hao' : jizu_bianhao,
'fengchang_location' : fengchang_location,
'kehu_company' : Yi_company,
'xiangmuguige' : jizu_num,
'kehu_fuzeren' : yi_fuzeren,
'yezhu_phone' : yi_phone,
'shigong_company' : JINFENG_COMPANY,
'shigong_date' : "未填写",#shigong_date,
'shigong_fuzeren' : "未填写",
'shigong_phone' : "未填写",
'zhengji_changjia' : jizu_manufacture,
'yepian_changjia' : partManufacturer,
'jizu_type' : jizu_xinghao,
'yepian_type' : partType,
'jiancha_fangshi' : ''.join(jiancha),
'jiancha_renyuan' : jiancha_renyuan
}
for find_text, replace_text in list_to_replace.items():
print(search_and_replace(output_dir, find_text, replace_text))
print(f"静态内容生成完毕,开始生成动态内容")
defect_info = get_defedct_info(defect_records_with_pic)
total_picture_num = add_defect_info_table(output_dir, defect_info, get_resource_path(MUBAN_DIR + '/quexian_liebiao.docx'), total_table_num)
# defect_json = get_defect_json(defect_info, Y_Code, jizu_bianhao)
# print(f"{defect_json} {type(defect_json)}")
# doc = json_to_docx(defect_json)
# doc.save(output_dir)

Binary file not shown.

Binary file not shown.

View File

@ -7,7 +7,8 @@ a = Analysis(
pathex=[],
binaries=[],
datas=[('./muban/*.docx', 'muban'),
('./muban/*.jpg', 'muban')],
('./muban/*.jpg', 'muban'),
('./muban/*.png', 'muban')],
hiddenimports=[],
hookspath=[],
hooksconfig={},

View File

@ -103,8 +103,6 @@ def apply_table_style(table, has_header_row=False, border_style=None, shading=No
except Exception:
return False
def copy_table(source_table, target_doc, ifadjustheight=True, height = 1):
"""
Copy a table from one document to another.
@ -118,7 +116,6 @@ def copy_table(source_table, target_doc, ifadjustheight=True, height = 1):
"""
# Create a new table with the same dimensions
new_table = target_doc.add_table(rows=len(source_table.rows), cols=len(source_table.columns))
# Try to apply the same style
try:
if source_table.style:
@ -130,8 +127,9 @@ def copy_table(source_table, target_doc, ifadjustheight=True, height = 1):
except:
pass
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.enum.table import WD_ALIGN_VERTICAL, WD_ROW_HEIGHT_RULE
from docx.shared import Pt, Inches, Cm, RGBColor
# Copy cell contents
for i, row in enumerate(source_table.rows):
for j, cell in enumerate(row.cells):
@ -144,18 +142,21 @@ def copy_table(source_table, target_doc, ifadjustheight=True, height = 1):
new_table.cell(i,j).paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') #设置中文字体
new_table.cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_table.cell(i,j).vertical_alignment = WD_ALIGN_VERTICAL.CENTER
#new_table.cell(i,j).width = cell.width
"""
待添加如何让表格自适应大小autofit目前不知为何没有作用
"""
if ifadjustheight:
new_table.rows[i].height = Cm(height)
#new_table.rows[i].height = row.height
#new_table.rows[i].height_rule = WD_ROW_HEIGHT_RULE.EXACTLY
if not ifadjustheight:
new_table.auto_fit = True
try:
new_table = merge_tables(new_table)
except Exception as e:
print(f"合并表格失败:{e}")
from docx.shared import Inches
return new_table
return target_doc
from collections import deque

BIN
muban/dt_header.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.1 KiB

Binary file not shown.

Binary file not shown.

BIN
muban/jf_fengmian.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

BIN
muban/jf_fengmian1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 197 B

BIN
muban/jf_fengmian2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 138 B

BIN
muban/jf_fengmian3.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

BIN
muban/jf_header.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.6 KiB

BIN
muban/jf_hengxiang.doc Normal file

Binary file not shown.

BIN
muban/jf_table_title.docx Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
muban/jinfeng_table.docx Normal file

Binary file not shown.

BIN
muban/quexian_liebiao.docx Normal file

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

Binary file not shown.

20
tools/Spiretool.py Normal file
View File

@ -0,0 +1,20 @@
from spire.doc import *
from spire.doc.common import *
# 加载第一个文档
doc1 = Document()
doc1.LoadFromFile("./muban/jingfeng_1.docx")
# 加载第二个文档
doc2 = Document()
doc2.LoadFromFile("./muban/jf_hengxiang.doc")
# 遍历 doc2 的所有节(使用 GetSections()
for i in range(doc2.Sections.Count):
section = doc2.Sections.get_Item(i) # 使用 GetItem 获取 Section
doc1.Sections.Add(section.Clone()) # 克隆并添加到 doc1
# 保存合并后的文档
doc1.SaveToFile("./output/MergedDocument.docx", FileFormat.Docx2019)
doc1.Close()
doc2.Close()

Binary file not shown.

Binary file not shown.

View File

@ -41,6 +41,8 @@ def parse_arguments():
help='封面图片 URL')
parser.add_argument('--conclusion', dest='conclusion',
help='报告总结')
parser.add_argument('--choose_template', '--ct', dest='choose_template',
help='选择模板,默认为迪特模板,目前支持枚举参数DTJF')
return parser.parse_args()
def merge_configs(default_cfg: Dict[str, Any],
@ -60,7 +62,8 @@ def merge_configs(default_cfg: Dict[str, Any],
merged['json2']['shengcheng_dir'] = cli.output_dir
for key in ('if_waibu', 'if_neibu', 'if_fanglei', 'userName',
'baogaoCheck', 'key_words', 'data_processor',
'jiancha_renyuan', 'check_date', 'coverurl', 'conclusion'):
'jiancha_renyuan', 'check_date', 'coverurl', 'conclusion'
'choose_template'):
val = getattr(cli, key, None)
if val is not None:
merged['json2'][key] = val
@ -101,6 +104,6 @@ def get_default_config() -> Dict[str, Dict[str, Any]]:
"if_docx_inspection_picture": True,
"if_docx_defect_picture": True,
"if_docx_conclusion": True,
"choose_template" : "JF"
}
}

View File

@ -396,8 +396,10 @@ async def add_picture_to_table(
import requests
from io import BytesIO
from PIL import Image
from docx.enum.text import WD_ALIGN_PARAGRAPH
async def add_picture(filename: str, image_path: str, width: Optional[float] = None, height: Optional[float] = None) -> str:
def add_picture(filename: str, image_path: str, width: Optional[float] = None, height: Optional[float] = None, is_center: Optional[bool] = False) -> str:
"""添加一个图片到文档中(支持本地路径或 URL
Args:
@ -416,7 +418,8 @@ async def add_picture(filename: str, image_path: str, width: Optional[float] = N
try:
doc = Document(abs_filename)
para = doc.add_paragraph()
run = para.add_run()
# 处理 URL 图片
if is_url:
try:
@ -430,10 +433,11 @@ async def add_picture(filename: str, image_path: str, width: Optional[float] = N
# 添加到文档
if width:
doc.add_picture(image_bytes, width=Inches(width), height=Inches(height))
run.add_picture(image_bytes, width=Inches(width), height=Inches(height))
else:
doc.add_picture(image_bytes)
run.add_picture(image_bytes)
if is_center:
para.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.save(abs_filename)
return f"Picture from URL {image_path} added to {filename}"
except Exception as url_error:
@ -456,9 +460,11 @@ async def add_picture(filename: str, image_path: str, width: Optional[float] = N
# 添加到文档
try:
if width:
doc.add_picture(abs_image_path, width=Inches(width), height=Inches(height))
run.add_picture(abs_image_path, width=Inches(width), height=Inches(height))
else:
doc.add_picture(abs_image_path)
run.add_picture(abs_image_path)
if is_center:
para.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.save(abs_filename)
return f"Picture {image_path} added to {filename}"
except Exception as inner_error:
@ -619,7 +625,7 @@ async def delete_paragraph(filename: str, paragraph_index: int) -> str:
return f"Failed to delete paragraph: {str(e)}"
async def search_and_replace(filename: str, find_text: str, replace_text: str) -> str:
def search_and_replace(filename: str, find_text: str, replace_text: str) -> str:
"""替换所有find_text为replace_text
Args:

View File

@ -130,13 +130,226 @@ def safe_get(data, *keys, default=None):
else:
return default
def is_frozen():
return hasattr(sys, 'frozen') or hasattr(sys, '_MEIPASS')
def get_resource_path(relative_path):
""" 获取打包后资源的绝对路径 """
try:
# PyInstaller创建的临时文件夹
""" 获取资源的路径 """
if is_frozen():
base_path = sys._MEIPASS
except AttributeError:
# 正常开发环境
else:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def get_defedct_info(defect_dict):
"""获取按缺陷类型分类的缺陷信息
Args:
defect_dict (dict):
{'叶片1':
{'DEFECT':[{'record':
{
'defectId': '02d892a178a82561bb565559102c7a58',
'imageId': '41543f531be24522b7bec741b9a483a2',
'defectName': '手动添加的缺陷1',
'defectCode': None,
'partName': '叶片1',
'defectTypeLabel': '表面裂纹',
'defectType': 'bmlw',
'defectLevelLabel': '轻微缺陷',
'defectLevel': 'SLIGHT',
'defectPosition': '',
'description': '手动添加的缺陷,请填写详细描述',
'repairIdea': '建议进行进一步检查',
'labelInfo': None,
'markInfo': {'label': None, 'clsId': None, 'bbox': None, 'confidence': 0.0},
'imagePath': '/static/image/temp/out-work/12bc30fb209f3af3bf530541c5b062bd/F02_机组10060_叶片叶片PS面距叶根15.5米处轴向裂纹轴向260mm弦向20mm。DJI_20250526090227_0052_Z.png'
},
...
},]
}
},
...
Returns:
result (dict):
{
'叶片1': { #按叶片分记录
'表面裂纹': { 'SLIGHT': #按类型分等级,等级从'defectLevel'获取
[ #按等级分记录
{
record:{...}
}, #一张图片一个记录
...
]
...
}
}
"""
result = {}
for part_name, part_info in defect_dict.items():
if part_name not in result:
result[part_name] = {}
for defect_type, defect_list in part_info.items():
for defect_dict in defect_list:
record = defect_dict['record']
type_label = record['defectTypeLabel']
level = record['defectLevel']
if type_label not in result[part_name]:
result[part_name][type_label] = {}
if level not in result[part_name][type_label]:
result[part_name][type_label][level] = []
result[part_name][type_label][level].append(defect_dict)
return result
def get_defect_json(defect_info: dict, Y_code: list[str], turbine_code: str) -> dict:
"""将缺陷信息转换为表格形式的JSON格式
Args:
defect_info: 结构为 {叶片1/2/3: {缺陷类型: [{'record': {...}}, ...]}}
Y_code: 叶片编号列表 ['1001', '1002', '1003']
turbine_code: 机组编号 'G01'
Returns:
{
"type": "table",
"content": {
"rows": 总行数,
"cols": 4,
"merged_cells": [合并信息],
"cells": [表格内容]
}
}
"""
# 初始化表格
cells = []
merged_cells = []
# 默认设置
default_border = {
"top": {"style": "single", "size": "4", "color": "000000"},
"left": {"style": "single", "size": "4", "color": "000000"},
"bottom": {"style": "single", "size": "4", "color": "000000"},
"right": {"style": "single", "size": "4", "color": "000000"}
}
# 1. 计算总行数(先计算再创建表头)
total_defect_rows = sum(len(defects) for blade_defects in defect_info.values()
for defects in blade_defects.values())
total_rows = 1 + total_defect_rows # 表头 + 数据行
# 2. 创建表头行第0行
header_row = [
create_cell(0, 0, "机组号", default_border),
create_cell(0, 1, "叶片号", default_border),
create_cell(0, 2, "损坏描述", default_border),
create_cell(0, 3, "缺陷面积", default_border)
]
cells.append(header_row)
# 3. 填充数据行从第1行开始
current_data_row = 1
# 合并机组号(如果有多行数据)
if total_defect_rows > 1:
merged_cells.append({
"start_row": 1, # 第2行(0-based中的1)
"start_col": 0,
"end_row": total_rows - 1, # 最后一行(0-based)
"end_col": 0
})
# 填充机组号(仅第一行数据行)
if total_defect_rows > 0:
first_data_row = [
create_cell(1, 0, turbine_code, default_border,
is_merged=total_defect_rows>1, is_primary=True),
None, None, None
]
cells.append(first_data_row)
# 按叶片顺序填充数据
for blade_idx, (blade_key, blade_defects) in enumerate(defect_info.items(), 1):
blade_code = Y_code[blade_idx - 1]
blade_defect_count = sum(len(defects) for defects in blade_defects.values())
if blade_defect_count > 1:
merged_cells.append({
"start_row": current_data_row, # 当前行(0-based)
"start_col": 1,
"end_row": current_data_row + blade_defect_count - 1, # 结束行(0-based)
"end_col": 1
})
# 填充缺陷数据
for defect_type, records in blade_defects.items():
for record in records:
# 确保不超过总行数
if current_row >= total_rows:
break
description = record.get('record', {}).get('损坏描述', '待填写')
area = record.get('record', {}).get('缺陷面积', '待填写')
is_primary = (not merged_cells or
current_row == merged_cells[-1]["start_row"])
row_data = [
None, # 机组号(已合并)
create_cell(current_row, 1, blade_code, default_border,
is_merged=blade_defect_count>1, is_primary=is_primary),
create_cell(current_row, 2, str(description), default_border),
create_cell(current_row, 3, str(area), default_border)
]
cells.append(row_data)
current_row += 1
# 填充空单元格
for row_idx, row in enumerate(cells):
for col_idx in range(4):
if row[col_idx] is None:
row[col_idx] = create_cell(row_idx, col_idx, "", default_border)
# 最终行数检查(调试用)
print(f"调试信息: 预期行数={total_rows}, 实际行数={len(cells)}")
print(f"缺陷信息统计: 叶片数={len(defect_info)}, 总缺陷数={total_defect_rows}")
return [{
"type": "table",
"content": {
"rows": len(cells), # 使用实际行数而非计算行数
"cols": 4,
"merged_cells": merged_cells,
"cells": cells
}
}]
def create_cell(row, col, text, border, is_merged=False, is_primary=False):
"""创建标准化的单元格结构"""
return {
"row": row,
"col": col,
"is_merged": is_merged,
"alignment": "center",
"border": border,
"merge_info": {"is_primary": is_primary},
"content": [{
"alignment": "center",
"runs": [{
"text": text,
"font": {
"name": "宋体",
"size": 12,
"bold": False,
"italic": False,
"underline": False,
"color": {"r": 0, "g": 0, "b": 0}
}
}]
}]
}

View File

@ -1,21 +1,7 @@
"""
缺陷图目录格式
缺陷图期望格式 _隔开
外部内部命名格式都如下
图片名xuhao_缺陷类型_缺陷位置_缺陷尺寸_可见程度_紧急程度_危重等级_维修建议
涂层损伤_叶片ps面距叶根3m处_缺陷尺寸弦向100mm轴向800mm_轻微_紧急_重要_建议打磨维修
每个的选项见我发的图
防雷
轮毂至塔基导通阻值_169mΩ
缺陷例轮毂至塔基未导通 #即标明未导通即可
"""
DEFAULT_BASE_INFO = { #项目基本信息
"turbine_id" : None,
}
DEFAULT_BAOGAO_INFO = {
#目录
'shengcheng_dir': "", #报告生成的路径
@ -38,11 +24,6 @@ DEFAULT_BAOGAO_INFO = {
"check_date" : None,
}
class JIANCHA_ENUM :
class WAIBU:
PART = "无人机外部高精度飞行"
#NEIRONG =
class SHIGONG_FANGAN_ENUM :
class WAIBU:
GONGZUO_NEIRONG = "无人机叶片外观巡检"
@ -72,87 +53,55 @@ class SHIGONG_FANGAN_ENUM :
SHIGONG_FANGAN = ""
FEISHOURENYUAN_PEIZHI = "1人主检飞手1人"
LUNGUZUOYERENYUAN_PEIZHI = "2人轮毂作业检查2人"
oneproject = {
"status": 200,
"data": {
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"coverUrl": "",
"farmName": "三峡能源阿城万兴风电场",
"farmAddress": "哈尔滨市阿城区",
"client": "辽宁信达检测有限公司",
"clientContact": "李经理",
"clientPhone": "13504783720",
"inspectionUnit": "武汉市迪特影像科技有限公司",
"inspectionContact": "吴名州",
"inspectionPhone": "18807109269",
"scale": "",
"turbineModel": "",
"constructorIds": "5709ccfece2685090ff700a3469f2539,a76d78f1325deda1790a12bdad4aad4e",
"auditorId": "ca37c4337df8673a5c045b6c25acf74a",
"qualityOfficerId": "862e027910c2562d2b67d88ec33d77ba",
"projectManagerId": "fbaa9e0aecf2ce287138c38a4b654085",
"constructionTeamLeaderId": "",
"status": 0,
"startDate": "",
"endDate": "",
"constructorName": "",
"auditorName": "李四",
"qualityOfficerName": "辛奇",
"projectManagerName": "张三",
"constructionTeamLeaderName": "",
"statusLabel": "待施工"
from docx.oxml.ns import qn
from docx.shared import Pt
class TEMPLATE_HEADER:
class JINFENG_HEADER:
ENUM = 'JF'
PIC_DIR = './muban/jf_header.png'
PARA =' Q/GW 3LC-FJFW21-2022-BY14'
QN = qn('w:eastAsia')
FONT = 'Arial'
PT = Pt(9)
class DT_HEADER:
ENUM = 'DT'
PIC_DIR = './muban/dt_header.png'
PARA = ''
QN = qn('w:eastAsia')
FONT = '宋体(中文正文)'
PT = Pt(9)
MUBAN_DIR = 'muban'
JF_HEIGHT = 297
JF_WIDTH = 210
JF_L_MARGIN = 20
JF_R_MARGIN = 20
JF_T_MARGIN = 10
JF_B_MARGIN = 10
JINFENG_COMPANY = "金风科技股份有限公司"
#json表格样式宏定义
STYLE_CONFIG = {
"alignment": "center",
"font": {
"name": "宋体",
"size": 9,
"bold": False,
},
"msg": "",
"code": 200,
"success": True
}
onejizu = {
"status": 200,
"data": [
{
"turbineId": "183463dbf40d9278549a76b82b175dd9",
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"turbineName": "一期012号",
"turbineCode": "00000",
"turbineDesc": "一期012号全新设备",
"turbineManufacturer": "",
"turbineModel": "",
"turbineCoverUrl": ""
}
],
"msg": "",
"code": 200,
"success": True
}
yepian = {
"status": 200,
"data": [
{
"partId": "12bc30fb209f3af3bf530541c5b062bc",
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"turbineId": "183463dbf40d9278549a76b82b175dd9",
"turbineName": "一期012号",
"partName": "叶片2",
"partCode": "0001",
"partType": "VANE-2",
"partTypeLabel": "叶片2"
"border": {
"top": {"style": "single", "size": "4", "color": "000000"},
"bottom": {"style": "single", "size": "4", "color": "000000"},
"left": {"style": "single", "size": "4", "color": "000000"},
"right": {"style": "single", "size": "4", "color": "000000"}
},
{
"partId": "12bc30fb209f3af3bf530541c5b062bd",
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"turbineId": "183463dbf40d9278549a76b82b175dd9",
"turbineName": "一期012号",
"partName": "叶片1",
"partCode": "0000",
"partType": "VANE-1",
"partTypeLabel": "叶片1"
"shading": {
"color": "FFFFFF"
}
],
"msg": "",
"code": 200,
"success": True
}

View File

@ -1,729 +0,0 @@
"""
Document creation and manipulation tools for Word Document Server.
"""
import os
import json, re
from typing import Dict, List, Optional, Any
from docx import Document
from utils.file_utils import check_file_writeable, ensure_docx_extension, create_document_copy
from utils.document_utils import get_document_properties, extract_document_text, get_document_structure
from core.styles import ensure_heading_style, ensure_table_style
from docx.oxml.shared import qn
from docx.oxml import OxmlElement
from tools.content_tools import search_and_replace,add_picture_to_table
from tools.Get_Json import get_full_picture_url
from tools.get_pictures import resize_and_reduce_quality
async def create_document(filename: str, title: Optional[str] = None, author: Optional[str] = None) -> str:
"""创建一个包含可选元数据的新Word文档。
参数:
filename: 要创建的文档名称带或不带.docx扩展名
title: 可选标题
author: 可选作者
"""
filename = ensure_docx_extension(filename)
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot create document: {error_message}"
try:
doc = Document()
# Set properties if provided
if title:
doc.core_properties.title = title
if author:
doc.core_properties.author = author
# Ensure necessary styles exist
ensure_heading_style(doc)
ensure_table_style(doc)
# 更改纸张大小为A4
from docx.shared import Mm, Inches
sections = doc.sections
for section in sections:
section.page_height = Mm(297)
section.page_width = Mm(210)
section.left_margin = Inches(0.94)
section.right_margin = Inches(0.94)
# Save the document
doc.save(filename)
return f"Document {filename} created successfully"
except Exception as e:
return f"Failed to create document: {str(e)}"
async def get_document_info(filename: str) -> str:
"""获得文档信息
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
try:
properties = get_document_properties(filename)
return json.dumps(properties, indent=2)
except Exception as e:
return f"Failed to get document info: {str(e)}"
async def get_document_text(filename: str) -> str:
"""获得文档的所有文本
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
return extract_document_text(filename)
async def get_document_outline(filename: str) -> str:
"""获得文档的所有结构信息
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
structure = get_document_structure(filename)
return json.dumps(structure, indent=2)
async def list_available_documents(directory: str = ".") -> str:
"""列出目录下所有Word文档
Args:
directory: 目录
"""
try:
if not os.path.exists(directory):
return f"Directory {directory} does not exist"
docx_files = [f for f in os.listdir(directory) if f.endswith('.docx')]
if not docx_files:
return f"No Word documents found in {directory}"
result = f"Found {len(docx_files)} Word documents in {directory}:\n"
for file in docx_files:
file_path = os.path.join(directory, file)
size = os.path.getsize(file_path) / 1024 # KB
result += f"- {file} ({size:.2f} KB)\n"
return result
except Exception as e:
return f"Failed to list documents: {str(e)}"
async def copy_document(source_filename: str, destination_filename: Optional[str] = None) -> str:
"""创建文档的副本
Args:
source_filename: 源文档路径
destination_filename: 目标文档路径为空则为当前目录
"""
source_filename = ensure_docx_extension(source_filename)
if destination_filename:
destination_filename = ensure_docx_extension(destination_filename)
success, message, new_path = create_document_copy(source_filename, destination_filename)
if success:
return message
else:
return f"Failed to copy document: {message}"
def add_documents(target_filename: str, source_filename: str) -> str:
"""将源文档(文本)添加到目标文档尾部
Args:
target_doc: 目标文档
source_filename: 源文档路径
"""
target_doc = Document(target_filename)
source_filename = ensure_docx_extension(source_filename)
source_doc = Document(source_filename)
for source_paragraph in source_doc.paragraphs:
new_paragraph = target_doc.add_paragraph(source_paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = source_paragraph.alignment
print(f"Source paragraph alignment: {source_paragraph.alignment}")
# Try to match the style if possible
try:
if source_paragraph.style and source_paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[source_paragraph.style.name]
except Exception as e:
print(f"Failed to apply style: {e}")
# Copy run formatting
for i, run in enumerate(source_paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
target_doc.save(target_filename)
return f"{target_filename}添加{source_filename}成功"
def write_table(target_filename: str, rows: int, cols: int, table_num: int, data: Optional[List[List[str]]] = None, ifadjustheight: Optional[bool] = True, height: Optional[float] = 1, key_words: re.Pattern[str] = None, ALIGMENT: Optional[str] = 'CENTER') -> Document:
"""填写word文档里的表格返回填写后的文档
Args:
target_filename: 目标文档路径
rows: 表格行数
cols: 表格列数
table_num: 表格序号
data: 表格数据二维列表每个单元格为字符串
ifadjustheight: bool为真则表格行高自动调整
"""
target_filename = ensure_docx_extension(target_filename)
# Check if target file is writeable
is_writeable, error_message = check_file_writeable(target_filename)
if not is_writeable:
return f"Cannot create target document: {error_message}"
try:
target_filename = ensure_docx_extension(target_filename)
target_doc = Document(target_filename)
except Exception as e:
print(f"获取{target_filename}失败:{str(e)}")
# Try to set the table style
try:
target_doc.tables[table_num].style = 'Table Grid'
except KeyError as k:
pass
except Exception as e:
print(f"{target_doc}最后一个表格更改样式失败: {str(e)}")
print("开始写入表格")
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.shared import Pt, Inches, Cm, RGBColor
try:
if data:
for i, row_data in enumerate(data):
if i >= rows + 1:
break
for j, cell_text in enumerate(row_data):
if j >= cols + 1:
break
if str(cell_text) == "": continue
print(f"在[{i},{j}]处写入{str(cell_text)}")
target_doc.tables[table_num].cell(i,j).text = str(cell_text)
print(key_words, cell_text)
if key_words and key_words.search(str(cell_text)):
print(f'{cell_text}包含关键之,已置红')
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0)
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.name = "Times New Roman" #设置英文字体
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.size = Pt(10.5) # 字体大小
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') #设置中文字体
if ALIGMENT == 'CENTER':
target_doc.tables[table_num].cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
elif ALIGMENT == 'LEFT':
target_doc.tables[table_num].cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.LEFT
target_doc.tables[table_num].cell(i,j).vertical_alignment = WD_ALIGN_VERTICAL.CENTER
if ifadjustheight:
target_doc.tables[table_num].rows[i].height = Cm(height)
except Exception as e:
print(f"写入{target_filename}tables.cell({i},{j})失败:{str(e)}")
print("表格写入完成")
return target_doc
def set_document_para(target_doc: Document) -> Document:
"""设置文档的段落格式
"""
paragraphs_to_remove = []
for i, paragraph in enumerate(target_doc.paragraphs):
if i <= 11:
continue
if not paragraph.text.strip():
paragraphs_to_remove.append(paragraph)
for paragraph in paragraphs_to_remove:
p = paragraph._element
p.getparent().remove(p)
return target_doc
async def add_table_to_document(target_filename: str, source_filename: str, rows: int, cols: int, table_num: int, data: Optional[List[List[str]]] = None, ifadjustheight: Optional[bool] = True, height: Optional[float] = 1, key_words: re.Pattern[str] = None, ALIGMENT: Optional[str] = 'CENTER') -> str:
"""复制源文件中的文字与表格(先文字后表格格式)到目标文档
Args:
target_filename: 目标文档路径
source_doc: 源文档路径
rows: 表格行数
cols: 表格列数
table_num: 表格序号
data: 表格数据二维列表每个单元格为字符串
ifadjustheight: bool为真则表格行高自动调整
key_words: list, 关键字
"""
target_filename = ensure_docx_extension(target_filename)
source_filename = ensure_docx_extension(source_filename)
source_doc = Document(source_filename)
target_doc = Document(target_filename)
try:
# Copy all paragraphs
for paragraph in source_doc.paragraphs:
# Create a new paragraph with the same text and style
new_paragraph = target_doc.add_paragraph(paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = paragraph.alignment
# 复制段落分页属性
new_paragraph.paragraph_format.page_break_before = paragraph.paragraph_format.page_break_before
# Try to match the style if possible
try:
if paragraph.style and paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[paragraph.style.name]
except:
pass
# Copy run formatting
for i, run in enumerate(paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
# 复制分页符处理w:br标签
for element in run._element:
if element.tag.endswith('br'):
br_type = element.get(qn('type'), '')
if br_type == 'page':
new_br = OxmlElement('w:br')
new_br.set(qn('type'), 'page')
new_run._element.append(new_br)
except Exception as e:
print(f"添加表格前文章失败:{str(e)}")
try:# Copy all tables
from core.tables import copy_table
copy_table(source_doc.tables[0], target_doc, ifadjustheight, height)
except Exception as e:
print(f"添加表格失败:{str(e)}")
print(f"{target_doc}写入表格{source_doc.tables[0]}成功")
target_doc = set_document_para(target_doc)
target_doc.save(target_filename)
target_doc = Document(target_filename)
try:
target_doc = write_table(target_filename, rows, cols, table_num, data, ifadjustheight, height, key_words, ALIGMENT)
except Exception as e:
print(f"{target_filename}写入{data}失败:{str(e)}")
target_doc.save(target_filename)
return target_doc,f"{target_filename}添加表格{source_doc}成功"
async def add_table_and_replace(target_filename: str, source_filename: str, ifadjustheight: Optional[bool] = True, list_to_replace: dict = {}, height: Optional[float] = 1):
"""复制源文件中的文字与表格(先文字后表格格式)到目标文档
Args:
target_filename: 目标文档路径
source_doc: 源文档路径
ifadjustheight: bool为真则表格行高自动调整
list_to_replace: dict, 待替换内容和替换内容
"""
target_filename = ensure_docx_extension(target_filename)
source_filename = ensure_docx_extension(source_filename)
source_doc = Document(source_filename)
target_doc = Document(target_filename)
try:
# Copy all paragraphs
for paragraph in source_doc.paragraphs:
# Create a new paragraph with the same text and style
new_paragraph = target_doc.add_paragraph(paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = paragraph.alignment
# 复制段落分页属性
new_paragraph.paragraph_format.page_break_before = paragraph.paragraph_format.page_break_before
# Try to match the style if possible
try:
if paragraph.style and paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[paragraph.style.name]
except:
pass
# Copy run formatting
for i, run in enumerate(paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
# 复制分页符处理w:br标签
for element in run._element:
if element.tag.endswith('br'):
br_type = element.get(qn('type'), '')
if br_type == 'page':
new_br = OxmlElement('w:br')
new_br.set(qn('type'), 'page')
new_run._element.append(new_br)
except Exception as e:
print(f"添加表格前文章失败:{str(e)}")
try:# Copy all tables
from core.tables import copy_table
copy_table(source_doc.tables[0], target_doc, ifadjustheight, height)
target_doc.save(target_filename)
except Exception as e:
print(f"添加表格失败:{str(e)}")
for find_text, replace_text in list_to_replace.items():
print(await search_and_replace(target_filename, find_text, replace_text))
async def merge_documents(target_filename: str, source_filenames: List[str], add_page_breaks: bool = True) -> str:
"""合并文档(文本) 表格会添加到最后
Args:
target_filename: 合并后文档路径
source_filenames: 源文档路径列表
add_page_breaks: bool为真则每个源文档中间加入分页符
"""
from core.tables import copy_table
target_filename = ensure_docx_extension(target_filename)
# Check if target file is writeable
is_writeable, error_message = check_file_writeable(target_filename)
if not is_writeable:
return f"Cannot create target document: {error_message}"
# Validate all source documents exist
missing_files = []
for filename in source_filenames:
doc_filename = ensure_docx_extension(filename)
if not os.path.exists(doc_filename):
missing_files.append(doc_filename)
if missing_files:
return f"Cannot merge documents. The following source files do not exist: {', '.join(missing_files)}"
try:
# Create a new document for the merged result
target_doc = Document()
# Process each source document
for i, filename in enumerate(source_filenames):
doc_filename = ensure_docx_extension(filename)
source_doc = Document(doc_filename)
# Add page break between documents (except before the first one)
if add_page_breaks and i > 0:
target_doc.add_page_break()
# Copy all paragraphs
for paragraph in source_doc.paragraphs:
# Create a new paragraph with the same text and style
new_paragraph = target_doc.add_paragraph(paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = paragraph.alignment
# Try to match the style if possible
try:
if paragraph.style and paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[paragraph.style.name]
except:
pass
# Copy run formatting
for i, run in enumerate(paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
# Copy all tables
for table in source_doc.tables:
copy_table(table, target_doc)
# Save the merged document
target_doc.save(target_filename)
return f"Successfully merged {len(source_filenames)} documents into {target_filename}"
except Exception as e:
return f"Failed to merge documents: {str(e)}"
async def right_align_last_three_para(target_filename: str) -> str:
"""右对齐最后三个段落
Args:
target_filename: 目标文档路径
"""
target_filename = ensure_docx_extension(target_filename)
# Check if target file is writeable
is_writeable, error_message = check_file_writeable(target_filename)
if not is_writeable:
return f"Cannot right align paragraphs: {error_message}"
try:
# Open the target document
target_doc = Document(target_filename)
# Get the last three paragraphs
paragraphs = target_doc.paragraphs[-3:]
# Set the alignment of each paragraph to right
from docx.enum.text import WD_ALIGN_PARAGRAPH
for paragraph in paragraphs:
paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT
# Save the modified document
target_doc.save(target_filename)
return f"Successfully right aligned the last three paragraphs in {target_filename}"
except Exception as e:
return f"Failed to right align paragraphs: {str(e)}"
async def add_dynamic_table(output_doc, output_dir, table_num, TABLES, JIANCHA_XIANGQING_DIR, PICTURES, row, col, i, FLAG, xuhao):
"""创建动态表
Args:
output_doc (Document): 文档对象
output_dir (str): 输出目录
table_num (int): 表格序号
TABLES (list): 表格数据
JIANCHA_XIANGQING_DIR (str): 检查详情表目录
PICTURES (dict): 图片数据字典键为表索引值为图片路径列表
row (int): 行数
col (int): 列数
i (int): 表格序号
FLAG: 其他标志
Returns:
tuple: (i, table_num) 更新后的表格序号和表格数量
"""
for table_idx, Table in enumerate(TABLES):
print(Table)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_XIANGQING_DIR, row, col, i, Table, FLAG)
print(message)
# 获取当前表格对应的图片
current_table_pictures = PICTURES.get(table_idx, [])
print(f"开始处理图片列表: {current_table_pictures}")
for picturedir in current_table_pictures:
if picturedir is None:
print(f"图片路径为空,跳过")
continue
try:
print(f"添加 {picturedir} {type(picturedir)}到表格{table_idx}")
resize_and_reduce_quality(picturedir, picturedir)
await add_picture_to_table(output_doc, output_dir, 4, 0, picturedir, i, 4.7232)
except Exception as e:
print(f"添加图片失败:{e}")
print(await search_and_replace(output_dir, 'tupian_xuhao', f'{xuhao}'))
table_num += 1
i += 1
xuhao += 1
return i, table_num, xuhao
async def process_images_table(data_dict, output_dir, start_i, JIANCHA_NEIRONG_PICTURES_TABLE, key_words = None):
"""添加对应表格且填写图片名与插入图片
Args:
data_dict (dict): dict内容图片:图片路径
output_dir (str): 输出路径
start_i (int): 总表格数量
JIANCHA_NEIRONG_PICTURES_TABLE (str): 二维表模板路径
Returns:
int: 最后使用的表格序号
"""
items = list(data_dict.items())
picture_num = len(items)
line_index = 0
picture_index = 0
i = start_i
for content_row in range(((picture_num + 2) // 3) * 2):
if content_row % 2 == 1:
# 文字行(从 items 取图片名)
JIANCHA_NEIRONG_TEXT = [["" for _ in range(3)] for _ in range(1)] # 1行3列
for k in range(1): # 只有1行
for l in range(3):
if line_index >= picture_num:
break
JIANCHA_NEIRONG_TEXT[k][l] = items[line_index][0] # 图片名
print(f'当前为文字表格,在({k},{l})位置插入文字: {items[line_index][0]}')
line_index += 1
print(f"当前待插入表格: {JIANCHA_NEIRONG_TEXT}")
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, JIANCHA_NEIRONG_TEXT, False, None, key_words
)
i += 1
else:
# 图片行(从 items 取图片路径)
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, None, False
)
for k in range(3):
if picture_index < picture_num:
pic_path = items[picture_index][1] # 图片路径
print(f"当前为图片表格,在(0,{k})位置插入图片: {pic_path}")
print(await add_picture_to_table(output_doc, output_dir, 0, k, pic_path, i, 1.8898))
picture_index += 1
i += 1
print(message)
return i # 返回最后使用的表格序号
async def process_server_images_table(data_list, image_source_list, output_dir, start_i, JIANCHA_NEIRONG_PICTURES_TABLE, key_words=None):
"""从服务器获取的图片数据添加对应表格且填写图片名与插入图片
逻辑按来源排序添加
Args:
data_list (list): 图片数据列表每个元素为图片路径
[
{'imageId': '41543f531be24522b7bec741b9a483a2', 'imageName': 'F02_机组10060_叶片叶片PS面距叶根15.5米处轴向裂纹轴向260mm弦向20mm。DJI_20250526090227_0052_Z.png', 'imagePath': '/static/image/temp/out-work/12bc30fb209f3af3bf530541c5b062bd/F02_机组10060_叶片叶片PS面距叶根15.5米处轴向裂纹轴向260mm弦向20mm。DJI_20250526090227_0052_Z.png', 'partId': '12bc30fb209f3af3bf530541c5b062bd', 'partName': '叶片1', 'imageResolution': '8000x6000', 'focalDistance': None, 'shootingTime': None, 'cameraManufacturer': None, 'cameraModel': None, 'weather': None, 'weatherLabel': None, 'humidness': None, 'temperature': None, 'windLevel': None, 'shootingMethod': 'UAV', 'shootingMethodLabel': '无人机航拍', 'shootingDistance': None, 'collectorName': '程启谦', 'imageSource': 'out-work', 'imageSourceLabel': '外部工作', 'imageType': 'DEFECT', 'imageTypeLabel': '缺陷影像', 'audioList': None, 'preImagePath': None, 'preTreatment': False, 'projectId': None, 'gps': None},
...
]
image_source_list (list): 来源列表每个元素为来源名称
['out-work', 'in-work', 'in-field'] (来源可能会少于3个pop出没有给出对应来源的图片)
output_dir (str): 输出路径
start_i (int): 总表格数量
JIANCHA_NEIRONG_PICTURES_TABLE (str): 二维表模板路径
key_words (re)
Returns:
int: 最后使用的表格序号
"""
# 按来源对图片数据进行分组和排序
sorted_data = {source: [] for source in image_source_list}
# 将图片数据按来源分组
for item in data_list:
source = item['imageSource']
if source in sorted_data:
# 存储图片名和图片路径的元组与非server版本保持一致
sorted_data[source].append((item['imageTypeLabel'], item['imagePath']))
i = start_i
# 按照image_source_list的顺序处理每个来源的图片
for source in image_source_list:
items = sorted_data[source]
if not items:
continue # 如果该来源没有图片则跳过
print(f"处理来源: {source}")
picture_num = len(items)
line_index = 0
picture_index = 0
for content_row in range(((picture_num + 2) // 3) * 2):
if content_row % 2 == 1:
# 文字行(从 items 取图片名)
JIANCHA_NEIRONG_TEXT = [["" for _ in range(3)] for _ in range(1)] # 1行3列
for k in range(1): # 只有1行
for l in range(3):
if line_index >= picture_num:
break
JIANCHA_NEIRONG_TEXT[k][l] = items[line_index][0] # 图片名
print(f'当前为文字表格,在({k},{l})位置插入文字: {items[line_index][0]}')
line_index += 1
print(f"当前待插入表格: {JIANCHA_NEIRONG_TEXT}")
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, JIANCHA_NEIRONG_TEXT, False, None, key_words
)
i += 1
else:
# 图片行(从 items 取图片路径)
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, None, False
)
for k in range(3):
if picture_index < picture_num:
pic_path = items[picture_index][1] # 图片路径
print(f"当前为图片表格,在(0,{k})位置插入图片: {pic_path}")
print(await add_picture_to_table(output_doc, output_dir, 0, k, get_full_picture_url(pic_path), i, 1.8898))
picture_index += 1
i += 1
print(message)
return i # 返回最后使用的表格序号

View File

@ -1,7 +1,8 @@
from content_tools import add_picture_to_table, search_and_replace
from content_tools import add_picture_to_table, search_and_replace,add_picture
from get_pictures import resize_and_reduce_quality
from document_tools import add_table_to_document
from document_tools import add_table_to_document,add_documents
from docx import Document
from typing import List
def fill_tables(Y_table_list, row, col, Y_Table_num, Y):
"""根据前端返回json块填写表格list并实时跟进已填写表格数量
目前只支持固定的缺陷图的填写
@ -179,4 +180,3 @@ def merge_info(frontend_info, default_info):
merged_info[key] = frontend_value
return merged_info

View File

@ -2,6 +2,7 @@ import os
import math
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
from tools.dataproccess import get_resource_path
def resize_and_reduce_quality(image_path, output_path, target_width = None):
try:
@ -332,3 +333,9 @@ def get_records_with_pic(records: list[dict], filtered_picture_data: dict[str, d
return records_with_defect_pic, defect_pic_with_no_records
def get_template_pic(pic_name : str):
"""获取模板图片路径"""
return get_resource_path(pic_name)

375
tools/json_to_docx.py Normal file
View File

@ -0,0 +1,375 @@
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml.shared import qn, OxmlElement
import json
def json_to_docx(json_data):
print(f"\n开始转换JSON到DOCX文档")
doc = Document()
total_elements = len(json_data)
print(f"文档包含 {total_elements} 个元素(段落和表格)")
for i, element in enumerate(json_data, 1):
print(f"\n处理元素 {i}/{total_elements}: ", end="")
if element["type"] == "text":
print(f"段落 (长度: {len(element['content']['runs'])}个runs)")
add_paragraph_from_json(doc, element["content"])
elif element["type"] == "table":
rows = element["content"]["rows"]
cols = element["content"]["cols"]
merges = len(element["content"].get("merged_cells", []))
print(f"表格 ({rows}行×{cols}列, 包含 {merges} 个合并单元格)")
add_table_from_json(doc, element["content"], element.get("bold", False))
return doc
def add_paragraph_from_json(doc, para_json):
paragraph = doc.add_paragraph()
print(f" 添加段落 (对齐: {para_json['alignment']})")
# 设置段落对齐方式
alignment_map = {
"left": WD_ALIGN_PARAGRAPH.LEFT,
"center": WD_ALIGN_PARAGRAPH.CENTER,
"right": WD_ALIGN_PARAGRAPH.RIGHT,
"justify": WD_ALIGN_PARAGRAPH.JUSTIFY
}
paragraph.alignment = alignment_map.get(para_json["alignment"], WD_ALIGN_PARAGRAPH.LEFT)
# 添加文本运行(runs)
for run_idx, run_json in enumerate(para_json["runs"], 1):
run = paragraph.add_run(run_json["text"])
try:
if run_json["has_page_break"]:
import docx
run.add_break(docx.enum.text.WD_BREAK.PAGE)
except:
pass
font = run.font
print(f" 添加run {run_idx}: '{run_json['text']}' "
f"(字体: {run_json['font']['name']}, 大小: {run_json['font']['size']}, "
f"加粗: {run_json['font']['bold']}, 斜体: {run_json['font']['italic']})")
# 设置字体样式
if run_json["font"]["name"]:
font.name = run_json["font"]["name"]
run.element.rPr.rFonts.set(qn('w:eastAsia'), run_json["font"]["name"])
if run_json["font"]["size"]:
font.size = Pt(run_json["font"]["size"])
font.bold = run_json["font"]["bold"]
font.italic = run_json["font"]["italic"]
font.underline = run_json["font"]["underline"]
# 设置字体颜色
if run_json["font"]["color"]:
color = run_json["font"]["color"]
font.color.rgb = RGBColor(color["r"], color["g"], color["b"])
print(f" 设置颜色: RGB({color['r']}, {color['g']}, {color['b']})")
def add_table_from_json(doc, table_json, bold=False):
print(f" 创建表格: {table_json['rows']}× {table_json['cols']}")
table = doc.add_table(rows=table_json["rows"], cols=table_json["cols"])
# 设置表格样式为无网格线(我们将自定义边框)
table.style = 'Table Grid'
# 设置列宽
if "col_widths" in table_json and any(table_json["col_widths"]):
print(" 设置列宽...")
for col_idx, width in enumerate(table_json["col_widths"]):
if width is not None:
# 将英寸转换为Twips1英寸=1440 Twips
twips_width = int(width * 1440)
for cell in table.columns[col_idx].cells:
tc = cell._tc
tcPr = tc.get_or_add_tcPr()
tcW = tcPr.first_child_found_in("w:tcW")
if tcW is None:
tcW = OxmlElement('w:tcW')
tcPr.append(tcW)
tcW.set(qn('w:w'), str(twips_width))
tcW.set(qn('w:type'), 'dxa') # 使用绝对单位
# 设置行高
if "row_heights" in table_json and any(table_json["row_heights"]):
print(" 设置行高...")
for row_idx, height in enumerate(table_json["row_heights"]):
if height is not None:
# 将英寸转换为Twips1英寸=1440 Twips
twips_height = int(height * 1440)
tr = table.rows[row_idx]._tr
trPr = tr.get_or_add_trPr()
trHeight = OxmlElement('w:trHeight')
trHeight.set(qn('w:val'), str(twips_height))
trHeight.set(qn('w:hRule'), 'atLeast') # 或'exact'表示固定高度
trPr.append(trHeight)
# 处理合并单元格
for merge_idx, merge_info in enumerate(table_json.get("merged_cells", []), 1):
start_row = merge_info["start_row"]
start_col = merge_info["start_col"]
end_row = merge_info["end_row"]
end_col = merge_info["end_col"]
print(f" 合并单元格 #{merge_idx}: 从({start_row},{start_col})到({end_row},{end_col})")
start_cell = table.cell(start_row, start_col)
end_cell = table.cell(end_row, end_col)
start_cell.merge(end_cell)
# 填充表格内容
for row_idx, row_data in enumerate(table_json["cells"]):
for col_idx, cell_data in enumerate(row_data):
# 跳过被合并的非主单元格
if cell_data["is_merged"] and not cell_data["merge_info"]["is_primary"]:
print(f" 跳过被合并的单元格({row_idx},{col_idx})")
continue
cell = table.cell(cell_data["row"], cell_data["col"])
print(f" 处理单元格({row_idx},{col_idx}) - 对齐: {cell_data['alignment']}")
format_cell(cell, cell_data) # 统一设置单元格格式
def format_cell(cell, cell_data):
"""设置单元格完整格式"""
# 清空原有内容
for p in cell.paragraphs:
p._element.getparent().remove(p._element)
# 添加内容
for para in cell_data["content"]:
add_paragraph_from_json(cell, para)
# 设置对齐方式
set_cell_alignment(cell, cell_data)
# 设置边框
set_cell_border(cell, cell_data["border"])
# 设置背景色
if cell_data.get("shading"):
set_cell_shading(cell, cell_data["shading"])
# 设置边距
if cell_data.get("margins"):
set_cell_margins(cell, cell_data["margins"])
def set_cell_alignment(cell, cell_data):
"""设置单元格对齐(水平和垂直)"""
# 水平对齐
if cell.paragraphs:
align_map = {
"left": WD_ALIGN_PARAGRAPH.LEFT,
"center": WD_ALIGN_PARAGRAPH.CENTER,
"right": WD_ALIGN_PARAGRAPH.RIGHT,
"justify": WD_ALIGN_PARAGRAPH.JUSTIFY
}
cell.paragraphs[0].alignment = align_map.get(cell_data["alignment"], WD_ALIGN_PARAGRAPH.LEFT)
# 垂直对齐设置
tcPr = cell._tc.get_or_add_tcPr()
vAlign = OxmlElement('w:vAlign')
align_value = cell_data.get('vertical_align', 'top')
print(f" 设置垂直对齐: {align_value}")
# 确保使用有效的对齐值
valid_alignments = ['top', 'center', 'bottom']
if align_value not in valid_alignments:
align_value = 'top' # 默认值
vAlign.set(qn('w:val'), align_value)
tcPr.append(vAlign)
def set_cell_shading(cell, shading):
"""设置单元格背景色"""
tcPr = cell._tc.get_or_add_tcPr()
shd = OxmlElement('w:shd')
shd.set(qn('w:fill'), shading["color"])
if shading.get("theme"):
shd.set(qn('w:themeColor'), shading["theme"])
tcPr.append(shd)
def set_cell_margins(cell, margins):
"""设置单元格边距"""
tcPr = cell._tc.get_or_add_tcPr()
tcMar = OxmlElement('w:tcMar')
for side, margin in margins.items():
side_el = OxmlElement(f'w:{side}')
side_el.set(qn('w:w'), margin["w"])
side_el.set(qn('w:type'), margin["type"])
tcMar.append(side_el)
tcPr.append(tcMar)
def set_cell_border(cell, border_data):
"""
设置单元格边框
:param cell: 单元格对象
:param border_data: 边框数据
"""
tc = cell._tc
tcPr = tc.get_or_add_tcPr()
# 检查是否存在边框元素,不存在则创建
tcBorders = tcPr.first_child_found_in("w:tcBorders")
if tcBorders is None:
tcBorders = OxmlElement('w:tcBorders')
tcPr.append(tcBorders)
# 设置各边边框
for side in ['top', 'left', 'bottom', 'right']:
if side in border_data:
border = border_data[side]
border_el = OxmlElement(f'w:{side}')
border_el.set(qn('w:val'), border.get('style', 'single'))
border_el.set(qn('w:sz'), str(border.get('size', 4)))
border_el.set(qn('w:color'), border.get('color', '000000'))
tcBorders.append(border_el)
# 使用示例
if __name__ == "__main__":
# 假设我们已经有了之前生成的JSON数据
input_json = "output.json"
output_path = "restored.docx"
print(f"{input_json} 读取JSON数据...")
with open(input_json, "r", encoding="utf-8") as f:
json_data = json.load(f)
# 将JSON转换回DOCX
json_to_docx(json_data, output_path)
from typing import List, Dict, Any
def list_to_json_with_merges(
table_data: List[List[str]],
style_config: Dict[str, Any] = None,
detect_merges: bool = True # 新增控制参数
) -> Dict[str, Any]:
"""
将二维列表转换为表格JSON可选是否合并相邻相同单元格
参数:
table_data: 二维字符串列表表示的表格数据
style_config: 包含样式配置的字典可选
detect_merges: 是否检测并合并相邻相同单元格默认为True
返回:
符合表格JSON结构的字典
"""
if not table_data or not table_data[0]:
return {"type": "table", "content": {"rows": 0, "cols": 0, "cells": [], "merged_cells": []}}
rows = len(table_data)
cols = len(table_data[0])
result = {
"type": "table",
"content": {
"rows": rows,
"cols": cols,
"merged_cells": [],
"cells": [[None for _ in range(cols)] for _ in range(rows)]
}
}
for col in range(cols):
start_row = 0
while start_row < rows:
current_value = table_data[start_row][col]
end_row = start_row
# 只有开启合并检测时才查找可合并区域
if detect_merges:
while end_row + 1 < rows and table_data[end_row + 1][col] == current_value:
end_row += 1
# 处理单元格(区分合并/非合并模式)
if detect_merges and end_row > start_row: # 合并模式
merge_info = {
"start_row": start_row,
"start_col": col,
"end_row": end_row,
"end_col": col
}
result["content"]["merged_cells"].append(merge_info)
for row in range(start_row, end_row + 1):
cell_data = create_cell_data(
row=row,
col=col,
value=current_value,
style_config=style_config,
is_merged=True,
is_primary=(row == start_row),
merge_range=merge_info
)
result["content"]["cells"][row][col] = cell_data
else: # 非合并模式或无需合并的单单元格
cell_data = create_cell_data(
row=start_row,
col=col,
value=current_value,
style_config=style_config,
is_merged=False
)
result["content"]["cells"][start_row][col] = cell_data
start_row = end_row + 1
return [result]
def create_cell_data(
row: int,
col: int,
value: str,
style_config: Dict[str, Any],
is_merged: bool = False,
is_primary: bool = False,
merge_range: Dict[str, int] = None
) -> Dict[str, Any]:
"""创建标准化单元格数据"""
cell = {
"row": row,
"col": col,
"is_merged": is_merged,
"content": create_cell_content(value, style_config),
"alignment": style_config.get("alignment", "left") if style_config else "left",
"border": style_config.get("border", {}) if style_config else {},
"shading": style_config.get("shading", {}) if style_config else {},
"margins": style_config.get("margins", {}) if style_config else {}
}
if is_merged:
cell["merge_info"] = {
"is_primary": is_primary,
"start_row": merge_range["start_row"],
"start_col": merge_range["start_col"],
"end_row": merge_range["end_row"],
"end_col": merge_range["end_col"]
}
return cell
def create_cell_content(text: str, style_config: Dict[str, Any] = None) -> List[Dict]:
"""创建单元格内容结构"""
font_config = style_config.get("font", {}) if style_config else {}
return [{
"alignment": style_config.get("alignment", "left") if style_config else "left",
"runs": [{
"text": text,
"font": {
"name": font_config.get("name", "Calibri"),
"size": font_config.get("size", 11),
"bold": font_config.get("bold", False),
"italic": font_config.get("italic", False),
"underline": font_config.get("underline", False),
"color": font_config.get("color", {"r": 0, "g": 0, "b": 0})
},
"has_page_break": False
}]
}]

View File

@ -165,3 +165,10 @@ def find_and_replace_text(doc, old_text, new_text):
count += 1
return count
def clear_header(section):
for para in section.header.paragraphs:
para.clear()