diff --git a/Generate_Report.py b/Generate_Report.py new file mode 100644 index 0000000..e2625ae --- /dev/null +++ b/Generate_Report.py @@ -0,0 +1,524 @@ +# 文档处理工具 +from tools.document_tools import ( + create_document, add_documents,add_table_and_replace, + add_table_to_document,process_images_table, +) + +# 内容处理工具 +from tools.content_tools import ( + add_picture,split_table_by_row_content, + search_and_replace,add_picture_to_table +) + +from tools.get_pictures import ( + make_Thumbnail,resize_and_reduce_quality, + get_picture_nums,find_image,collect_defect_data +) + +from tools.Get_Json import get_project_info,get_jizu_info,get_jizu_shigong_info,get_weather + +import asyncio + +from core.tables import fill_tables + +from tools.defines import * +import os, re, datetime +from pathlib import Path + +async def add_dynamic_table(output_doc, output_dir, table_num, TABLES, JIANCHA_XIANGQING_DIR, PICTURES, row, col, i, FLAG, xuhao): + """创建动态表 + + Args: + output_doc (Document): 文档对象 + output_dir (str): 输出目录 + table_num (int): 表格序号 + TABLES (list): 表格数据 + JIANCHA_XIANGQING_DIR (str): 检查详情表目录 + PICTURES (dict): 图片数据字典,键为表索引,值为图片路径列表 + row (int): 行数 + col (int): 列数 + i (int): 表格序号 + FLAG: 其他标志 + + Returns: + tuple: (i, table_num) 更新后的表格序号和表格数量 + """ + for table_idx, Table in enumerate(TABLES): + print(Table) + output_doc, message = await add_table_to_document(output_dir, JIANCHA_XIANGQING_DIR, row, col, i, Table, FLAG) + print(message) + + # 获取当前表格对应的图片 + current_table_pictures = PICTURES.get(table_idx, []) + print(f"开始处理图片列表: {current_table_pictures}") + + for picturedir in current_table_pictures: + try: + print(f"添加 {picturedir} {type(picturedir)}到表格{table_idx}") + resize_and_reduce_quality(picturedir, picturedir) + await add_picture_to_table(output_doc, output_dir, 4, 0, picturedir, i, 4.7232) + except Exception as e: + print(f"添加图片失败:{e}") + + print(await search_and_replace(output_dir, 'tupian_xuhao', f'{xuhao}')) + table_num += 1 + i += 1 + xuhao += 1 + return i, table_num, xuhao + +def get_year_month(date): + """根据格式化date字符串获取年月 'date': '二〇二一年十二月十日 9:00' + + Args: date (str): 日期字符串 + + Returns: 年月字符串 '二〇二一年十二月' + """ + unit_map = {'1' : '一', '2' : '二', '3' : '三', '4' : '四', '5' : '五', '6' : '六', '7' : '七', '8' : '八', '9' : '九', '0' : '〇'} + unit_map_month = {1 : '一', 2 : '二', 3 : '三', 4 : '四', 5 : '五', 6 : '六', 7 : '七', 8 : '八', 9 : '九', 10 : '十', 11 : '十一', 12 : '十二'} + year = date.split('年')[0] + month = date.split('年')[1].split('月')[0] + year = ''.join([unit_map[i] for i in year]) + month = unit_map_month[int(month)] + return f"{year}年{month}月" + +def merge_info(frontend_info, default_info): + """ + 合并前端传入的 info 和默认 info + 规则:如果前端传入的值为空(None 或空字符串),则使用默认值 + + Args: + frontend_info: 前端传入的字典 + default_info: 默认的完整字典 + Returns: + 合并后的完整字典 + """ + if not isinstance(frontend_info, dict) or frontend_info is None: + return default_info.copy() + + merged_info = {} + + for key, default_value in default_info.items(): + # 获取前端传入的值 + frontend_value = frontend_info.get(key) + + # 判断前端值是否为空(None 或空字符串) + if frontend_value is None or frontend_value == "": + merged_info[key] = default_value + else: + merged_info[key] = frontend_value + + return merged_info + + +async def generate_report(base_info, baogao_info): + #获取模板编号、模板名称 + num_to_chinese = {1 : '一', 2 : '二', 3 : '三', 4 : '四', 5 : '五', 6 : '六', 7 : '七', 8 : '八', 9 : '九', 10 : '十', 11 : '十一', 12 : '十二'} + cover_encode = "encode" + cover_project = "project" + baogao_name1 = "baogaoname1" + baogao_name2 = "baogaoname2" + company_name_yi = "company_name_yi" + cover_date = "time" + TITLE_OF_REPORT = "companyencode" + jiegou_xuhao = 'num' + + jizu_data = get_jizu_info(base_info['turbine_id']) + project_data = get_project_info(jizu_data['projectId']) + shigong_data = get_jizu_shigong_info(base_info['turbine_id']) + print(shigong_data) + try: + fengchang_name = project_data['farmName'] + Yi_company = project_data['inspectionUnit'] + yi_fuzeren = project_data['inspectionContact'] + yi_phone = project_data['inspectionPhone'] + fengchang_location = project_data['farmAddress'] + Jia_company = project_data['client'] + jia_fuzeren = project_data['clientContact'] + jia_phone = project_data['clientPhone'] + jizu_num = project_data['scale'] + jizu_xinghao = project_data['turbineModel'] + except Exception as e: + print(f"数据库的项目-机组基本信息获取失败:{e}") + return + + try: + baogao_date = datetime.datetime.now().strftime("%Y年%m月%d日 %H:%M") #现在的时间 + + #数据库拉取信息 + Jiancha_date = shigong_data["startTime"].replace("T", " ") #检查日期 + image_count = shigong_data['imageCount'] #从施工方案获取的图片数量,待定 + temperature = shigong_data['temperature'] #温度 + wind_speed = shigong_data['windSpeed'] #风速 + weather = get_weather(shigong_data["weatherCode"]) #天气 + + #前端信息 + baogao_info = merge_info(baogao_info, DEFAULT_BAOGAO_INFO) + key_words= re.compile('|'.join(map(re.escape, baogao_info['key_words'].split(',')))) + shengcheng_dir = baogao_info['shengcheng_dir'] + muban_dir = baogao_info['muban_dir'] + project_number = baogao_info['jizu_type'] + baogao_type = baogao_info['baogao_type'] + if muban_dir == "" or shengcheng_dir == "": + print("未配置图片/生成路径/总图片路径,请检查配置") + return + + date_year_month = get_year_month(baogao_date) + + Jiancha_renyuan = baogao_info['jiancha_renyuan'] + shebei_peizhi = baogao_info['shebei_peizhi'] + shigong_fangan = baogao_info['shigong_fangan'] + renyuan_peizhi = baogao_info['renyuan_peizhi'] + gongzuo_neirong = baogao_info['gongzuo_neirong'] + beizhu = baogao_info['beizhu'] + + Jiancha_location = baogao_info['jiancha_location'] + Jiancha_fangshi = baogao_info['jiancha_fangshi'] + Changjia = baogao_info['yepian_changjia'] + + yezhu_renyuan = baogao_info['yezhu_renyuan'] + changjia_renyuan = baogao_info['changjia_renyuan'] + data_process = baogao_info['date_process'] + baogao_bianzhi = baogao_info['baogao_bianzhi'] + baogao_shenghe = baogao_info['baogao_shenghe'] + shenghe_date = baogao_info['shenghe_date'] + + Y1_jiancha = baogao_info['Y1_jiancha_neirong'] + Y2_jiancha = baogao_info['Y2_jiancha_neirong'] + Y3_jiancha = baogao_info['Y3_jiancha_neirong'] + + except Exception as e: + print(f"报告基本信息获取失败:{e}") + return + + normal_picture_num = 0 + Y1 = "t" + Y2 = "t" + Y3 = "t" + + output_doc = None + head_num = 1 + ###封面创建### + cover_dirs = [os.path.join(muban_dir,"fengmian1.docx"),os.path.join(muban_dir,"fengmian.jpg"),os.path.join(muban_dir,"fengmian2.docx")] + #输出目录 + output_dir = os.path.normpath(f"{shengcheng_dir}/{fengchang_name}项目{baogao_type}{project_number}{baogao_date.split(' ')[0]}版.docx") + + version = 1 + while os.path.exists(output_dir): + if version != 1: + output_dir = output_dir.replace(f"版{version - 1}",f"版{version}") + else: + output_dir = output_dir.replace("版",f"版{version}") + version += 1 + + ifwaibu = baogao_info['waibu_jiancha'] + ifneibu = baogao_info['neibu_jiancha'] + iffanglei = baogao_info['fanglei_jiancha'] + parts = [] + if ifwaibu: + parts.append("叶片外观") + if ifneibu: + parts.append("叶片内部") + if iffanglei: + parts.append("叶片防雷") + if not parts: + print("前端未指定检查内容") + mianzhe_shengming = f"本报告仅涵盖{'、'.join(parts)}检测内容" + + #创建文档、添加封面 + print(await create_document(output_dir)) + print(add_documents(output_dir, cover_dirs[0])) + print(await add_picture(output_dir, cover_dirs[1])) + print(add_documents(output_dir, cover_dirs[2])) + print("封面创建成功") + + #更改文档信息 + print(await search_and_replace(output_dir, TITLE_OF_REPORT, project_number)) + print(await search_and_replace(output_dir, baogao_name1, baogao_type)) + print(await search_and_replace(output_dir, baogao_name2, baogao_type)) + print(await search_and_replace(output_dir, company_name_yi, Yi_company)) + print(await search_and_replace(output_dir, cover_project, fengchang_name)) + print(await search_and_replace(output_dir, cover_encode, project_number)) + print(await search_and_replace(output_dir, cover_date, date_year_month)) + print(await search_and_replace(output_dir, 'bianzhi', baogao_bianzhi)) + print(await search_and_replace(output_dir, 'shenghe', baogao_shenghe)) + print(await search_and_replace(output_dir, 'mianzhe_shengming', mianzhe_shengming)) + + total_table_num = 0 + #项目概况表 + print("开始添加项目概况表") + XIANG_MU_GAI_KUANG = os.path.join(muban_dir,"xiangmugaikuo.docx") + print(f"查找模板,找到模板:{XIANG_MU_GAI_KUANG}") + project_location = fengchang_location + company_name_jia = Jia_company + fuzeren = yi_fuzeren + phone_fuzeren = yi_phone + jizu_bianhao = project_number + xiangmuguige = jizu_num + Yi_company = Yi_company + XIANGMU_GAIKUO = list(list("" for i in range(5)) for j in range(5)) + XIANGMU_GAIKUO[0][1] = fengchang_name + #XIANGMU_GAIKUO[0][3]=XIANGMU_GAIKUO[0][4] = "盐城市滨海县" + XIANGMU_GAIKUO[0][3] = project_location + #XIANGMU_GAIKUO[1][1]=XIANGMU_GAIKUO[2,1]=XIANGMU_GAIKUO[3,1] = "国家电投集团滨海风力发电有限公司" + XIANGMU_GAIKUO[1][1] = company_name_jia + XIANGMU_GAIKUO[1][3] = Yi_company + XIANGMU_GAIKUO[2][3] = fuzeren + XIANGMU_GAIKUO[3][4] = phone_fuzeren + XIANGMU_GAIKUO[4][1] = jizu_xinghao + XIANGMU_GAIKUO[4][4] = xiangmuguige + print("建立表结构完毕,开始插入模板") + #添加项目概况表 + print(f"输出路径:{output_dir},模板路径:{XIANG_MU_GAI_KUANG},插入数据:{XIANGMU_GAIKUO}") + output_doc, message = await add_table_to_document(output_dir, XIANG_MU_GAI_KUANG,5,5,total_table_num,XIANGMU_GAIKUO) + print(message) + print("模板插入完毕,开始替换内容") + total_table_num += 1 + print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) + head_num += 1 + + #检查方案描述 + FANGAN_JIANCHA_DIR = os.path.join(muban_dir,"checkmethod.docx") + list_to_replace = { + 'renyuan_peizhi' : renyuan_peizhi, + 'shebei_peizhi' : shebei_peizhi, + 'shigong_fangan' : shigong_fangan, + 'gongzuo_neirong' : gongzuo_neirong, + 'beizhu' : beizhu, + 'num' : num_to_chinese[head_num], + } + print(await add_table_and_replace(output_dir, FANGAN_JIANCHA_DIR, 0, list_to_replace)) + print(split_table_by_row_content(output_dir, output_dir, total_table_num)) + total_table_num += 1 + head_num += 1 + + jiancha = [] + neirong = [] + if ifwaibu: + jiancha.append("无人机外部高精度飞行") + neirong.append(f"{Y1}、{Y2}、{Y3}三支叶片的前缘、后缘、迎风面、背风面。") + if ifneibu: + jiancha.append("人工内部拍摄") + neirong.append(f"{Y1}、{Y2}、{Y3}三支叶片的内部导雷卡、腹板、透光、人孔盖版、叶根盖板...") + if iffanglei: + jiancha.append("人工防雷") + neirong.append(f"轮毂至塔基导通、内部导线线阻、外部导线线阻...") + + JIANCHA_XINGXI_DIR = os.path.join(muban_dir,"checkinfo.docx") + JIANCHA_XINGXI = list(list("" for i in range(4)) for j in range(9)) + JIANCHA_XINGXI[0][1] = Jiancha_renyuan + JIANCHA_XINGXI[1][1] = Jiancha_date.split(' ')[0] + JIANCHA_XINGXI[1][3] = project_number + JIANCHA_XINGXI[2][1] = Jiancha_location + JIANCHA_XINGXI[2][3] = Jiancha_fangshi + JIANCHA_XINGXI[3][2] = Changjia + JIANCHA_XINGXI[4][1] = '机组编号:' + project_number + '机组' + JIANCHA_XINGXI[5][1] = Y1 + JIANCHA_XINGXI[6][1] = Y2 + JIANCHA_XINGXI[7][1] = Y3 + JIANCHA_XINGXI[8][0] = "本次" + "、".join(_ for _ in jiancha) + f"检查,采集叶片图片{normal_picture_num}张,内容覆盖" + ";".join(_ for _ in neirong) + # if total_picture_dir == "": + # Thumbnail_Picture = await make_Thumbnail(Picture_dir, Picture_dir)#添加图片 + # else: + # print('传入了总图片路径,获取图片数量') + # Thumbnail_Picture = await make_Thumbnail(total_picture_dir, Picture_dir)#添加图片 + # normal_picture_num = get_picture_nums(total_picture_dir) + # JIANCHA_XINGXI42 = tatong_image_path + #新建检查信息表 + output_doc, message = await add_table_to_document(output_dir, JIANCHA_XINGXI_DIR,9,4,total_table_num ,JIANCHA_XINGXI,False) + print(message) + # print(await add_picture_to_table(output_doc, output_dir, 4, 2, JIANCHA_XINGXI42, total_table_num , 1.18)) + # #添加略缩图片 + # print(await add_picture_to_table(output_doc, output_dir, 8, 0, Thumbnail_Picture, total_table_num)) + print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) + head_num += 1 + total_table_num += 1 + + #添加成果递交表 + + CHENGGUO_DIJIAO_DIR = os.path.join(muban_dir,"chengguo_sub.docx") + CHENGGUO_DIJIAO = list(list("" for i in range(4)) for j in range(6)) + CHENGGUO_DIJIAO[0][1] = Jiancha_renyuan + CHENGGUO_DIJIAO[1][1] = yezhu_renyuan + CHENGGUO_DIJIAO[2][1] = Jiancha_date.split(' ')[0] + CHENGGUO_DIJIAO[3][1] = data_process + CHENGGUO_DIJIAO[4][1] = baogao_bianzhi + CHENGGUO_DIJIAO[5][1] = baogao_shenghe + CHENGGUO_DIJIAO[1][3] = changjia_renyuan + CHENGGUO_DIJIAO[2][3] = Jiancha_date.split(' ')[1] + CHENGGUO_DIJIAO[3][3] = baogao_date.split(' ')[0] + CHENGGUO_DIJIAO[4][3] = baogao_date.split(' ')[0] + CHENGGUO_DIJIAO[5][3] = shenghe_date.split(' ')[0] + + output_doc, message = await add_table_to_document(output_dir, CHENGGUO_DIJIAO_DIR,5,5,total_table_num,CHENGGUO_DIJIAO,True,0.04) + print(message) + print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) + head_num += 1 + total_table_num += 1 + + # #检查情况汇总表(文字信息) + # try: + # search_file_list = [] + # if ifwaibu: + # search_file_list.append("外缺陷图") + # if ifneibu: + # search_file_list.append("内缺陷图") + # Y1_quexian_num, Y1_quexian_dict = collect_defect_data(Y1, Picture_dir, ifwaibu, ifneibu, search_file_list) + # Y2_quexian_num, Y2_quexian_dict = collect_defect_data(Y2, Picture_dir, ifwaibu, ifneibu, search_file_list) + # Y3_quexian_num, Y3_quexian_dict = collect_defect_data(Y3, Picture_dir, ifwaibu, ifneibu, search_file_list) + # weak_num_Y1 = f"{Y1}共发现缺陷{Y1_quexian_num}处" + # weak_num_Y2 = f"{Y2}共发现缺陷{Y2_quexian_num}处" + # weak_num_Y3 = f"{Y3}共发现缺陷{Y3_quexian_num}处" + # except Exception as e: + # print(f"缺陷图获取失败:{e}") + # return + + # #添加检查情况汇总表 + # JIANCHA_HUIZONG_DIR = os.path.join(muban_dir,"total_check.docx") + # JIANCHA_HUIZONG = list(list("" for i in range(3)) for j in range(4)) + + # JIANCHA_HUIZONG[1][0] = weak_num_Y1 + # JIANCHA_HUIZONG[2][0] = weak_num_Y2 + # JIANCHA_HUIZONG[3][0] = weak_num_Y3 + # JIANCHA_HUIZONG[1][1] = Y1_jiancha + # JIANCHA_HUIZONG[2][1] = Y2_jiancha + # JIANCHA_HUIZONG[3][1] = Y3_jiancha + # JIANCHA_HUIZONG[1][2] = "/n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y1_quexian_dict.items())]) if Y1_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷' + # JIANCHA_HUIZONG[2][2] = "/n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y2_quexian_dict.items())]) if Y2_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷' + # JIANCHA_HUIZONG[3][2] = "/n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y3_quexian_dict.items())]) if Y3_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷' + # output_doc, message = await add_table_to_document(output_dir, JIANCHA_HUIZONG_DIR,4,3,total_table_num,JIANCHA_HUIZONG,False,ALIGMENT='LEFT') + # print(message) + # print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) + # total_table_num += 1 + # head_num += 1 + + # #主要部位图片展示表/检查内容表 + # search_file_list = ["外汇总","内汇总","防汇总"] + # picture_Y1_num, Y1_dict = collect_defect_data(Y1, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei) + # picture_Y2_num, Y2_dict = collect_defect_data(Y2, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei) + # picture_Y3_num, Y3_dict = collect_defect_data(Y3, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei) + # print(f"图片、文字数量:{picture_Y1_num} {picture_Y2_num} {picture_Y3_num}") + # JIANCHA_NEIRONG_TOTAL_NUM = picture_Y1_num+ picture_Y2_num + picture_Y3_num + # col ,row = 3, 0 + # JIANCHA_NEIRONG_PICTURES_TABLE = os.path.join(muban_dir,"check2.docx") + # JIANCHA_NEIRONG_Y1_DIR = os.path.join(muban_dir,"check_content.docx") + # JIANCHA_NEIRONG_Y1 = list(list("" for _ in range(3)) for j in range(1)) + # JIANCHA_NEIRONG_Y1[0][0] = f"叶片1:{Y1}检查内容" + # print(f"Y1标题内容:{JIANCHA_NEIRONG_Y1}") + # JIANCHA_NEIRONG_Y2_DIR = os.path.join(muban_dir,"check3.docx") + # JIANCHA_NEIRONG_Y2 = list(list("" for _ in range(3)) for j in range(1)) + # JIANCHA_NEIRONG_Y2[0][0] = f"叶片2:{Y2}检查内容" + # print(f"Y2标题内容:{JIANCHA_NEIRONG_Y2}") + # JIANCHA_NEIRONG_Y3_DIR = os.path.join(muban_dir,"check3.docx") + # JIANCHA_NEIRONG_Y3 = list(list("" for _ in range(3)) for j in range(1)) + # JIANCHA_NEIRONG_Y3[0][0] = f"叶片3:{Y3}检查内容" + # print(f"Y3标题内容:{JIANCHA_NEIRONG_Y3}") + # print(f"当前表格序号为 {total_table_num}") + # print(key_words) + # output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y1_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y1,True, 1) + # print(message) + # total_table_num += 1 + + # total_table_num = await process_images_table(Y1_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words) + + # output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y2_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y2,True, 1) + # print(message) + # total_table_num += 1 + + # total_table_num = await process_images_table(Y2_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words) + + # output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y3_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y3,True, 1) + # print(message) + # total_table_num += 1 + + # total_table_num = await process_images_table(Y3_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words) + # print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) + # head_num += 1 + + # #缺陷详情 + # QUEXIAN_XIANGQING_DIR = os.path.join(muban_dir,"check_check.docx") + # QUEXIAN_XIANGQING_TITLE_DIR = os.path.join(muban_dir,"check_check_title.docx") + + # Y_tables = [Y1_quexian_dict,Y2_quexian_dict,Y3_quexian_dict] + # Y1_table_list = [] + # Y2_table_list = [] + # Y3_table_list = [] + # table_lists = [Y1_table_list, Y2_table_list, Y3_table_list] + + # for i, (table_list, Y_dict) in enumerate(zip(table_lists, Y_tables)): + # for image_name, image_path in Y_dict.items(): + # # 从图片名解析各个字段 + # parts = image_name.split('_') + # if len(parts) >= 8: # 确保有7个部分 + # defect_type = parts[1] + # defect_location = parts[2] + # defect_size = parts[3] + # visibility = parts[4] + # urgency = parts[5] + # severity = parts[6] + # repair_suggestion = parts[7] + + # print(f"获取第{i+1}个叶片的缺陷图: {image_path}") + + # table_list.append({ + # "QueXianLeiXing": defect_type, + # "QueXianWeiZhi": defect_location, + # "QueXianChiCun": defect_size, + # "WeiZongDengJi": severity, + # "Tupian_Dir": image_path, + # "visibility": visibility, + # "urgency": urgency, + # "repair_suggestion": repair_suggestion.split('.')[0], # 新增维修建议字段 + # }) + # else: + # table_list.append({ + # "QueXianLeiXing": "图片命名有误", + # "QueXianWeiZhi": "图片命名有误", + # "QueXianChiCun": "图片命名有误", + # "WeiZongDengJi": "图片命名有误", + # "Tupian_Dir": image_path, + # "visibility": "图片命名有误", + # "urgency": "图片命名有误", + # "repair_suggestion": "图片命名有误", # 新增维修建议字段 + # }) + + + # Y1_TABLES, Y1_TABLES_PICTURES = fill_tables(table_lists[0],4,5,len(table_lists[0]),Y1) + # Y2_TABLES, Y2_TABLES_PICTURES = fill_tables(table_lists[1],4,5,len(table_lists[1]),Y2) + # Y3_TABLES, Y3_TABLES_PICTURES = fill_tables(table_lists[2],4,5,len(table_lists[2]),Y3) + # print(add_documents(output_dir, QUEXIAN_XIANGQING_TITLE_DIR)) + # print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) + # head_num += 1 + # table_num = 0 + # Xu_Hao = 0 + # total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y1_TABLES,QUEXIAN_XIANGQING_DIR,Y1_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao) + # total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y2_TABLES,QUEXIAN_XIANGQING_DIR,Y2_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao) + # total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y3_TABLES,QUEXIAN_XIANGQING_DIR,Y3_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao) + + + #总结 + ZONG_JIE_DIR = os.path.join(muban_dir,"result.docx") + ZONG_JIE_BEFORE = "result" + ZONG_JIE = baogao_info['baogao_zongjie'] + print(add_documents(output_dir, ZONG_JIE_DIR)) + print(await search_and_replace(output_dir, ZONG_JIE_BEFORE, ZONG_JIE)) + print(await search_and_replace(output_dir, 'company_yi', Yi_company)) + print(await search_and_replace(output_dir, 'baogao_date', baogao_date.split(' ')[0])) + print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num])) + + + + + +def main(): + json_data1 = { + "turbine_id" : "183463dbf40d9278549a76b82b175dd9", + } + + json_data2 = { + 'shengcheng_dir': r"D:\work\Report_Generate_Server\output", + 'muban_dir': r"D:\work\Report_Generate_Server\muban", + } + asyncio.run(generate_report(json_data1,json_data2)) + print('文档生成完毕') +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/core/__pycache__/styles.cpython-313.pyc b/core/__pycache__/styles.cpython-313.pyc new file mode 100644 index 0000000..cc6140e Binary files /dev/null and b/core/__pycache__/styles.cpython-313.pyc differ diff --git a/core/__pycache__/tables.cpython-313.pyc b/core/__pycache__/tables.cpython-313.pyc new file mode 100644 index 0000000..db9d17f Binary files /dev/null and b/core/__pycache__/tables.cpython-313.pyc differ diff --git a/core/styles.py b/core/styles.py new file mode 100644 index 0000000..e1812fa --- /dev/null +++ b/core/styles.py @@ -0,0 +1,138 @@ +""" +Style-related functions for Word Document Server. +""" +from docx.shared import Pt +from docx.enum.style import WD_STYLE_TYPE + + +def ensure_heading_style(doc): + """ + Ensure Heading styles exist in the document. + + Args: + doc: Document object + """ + for i in range(1, 10): # Create Heading 1 through Heading 9 + style_name = f'Heading {i}' + try: + # Try to access the style to see if it exists + style = doc.styles[style_name] + except KeyError: + # Create the style if it doesn't exist + try: + from docx.oxml.ns import qn + style = doc.styles.add_style(style_name, WD_STYLE_TYPE.PARAGRAPH) + style.font.name = '宋体(中文正文)' + style.font.size = Pt(22) # 根据需要设置字体大小 + style._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体(中文正文)') + if i == 1: + style.font.size = Pt(16) + style.font.bold = True + elif i == 2: + style.font.size = Pt(14) + style.font.bold = True + else: + style.font.size = Pt(12) + style.font.bold = True + except Exception: + # If style creation fails, we'll just use default formatting + pass + + +def ensure_table_style(doc): + """ + Ensure Table Grid style exists in the document. + + Args: + doc: Document object + """ + try: + # Try to access the style to see if it exists + style = doc.styles['Table Grid'] + except KeyError: + # If style doesn't exist, we'll handle it at usage time + pass + + +def create_style(doc, style_name, style_type, base_style=None, font_properties=None, paragraph_properties=None): + """ + Create a new style in the document. + + Args: + doc: Document object + style_name: Name for the new style + style_type: Type of style (WD_STYLE_TYPE) + base_style: Optional base style to inherit from + font_properties: Dictionary of font properties (bold, italic, size, name, color) + paragraph_properties: Dictionary of paragraph properties (alignment, spacing) + + Returns: + The created style + """ + from docx.shared import Pt + + try: + # Check if style already exists + style = doc.styles.get_by_id(style_name, WD_STYLE_TYPE.PARAGRAPH) + return style + except: + # Create new style + new_style = doc.styles.add_style(style_name, style_type) + + # Set base style if specified + if base_style: + new_style.base_style = doc.styles[base_style] + + # Set font properties + if font_properties: + font = new_style.font + if 'bold' in font_properties: + font.bold = font_properties['bold'] + if 'italic' in font_properties: + font.italic = font_properties['italic'] + if 'size' in font_properties: + font.size = Pt(font_properties['size']) + if 'name' in font_properties: + font.name = font_properties['name'] + if 'color' in font_properties: + from docx.shared import RGBColor + + # Define common RGB colors + color_map = { + 'red': RGBColor(255, 0, 0), + 'blue': RGBColor(0, 0, 255), + 'green': RGBColor(0, 128, 0), + 'yellow': RGBColor(255, 255, 0), + 'black': RGBColor(0, 0, 0), + 'gray': RGBColor(128, 128, 128), + 'white': RGBColor(255, 255, 255), + 'purple': RGBColor(128, 0, 128), + 'orange': RGBColor(255, 165, 0) + } + + color_value = font_properties['color'] + try: + # Handle string color names + if isinstance(color_value, str) and color_value.lower() in color_map: + font.color.rgb = color_map[color_value.lower()] + # Handle RGBColor objects + elif hasattr(color_value, 'rgb'): + font.color.rgb = color_value + # Try to parse as RGB string + elif isinstance(color_value, str): + font.color.rgb = RGBColor.from_string(color_value) + # Use directly if it's already an RGB value + else: + font.color.rgb = color_value + except Exception as e: + # Fallback to black if all else fails + font.color.rgb = RGBColor(0, 0, 0) + + # Set paragraph properties + if paragraph_properties: + if 'alignment' in paragraph_properties: + new_style.paragraph_format.alignment = paragraph_properties['alignment'] + if 'spacing' in paragraph_properties: + new_style.paragraph_format.line_spacing = paragraph_properties['spacing'] + + return new_style diff --git a/core/tables.py b/core/tables.py new file mode 100644 index 0000000..abeb0b8 --- /dev/null +++ b/core/tables.py @@ -0,0 +1,283 @@ +""" +Table-related operations for Word Document Server. +""" +from docx.oxml.shared import OxmlElement, qn +from docx.oxml.ns import nsdecls +from docx.oxml import parse_xml + + +def set_cell_border(cell, **kwargs): + """ + Set cell border properties. + + Args: + cell: The cell to modify + **kwargs: Border properties (top, bottom, left, right, val, color) + """ + tc = cell._tc + tcPr = tc.get_or_add_tcPr() + + # Create border elements + for key, value in kwargs.items(): + if key in ['top', 'left', 'bottom', 'right']: + tag = 'w:{}'.format(key) + + element = OxmlElement(tag) + element.set(qn('w:val'), kwargs.get('val', 'single')) + element.set(qn('w:sz'), kwargs.get('sz', '4')) + element.set(qn('w:space'), kwargs.get('space', '0')) + element.set(qn('w:color'), kwargs.get('color', 'auto')) + + tcBorders = tcPr.first_child_found_in("w:tcBorders") + if tcBorders is None: + tcBorders = OxmlElement('w:tcBorders') + tcPr.append(tcBorders) + + tcBorders.append(element) + + +def apply_table_style(table, has_header_row=False, border_style=None, shading=None): + """ + Apply formatting to a table. + + Args: + table: The table to format + has_header_row: If True, formats the first row as a header + border_style: Style for borders ('none', 'single', 'double', 'thick') + shading: 2D list of cell background colors (by row and column) + + Returns: + True if successful, False otherwise + """ + try: + # Format header row if requested + if has_header_row and table.rows: + header_row = table.rows[0] + for cell in header_row.cells: + for paragraph in cell.paragraphs: + if paragraph.runs: + for run in paragraph.runs: + run.bold = True + + # Apply border style if specified + if border_style: + val_map = { + 'none': 'nil', + 'single': 'single', + 'double': 'double', + 'thick': 'thick' + } + val = val_map.get(border_style.lower(), 'single') + + # Apply to all cells + for row in table.rows: + for cell in row.cells: + set_cell_border( + cell, + top=True, + bottom=True, + left=True, + right=True, + val=val, + color="000000" + ) + + # Apply cell shading if specified + if shading: + for i, row_colors in enumerate(shading): + if i >= len(table.rows): + break + for j, color in enumerate(row_colors): + if j >= len(table.rows[i].cells): + break + try: + # Apply shading to cell + cell = table.rows[i].cells[j] + shading_elm = parse_xml(f'') + cell._tc.get_or_add_tcPr().append(shading_elm) + except: + # Skip if color format is invalid + pass + + return True + except Exception: + return False + + + +def copy_table(source_table, target_doc, ifadjustheight=True, height = 1): + """ + Copy a table from one document to another. + + Args: + source_table: The table to copy + target_doc: The document to copy the table to + + Returns: + The new table in the target document + """ + # Create a new table with the same dimensions + new_table = target_doc.add_table(rows=len(source_table.rows), cols=len(source_table.columns)) + + # Try to apply the same style + try: + if source_table.style: + new_table.style = 'Table Grid' + except: + # Fall back to default grid style + try: + new_table.style = 'Table Grid' + except: + pass + from docx.enum.table import WD_TABLE_ALIGNMENT + from docx.enum.table import WD_ALIGN_VERTICAL + from docx.shared import Pt, Inches, Cm, RGBColor + # Copy cell contents + for i, row in enumerate(source_table.rows): + for j, cell in enumerate(row.cells): + for paragraph in cell.paragraphs: + average_char_width_in_points = 6 + if paragraph.text: + new_table.cell(i,j).text = paragraph.text + new_table.cell(i,j).paragraphs[0].runs[0].font.name = "Times New Roman" #设置英文字体 + new_table.cell(i,j).paragraphs[0].runs[0].font.size = Pt(10.5) # 字体大小 + new_table.cell(i,j).paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') #设置中文字体 + new_table.cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER + new_table.cell(i,j).vertical_alignment = WD_ALIGN_VERTICAL.CENTER + """ + 待添加:如何让表格自适应大小(autofit目前不知为何没有作用) + """ + if ifadjustheight: + new_table.rows[i].height = Cm(height) + try: + new_table = merge_tables(new_table) + except Exception as e: + print(f"合并表格失败:{e}") + from docx.shared import Inches + + return new_table + + +from collections import deque + +def merge_tables(table): + """BFS遍历,将相邻且相同单元格合并 + + Args: + table: 表格,docx库的Table类型 + Returns: + 合并后的表格 + """ + if not table or len(table.rows) == 0: + return table + + rows = len(table.rows) + cols = len(table.columns) + + # 创建访问标记矩阵 + visited = [[False for _ in range(cols)] for _ in range(rows)] + + # 定义四个方向的移动:上、右、下、左 + directions = [(0, 0), (0, 1), (1, 0), (0, 0)] + + for i in range(rows): + for j in range(cols): + if not visited[i][j]: + current_cell = table.cell(i, j) + current_text = current_cell.text.strip() + + if not current_text: # 跳过空单元格 + visited[i][j] = True + continue + + # BFS队列 + queue = deque() + queue.append((i, j)) + visited[i][j] = True + + # 记录需要合并的单元格 + cells_to_merge = [] + + while queue: + x, y = queue.popleft() + cells_to_merge.append((x, y)) + + for dx, dy in directions: + nx, ny = x + dx, y + dy + + # 检查边界和访问状态 + if 0 <= nx < rows and 0 <= ny < cols and not visited[nx][ny]: + neighbor_cell = table.cell(nx, ny) + neighbor_text = neighbor_cell.text.strip() + + if neighbor_text == current_text: + visited[nx][ny] = True + queue.append((nx, ny)) + + # 如果有需要合并的单元格 + if len(cells_to_merge) > 1: + # 按行和列排序,确保左上角是第一个单元格 + cells_to_merge.sort() + min_row, min_col = cells_to_merge[0] + max_row, max_col = cells_to_merge[-1] + + # 清空所有待合并单元格(包括换行符) + for x, y in cells_to_merge[1:]: + cell = table.cell(x, y) + # 删除所有段落(彻底清空) + for paragraph in list(cell.paragraphs): + p = paragraph._element + p.getparent().remove(p) + # 可选:添加一个空段落防止格式问题 + cell.add_paragraph() + + # 执行合并 + if max_row > min_row or max_col > min_col: + table.cell(min_row, min_col).merge(table.cell(max_row, max_col)) + + return table + +def fill_tables(Y_table_list, row, col, Y_Table_num, Y): + """根据前端返回json块填写表格list,并实时跟进已填写表格数量 + 目前只支持固定的缺陷图的填写 + + Args: + Y_table_list (list): 前端返回的json块 + row (int): 表格行数 + col (int): 表格列数 + Y_Table_num: json块中有几个表格 + Xu_Hao: 是第几个json块 + Y: 其他参数 + + Return: + Y1_TABLES: 三维,表和对应元素 + table_index_to_images: 字典,表索引到图片路径列表的映射 + Xu_Hao:到达第几个表了 + """ + table_index_to_images = {} + Y_TABLES = [[["" for _ in range(row)] for _ in range(col)] for _ in range(Y_Table_num)] + + # 处理前端返回数据 + for l, table_dict in enumerate(Y_table_list): + if table_dict: + Y_TABLES[l][1][0] = Y + Y_TABLES[l][1][1] = table_dict["QueXianLeiXing"] + Y_TABLES[l][1][2] = table_dict["QueXianWeiZhi"] + Y_TABLES[l][1][3] = table_dict["QueXianChiCun"] + Y_TABLES[l][3][0] = table_dict["WeiZongDengJi"] + Y_TABLES[l][3][1] = table_dict["visibility"] + Y_TABLES[l][3][2] = table_dict["urgency"] + Y_TABLES[l][3][3] = table_dict["repair_suggestion"] + + # 获取图片路径 + image_path = table_dict['Tupian_Dir'] + if image_path: + # 确保路径是字符串形式 + if isinstance(image_path, list): + table_index_to_images[l] = image_path.copy() + else: + table_index_to_images[l] = [str(image_path)] + + return Y_TABLES, table_index_to_images + + diff --git a/muban/check2.docx b/muban/check2.docx new file mode 100644 index 0000000..64bc2f7 Binary files /dev/null and b/muban/check2.docx differ diff --git a/muban/check3.docx b/muban/check3.docx new file mode 100644 index 0000000..3a93523 Binary files /dev/null and b/muban/check3.docx differ diff --git a/muban/check_check.docx b/muban/check_check.docx new file mode 100644 index 0000000..e79852d Binary files /dev/null and b/muban/check_check.docx differ diff --git a/muban/check_check_title.docx b/muban/check_check_title.docx new file mode 100644 index 0000000..382514d Binary files /dev/null and b/muban/check_check_title.docx differ diff --git a/muban/check_content.docx b/muban/check_content.docx new file mode 100644 index 0000000..86b5452 Binary files /dev/null and b/muban/check_content.docx differ diff --git a/muban/checkinfo.docx b/muban/checkinfo.docx new file mode 100644 index 0000000..6c60b8f Binary files /dev/null and b/muban/checkinfo.docx differ diff --git a/muban/checkmethod.docx b/muban/checkmethod.docx new file mode 100644 index 0000000..b35dccc Binary files /dev/null and b/muban/checkmethod.docx differ diff --git a/muban/chengguo_sub.docx b/muban/chengguo_sub.docx new file mode 100644 index 0000000..a36e983 Binary files /dev/null and b/muban/chengguo_sub.docx differ diff --git a/muban/fengmian.jpg b/muban/fengmian.jpg new file mode 100644 index 0000000..0df9fd9 Binary files /dev/null and b/muban/fengmian.jpg differ diff --git a/muban/fengmian1.docx b/muban/fengmian1.docx new file mode 100644 index 0000000..fc0e701 Binary files /dev/null and b/muban/fengmian1.docx differ diff --git a/muban/fengmian2.docx b/muban/fengmian2.docx new file mode 100644 index 0000000..c12fbf8 Binary files /dev/null and b/muban/fengmian2.docx differ diff --git a/muban/result.docx b/muban/result.docx new file mode 100644 index 0000000..76d7748 Binary files /dev/null and b/muban/result.docx differ diff --git a/muban/total_check.docx b/muban/total_check.docx new file mode 100644 index 0000000..ce78dbc Binary files /dev/null and b/muban/total_check.docx differ diff --git a/muban/xiangmugaikuo.docx b/muban/xiangmugaikuo.docx new file mode 100644 index 0000000..ee39cf2 Binary files /dev/null and b/muban/xiangmugaikuo.docx differ diff --git a/muban/检查方案内容.docx b/muban/检查方案内容.docx new file mode 100644 index 0000000..5a2d8d5 Binary files /dev/null and b/muban/检查方案内容.docx differ diff --git a/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版.docx b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版.docx new file mode 100644 index 0000000..44548a6 Binary files /dev/null and b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版.docx differ diff --git a/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版1.docx b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版1.docx new file mode 100644 index 0000000..44548a6 Binary files /dev/null and b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版1.docx differ diff --git a/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版2.docx b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版2.docx new file mode 100644 index 0000000..44548a6 Binary files /dev/null and b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版2.docx differ diff --git a/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版1.docx b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版1.docx new file mode 100644 index 0000000..44548a6 Binary files /dev/null and b/output/~$能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版1.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版.docx new file mode 100644 index 0000000..ac77005 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版1.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版1.docx new file mode 100644 index 0000000..4edbf8d Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版1.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版2.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版2.docx new file mode 100644 index 0000000..57de554 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版2.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版3.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版3.docx new file mode 100644 index 0000000..6752839 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版3.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版4.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版4.docx new file mode 100644 index 0000000..94f6d54 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版4.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版5.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版5.docx new file mode 100644 index 0000000..961ad89 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版5.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版6.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版6.docx new file mode 100644 index 0000000..f97fc83 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2021年12月10日版6.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版.docx new file mode 100644 index 0000000..09a9062 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版.docx differ diff --git a/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版1.docx b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版1.docx new file mode 100644 index 0000000..f7b95a1 Binary files /dev/null and b/output/三峡能源阿城万兴风电场项目风力发电机组叶片检查报告H3-08#2025年07月03日版1.docx differ diff --git a/tools/API.py b/tools/API.py new file mode 100644 index 0000000..e56987e --- /dev/null +++ b/tools/API.py @@ -0,0 +1,5 @@ +DTURL = "http://pms.dtyx.net:9158" +GETPROJECTINFO = "/project/detail/{projectId}" +GETJIZUINFO = "/turbine/detail/{turbineId}" +GETSHIGONGINFO = "/t-construction/list" +GETWEATHERINFO = "/weather-type/{weatherCode}" \ No newline at end of file diff --git a/tools/Get_Json.py b/tools/Get_Json.py new file mode 100644 index 0000000..acb10be --- /dev/null +++ b/tools/Get_Json.py @@ -0,0 +1,79 @@ +import requests +import json +from tools.API import * +def get_project_info(projectId): + projecturl = DTURL + GETPROJECTINFO.format(projectId=projectId) + headers = { + "content_type" : "application/x-www-form-urlencoded" + } + try: + response = requests.get(projecturl, headers=headers) + if response.status_code == 200: + data = json.loads(response.text) + print(f"获取到项目的数据:{data}") + return data["data"] + else: + print(f"获取项目{projectId}数据失败,状态码:{response.status_code}") + return None + except Exception as e: + print(f"获取项目{projectId}数据失败,异常:{e}") + return None + +def get_jizu_info(turbineId): + jizuurl = DTURL + GETJIZUINFO.format(turbineId=turbineId) + headers = { + "content_type" : "application/x-www-form-urlencoded" + } + try: + response = requests.get(jizuurl, headers=headers) + if response.status_code == 200: + data = json.loads(response.text) + print(f"获取到机组的数据:{data}") + return data["data"] + else: + print(f"获取项目{turbineId}数据失败,状态码:{response.status_code}") + return None + except Exception as e: + print(f"获取项目{turbineId}数据失败,异常:{e}") + return None + +def get_jizu_shigong_info(turbineId): + jizuurl = DTURL + GETSHIGONGINFO + headers = { + "content_type" : "application/x-www-form-urlencoded" + } + params = { + "rows" : { + "turbineId" : turbineId + } + } + try: + response = requests.get(jizuurl, headers=headers, params=params) + if response.status_code == 200: + data = json.loads(response.text) + print(f"获取到机组施工的数据:{data}") + return data["rows"][0] + else: + print(f"获取项目{turbineId}施工数据失败,状态码:{response.status_code}") + return None + except Exception as e: + print(f"获取项目{turbineId}施工数据失败,异常:{e}") + return None + +def get_weather(weatherid): + weatherurl = DTURL + GETWEATHERINFO.format(weatherCode=weatherid) + headers = { + "content_type" : "application/x-www-form-urlencoded" + } + try: + response = requests.get(weatherurl, headers=headers) + if response.status_code == 200: + data = json.loads(response.text) + print(f"获取到天气数据:{data}") + return data["data"] + else: + print(f"获取天气{weatherid}数据失败,状态码:{response.status_code}") + return None + except Exception as e: + print(f"获取天气{weatherid}数据失败,异常:{e}") + return None \ No newline at end of file diff --git a/tools/__pycache__/API.cpython-313.pyc b/tools/__pycache__/API.cpython-313.pyc new file mode 100644 index 0000000..6b62e37 Binary files /dev/null and b/tools/__pycache__/API.cpython-313.pyc differ diff --git a/tools/__pycache__/Get_Json.cpython-313.pyc b/tools/__pycache__/Get_Json.cpython-313.pyc new file mode 100644 index 0000000..85385b1 Binary files /dev/null and b/tools/__pycache__/Get_Json.cpython-313.pyc differ diff --git a/tools/__pycache__/content_tools.cpython-313.pyc b/tools/__pycache__/content_tools.cpython-313.pyc new file mode 100644 index 0000000..33a093e Binary files /dev/null and b/tools/__pycache__/content_tools.cpython-313.pyc differ diff --git a/tools/__pycache__/defines.cpython-313.pyc b/tools/__pycache__/defines.cpython-313.pyc new file mode 100644 index 0000000..27a98fe Binary files /dev/null and b/tools/__pycache__/defines.cpython-313.pyc differ diff --git a/tools/__pycache__/document_tools.cpython-313.pyc b/tools/__pycache__/document_tools.cpython-313.pyc new file mode 100644 index 0000000..08b4394 Binary files /dev/null and b/tools/__pycache__/document_tools.cpython-313.pyc differ diff --git a/tools/__pycache__/get_pictures.cpython-313.pyc b/tools/__pycache__/get_pictures.cpython-313.pyc new file mode 100644 index 0000000..33bab75 Binary files /dev/null and b/tools/__pycache__/get_pictures.cpython-313.pyc differ diff --git a/tools/content_tools.py b/tools/content_tools.py new file mode 100644 index 0000000..2b2719d --- /dev/null +++ b/tools/content_tools.py @@ -0,0 +1,637 @@ +""" +Content tools for Word Document Server. + +These tools add various types of content to Word documents, +including headings, paragraphs, tables, images, and page breaks. +""" +import os +from typing import List, Optional, Dict, Any +from docx import Document +from docx.shared import Inches, Pt +from docx.oxml.shared import qn + + +from utils.file_utils import check_file_writeable, ensure_docx_extension +from utils.document_utils import find_and_replace_text +from core.styles import ensure_heading_style, ensure_table_style + +def split_table_by_row_content( + doc_path: str, + output_path: str, + table_num: int = 0 +) -> str: + """ + 根据表格第二行第一列内容的行数对指定表格进行分行处理, + 并将每列内容按相同行数分割,不足则重复 + + 参数: + doc_path: 输入Word文档路径 + output_path: 输出Word文档路径 + table_num: 要处理的表格序号(从0开始) + """ + try: + from docx import Document + from docx.shared import Pt + from docx.oxml.shared import qn + from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL + + # 打开文档 + doc = Document(doc_path) + + # 检查表格是否存在 + if len(doc.tables) <= table_num: + return f"文档中不存在第{table_num+1}个表格" + + # 获取指定表格 + table = doc.tables[table_num] + + # 获取表格行数和列数 + row_count = len(table.rows) + col_count = len(table.columns) + + # 如果表格行数小于2,无法处理 + if row_count < 2: + doc.save(output_path) + return "表格行数少于2行,无法按照要求分行" + + # 获取第二行第一列的文本内容 + second_row_first_cell = table.cell(1, 0) + second_row_text = second_row_first_cell.text + + # 计算第二行第一列文本的行数(按换行符分割) + lines_in_second_row = len(second_row_text.split('\n')) + + # 如果行数为0,设置为1(至少分为1部分) + split_count = max(1, lines_in_second_row) + + print(f'原表格行数:{row_count},第二行第一列内容行数:{split_count},需要分割为:{split_count}部分') + + # 创建新表格来替代原表格(分割后的表格) + # 新表格的行数 = 标题行(1) + 原数据行数 × 分割部分数 + new_table = doc.add_table(rows=1 + (row_count-1)*split_count, cols=col_count) + + # 设置表格样式 + new_table.style = table.style + new_table.autofit = True + + # 1. 处理标题行(第一行)保持不变 + for col_idx in range(col_count): + orig_cell = table.cell(0, col_idx) + new_cell = new_table.cell(0, col_idx) + + # 复制内容并设置格式 + new_cell.text = orig_cell.text + if orig_cell.paragraphs: + # 设置格式 + new_cell.paragraphs[0].runs[0].font.name = "Times New Roman" + new_cell.paragraphs[0].runs[0].font.size = Pt(10.5) + new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') + new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER + new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER + new_cell.width = orig_cell.width + + # 2. 处理数据行(从第二行开始) + for orig_row_idx in range(1, row_count): # 遍历原表格的每一行数据 + for col_idx in range(col_count): # 遍历每一列 + orig_cell = table.cell(orig_row_idx, col_idx) + cell_text = orig_cell.text + + # 分割当前单元格内容 + cell_lines = cell_text.split('\n') + cell_line_count = len(cell_lines) + + # 如果内容行数不足分割数,则重复最后一行 + if cell_line_count < split_count: + cell_lines += [cell_lines[-1]] * (split_count - cell_line_count) + + # 在新表格中对应的位置写入分割后的内容 + for part_idx in range(split_count): + # 计算新表格中的行位置 + new_row_idx = 1 + (orig_row_idx-1)*split_count + part_idx + + # 获取新单元格 + new_cell = new_table.cell(new_row_idx, col_idx) + + # 写入分割后的内容 + line_text = cell_lines[part_idx] if part_idx < len(cell_lines) else cell_lines[-1] + new_cell.text = line_text + + # 设置格式 + if new_cell.paragraphs: + new_cell.paragraphs[0].runs[0].font.name = "Times New Roman" + new_cell.paragraphs[0].runs[0].font.size = Pt(10.5) + new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') + new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER + new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER + + # 复制单元格宽度 + new_cell.width = orig_cell.width + + # 删除原表格 + table._element.getparent().remove(table._element) + + # 保存文档 + doc.save(output_path) + return f"第{table_num+1}个表格已成功分行处理" + + except Exception as e: + return f"处理表格时出错: {str(e)}" + + +async def add_heading(filename: str, text: str, level: int = 1) -> str: + """对文档增加标题 + + Args: + filename: 目标文档路径 + text: 标题文本 + level: 标题级别,1为最高级 + """ + filename = ensure_docx_extension(filename) + + # Ensure level is converted to integer + try: + level = int(level) + except (ValueError, TypeError): + return "Invalid parameter: level must be an integer between 1 and 9" + + # Validate level range + if level < 1 or level > 9: + return f"Invalid heading level: {level}. Level must be between 1 and 9." + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + # Suggest creating a copy + return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document." + + try: + doc = Document(filename) + + # Ensure heading styles exist + ensure_heading_style(doc) + + # Try to add heading with style + try: + heading = doc.add_heading(text, level=level) + doc.save(filename) + return f"Heading '{text}' (level {level}) added to {filename}" + except Exception as style_error: + # If style-based approach fails, use direct formatting + paragraph = doc.add_paragraph(text) + paragraph.style = doc.styles['Normal'] + run = paragraph.runs[0] + run.bold = True + rPr = run.element.get_or_add_rPr() + rFonts = rPr.get_or_add_rFonts() + from docx.oxml.shared import qn + rFonts.set(qn('w:eastAsia'), '宋体(中文正文)') + # Adjust size based on heading level + if level == 1: + run.font.size = Pt(12) + elif level == 2: + run.font.size = Pt(14) + else: + run.font.size = Pt(12) + + doc.save(filename) + return f"Heading '{text}' added to {filename} with direct formatting (style not available)" + except Exception as e: + return f"Failed to add heading: {str(e)}" + +async def add_paragraph(filename: str, text: str, style: Optional[str] = None) -> str: + """对文档添加一个段落(一行) + + Args: + filename: 目标文档路径 + text: 段落内容 + style: 段落样式,可选 + """ + filename = ensure_docx_extension(filename) + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + # Suggest creating a copy + return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document." + + try: + doc = Document(filename) + paragraph = doc.add_paragraph(text) + + if style: + try: + paragraph.style = style + except KeyError: + # Style doesn't exist, use normal and report it + paragraph.style = doc.styles['Normal'] + # Copy run formatting + # for i, run in enumerate(paragraph.runs): + # if i < len(paragraph.runs): + # new_run = paragraph.runs[i] + # # Copy basic formatting + # new_run.bold = run.bold + # new_run.italic = run.italic + # new_run.underline = run.underline + # #添加同时合并字体2025427 + # new_run.font.name = run.font.name + # rPr = new_run.element.get_or_add_rPr() + # rFonts = rPr.get_or_add_rFonts() + # # 检查 run.font.name 是否为 None + # if run.font.name is None: + # # 设置默认的中文字体名称 + # run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体 + # rFonts.set(qn('w:eastAsia'), run.font.name) + # new_run.font.color.rgb = run.font.color.rgb + + # # Font size if specified + # if run.font.size: + # new_run.font.size = run.font.size + doc.save(filename) + return f"Style '{style}' not found, paragraph added with default style to {filename}" + + doc.save(filename) + return f"Paragraph added to {filename}" + except Exception as e: + return f"Failed to add paragraph: {str(e)}" + + +async def add_table(filename: str, rows: int, cols: int, data: Optional[List[List[str]]] = None) -> str: + """对文档添加一个表格 + + Args: + filename: 目标文档路径 + rows: 表格行数 + cols: 表格列数 + data: 二维数组列表,每一项为单元格内容,默认为空 + """ + filename = ensure_docx_extension(filename) + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + # Suggest creating a copy + return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document." + + try: + doc = Document(filename) + table = doc.add_table(rows=rows, cols=cols) + + # Try to set the table style + try: + table.style = 'Table Grid' + except KeyError: + # If style doesn't exist, add basic borders + pass + + # Fill table with data if provided + if data: + for i, row_data in enumerate(data): + if i >= rows: + break + for j, cell_text in enumerate(row_data): + if j >= cols: + break + table.cell(i, j).text = str(cell_text) + + doc.save(filename) + return f"Table ({rows}x{cols}) added to {filename}" + except Exception as e: + return f"Failed to add table: {str(e)}" + +async def add_picture_to_table(target_doc: Document, target_filename: str, row: int, col: int, image_path: str,table_num: int = -1, width: Optional[float] = None) -> str: + """向文档中对应表格添加图片 + + Args: + target_doc: 目标文档 + target_filename: 目标文档保存路径 + row: 表格行数 + col: 表格列数 + image_path: 图片路径 + table_num: 表格序号,默认为-1,即最后一个表格 + width: 图片宽度,默认为None,表示使用原始图片大小 + """ + from PIL import Image + if not os.path.exists(image_path): + return f"Image file not found: {image_path}" + + # Check image file size + try: + image_size = os.path.getsize(image_path) / 1024 # Size in KB + if image_size <= 0: + return f"Image file appears to be empty: {image_path} (0 KB)" + elif image_size > 9126: + # Create the output directory if it doesn't exist + output_dir = os.path.join(os.path.dirname(image_path), "压缩图片") + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # Define the output path for the compressed image + image_name = os.path.basename(image_path) + output_path = os.path.join(output_dir, image_name) + + # Compress the image + while image_size > 9126: + print(f"压缩图片:{image_path} ({image_size:.2f} KB) -> {output_path} (9126 KB)") + with Image.open(image_path) as img: + img.save(output_path, optimize=True, quality=85) + image_size = os.path.getsize(output_path) / 1024 # Size in KB + + # Update the image path to the compressed image path + image_path = output_path + except Exception as size_error: + return f"Error checking image file: {str(size_error)}" + + try: + table = target_doc.tables[table_num] + # Add the picture to the cell + cell = table.cell(row, col) + if len(cell.text) == 1: cell.text = "" + paragraph = cell.paragraphs[-1] + run = paragraph.add_run() + try: + if width: + run.add_picture(image_path, width=Inches(width)) + else: + run.add_picture(image_path) + except Exception as e: + # 如果添加图片时出现问题,尝试将图片转换为PNG格式 + try: + print(f"正常添加失败,尝试转换图片后添加:{image_path}") + # 打开图片 + img = Image.open(image_path) + # 转换为PNG格式 + temp_image_path = os.path.splitext(image_path)[0] + '.png' + img.save(temp_image_path, 'PNG') + + # 尝试添加转换后的图片 + if width: + run.add_picture(temp_image_path, width=Inches(width)) + else: + run.add_picture(temp_image_path) + + # 添加完成后删除转换后的图片 + os.remove(temp_image_path) + except Exception as e: + # 如果转换或添加转换后的图片时出现问题,返回错误信息 + return f"调用add_picture函数出现问题: {str(e)}" + from docx.enum.table import WD_TABLE_ALIGNMENT + from docx.enum.table import WD_ALIGN_VERTICAL + cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER + cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER + + target_doc.save(target_filename) + return f"Picture {image_path} added to table {table_num} cell ({row},{col})" + except Exception as e: + return f"Failed to add picture to table: {str(e)}" + +async def add_picture(filename: str, image_path: str, width: Optional[float] = None) -> str: + """添加一个图片到文档中 + + Args: + filename: 文档路径 + image_path: 图片路径 + width: 图片大小 + """ + filename = ensure_docx_extension(filename) + + # Validate document existence + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Get absolute paths for better diagnostics + abs_filename = os.path.abspath(filename) + abs_image_path = os.path.abspath(image_path) + + # Validate image existence with improved error message + if not os.path.exists(abs_image_path): + return f"Image file not found: {abs_image_path}" + + # Check image file size + try: + image_size = os.path.getsize(abs_image_path) / 1024 # Size in KB + if image_size <= 0: + return f"Image file appears to be empty: {abs_image_path} (0 KB)" + except Exception as size_error: + return f"Error checking image file: {str(size_error)}" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(abs_filename) + if not is_writeable: + return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document." + + try: + doc = Document(abs_filename) + # Additional diagnostic info + diagnostic = f"Attempting to add image ({abs_image_path}, {image_size:.2f} KB) to document ({abs_filename})" + + try: + if width: + doc.add_picture(abs_image_path, width=Inches(width)) + else: + doc.add_picture(abs_image_path) + doc.save(abs_filename) + return f"Picture {image_path} added to {filename}" + except Exception as inner_error: + # More detailed error for the specific operation + error_type = type(inner_error).__name__ + error_msg = str(inner_error) + return f"Failed to add picture: {error_type} - {error_msg or 'No error details available'}\nDiagnostic info: {diagnostic}" + except Exception as outer_error: + # Fallback error handling + error_type = type(outer_error).__name__ + error_msg = str(outer_error) + return f"Document processing error: {error_type} - {error_msg or 'No error details available'}" + + +async def add_page_break(filename: str) -> str: + """增加分页符 + + Args: + filename: 目标文档 + """ + filename = ensure_docx_extension(filename) + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + return f"Cannot modify document: {error_message}. Consider creating a copy first." + + try: + doc = Document(filename) + doc.add_page_break() + doc.save(filename) + return f"Page break added to {filename}." + except Exception as e: + return f"Failed to add page break: {str(e)}" + + +async def add_table_of_contents(filename: str, title: str = "Table of Contents", max_level: int = 3) -> str: + """根据标题样式向Word文档添加目录。 + + 参数: + filename: Word文档的路径 + title: 可自行选择的一个标题 + max_level: 要包含的最大标题级别(1-9) + """ + + filename = ensure_docx_extension(filename) + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + return f"Cannot modify document: {error_message}. Consider creating a copy first." + + try: + # Ensure max_level is within valid range + max_level = max(1, min(max_level, 9)) + + doc = Document(filename) + + # Collect headings and their positions + headings = [] + for i, paragraph in enumerate(doc.paragraphs): + # Check if paragraph style is a heading + if paragraph.style and paragraph.style.name.startswith('Heading '): + try: + # Extract heading level from style name + level = int(paragraph.style.name.split(' ')[1]) + if level <= max_level: + headings.append({ + 'level': level, + 'text': paragraph.text, + 'position': i + }) + except (ValueError, IndexError): + # Skip if heading level can't be determined + pass + + if not headings: + return f"No headings found in document {filename}. Table of contents not created." + + # Create a new document with the TOC + toc_doc = Document() + + # Add title + if title: + toc_doc.add_heading(title, level=1) + + # Add TOC entries + for heading in headings: + # Indent based on level (using tab characters) + indent = ' ' * (heading['level'] - 1) + toc_doc.add_paragraph(f"{indent}{heading['text']}") + + # Add page break + toc_doc.add_page_break() + + # Get content from original document + for paragraph in doc.paragraphs: + p = toc_doc.add_paragraph(paragraph.text) + # Copy style if possible + try: + if paragraph.style: + p.style = paragraph.style.name + except: + pass + + # Copy tables + for table in doc.tables: + # Create a new table with the same dimensions + new_table = toc_doc.add_table(rows=len(table.rows), cols=len(table.columns)) + # Copy cell contents + for i, row in enumerate(table.rows): + for j, cell in enumerate(row.cells): + for paragraph in cell.paragraphs: + new_table.cell(i, j).text = paragraph.text + + # Save the new document with TOC + toc_doc.save(filename) + + return f"Table of contents with {len(headings)} entries added to {filename}" + except Exception as e: + return f"Failed to add table of contents: {str(e)}" + + +async def delete_paragraph(filename: str, paragraph_index: int) -> str: + """通过行索引从文档中删除一段 + + Args: + filename: Path to the Word document + paragraph_index: 段落位置(第几行) + """ + filename = ensure_docx_extension(filename) + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + return f"Cannot modify document: {error_message}. Consider creating a copy first." + + try: + doc = Document(filename) + + # Validate paragraph index + if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs): + return f"Invalid paragraph index. Document has {len(doc.paragraphs)} paragraphs (0-{len(doc.paragraphs)-1})." + + # Delete the paragraph (by removing its content and setting it empty) + # Note: python-docx doesn't support true paragraph deletion, this is a workaround + paragraph = doc.paragraphs[paragraph_index] + p = paragraph._p + p.getparent().remove(p) + + doc.save(filename) + return f"Paragraph at index {paragraph_index} deleted successfully." + except Exception as e: + return f"Failed to delete paragraph: {str(e)}" + + +async def search_and_replace(filename: str, find_text: str, replace_text: str) -> str: + """替换所有find_text为replace_text + + Args: + filename: Path to the Word document + find_text: Text to search for + replace_text: Text to replace with + """ + filename = ensure_docx_extension(filename) + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + return f"Cannot modify document: {error_message}. Consider creating a copy first." + + try: + doc = Document(filename) + + # Perform find and replace + count = find_and_replace_text(doc, find_text, replace_text) + + if count > 0: + doc.save(filename) + return f"Replaced {count} occurrence(s) of '{find_text}' with '{replace_text}'." + else: + return f"No occurrences of '{find_text}' found." + except Exception as e: + return f"Failed to search and replace: {str(e)}" + diff --git a/tools/defines.py b/tools/defines.py new file mode 100644 index 0000000..9d1d3b0 --- /dev/null +++ b/tools/defines.py @@ -0,0 +1,176 @@ +""" +缺陷图目录格式: + +缺陷图期望格式 _隔开 + +外部内部命名格式都如下: +图片名:xuhao_缺陷类型_缺陷位置_缺陷尺寸_可见程度_紧急程度_危重等级_维修建议 +例:涂层损伤_叶片ps面距叶根3m处_缺陷尺寸弦向100mm,轴向800mm_轻微_紧急_重要_建议打磨维修 +每个的选项:见我发的图 + +防雷: +例:轮毂至塔基导通阻值_169mΩ +缺陷例:轮毂至塔基未导通 #即标明未导通即可 +""" + +DEFAULT_BAOGAO_INFO = { + #目录 + 'picture_dir': "", #图片存放地址 为了报告美观,希望总的汇总图片数量为3的倍数 + 'shengcheng_dir': "", #报告生成的路径 + 'muban_dir': "", #文档模板存放路径 + 'total_picture_dir': "", #用于生成略缩图的路径 + + 'key_words': '缺,损,裂,脱,污', #关键字,用于汇总图的名字包含缺陷时标红,匹配逻辑为正则匹配单个字则为红 后续可优化 + #当前检查报告基本内容 + 'jizu_type': 'H3-08#', #检查的机组编号 + 'jiancha_date': '2021年12月10日 9.00', #检查叶片日期 注意空格分开 小时分钟间隔为. + 'baogao_date': '2021年12月10日 9.00', #生成报告时间 注意空格分开 + 'baogao_type': "风力发电机组叶片检查报告", #报告类型 + #检查方案 + 'beizhu': '无', + 'renyuan_peizhi': '''2人;主检飞手1人,副检抄表1人 +3人,轮毂机舱作业1人,揽风绳作业1人,无人设备操作员及抄表1人 +1人;抄表人员1人,检测人员1人,监护1人。 +1人;主检飞手1人 +2人;轮毂作业检查2人''', + 'gongzuo_neirong': '''无人机叶片防雷导通测 +无人吊篮叶片导通测试(含机舱设备、) +风机基础、办公楼、变电站防雷接地检测及浪涌保护器测试 +无人机叶片外观巡检 +叶片内部检查''', + 'shigong_fangan': '无', + 'shebei_peizhi': '''1四轴电阻无人机1套,电子微欧计1台,视频记录手机1台 +无人吊篮系统1套(爬绳器+接触平台)、电子微欧计1套,视频记录手机1台,对讲机2台 +接地电阻测试仪1套、SPD测试仪1套、对讲机2个、 +1、大疆无人机1台(M350rtk,M300rtk,M30T,M30,精灵4PRO)2、大疆精灵4PRO+索尼A7R2机身+索尼200-600mm镜头/适马150-600mm镜头 +1、人工检查:照明设备2套,视频记录手机2台,含氧量监测仪1台,电动扳手2套,卷尺1个。2、爬壁机器人检查:无人作业校车+视频图传1套,照明设备2套,含氧量监测仪1台,电动扳手2套,卷尺1个。''', + 'jiancha_renyuan': '张三', + #检查信息 + 'waibu_jiancha': 'True', #是否包含外部检查 + 'neibu_jiancha': 'True', #是否包含内部检查 + 'fanglei_jiancha': 'True', #是否包含防雷检查 #注:防雷检测占不存放缺陷图 + 'jiancha_location': '叶片外部外观', #检查内容文字 + 'jiancha_fangshi': '作业无人机近距离外观检查', #检查方式文字 + 'yepian_changjia': '株洲时代新材料科技股份有限公司', #叶片厂家信息 + #报告处理信息 + 'yezhu_renyuan': '李四', #业主(人员) + 'changjia_renyuan': '王五', #厂家(人员) + 'date_process': '生成报告人员', #数据处理人员 吴总希望获取前端用户执行生成人员为这个人 + 'baogao_bianzhi': '生成报告人员', #报告编制人员 吴总希望获取前端用户执行生成人员为这个人 + 'baogao_shenghe': '待填写(人员)', #报告审核人员 + 'shenghe_date': '待填写(日期)', #报告审核日期 + #检查情况汇总表(文字信息) 前端根据是否包含对应部分检查自行确定检查内容,这里默认全部包含 + 'Y1_jiancha_neirong': '''1.叶片前缘、后缘、PS面、SS面 +2.叶片内部导雷卡、腹板、透光、人孔盖版、叶根盖板... +3.轮毂至塔基导通、内部导线线阻、外部导线线阻...''', + 'Y2_jiancha_neirong': '''1.叶片前缘、后缘、PS面、SS面 +2.叶片内部导雷卡、腹板、透光、人孔盖版、叶根盖板... +3.轮毂至塔基导通、内部导线线阻、外部导线线阻...''', + 'Y3_jiancha_neirong': '''1.叶片前缘、后缘、PS面、SS面 +2.叶片内部导雷卡、腹板、透光、人孔盖版、叶根盖板... +3.轮毂至塔基导通、内部导线线阻、外部导线线阻...''', + #报告总结 + 'baogao_zongjie': '''1、因海上风电叶片运行环境恶劣、空气盐碱度高,叶片前缘合模缝区域及PS面(迎风面)涂层易受腐蚀,建议定期观察维护。 +2、经无人机近距离外观检查发现H3-08#机位Y200220AF叶片PS面距叶根20m处发现一处裂纹,损伤长度轴向3m,该缺陷经我方判定为严重缺陷,建议尽快结安排对该机组停机并结合其他检查手段(如人工打磨)进一步勘查并决定维修处置方案,防止风险进一步升级。 +3、经无人机近距离外观检查未发现H3-08#机位Y200249AF、Y200250AF叶片有明显影响机组正常运行的外部缺陷。 +''', #报告总结文字 +} + +DEFAULT_BASE_INFO = { #项目基本信息 + #项目概况 + 'jituan_jianxie': '甲方集团', + 'jia_Company': '甲方公司名', + 'jizu_num': '项目规格(台)', + 'fengchang_name': '风场名称', + 'fengchang_location': '风场位置', + 'jizu_xinghao': '机组型号', #机组的型号 + #乙方信息 + 'yi_Company': '乙方公司名', + 'fuzeren': '甲方负责人(吴明洲)', + 'phone_fuzeren': '联系电话:18807109269 ', +} +oneproject = { + "status": 200, + "data": { + "projectId": "96e0debf78187300f144d7f3450a2477", + "projectName": "三峡能源阿城万兴风电场防雷通道检测项目", + "coverUrl": "", + "farmName": "三峡能源阿城万兴风电场", + "farmAddress": "哈尔滨市阿城区", + "client": "辽宁信达检测有限公司", + "clientContact": "李经理", + "clientPhone": "13504783720", + "inspectionUnit": "武汉市迪特影像科技有限公司", + "inspectionContact": "吴名州", + "inspectionPhone": "18807109269", + "scale": "", + "turbineModel": "", + "constructorIds": "5709ccfece2685090ff700a3469f2539,a76d78f1325deda1790a12bdad4aad4e", + "auditorId": "ca37c4337df8673a5c045b6c25acf74a", + "qualityOfficerId": "862e027910c2562d2b67d88ec33d77ba", + "projectManagerId": "fbaa9e0aecf2ce287138c38a4b654085", + "constructionTeamLeaderId": None, + "status": 0, + "startDate": None, + "endDate": None, + "constructorName": None, + "auditorName": "李四", + "qualityOfficerName": "辛奇", + "projectManagerName": "张三", + "constructionTeamLeaderName": None, + "statusLabel": "待施工" + }, + "msg": "", + "code": 200, + "success": True + } +onejizu = { + "status": 200, + "data": [ + { + "turbineId": "183463dbf40d9278549a76b82b175dd9", + "projectId": "96e0debf78187300f144d7f3450a2477", + "projectName": "三峡能源阿城万兴风电场防雷通道检测项目", + "turbineName": "一期012号", + "turbineCode": "00000", + "turbineDesc": "一期012号,全新设备", + "turbineManufacturer": "", + "turbineModel": "", + "turbineCoverUrl": "" + } + ], + "msg": "", + "code": 200, + "success": True +} +yepian = { + "status": 200, + "data": [ + { + "partId": "12bc30fb209f3af3bf530541c5b062bc", + "projectId": "96e0debf78187300f144d7f3450a2477", + "projectName": "三峡能源阿城万兴风电场防雷通道检测项目", + "turbineId": "183463dbf40d9278549a76b82b175dd9", + "turbineName": "一期012号", + "partName": "叶片2", + "partCode": "0001", + "partType": "VANE-2", + "partTypeLabel": "叶片2" + }, + { + "partId": "12bc30fb209f3af3bf530541c5b062bd", + "projectId": "96e0debf78187300f144d7f3450a2477", + "projectName": "三峡能源阿城万兴风电场防雷通道检测项目", + "turbineId": "183463dbf40d9278549a76b82b175dd9", + "turbineName": "一期012号", + "partName": "叶片1", + "partCode": "0000", + "partType": "VANE-1", + "partTypeLabel": "叶片1" + } + ], + "msg": "", + "code": 200, + "success": True +} + diff --git a/tools/document_tools.py b/tools/document_tools.py new file mode 100644 index 0000000..89b4b8f --- /dev/null +++ b/tools/document_tools.py @@ -0,0 +1,609 @@ +""" +Document creation and manipulation tools for Word Document Server. +""" +import os +import json, re +from typing import Dict, List, Optional, Any +from docx import Document + +from utils.file_utils import check_file_writeable, ensure_docx_extension, create_document_copy +from utils.document_utils import get_document_properties, extract_document_text, get_document_structure +from core.styles import ensure_heading_style, ensure_table_style +from docx.oxml.shared import qn +from docx.oxml import OxmlElement +from tools.content_tools import search_and_replace,add_picture_to_table + +async def create_document(filename: str, title: Optional[str] = None, author: Optional[str] = None) -> str: + """创建一个包含可选元数据的新Word文档。 + + 参数: + filename: 要创建的文档名称(带或不带.docx扩展名) + title: 可选标题 + author: 可选作者 + """ + filename = ensure_docx_extension(filename) + + # Check if file is writeable + is_writeable, error_message = check_file_writeable(filename) + if not is_writeable: + return f"Cannot create document: {error_message}" + + try: + doc = Document() + + # Set properties if provided + if title: + doc.core_properties.title = title + if author: + doc.core_properties.author = author + + # Ensure necessary styles exist + ensure_heading_style(doc) + ensure_table_style(doc) + # 更改纸张大小为A4 + from docx.shared import Mm, Inches + sections = doc.sections + for section in sections: + section.page_height = Mm(297) + section.page_width = Mm(210) + section.left_margin = Inches(0.94) + section.right_margin = Inches(0.94) + # Save the document + doc.save(filename) + + return f"Document {filename} created successfully" + except Exception as e: + return f"Failed to create document: {str(e)}" + + +async def get_document_info(filename: str) -> str: + """获得文档信息 + + Args: + filename: 目标文档 + """ + filename = ensure_docx_extension(filename) + + if not os.path.exists(filename): + return f"Document {filename} does not exist" + + try: + properties = get_document_properties(filename) + return json.dumps(properties, indent=2) + except Exception as e: + return f"Failed to get document info: {str(e)}" + + +async def get_document_text(filename: str) -> str: + """获得文档的所有文本 + + Args: + filename: 目标文档 + """ + filename = ensure_docx_extension(filename) + + return extract_document_text(filename) + + +async def get_document_outline(filename: str) -> str: + """获得文档的所有结构信息 + + Args: + filename: 目标文档 + """ + filename = ensure_docx_extension(filename) + + structure = get_document_structure(filename) + return json.dumps(structure, indent=2) + + +async def list_available_documents(directory: str = ".") -> str: + """列出目录下所有Word文档 + + Args: + directory: 目录 + """ + try: + if not os.path.exists(directory): + return f"Directory {directory} does not exist" + + docx_files = [f for f in os.listdir(directory) if f.endswith('.docx')] + + if not docx_files: + return f"No Word documents found in {directory}" + + result = f"Found {len(docx_files)} Word documents in {directory}:\n" + for file in docx_files: + file_path = os.path.join(directory, file) + size = os.path.getsize(file_path) / 1024 # KB + result += f"- {file} ({size:.2f} KB)\n" + + return result + except Exception as e: + return f"Failed to list documents: {str(e)}" + + +async def copy_document(source_filename: str, destination_filename: Optional[str] = None) -> str: + """创建文档的副本 + + Args: + source_filename: 源文档路径 + destination_filename: 目标文档路径,为空则为当前目录 + """ + source_filename = ensure_docx_extension(source_filename) + + if destination_filename: + destination_filename = ensure_docx_extension(destination_filename) + + success, message, new_path = create_document_copy(source_filename, destination_filename) + if success: + return message + else: + return f"Failed to copy document: {message}" + +def add_documents(target_filename: str, source_filename: str) -> str: + """将源文档(文本)添加到目标文档尾部 + Args: + target_doc: 目标文档 + source_filename: 源文档路径 + """ + target_doc = Document(target_filename) + source_filename = ensure_docx_extension(source_filename) + source_doc = Document(source_filename) + for source_paragraph in source_doc.paragraphs: + new_paragraph = target_doc.add_paragraph(source_paragraph.text) + new_paragraph.style = target_doc.styles['Normal'] # Default style + + #获取合并等样式2025427 + new_paragraph.alignment = source_paragraph.alignment + print(f"Source paragraph alignment: {source_paragraph.alignment}") + + # Try to match the style if possible + try: + if source_paragraph.style and source_paragraph.style.name in target_doc.styles: + new_paragraph.style = target_doc.styles[source_paragraph.style.name] + except Exception as e: + print(f"Failed to apply style: {e}") + + # Copy run formatting + for i, run in enumerate(source_paragraph.runs): + if i < len(new_paragraph.runs): + new_run = new_paragraph.runs[i] + # Copy basic formatting + new_run.bold = run.bold + new_run.italic = run.italic + new_run.underline = run.underline + #添加同时合并字体2025427 + new_run.font.name = run.font.name + rPr = new_run.element.get_or_add_rPr() + rFonts = rPr.get_or_add_rFonts() + # 检查 run.font.name 是否为 None + if run.font.name is None: + # 设置默认的中文字体名称 + run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体 + rFonts.set(qn('w:eastAsia'), run.font.name) + new_run.font.color.rgb = run.font.color.rgb + + # Font size if specified + if run.font.size: + new_run.font.size = run.font.size + target_doc.save(target_filename) + return f"{target_filename}添加{source_filename}成功" + + + +def write_table(target_filename: str, rows: int, cols: int, table_num: int, data: Optional[List[List[str]]] = None, ifadjustheight: Optional[bool] = True, height: Optional[float] = 1, key_words: re.Pattern[str] = None, ALIGMENT: Optional[str] = 'CENTER') -> Document: + """填写word文档里的表格,返回填写后的文档 + + Args: + target_filename: 目标文档路径 + rows: 表格行数 + cols: 表格列数 + table_num: 表格序号 + data: 表格数据,二维列表,每个单元格为字符串 + ifadjustheight: bool,为真则表格行高自动调整 + """ + target_filename = ensure_docx_extension(target_filename) + # Check if target file is writeable + is_writeable, error_message = check_file_writeable(target_filename) + if not is_writeable: + return f"Cannot create target document: {error_message}" + + try: + target_filename = ensure_docx_extension(target_filename) + target_doc = Document(target_filename) + except Exception as e: + print(f"获取{target_filename}失败:{str(e)}") + + # Try to set the table style + try: + target_doc.tables[table_num].style = 'Table Grid' + except KeyError as k: + pass + except Exception as e: + print(f"{target_doc}最后一个表格更改样式失败: {str(e)}") + + print("开始写入表格") + from docx.enum.table import WD_TABLE_ALIGNMENT + from docx.enum.table import WD_ALIGN_VERTICAL + from docx.shared import Pt, Inches, Cm, RGBColor + try: + if data: + for i, row_data in enumerate(data): + if i >= rows + 1: + break + for j, cell_text in enumerate(row_data): + if j >= cols + 1: + break + if str(cell_text) == "": continue + print(f"在[{i},{j}]处写入{str(cell_text)}") + target_doc.tables[table_num].cell(i,j).text = str(cell_text) + print(key_words, cell_text) + if key_words and key_words.search(str(cell_text)): + print(f'{cell_text}包含关键之,已置红') + target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0) + target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.name = "Times New Roman" #设置英文字体 + target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.size = Pt(10.5) # 字体大小 + target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') #设置中文字体 + if ALIGMENT == 'CENTER': + target_doc.tables[table_num].cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER + elif ALIGMENT == 'LEFT': + target_doc.tables[table_num].cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.LEFT + target_doc.tables[table_num].cell(i,j).vertical_alignment = WD_ALIGN_VERTICAL.CENTER + if ifadjustheight: + target_doc.tables[table_num].rows[i].height = Cm(height) + except Exception as e: + print(f"写入{target_filename}tables.cell({i},{j})失败:{str(e)}") + print("表格写入完成") + return target_doc + +def set_document_para(target_doc: Document) -> Document: + """设置文档的段落格式 + """ + paragraphs_to_remove = [] + for i, paragraph in enumerate(target_doc.paragraphs): + if i <= 11: + continue + if not paragraph.text.strip(): + paragraphs_to_remove.append(paragraph) + + for paragraph in paragraphs_to_remove: + p = paragraph._element + p.getparent().remove(p) + + return target_doc + + +async def add_table_to_document(target_filename: str, source_filename: str, rows: int, cols: int, table_num: int, data: Optional[List[List[str]]] = None, ifadjustheight: Optional[bool] = True, height: Optional[float] = 1, key_words: re.Pattern[str] = None, ALIGMENT: Optional[str] = 'CENTER') -> str: + """复制源文件中的文字与表格(先文字后表格格式)到目标文档 + Args: + target_filename: 目标文档路径 + source_doc: 源文档路径 + rows: 表格行数 + cols: 表格列数 + table_num: 表格序号 + data: 表格数据,二维列表,每个单元格为字符串 + ifadjustheight: bool,为真则表格行高自动调整 + key_words: list, 关键字 + """ + target_filename = ensure_docx_extension(target_filename) + source_filename = ensure_docx_extension(source_filename) + source_doc = Document(source_filename) + + target_doc = Document(target_filename) + try: + # Copy all paragraphs + for paragraph in source_doc.paragraphs: + # Create a new paragraph with the same text and style + new_paragraph = target_doc.add_paragraph(paragraph.text) + new_paragraph.style = target_doc.styles['Normal'] # Default style + #获取合并等样式2025427 + new_paragraph.alignment = paragraph.alignment + + # 复制段落分页属性 + new_paragraph.paragraph_format.page_break_before = paragraph.paragraph_format.page_break_before + # Try to match the style if possible + try: + if paragraph.style and paragraph.style.name in target_doc.styles: + new_paragraph.style = target_doc.styles[paragraph.style.name] + except: + pass + + + # Copy run formatting + for i, run in enumerate(paragraph.runs): + if i < len(new_paragraph.runs): + new_run = new_paragraph.runs[i] + # Copy basic formatting + new_run.bold = run.bold + new_run.italic = run.italic + new_run.underline = run.underline + #添加同时合并字体2025427 + new_run.font.name = run.font.name + rPr = new_run.element.get_or_add_rPr() + rFonts = rPr.get_or_add_rFonts() + # 检查 run.font.name 是否为 None + if run.font.name is None: + # 设置默认的中文字体名称 + run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体 + rFonts.set(qn('w:eastAsia'), run.font.name) + new_run.font.color.rgb = run.font.color.rgb + + + # Font size if specified + if run.font.size: + new_run.font.size = run.font.size + + # 复制分页符(处理w:br标签) + for element in run._element: + if element.tag.endswith('br'): + br_type = element.get(qn('type'), '') + if br_type == 'page': + new_br = OxmlElement('w:br') + new_br.set(qn('type'), 'page') + new_run._element.append(new_br) + + except Exception as e: + print(f"添加表格前文章失败:{str(e)}") + + try:# Copy all tables + from core.tables import copy_table + copy_table(source_doc.tables[0], target_doc, ifadjustheight, height) + except Exception as e: + print(f"添加表格失败:{str(e)}") + print(f"{target_doc}写入表格{source_doc.tables[0]}成功") + target_doc = set_document_para(target_doc) + target_doc.save(target_filename) + target_doc = Document(target_filename) + try: + target_doc = write_table(target_filename, rows, cols, table_num, data, ifadjustheight, height, key_words, ALIGMENT) + except Exception as e: + print(f"{target_filename}写入{data}失败:{str(e)}") + target_doc.save(target_filename) + return target_doc,f"{target_filename}添加表格{source_doc}成功" + +async def add_table_and_replace(target_filename: str, source_filename: str, ifadjustheight: Optional[bool] = True, list_to_replace: dict = {}, height: Optional[float] = 1): + """复制源文件中的文字与表格(先文字后表格格式)到目标文档 + Args: + target_filename: 目标文档路径 + source_doc: 源文档路径 + ifadjustheight: bool,为真则表格行高自动调整 + list_to_replace: dict, 待替换内容和替换内容 + """ + target_filename = ensure_docx_extension(target_filename) + source_filename = ensure_docx_extension(source_filename) + source_doc = Document(source_filename) + + target_doc = Document(target_filename) + try: + # Copy all paragraphs + for paragraph in source_doc.paragraphs: + # Create a new paragraph with the same text and style + new_paragraph = target_doc.add_paragraph(paragraph.text) + new_paragraph.style = target_doc.styles['Normal'] # Default style + #获取合并等样式2025427 + new_paragraph.alignment = paragraph.alignment + + # 复制段落分页属性 + new_paragraph.paragraph_format.page_break_before = paragraph.paragraph_format.page_break_before + # Try to match the style if possible + try: + if paragraph.style and paragraph.style.name in target_doc.styles: + new_paragraph.style = target_doc.styles[paragraph.style.name] + except: + pass + # Copy run formatting + for i, run in enumerate(paragraph.runs): + if i < len(new_paragraph.runs): + new_run = new_paragraph.runs[i] + # Copy basic formatting + new_run.bold = run.bold + new_run.italic = run.italic + new_run.underline = run.underline + #添加同时合并字体2025427 + new_run.font.name = run.font.name + rPr = new_run.element.get_or_add_rPr() + rFonts = rPr.get_or_add_rFonts() + # 检查 run.font.name 是否为 None + if run.font.name is None: + # 设置默认的中文字体名称 + run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体 + rFonts.set(qn('w:eastAsia'), run.font.name) + new_run.font.color.rgb = run.font.color.rgb + + + # Font size if specified + if run.font.size: + new_run.font.size = run.font.size + + # 复制分页符(处理w:br标签) + for element in run._element: + if element.tag.endswith('br'): + br_type = element.get(qn('type'), '') + if br_type == 'page': + new_br = OxmlElement('w:br') + new_br.set(qn('type'), 'page') + new_run._element.append(new_br) + except Exception as e: + print(f"添加表格前文章失败:{str(e)}") + try:# Copy all tables + from core.tables import copy_table + copy_table(source_doc.tables[0], target_doc, ifadjustheight, height) + target_doc.save(target_filename) + except Exception as e: + print(f"添加表格失败:{str(e)}") + for find_text, replace_text in list_to_replace.items(): + print(await search_and_replace(target_filename, find_text, replace_text)) + +async def merge_documents(target_filename: str, source_filenames: List[str], add_page_breaks: bool = True) -> str: + """合并文档(文本) 表格会添加到最后 + + Args: + target_filename: 合并后文档路径 + source_filenames: 源文档路径(列表) + add_page_breaks: bool,为真则每个源文档中间加入分页符 + """ + from core.tables import copy_table + + target_filename = ensure_docx_extension(target_filename) + + # Check if target file is writeable + is_writeable, error_message = check_file_writeable(target_filename) + if not is_writeable: + return f"Cannot create target document: {error_message}" + + # Validate all source documents exist + missing_files = [] + for filename in source_filenames: + doc_filename = ensure_docx_extension(filename) + if not os.path.exists(doc_filename): + missing_files.append(doc_filename) + + if missing_files: + return f"Cannot merge documents. The following source files do not exist: {', '.join(missing_files)}" + + try: + # Create a new document for the merged result + target_doc = Document() + + # Process each source document + for i, filename in enumerate(source_filenames): + doc_filename = ensure_docx_extension(filename) + source_doc = Document(doc_filename) + + # Add page break between documents (except before the first one) + if add_page_breaks and i > 0: + target_doc.add_page_break() + + # Copy all paragraphs + for paragraph in source_doc.paragraphs: + # Create a new paragraph with the same text and style + new_paragraph = target_doc.add_paragraph(paragraph.text) + new_paragraph.style = target_doc.styles['Normal'] # Default style + #获取合并等样式2025427 + new_paragraph.alignment = paragraph.alignment + + # Try to match the style if possible + try: + if paragraph.style and paragraph.style.name in target_doc.styles: + new_paragraph.style = target_doc.styles[paragraph.style.name] + except: + pass + + + # Copy run formatting + for i, run in enumerate(paragraph.runs): + if i < len(new_paragraph.runs): + new_run = new_paragraph.runs[i] + # Copy basic formatting + new_run.bold = run.bold + new_run.italic = run.italic + new_run.underline = run.underline + #添加同时合并字体2025427 + new_run.font.name = run.font.name + rPr = new_run.element.get_or_add_rPr() + rFonts = rPr.get_or_add_rFonts() + # 检查 run.font.name 是否为 None + if run.font.name is None: + # 设置默认的中文字体名称 + run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体 + rFonts.set(qn('w:eastAsia'), run.font.name) + new_run.font.color.rgb = run.font.color.rgb + + + # Font size if specified + if run.font.size: + new_run.font.size = run.font.size + + # Copy all tables + for table in source_doc.tables: + copy_table(table, target_doc) + + # Save the merged document + target_doc.save(target_filename) + return f"Successfully merged {len(source_filenames)} documents into {target_filename}" + except Exception as e: + return f"Failed to merge documents: {str(e)}" + + +async def right_align_last_three_para(target_filename: str) -> str: + """右对齐最后三个段落 + + Args: + target_filename: 目标文档路径 + """ + target_filename = ensure_docx_extension(target_filename) + + # Check if target file is writeable + is_writeable, error_message = check_file_writeable(target_filename) + if not is_writeable: + return f"Cannot right align paragraphs: {error_message}" + + try: + # Open the target document + target_doc = Document(target_filename) + + # Get the last three paragraphs + paragraphs = target_doc.paragraphs[-3:] + + # Set the alignment of each paragraph to right + from docx.enum.text import WD_ALIGN_PARAGRAPH + for paragraph in paragraphs: + paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT + + # Save the modified document + target_doc.save(target_filename) + return f"Successfully right aligned the last three paragraphs in {target_filename}" + except Exception as e: + return f"Failed to right align paragraphs: {str(e)}" + +async def process_images_table(data_dict, output_dir, start_i, JIANCHA_NEIRONG_PICTURES_TABLE, key_words = None): + """添加对应表格且填写图片名与插入图片 + + Args: + data_dict (dict): dict内容,图片:图片路径 + output_dir (str): 输出路径 + start_i (int): 总表格数量 + JIANCHA_NEIRONG_PICTURES_TABLE (str): 二维表模板路径 + + Returns: + int: 最后使用的表格序号 + """ + items = list(data_dict.items()) + picture_num = len(items) + line_index = 0 + picture_index = 0 + i = start_i + + for content_row in range(((picture_num + 2) // 3) * 2): + if content_row % 2 == 1: + # 文字行(从 items 取图片名) + JIANCHA_NEIRONG_TEXT = [["" for _ in range(3)] for _ in range(1)] # 1行3列 + for k in range(1): # 只有1行 + for l in range(3): + if line_index >= picture_num: + break + JIANCHA_NEIRONG_TEXT[k][l] = items[line_index][0] # 图片名 + print(f'当前为文字表格,在({k},{l})位置插入文字: {items[line_index][0]}') + line_index += 1 + print(f"当前待插入表格: {JIANCHA_NEIRONG_TEXT}") + print(f"当前表格序号为 {i}") + output_doc, message = await add_table_to_document( + output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, JIANCHA_NEIRONG_TEXT, False, None, key_words + ) + i += 1 + else: + # 图片行(从 items 取图片路径) + print(f"当前表格序号为 {i}") + output_doc, message = await add_table_to_document( + output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, None, False + ) + for k in range(3): + if picture_index < picture_num: + pic_path = items[picture_index][1] # 图片路径 + print(f"当前为图片表格,在(0,{k})位置插入图片: {pic_path}") + print(await add_picture_to_table(output_doc, output_dir, 0, k, pic_path, i, 1.8898)) + picture_index += 1 + i += 1 + print(message) + return i # 返回最后使用的表格序号 \ No newline at end of file diff --git a/tools/esay_docx_func.py b/tools/esay_docx_func.py new file mode 100644 index 0000000..322c5b9 --- /dev/null +++ b/tools/esay_docx_func.py @@ -0,0 +1,182 @@ +from content_tools import add_picture_to_table, search_and_replace +from get_pictures import resize_and_reduce_quality +from document_tools import add_table_to_document + +def fill_tables(Y_table_list, row, col, Y_Table_num, Y): + """根据前端返回json块填写表格list,并实时跟进已填写表格数量 + 目前只支持固定的缺陷图的填写 + + Args: + Y_table_list (list): 前端返回的json块 + row (int): 表格行数 + col (int): 表格列数 + Y_Table_num: json块中有几个表格 + Xu_Hao: 是第几个json块 + Y: 其他参数 + + Return: + Y1_TABLES: 三维,表和对应元素 + table_index_to_images: 字典,表索引到图片路径列表的映射 + Xu_Hao:到达第几个表了 + """ + table_index_to_images = {} + Y_TABLES = [[["" for _ in range(row)] for _ in range(col)] for _ in range(Y_Table_num)] + + # 处理前端返回数据 + for l, table_dict in enumerate(Y_table_list): + if table_dict: + Y_TABLES[l][1][0] = Y + Y_TABLES[l][1][1] = table_dict["QueXianLeiXing"] + Y_TABLES[l][1][2] = table_dict["QueXianWeiZhi"] + Y_TABLES[l][1][3] = table_dict["QueXianChiCun"] + Y_TABLES[l][3][0] = table_dict["WeiZongDengJi"] + Y_TABLES[l][3][1] = table_dict["visibility"] + Y_TABLES[l][3][2] = table_dict["urgency"] + Y_TABLES[l][3][3] = table_dict["repair_suggestion"] + + # 获取图片路径 + image_path = table_dict['Tupian_Dir'] + if image_path: + # 确保路径是字符串形式 + if isinstance(image_path, list): + table_index_to_images[l] = image_path.copy() + else: + table_index_to_images[l] = [str(image_path)] + + return Y_TABLES, table_index_to_images + +async def process_images_table(data_dict, output_dir, start_i, JIANCHA_NEIRONG_PICTURES_TABLE, key_words = None): + """添加对应表格且填写图片名与插入图片 + + Args: + data_dict (dict): dict内容,图片:图片路径 + output_dir (str): 输出路径 + start_i (int): 总表格数量 + JIANCHA_NEIRONG_PICTURES_TABLE (str): 二维表模板路径 + + Returns: + int: 最后使用的表格序号 + """ + items = list(data_dict.items()) + picture_num = len(items) + line_index = 0 + picture_index = 0 + i = start_i + + for content_row in range(((picture_num + 2) // 3) * 2): + if content_row % 2 == 1: + # 文字行(从 items 取图片名) + JIANCHA_NEIRONG_TEXT = [["" for _ in range(3)] for _ in range(1)] # 1行3列 + for k in range(1): # 只有1行 + for l in range(3): + if line_index >= picture_num: + break + JIANCHA_NEIRONG_TEXT[k][l] = items[line_index][0] # 图片名 + print(f'当前为文字表格,在({k},{l})位置插入文字: {items[line_index][0]}') + line_index += 1 + print(f"当前待插入表格: {JIANCHA_NEIRONG_TEXT}") + print(f"当前表格序号为 {i}") + output_doc, message = await add_table_to_document( + output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, JIANCHA_NEIRONG_TEXT, False, None, key_words + ) + i += 1 + else: + # 图片行(从 items 取图片路径) + print(f"当前表格序号为 {i}") + output_doc, message = await add_table_to_document( + output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, None, False + ) + for k in range(3): + if picture_index < picture_num: + pic_path = items[picture_index][1] # 图片路径 + print(f"当前为图片表格,在(0,{k})位置插入图片: {pic_path}") + print(await add_picture_to_table(output_doc, output_dir, 0, k, pic_path, i, 1.8898)) + picture_index += 1 + i += 1 + print(message) + return i # 返回最后使用的表格序号 + +async def add_dynamic_table(output_doc, output_dir, table_num, TABLES, JIANCHA_XIANGQING_DIR, PICTURES, row, col, i, FLAG, xuhao): + """创建动态表 + + Args: + output_doc (Document): 文档对象 + output_dir (str): 输出目录 + table_num (int): 表格序号 + TABLES (list): 表格数据 + JIANCHA_XIANGQING_DIR (str): 检查详情表目录 + PICTURES (dict): 图片数据字典,键为表索引,值为图片路径列表 + row (int): 行数 + col (int): 列数 + i (int): 表格序号 + FLAG: 其他标志 + + Returns: + tuple: (i, table_num) 更新后的表格序号和表格数量 + """ + for table_idx, Table in enumerate(TABLES): + print(Table) + output_doc, message = await add_table_to_document(output_dir, JIANCHA_XIANGQING_DIR, row, col, i, Table, FLAG) + print(message) + + # 获取当前表格对应的图片 + current_table_pictures = PICTURES.get(table_idx, []) + print(f"开始处理图片列表: {current_table_pictures}") + + for picturedir in current_table_pictures: + try: + print(f"添加 {picturedir} {type(picturedir)}到表格{table_idx}") + resize_and_reduce_quality(picturedir, picturedir) + await add_picture_to_table(output_doc, output_dir, 4, 0, picturedir, i, 4.7232) + except Exception as e: + print(f"添加图片失败:{e}") + + print(await search_and_replace(output_dir, 'tupian_xuhao', f'{xuhao}')) + table_num += 1 + i += 1 + xuhao += 1 + return i, table_num, xuhao + +def get_year_month(date): + """根据格式化date字符串获取年月 'date': '二〇二一年十二月十日 9:00' + + Args: date (str): 日期字符串 + + Returns: 年月字符串 '二〇二一年十二月' + """ + unit_map = {'1' : '一', '2' : '二', '3' : '三', '4' : '四', '5' : '五', '6' : '六', '7' : '七', '8' : '八', '9' : '九', '0' : '〇'} + unit_map_month = {1 : '一', 2 : '二', 3 : '三', 4 : '四', 5 : '五', 6 : '六', 7 : '七', 8 : '八', 9 : '九', 10 : '十', 11 : '十一', 12 : '十二'} + year = date.split('年')[0] + month = date.split('年')[1].split('月')[0] + year = ''.join([unit_map[i] for i in year]) + month = unit_map_month[int(month)] + return f"{year}年{month}月" + +def merge_info(frontend_info, default_info): + """ + 合并前端传入的 info 和默认 info + 规则:如果前端传入的值为空(None 或空字符串),则使用默认值 + + Args: + frontend_info: 前端传入的字典 + default_info: 默认的完整字典 + Returns: + 合并后的完整字典 + """ + if not isinstance(frontend_info, dict) or frontend_info is None: + return default_info.copy() + + merged_info = {} + + for key, default_value in default_info.items(): + # 获取前端传入的值 + frontend_value = frontend_info.get(key) + + # 判断前端值是否为空(None 或空字符串) + if frontend_value is None or frontend_value == "": + merged_info[key] = default_value + else: + merged_info[key] = frontend_value + + return merged_info + diff --git a/tools/get_pictures.py b/tools/get_pictures.py new file mode 100644 index 0000000..897d05d --- /dev/null +++ b/tools/get_pictures.py @@ -0,0 +1,234 @@ +import os +import math +from PIL import Image +from concurrent.futures import ThreadPoolExecutor + +def resize_and_reduce_quality(image_path, output_path, target_width = None): + try: + # 检查图片文件大小 + if os.path.getsize(image_path) < 10 * 1024 * 1024: # 10MB + print("图片文件大小小于10MB,不进行调整") + return image_path + + + # 打开图片 + with Image.open(image_path) as img: + # 计算新的高度以保持宽高比 + if target_width is None: + target_width = img.width + aspect_ratio = img.height / img.width + new_height = int(target_width * aspect_ratio) + + # 调整图片大小 + img_resized = img.resize((target_width, new_height), Image.LANCZOS) + + # 降低图片质量 + quality = 70 # 质量从1(最差)到95(最好),可以根据需要调整 + img_resized.save(output_path, quality=quality) + + return output_path + except Exception as e: + return f"调整图片大小和质量时出现问题: {str(e)}" + +def get_picture_nums(source_path: str) -> int: + picture_count = 0 + for root, dirs, files in os.walk(source_path): + for file in files: + if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'): + picture_count += 1 + return picture_count + +def collect_defect_data( + Y: str, + picture_dir: str, + search_file_list: list = [], +) -> tuple[int, dict]: + """ + 收集指定年份的缺陷图片数据,并根据布尔值决定是否扫描特定类型的缺陷图。 + + Args: + Y: 叶片号,如 "Y1"、"Y2"、"Y3" + picture_dir: 图片根目录 + search_file_list (list, optional): 要搜索的文件列表.规定为3个元素 + + Returns: + (缺陷图片总数, 缺陷图片文件名字典) + """ + + total_num = 0 + result_dict = {} + + try: + for defect_type in search_file_list: + dir_path = os.path.join(picture_dir, Y, defect_type) + num, img_dict = get_picture_nums_and_image_with_name(dir_path) + total_num += num + result_dict.update(img_dict) + except Exception as e: + print(f"获取图片数据时出现问题: {str(e)},搜寻的目录:{dir_path}") + + return total_num, result_dict + +def get_picture_nums_and_image_with_name(source_path: str) -> tuple[int, dict]: + """ + 获取指定目录下图片的数量,并返回每个图片的路径和名称(字典) + + Args: + source_path (str): 要搜索的目录路径 + + Returns: + tuple: 包含两个元素的元组 + picture_count (int): 图片数量 + image_with_name (dict): 图片路径和名称的字典,格式为 {图片名称: 图片完整路径} + """ + picture_count = 0 + image_with_name = {} + name_list = [] + + for root, dirs, files in os.walk(source_path): + for file in files: + if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'): + picture_count += 1 + image_with_name[os.path.splitext(file)[0]] = os.path.join(root, file) + + return picture_count, image_with_name + + +def find_image(directory, image_name): + """ + 在指定目录中查找指定名称的图片文件 + + 参数: + directory (str): 要搜索的目录路径 + image_name (str): 要查找的图片文件名(可带扩展名或不带) + + 返回: + str: 找到的图片完整路径,如果未找到则返回None + """ + # 支持的图片扩展名列表 + image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'] + + # 遍历目录中的所有文件 + for root, dirs, files in os.walk(directory): + for file in files: + # 获取文件名和扩展名 + filename, ext = os.path.splitext(file) + + # 检查是否匹配图片名称(带或不带扩展名) + if (file.lower() == image_name.lower() or + filename.lower() == image_name.lower() and ext.lower() in image_extensions): + return os.path.join(root, file) + + return None + +async def make_Thumbnail(source_path: str, output_path: str, size: tuple = (436, 233)) -> str: + """获取目录下所有图片,将所有图片合并制作略缩图并保存 + + Args: + source_path: 源目录 + output_path: 输出目录 + size: 合并后的略缩图总大小 (宽度, 高度) + """ + print("略缩图处理中") + + try: + if not os.path.exists(output_path): + print(f"无输出目录,创建中,输出目录为:{output_path}") + os.makedirs(output_path) + except Exception as e: + print(f"输出目录有问题:{e}") + return "" + #如果存在merged_thumbnail.jpg文件,则直接返回该文件路径 + if os.path.exists(os.path.join(output_path,'merged_thumbnail.jpg')): + print(f"已有略缩图,不用处理, 目前如需重新生成,请去往{output_path}目录 删除 merged_thumbnail.jpg 图片") + """ + 此处可预留接口,询问用户是否重新生成一份略缩图 + """ + + return os.path.join(output_path,'merged_thumbnail.jpg') + print("目录中无略缩图,合并略缩图中") + # 获取源目录下所有的图片文件 + try: + image_files = [] + for root, dirs, files in os.walk(source_path): + for file in files: + if file.lower().endswith(('.jpg', '.jpeg', '.png')): + image_files.append(os.path.join(root, file)) + except Exception as e: + print(f"递归获取图片失败,原因:{e}") + + if not image_files: + print("源目录中没有找到图片文件") + return "" + + # 计算每个缩略图的大小 + num_images = len(image_files) + target_width, target_height = size + + # 计算最佳的缩略图排列方式 + # 先尝试计算每行可以放多少个缩略图 + aspect_ratio = target_width / target_height + cols = math.ceil(math.sqrt(num_images * aspect_ratio)) + rows = math.ceil(num_images / cols) + + # 计算单个缩略图的大小 + thumb_width = target_width // cols + thumb_height = target_height // rows + + # 创建线程池处理图片 + with ThreadPoolExecutor() as executor: + thumbnails = list(executor.map( + lambda file: create_thumbnail(file, (thumb_width, thumb_height)), + image_files + )) + + # 过滤掉 None 值 + thumbnails = [thumb for thumb in thumbnails if thumb is not None] + + if not thumbnails: + print("没有成功创建任何略缩图") + return "" + + # 计算实际需要的行数和列数 + actual_cols = min(len(thumbnails), cols) + actual_rows = math.ceil(len(thumbnails) / actual_cols) + + # 创建合并后的图像 + merged_image = Image.new('RGB', (actual_cols * thumb_width, actual_rows * thumb_height)) + + # 粘贴缩略图 + for index, thumb in enumerate(thumbnails): + row = index // actual_cols + col = index % actual_cols + merged_image.paste(thumb, (col * thumb_width, row * thumb_height)) + + # 如果最终尺寸不完全匹配,调整大小 + if merged_image.size != size: + merged_image = merged_image.resize(size, Image.LANCZOS) + + # 保存合并后的略缩图 + merged_thumbnail_path = os.path.join(output_path, 'merged_thumbnail.jpg') + merged_image.save(merged_thumbnail_path) + + print(f"合并后的略缩图已保存到:{merged_thumbnail_path}") + return merged_thumbnail_path + +def create_thumbnail(file_path: str, size: tuple) -> Image: + """创建单个图片的略缩图 + + Args: + file_path: 图片文件路径 + size: 缩略图大小 + """ + try: + with Image.open(file_path) as img: + # 保持原始宽高比 + img.thumbnail(size, Image.LANCZOS) + + # 创建新图像确保尺寸一致 + new_img = Image.new('RGB', size) + new_img.paste(img, ((size[0] - img.width) // 2, (size[1] - img.height) // 2)) + return new_img + except Exception as e: + print(f"图片处理有问题:{e}") + return None \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..6a2fa1b --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,8 @@ +""" +Utility functions for the Word Document Server. + +This package contains utility modules for file operations and document handling. +""" + +from utils.file_utils import check_file_writeable, create_document_copy, ensure_docx_extension +from utils.document_utils import get_document_properties, extract_document_text, get_document_structure, find_paragraph_by_text, find_and_replace_text diff --git a/utils/__pycache__/__init__.cpython-312.pyc b/utils/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..002b2f0 Binary files /dev/null and b/utils/__pycache__/__init__.cpython-312.pyc differ diff --git a/utils/__pycache__/__init__.cpython-313.pyc b/utils/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..91cf4d8 Binary files /dev/null and b/utils/__pycache__/__init__.cpython-313.pyc differ diff --git a/utils/__pycache__/document_utils.cpython-312.pyc b/utils/__pycache__/document_utils.cpython-312.pyc new file mode 100644 index 0000000..010bc3b Binary files /dev/null and b/utils/__pycache__/document_utils.cpython-312.pyc differ diff --git a/utils/__pycache__/document_utils.cpython-313.pyc b/utils/__pycache__/document_utils.cpython-313.pyc new file mode 100644 index 0000000..f1d9d5a Binary files /dev/null and b/utils/__pycache__/document_utils.cpython-313.pyc differ diff --git a/utils/__pycache__/file_utils.cpython-312.pyc b/utils/__pycache__/file_utils.cpython-312.pyc new file mode 100644 index 0000000..43b01d6 Binary files /dev/null and b/utils/__pycache__/file_utils.cpython-312.pyc differ diff --git a/utils/__pycache__/file_utils.cpython-313.pyc b/utils/__pycache__/file_utils.cpython-313.pyc new file mode 100644 index 0000000..7f2bbbb Binary files /dev/null and b/utils/__pycache__/file_utils.cpython-313.pyc differ diff --git a/utils/document_utils.py b/utils/document_utils.py new file mode 100644 index 0000000..d8a1e5b --- /dev/null +++ b/utils/document_utils.py @@ -0,0 +1,167 @@ +""" +Document utility functions for Word Document Server. +""" +import json +from typing import Dict, List, Any +from docx import Document + + +def get_document_properties(doc_path: str) -> Dict[str, Any]: + """Get properties of a Word document.""" + import os + if not os.path.exists(doc_path): + return {"error": f"Document {doc_path} does not exist"} + + try: + doc = Document(doc_path) + core_props = doc.core_properties + + return { + "title": core_props.title or "", + "author": core_props.author or "", + "subject": core_props.subject or "", + "keywords": core_props.keywords or "", + "created": str(core_props.created) if core_props.created else "", + "modified": str(core_props.modified) if core_props.modified else "", + "last_modified_by": core_props.last_modified_by or "", + "revision": core_props.revision or 0, + "page_count": len(doc.sections), + "word_count": sum(len(paragraph.text.split()) for paragraph in doc.paragraphs), + "paragraph_count": len(doc.paragraphs), + "table_count": len(doc.tables) + } + except Exception as e: + return {"error": f"Failed to get document properties: {str(e)}"} + + +def extract_document_text(doc_path: str) -> str: + """Extract all text from a Word document.""" + import os + if not os.path.exists(doc_path): + return f"Document {doc_path} does not exist" + + try: + doc = Document(doc_path) + text = [] + + for paragraph in doc.paragraphs: + text.append(paragraph.text) + + for table in doc.tables: + for row in table.rows: + for cell in row.cells: + for paragraph in cell.paragraphs: + text.append(paragraph.text) + + return "\n".join(text) + except Exception as e: + return f"Failed to extract text: {str(e)}" + + +def get_document_structure(doc_path: str) -> Dict[str, Any]: + """Get the structure of a Word document.""" + import os + if not os.path.exists(doc_path): + return {"error": f"Document {doc_path} does not exist"} + + try: + doc = Document(doc_path) + structure = { + "paragraphs": [], + "tables": [] + } + + # Get paragraphs + for i, para in enumerate(doc.paragraphs): + structure["paragraphs"].append({ + "index": i, + "text": para.text[:100] + ("..." if len(para.text) > 100 else ""), + "style": para.style.name if para.style else "Normal" + }) + + # Get tables + for i, table in enumerate(doc.tables): + table_data = { + "index": i, + "rows": len(table.rows), + "columns": len(table.columns), + "preview": [] + } + + # Get sample of table data + max_rows = min(3, len(table.rows)) + for row_idx in range(max_rows): + row_data = [] + max_cols = min(3, len(table.columns)) + for col_idx in range(max_cols): + try: + cell_text = table.cell(row_idx, col_idx).text + row_data.append(cell_text[:20] + ("..." if len(cell_text) > 20 else "")) + except IndexError: + row_data.append("N/A") + table_data["preview"].append(row_data) + + structure["tables"].append(table_data) + + return structure + except Exception as e: + return {"error": f"Failed to get document structure: {str(e)}"} + + +def find_paragraph_by_text(doc, text, partial_match=False): + """ + Find paragraphs containing specific text. + + Args: + doc: Document object + text: Text to search for + partial_match: If True, matches paragraphs containing the text; if False, matches exact text + + Returns: + List of paragraph indices that match the criteria + """ + matching_paragraphs = [] + + for i, para in enumerate(doc.paragraphs): + if partial_match and text in para.text: + matching_paragraphs.append(i) + elif not partial_match and para.text == text: + matching_paragraphs.append(i) + + return matching_paragraphs + + +def find_and_replace_text(doc, old_text, new_text): + """ + Find and replace text throughout the document. + + Args: + doc: Document object + old_text: Text to find + new_text: Text to replace with + + Returns: + Number of replacements made + """ + count = 0 + + # Search in paragraphs + for para in doc.paragraphs: + if old_text in para.text: + for run in para.runs: + if old_text in run.text: + run.text = run.text.replace(old_text, new_text) + count += 1 + + # Search in tables + for table in doc.tables: + for row in table.rows: + for cell in row.cells: + for para in cell.paragraphs: + if old_text in para.text: + for run in para.runs: + if old_text in run.text: + run.text = run.text.replace(old_text, new_text) + count += 1 + + return count diff --git a/utils/extended_document_utils.py b/utils/extended_document_utils.py new file mode 100644 index 0000000..007d5ce --- /dev/null +++ b/utils/extended_document_utils.py @@ -0,0 +1,165 @@ +""" +Extended document utilities for Word Document Server. +""" +from typing import Dict, List, Any, Tuple +from docx import Document + + +def get_paragraph_text(doc_path: str, paragraph_index: int) -> Dict[str, Any]: + """ + Get text from a specific paragraph in a Word document. + + Args: + doc_path: Path to the Word document + paragraph_index: Index of the paragraph to extract (0-based) + + Returns: + Dictionary with paragraph text and metadata + """ + import os + if not os.path.exists(doc_path): + return {"error": f"Document {doc_path} does not exist"} + + try: + doc = Document(doc_path) + + # Check if paragraph index is valid + if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs): + return {"error": f"Invalid paragraph index: {paragraph_index}. Document has {len(doc.paragraphs)} paragraphs."} + + paragraph = doc.paragraphs[paragraph_index] + + return { + "index": paragraph_index, + "text": paragraph.text, + "style": paragraph.style.name if paragraph.style else "Normal", + "is_heading": paragraph.style.name.startswith("Heading") if paragraph.style else False + } + except Exception as e: + return {"error": f"Failed to get paragraph text: {str(e)}"} + + +def find_text(doc_path: str, text_to_find: str, match_case: bool = True, whole_word: bool = False) -> Dict[str, Any]: + """ + Find all occurrences of specific text in a Word document. + + Args: + doc_path: Path to the Word document + text_to_find: Text to search for + match_case: Whether to perform case-sensitive search + whole_word: Whether to match whole words only + + Returns: + Dictionary with search results + """ + import os + if not os.path.exists(doc_path): + return {"error": f"Document {doc_path} does not exist"} + + if not text_to_find: + return {"error": "Search text cannot be empty"} + + try: + doc = Document(doc_path) + results = { + "query": text_to_find, + "match_case": match_case, + "whole_word": whole_word, + "occurrences": [], + "total_count": 0 + } + + # Search in paragraphs + for i, para in enumerate(doc.paragraphs): + # Prepare text for comparison + para_text = para.text + search_text = text_to_find + + if not match_case: + para_text = para_text.lower() + search_text = search_text.lower() + + # Find all occurrences (simple implementation) + start_pos = 0 + while True: + if whole_word: + # For whole word search, we need to check word boundaries + words = para_text.split() + found = False + for word_idx, word in enumerate(words): + if (word == search_text or + (not match_case and word.lower() == search_text.lower())): + results["occurrences"].append({ + "paragraph_index": i, + "position": word_idx, + "context": para.text[:100] + ("..." if len(para.text) > 100 else "") + }) + results["total_count"] += 1 + found = True + + # Break after checking all words + break + else: + # For substring search + pos = para_text.find(search_text, start_pos) + if pos == -1: + break + + results["occurrences"].append({ + "paragraph_index": i, + "position": pos, + "context": para.text[:100] + ("..." if len(para.text) > 100 else "") + }) + results["total_count"] += 1 + start_pos = pos + len(search_text) + + # Search in tables + for table_idx, table in enumerate(doc.tables): + for row_idx, row in enumerate(table.rows): + for col_idx, cell in enumerate(row.cells): + for para_idx, para in enumerate(cell.paragraphs): + # Prepare text for comparison + para_text = para.text + search_text = text_to_find + + if not match_case: + para_text = para_text.lower() + search_text = search_text.lower() + + # Find all occurrences (simple implementation) + start_pos = 0 + while True: + if whole_word: + # For whole word search, check word boundaries + words = para_text.split() + found = False + for word_idx, word in enumerate(words): + if (word == search_text or + (not match_case and word.lower() == search_text.lower())): + results["occurrences"].append({ + "location": f"Table {table_idx}, Row {row_idx}, Column {col_idx}", + "position": word_idx, + "context": para.text[:100] + ("..." if len(para.text) > 100 else "") + }) + results["total_count"] += 1 + found = True + + # Break after checking all words + break + else: + # For substring search + pos = para_text.find(search_text, start_pos) + if pos == -1: + break + + results["occurrences"].append({ + "location": f"Table {table_idx}, Row {row_idx}, Column {col_idx}", + "position": pos, + "context": para.text[:100] + ("..." if len(para.text) > 100 else "") + }) + results["total_count"] += 1 + start_pos = pos + len(search_text) + + return results + except Exception as e: + return {"error": f"Failed to search for text: {str(e)}"} diff --git a/utils/file_utils.py b/utils/file_utils.py new file mode 100644 index 0000000..7974707 --- /dev/null +++ b/utils/file_utils.py @@ -0,0 +1,85 @@ +""" +File utility functions for Word Document Server. +""" +import os +from typing import Tuple, Optional +import shutil + + +def check_file_writeable(filepath: str) -> Tuple[bool, str]: + """ + Check if a file can be written to. + + Args: + filepath: Path to the file + + Returns: + Tuple of (is_writeable, error_message) + """ + # If file doesn't exist, check if directory is writeable + if not os.path.exists(filepath): + directory = os.path.dirname(filepath) + # If no directory is specified (empty string), use current directory + if directory == '': + directory = '.' + if not os.path.exists(directory): + return False, f"Directory {directory} does not exist" + if not os.access(directory, os.W_OK): + return False, f"Directory {directory} is not writeable" + return True, "" + + # If file exists, check if it's writeable + if not os.access(filepath, os.W_OK): + return False, f"File {filepath} is not writeable (permission denied)" + + # Try to open the file for writing to see if it's locked + try: + with open(filepath, 'a'): + pass + return True, "" + except IOError as e: + return False, f"File {filepath} is not writeable: {str(e)}" + except Exception as e: + return False, f"Unknown error checking file permissions: {str(e)}" + + +def create_document_copy(source_path: str, dest_path: Optional[str] = None) -> Tuple[bool, str, Optional[str]]: + """ + Create a copy of a document. + + Args: + source_path: Path to the source document + dest_path: Optional path for the new document. If not provided, will use source_path + '_copy.docx' + + Returns: + Tuple of (success, message, new_filepath) + """ + if not os.path.exists(source_path): + return False, f"Source document {source_path} does not exist", None + + if not dest_path: + # Generate a new filename if not provided + base, ext = os.path.splitext(source_path) + dest_path = f"{base}_copy{ext}" + + try: + # Simple file copy + shutil.copy2(source_path, dest_path) + return True, f"Document copied to {dest_path}", dest_path + except Exception as e: + return False, f"Failed to copy document: {str(e)}", None + + +def ensure_docx_extension(filename: str) -> str: + """ + Ensure filename has .docx extension. + + Args: + filename: The filename to check + + Returns: + Filename with .docx extension + """ + if not filename.endswith('.docx'): + return filename + '.docx' + return filename