完成部分数据库的数据拉取代码

This commit is contained in:
Voge1imkafig 2025-07-03 17:50:42 +08:00
parent 6a40b5d5fe
commit 28a8cd0965
56 changed files with 3292 additions and 0 deletions

524
Generate_Report.py Normal file
View File

@ -0,0 +1,524 @@
# 文档处理工具
from tools.document_tools import (
create_document, add_documents,add_table_and_replace,
add_table_to_document,process_images_table,
)
# 内容处理工具
from tools.content_tools import (
add_picture,split_table_by_row_content,
search_and_replace,add_picture_to_table
)
from tools.get_pictures import (
make_Thumbnail,resize_and_reduce_quality,
get_picture_nums,find_image,collect_defect_data
)
from tools.Get_Json import get_project_info,get_jizu_info,get_jizu_shigong_info,get_weather
import asyncio
from core.tables import fill_tables
from tools.defines import *
import os, re, datetime
from pathlib import Path
async def add_dynamic_table(output_doc, output_dir, table_num, TABLES, JIANCHA_XIANGQING_DIR, PICTURES, row, col, i, FLAG, xuhao):
"""创建动态表
Args:
output_doc (Document): 文档对象
output_dir (str): 输出目录
table_num (int): 表格序号
TABLES (list): 表格数据
JIANCHA_XIANGQING_DIR (str): 检查详情表目录
PICTURES (dict): 图片数据字典键为表索引值为图片路径列表
row (int): 行数
col (int): 列数
i (int): 表格序号
FLAG: 其他标志
Returns:
tuple: (i, table_num) 更新后的表格序号和表格数量
"""
for table_idx, Table in enumerate(TABLES):
print(Table)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_XIANGQING_DIR, row, col, i, Table, FLAG)
print(message)
# 获取当前表格对应的图片
current_table_pictures = PICTURES.get(table_idx, [])
print(f"开始处理图片列表: {current_table_pictures}")
for picturedir in current_table_pictures:
try:
print(f"添加 {picturedir} {type(picturedir)}到表格{table_idx}")
resize_and_reduce_quality(picturedir, picturedir)
await add_picture_to_table(output_doc, output_dir, 4, 0, picturedir, i, 4.7232)
except Exception as e:
print(f"添加图片失败:{e}")
print(await search_and_replace(output_dir, 'tupian_xuhao', f'{xuhao}'))
table_num += 1
i += 1
xuhao += 1
return i, table_num, xuhao
def get_year_month(date):
"""根据格式化date字符串获取年月 'date': '二〇二一年十二月十日 9:00'
Args: date (str): 日期字符串
Returns: 年月字符串 '二〇二一年十二月'
"""
unit_map = {'1' : '', '2' : '', '3' : '', '4' : '', '5' : '', '6' : '', '7' : '', '8' : '', '9' : '', '0' : ''}
unit_map_month = {1 : '', 2 : '', 3 : '', 4 : '', 5 : '', 6 : '', 7 : '', 8 : '', 9 : '', 10 : '', 11 : '十一', 12 : '十二'}
year = date.split('')[0]
month = date.split('')[1].split('')[0]
year = ''.join([unit_map[i] for i in year])
month = unit_map_month[int(month)]
return f"{year}{month}"
def merge_info(frontend_info, default_info):
"""
合并前端传入的 info 和默认 info
规则如果前端传入的值为空None 或空字符串则使用默认值
Args:
frontend_info: 前端传入的字典
default_info: 默认的完整字典
Returns:
合并后的完整字典
"""
if not isinstance(frontend_info, dict) or frontend_info is None:
return default_info.copy()
merged_info = {}
for key, default_value in default_info.items():
# 获取前端传入的值
frontend_value = frontend_info.get(key)
# 判断前端值是否为空None 或空字符串)
if frontend_value is None or frontend_value == "":
merged_info[key] = default_value
else:
merged_info[key] = frontend_value
return merged_info
async def generate_report(base_info, baogao_info):
#获取模板编号、模板名称
num_to_chinese = {1 : '', 2 : '', 3 : '', 4 : '', 5 : '', 6 : '', 7 : '', 8 : '', 9 : '', 10 : '', 11 : '十一', 12 : '十二'}
cover_encode = "encode"
cover_project = "project"
baogao_name1 = "baogaoname1"
baogao_name2 = "baogaoname2"
company_name_yi = "company_name_yi"
cover_date = "time"
TITLE_OF_REPORT = "companyencode"
jiegou_xuhao = 'num'
jizu_data = get_jizu_info(base_info['turbine_id'])
project_data = get_project_info(jizu_data['projectId'])
shigong_data = get_jizu_shigong_info(base_info['turbine_id'])
print(shigong_data)
try:
fengchang_name = project_data['farmName']
Yi_company = project_data['inspectionUnit']
yi_fuzeren = project_data['inspectionContact']
yi_phone = project_data['inspectionPhone']
fengchang_location = project_data['farmAddress']
Jia_company = project_data['client']
jia_fuzeren = project_data['clientContact']
jia_phone = project_data['clientPhone']
jizu_num = project_data['scale']
jizu_xinghao = project_data['turbineModel']
except Exception as e:
print(f"数据库的项目-机组基本信息获取失败:{e}")
return
try:
baogao_date = datetime.datetime.now().strftime("%Y年%m月%d%H:%M") #现在的时间
#数据库拉取信息
Jiancha_date = shigong_data["startTime"].replace("T", " ") #检查日期
image_count = shigong_data['imageCount'] #从施工方案获取的图片数量,待定
temperature = shigong_data['temperature'] #温度
wind_speed = shigong_data['windSpeed'] #风速
weather = get_weather(shigong_data["weatherCode"]) #天气
#前端信息
baogao_info = merge_info(baogao_info, DEFAULT_BAOGAO_INFO)
key_words= re.compile('|'.join(map(re.escape, baogao_info['key_words'].split(','))))
shengcheng_dir = baogao_info['shengcheng_dir']
muban_dir = baogao_info['muban_dir']
project_number = baogao_info['jizu_type']
baogao_type = baogao_info['baogao_type']
if muban_dir == "" or shengcheng_dir == "":
print("未配置图片/生成路径/总图片路径,请检查配置")
return
date_year_month = get_year_month(baogao_date)
Jiancha_renyuan = baogao_info['jiancha_renyuan']
shebei_peizhi = baogao_info['shebei_peizhi']
shigong_fangan = baogao_info['shigong_fangan']
renyuan_peizhi = baogao_info['renyuan_peizhi']
gongzuo_neirong = baogao_info['gongzuo_neirong']
beizhu = baogao_info['beizhu']
Jiancha_location = baogao_info['jiancha_location']
Jiancha_fangshi = baogao_info['jiancha_fangshi']
Changjia = baogao_info['yepian_changjia']
yezhu_renyuan = baogao_info['yezhu_renyuan']
changjia_renyuan = baogao_info['changjia_renyuan']
data_process = baogao_info['date_process']
baogao_bianzhi = baogao_info['baogao_bianzhi']
baogao_shenghe = baogao_info['baogao_shenghe']
shenghe_date = baogao_info['shenghe_date']
Y1_jiancha = baogao_info['Y1_jiancha_neirong']
Y2_jiancha = baogao_info['Y2_jiancha_neirong']
Y3_jiancha = baogao_info['Y3_jiancha_neirong']
except Exception as e:
print(f"报告基本信息获取失败:{e}")
return
normal_picture_num = 0
Y1 = "t"
Y2 = "t"
Y3 = "t"
output_doc = None
head_num = 1
###封面创建###
cover_dirs = [os.path.join(muban_dir,"fengmian1.docx"),os.path.join(muban_dir,"fengmian.jpg"),os.path.join(muban_dir,"fengmian2.docx")]
#输出目录
output_dir = os.path.normpath(f"{shengcheng_dir}/{fengchang_name}项目{baogao_type}{project_number}{baogao_date.split(' ')[0]}版.docx")
version = 1
while os.path.exists(output_dir):
if version != 1:
output_dir = output_dir.replace(f"{version - 1}",f"{version}")
else:
output_dir = output_dir.replace("",f"{version}")
version += 1
ifwaibu = baogao_info['waibu_jiancha']
ifneibu = baogao_info['neibu_jiancha']
iffanglei = baogao_info['fanglei_jiancha']
parts = []
if ifwaibu:
parts.append("叶片外观")
if ifneibu:
parts.append("叶片内部")
if iffanglei:
parts.append("叶片防雷")
if not parts:
print("前端未指定检查内容")
mianzhe_shengming = f"本报告仅涵盖{''.join(parts)}检测内容"
#创建文档、添加封面
print(await create_document(output_dir))
print(add_documents(output_dir, cover_dirs[0]))
print(await add_picture(output_dir, cover_dirs[1]))
print(add_documents(output_dir, cover_dirs[2]))
print("封面创建成功")
#更改文档信息
print(await search_and_replace(output_dir, TITLE_OF_REPORT, project_number))
print(await search_and_replace(output_dir, baogao_name1, baogao_type))
print(await search_and_replace(output_dir, baogao_name2, baogao_type))
print(await search_and_replace(output_dir, company_name_yi, Yi_company))
print(await search_and_replace(output_dir, cover_project, fengchang_name))
print(await search_and_replace(output_dir, cover_encode, project_number))
print(await search_and_replace(output_dir, cover_date, date_year_month))
print(await search_and_replace(output_dir, 'bianzhi', baogao_bianzhi))
print(await search_and_replace(output_dir, 'shenghe', baogao_shenghe))
print(await search_and_replace(output_dir, 'mianzhe_shengming', mianzhe_shengming))
total_table_num = 0
#项目概况表
print("开始添加项目概况表")
XIANG_MU_GAI_KUANG = os.path.join(muban_dir,"xiangmugaikuo.docx")
print(f"查找模板,找到模板:{XIANG_MU_GAI_KUANG}")
project_location = fengchang_location
company_name_jia = Jia_company
fuzeren = yi_fuzeren
phone_fuzeren = yi_phone
jizu_bianhao = project_number
xiangmuguige = jizu_num
Yi_company = Yi_company
XIANGMU_GAIKUO = list(list("" for i in range(5)) for j in range(5))
XIANGMU_GAIKUO[0][1] = fengchang_name
#XIANGMU_GAIKUO[0][3]=XIANGMU_GAIKUO[0][4] = "盐城市滨海县"
XIANGMU_GAIKUO[0][3] = project_location
#XIANGMU_GAIKUO[1][1]=XIANGMU_GAIKUO[2,1]=XIANGMU_GAIKUO[3,1] = "国家电投集团滨海风力发电有限公司"
XIANGMU_GAIKUO[1][1] = company_name_jia
XIANGMU_GAIKUO[1][3] = Yi_company
XIANGMU_GAIKUO[2][3] = fuzeren
XIANGMU_GAIKUO[3][4] = phone_fuzeren
XIANGMU_GAIKUO[4][1] = jizu_xinghao
XIANGMU_GAIKUO[4][4] = xiangmuguige
print("建立表结构完毕,开始插入模板")
#添加项目概况表
print(f"输出路径:{output_dir},模板路径:{XIANG_MU_GAI_KUANG},插入数据:{XIANGMU_GAIKUO}")
output_doc, message = await add_table_to_document(output_dir, XIANG_MU_GAI_KUANG,5,5,total_table_num,XIANGMU_GAIKUO)
print(message)
print("模板插入完毕,开始替换内容")
total_table_num += 1
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
#检查方案描述
FANGAN_JIANCHA_DIR = os.path.join(muban_dir,"checkmethod.docx")
list_to_replace = {
'renyuan_peizhi' : renyuan_peizhi,
'shebei_peizhi' : shebei_peizhi,
'shigong_fangan' : shigong_fangan,
'gongzuo_neirong' : gongzuo_neirong,
'beizhu' : beizhu,
'num' : num_to_chinese[head_num],
}
print(await add_table_and_replace(output_dir, FANGAN_JIANCHA_DIR, 0, list_to_replace))
print(split_table_by_row_content(output_dir, output_dir, total_table_num))
total_table_num += 1
head_num += 1
jiancha = []
neirong = []
if ifwaibu:
jiancha.append("无人机外部高精度飞行")
neirong.append(f"{Y1}{Y2}{Y3}三支叶片的前缘、后缘、迎风面、背风面。")
if ifneibu:
jiancha.append("人工内部拍摄")
neirong.append(f"{Y1}{Y2}{Y3}三支叶片的内部导雷卡、腹板、透光、人孔盖版、叶根盖板...")
if iffanglei:
jiancha.append("人工防雷")
neirong.append(f"轮毂至塔基导通、内部导线线阻、外部导线线阻...")
JIANCHA_XINGXI_DIR = os.path.join(muban_dir,"checkinfo.docx")
JIANCHA_XINGXI = list(list("" for i in range(4)) for j in range(9))
JIANCHA_XINGXI[0][1] = Jiancha_renyuan
JIANCHA_XINGXI[1][1] = Jiancha_date.split(' ')[0]
JIANCHA_XINGXI[1][3] = project_number
JIANCHA_XINGXI[2][1] = Jiancha_location
JIANCHA_XINGXI[2][3] = Jiancha_fangshi
JIANCHA_XINGXI[3][2] = Changjia
JIANCHA_XINGXI[4][1] = '机组编号:' + project_number + '机组'
JIANCHA_XINGXI[5][1] = Y1
JIANCHA_XINGXI[6][1] = Y2
JIANCHA_XINGXI[7][1] = Y3
JIANCHA_XINGXI[8][0] = "本次" + "".join(_ for _ in jiancha) + f"检查,采集叶片图片{normal_picture_num}张,内容覆盖" + ";".join(_ for _ in neirong)
# if total_picture_dir == "":
# Thumbnail_Picture = await make_Thumbnail(Picture_dir, Picture_dir)#添加图片
# else:
# print('传入了总图片路径,获取图片数量')
# Thumbnail_Picture = await make_Thumbnail(total_picture_dir, Picture_dir)#添加图片
# normal_picture_num = get_picture_nums(total_picture_dir)
# JIANCHA_XINGXI42 = tatong_image_path
#新建检查信息表
output_doc, message = await add_table_to_document(output_dir, JIANCHA_XINGXI_DIR,9,4,total_table_num ,JIANCHA_XINGXI,False)
print(message)
# print(await add_picture_to_table(output_doc, output_dir, 4, 2, JIANCHA_XINGXI42, total_table_num , 1.18))
# #添加略缩图片
# print(await add_picture_to_table(output_doc, output_dir, 8, 0, Thumbnail_Picture, total_table_num))
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
total_table_num += 1
#添加成果递交表
CHENGGUO_DIJIAO_DIR = os.path.join(muban_dir,"chengguo_sub.docx")
CHENGGUO_DIJIAO = list(list("" for i in range(4)) for j in range(6))
CHENGGUO_DIJIAO[0][1] = Jiancha_renyuan
CHENGGUO_DIJIAO[1][1] = yezhu_renyuan
CHENGGUO_DIJIAO[2][1] = Jiancha_date.split(' ')[0]
CHENGGUO_DIJIAO[3][1] = data_process
CHENGGUO_DIJIAO[4][1] = baogao_bianzhi
CHENGGUO_DIJIAO[5][1] = baogao_shenghe
CHENGGUO_DIJIAO[1][3] = changjia_renyuan
CHENGGUO_DIJIAO[2][3] = Jiancha_date.split(' ')[1]
CHENGGUO_DIJIAO[3][3] = baogao_date.split(' ')[0]
CHENGGUO_DIJIAO[4][3] = baogao_date.split(' ')[0]
CHENGGUO_DIJIAO[5][3] = shenghe_date.split(' ')[0]
output_doc, message = await add_table_to_document(output_dir, CHENGGUO_DIJIAO_DIR,5,5,total_table_num,CHENGGUO_DIJIAO,True,0.04)
print(message)
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
head_num += 1
total_table_num += 1
# #检查情况汇总表(文字信息)
# try:
# search_file_list = []
# if ifwaibu:
# search_file_list.append("外缺陷图")
# if ifneibu:
# search_file_list.append("内缺陷图")
# Y1_quexian_num, Y1_quexian_dict = collect_defect_data(Y1, Picture_dir, ifwaibu, ifneibu, search_file_list)
# Y2_quexian_num, Y2_quexian_dict = collect_defect_data(Y2, Picture_dir, ifwaibu, ifneibu, search_file_list)
# Y3_quexian_num, Y3_quexian_dict = collect_defect_data(Y3, Picture_dir, ifwaibu, ifneibu, search_file_list)
# weak_num_Y1 = f"{Y1}共发现缺陷{Y1_quexian_num}处"
# weak_num_Y2 = f"{Y2}共发现缺陷{Y2_quexian_num}处"
# weak_num_Y3 = f"{Y3}共发现缺陷{Y3_quexian_num}处"
# except Exception as e:
# print(f"缺陷图获取失败:{e}")
# return
# #添加检查情况汇总表
# JIANCHA_HUIZONG_DIR = os.path.join(muban_dir,"total_check.docx")
# JIANCHA_HUIZONG = list(list("" for i in range(3)) for j in range(4))
# JIANCHA_HUIZONG[1][0] = weak_num_Y1
# JIANCHA_HUIZONG[2][0] = weak_num_Y2
# JIANCHA_HUIZONG[3][0] = weak_num_Y3
# JIANCHA_HUIZONG[1][1] = Y1_jiancha
# JIANCHA_HUIZONG[2][1] = Y2_jiancha
# JIANCHA_HUIZONG[3][1] = Y3_jiancha
# JIANCHA_HUIZONG[1][2] = "/n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y1_quexian_dict.items())]) if Y1_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
# JIANCHA_HUIZONG[2][2] = "/n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y2_quexian_dict.items())]) if Y2_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
# JIANCHA_HUIZONG[3][2] = "/n".join([f"{i+1}.{name}" for i, (name, path) in enumerate(Y3_quexian_dict.items())]) if Y3_quexian_num else '未发现明显影响风力发电机组正常运行的缺陷'
# output_doc, message = await add_table_to_document(output_dir, JIANCHA_HUIZONG_DIR,4,3,total_table_num,JIANCHA_HUIZONG,False,ALIGMENT='LEFT')
# print(message)
# print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
# total_table_num += 1
# head_num += 1
# #主要部位图片展示表/检查内容表
# search_file_list = ["外汇总","内汇总","防汇总"]
# picture_Y1_num, Y1_dict = collect_defect_data(Y1, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei)
# picture_Y2_num, Y2_dict = collect_defect_data(Y2, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei)
# picture_Y3_num, Y3_dict = collect_defect_data(Y3, Picture_dir, ifwaibu, ifneibu, search_file_list, iffanglei)
# print(f"图片、文字数量:{picture_Y1_num} {picture_Y2_num} {picture_Y3_num}")
# JIANCHA_NEIRONG_TOTAL_NUM = picture_Y1_num+ picture_Y2_num + picture_Y3_num
# col ,row = 3, 0
# JIANCHA_NEIRONG_PICTURES_TABLE = os.path.join(muban_dir,"check2.docx")
# JIANCHA_NEIRONG_Y1_DIR = os.path.join(muban_dir,"check_content.docx")
# JIANCHA_NEIRONG_Y1 = list(list("" for _ in range(3)) for j in range(1))
# JIANCHA_NEIRONG_Y1[0][0] = f"叶片1{Y1}检查内容"
# print(f"Y1标题内容{JIANCHA_NEIRONG_Y1}")
# JIANCHA_NEIRONG_Y2_DIR = os.path.join(muban_dir,"check3.docx")
# JIANCHA_NEIRONG_Y2 = list(list("" for _ in range(3)) for j in range(1))
# JIANCHA_NEIRONG_Y2[0][0] = f"叶片2{Y2}检查内容"
# print(f"Y2标题内容{JIANCHA_NEIRONG_Y2}")
# JIANCHA_NEIRONG_Y3_DIR = os.path.join(muban_dir,"check3.docx")
# JIANCHA_NEIRONG_Y3 = list(list("" for _ in range(3)) for j in range(1))
# JIANCHA_NEIRONG_Y3[0][0] = f"叶片3{Y3}检查内容"
# print(f"Y3标题内容{JIANCHA_NEIRONG_Y3}")
# print(f"当前表格序号为 {total_table_num}")
# print(key_words)
# output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y1_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y1,True, 1)
# print(message)
# total_table_num += 1
# total_table_num = await process_images_table(Y1_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
# output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y2_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y2,True, 1)
# print(message)
# total_table_num += 1
# total_table_num = await process_images_table(Y2_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
# output_doc, message = await add_table_to_document(output_dir, JIANCHA_NEIRONG_Y3_DIR,1,3,total_table_num,JIANCHA_NEIRONG_Y3,True, 1)
# print(message)
# total_table_num += 1
# total_table_num = await process_images_table(Y3_dict, output_dir, total_table_num, JIANCHA_NEIRONG_PICTURES_TABLE, key_words)
# print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
# head_num += 1
# #缺陷详情
# QUEXIAN_XIANGQING_DIR = os.path.join(muban_dir,"check_check.docx")
# QUEXIAN_XIANGQING_TITLE_DIR = os.path.join(muban_dir,"check_check_title.docx")
# Y_tables = [Y1_quexian_dict,Y2_quexian_dict,Y3_quexian_dict]
# Y1_table_list = []
# Y2_table_list = []
# Y3_table_list = []
# table_lists = [Y1_table_list, Y2_table_list, Y3_table_list]
# for i, (table_list, Y_dict) in enumerate(zip(table_lists, Y_tables)):
# for image_name, image_path in Y_dict.items():
# # 从图片名解析各个字段
# parts = image_name.split('_')
# if len(parts) >= 8: # 确保有7个部分
# defect_type = parts[1]
# defect_location = parts[2]
# defect_size = parts[3]
# visibility = parts[4]
# urgency = parts[5]
# severity = parts[6]
# repair_suggestion = parts[7]
# print(f"获取第{i+1}个叶片的缺陷图: {image_path}")
# table_list.append({
# "QueXianLeiXing": defect_type,
# "QueXianWeiZhi": defect_location,
# "QueXianChiCun": defect_size,
# "WeiZongDengJi": severity,
# "Tupian_Dir": image_path,
# "visibility": visibility,
# "urgency": urgency,
# "repair_suggestion": repair_suggestion.split('.')[0], # 新增维修建议字段
# })
# else:
# table_list.append({
# "QueXianLeiXing": "图片命名有误",
# "QueXianWeiZhi": "图片命名有误",
# "QueXianChiCun": "图片命名有误",
# "WeiZongDengJi": "图片命名有误",
# "Tupian_Dir": image_path,
# "visibility": "图片命名有误",
# "urgency": "图片命名有误",
# "repair_suggestion": "图片命名有误", # 新增维修建议字段
# })
# Y1_TABLES, Y1_TABLES_PICTURES = fill_tables(table_lists[0],4,5,len(table_lists[0]),Y1)
# Y2_TABLES, Y2_TABLES_PICTURES = fill_tables(table_lists[1],4,5,len(table_lists[1]),Y2)
# Y3_TABLES, Y3_TABLES_PICTURES = fill_tables(table_lists[2],4,5,len(table_lists[2]),Y3)
# print(add_documents(output_dir, QUEXIAN_XIANGQING_TITLE_DIR))
# print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
# head_num += 1
# table_num = 0
# Xu_Hao = 0
# total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y1_TABLES,QUEXIAN_XIANGQING_DIR,Y1_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
# total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y2_TABLES,QUEXIAN_XIANGQING_DIR,Y2_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
# total_table_num,table_num,Xu_Hao = await add_dynamic_table(output_doc,output_dir,table_num,Y3_TABLES,QUEXIAN_XIANGQING_DIR,Y3_TABLES_PICTURES,4,5,total_table_num,False,xuhao=Xu_Hao)
#总结
ZONG_JIE_DIR = os.path.join(muban_dir,"result.docx")
ZONG_JIE_BEFORE = "result"
ZONG_JIE = baogao_info['baogao_zongjie']
print(add_documents(output_dir, ZONG_JIE_DIR))
print(await search_and_replace(output_dir, ZONG_JIE_BEFORE, ZONG_JIE))
print(await search_and_replace(output_dir, 'company_yi', Yi_company))
print(await search_and_replace(output_dir, 'baogao_date', baogao_date.split(' ')[0]))
print(await search_and_replace(output_dir, jiegou_xuhao, num_to_chinese[head_num]))
def main():
json_data1 = {
"turbine_id" : "183463dbf40d9278549a76b82b175dd9",
}
json_data2 = {
'shengcheng_dir': r"D:\work\Report_Generate_Server\output",
'muban_dir': r"D:\work\Report_Generate_Server\muban",
}
asyncio.run(generate_report(json_data1,json_data2))
print('文档生成完毕')
if __name__ == '__main__':
main()

Binary file not shown.

Binary file not shown.

138
core/styles.py Normal file
View File

@ -0,0 +1,138 @@
"""
Style-related functions for Word Document Server.
"""
from docx.shared import Pt
from docx.enum.style import WD_STYLE_TYPE
def ensure_heading_style(doc):
"""
Ensure Heading styles exist in the document.
Args:
doc: Document object
"""
for i in range(1, 10): # Create Heading 1 through Heading 9
style_name = f'Heading {i}'
try:
# Try to access the style to see if it exists
style = doc.styles[style_name]
except KeyError:
# Create the style if it doesn't exist
try:
from docx.oxml.ns import qn
style = doc.styles.add_style(style_name, WD_STYLE_TYPE.PARAGRAPH)
style.font.name = '宋体(中文正文)'
style.font.size = Pt(22) # 根据需要设置字体大小
style._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体(中文正文)')
if i == 1:
style.font.size = Pt(16)
style.font.bold = True
elif i == 2:
style.font.size = Pt(14)
style.font.bold = True
else:
style.font.size = Pt(12)
style.font.bold = True
except Exception:
# If style creation fails, we'll just use default formatting
pass
def ensure_table_style(doc):
"""
Ensure Table Grid style exists in the document.
Args:
doc: Document object
"""
try:
# Try to access the style to see if it exists
style = doc.styles['Table Grid']
except KeyError:
# If style doesn't exist, we'll handle it at usage time
pass
def create_style(doc, style_name, style_type, base_style=None, font_properties=None, paragraph_properties=None):
"""
Create a new style in the document.
Args:
doc: Document object
style_name: Name for the new style
style_type: Type of style (WD_STYLE_TYPE)
base_style: Optional base style to inherit from
font_properties: Dictionary of font properties (bold, italic, size, name, color)
paragraph_properties: Dictionary of paragraph properties (alignment, spacing)
Returns:
The created style
"""
from docx.shared import Pt
try:
# Check if style already exists
style = doc.styles.get_by_id(style_name, WD_STYLE_TYPE.PARAGRAPH)
return style
except:
# Create new style
new_style = doc.styles.add_style(style_name, style_type)
# Set base style if specified
if base_style:
new_style.base_style = doc.styles[base_style]
# Set font properties
if font_properties:
font = new_style.font
if 'bold' in font_properties:
font.bold = font_properties['bold']
if 'italic' in font_properties:
font.italic = font_properties['italic']
if 'size' in font_properties:
font.size = Pt(font_properties['size'])
if 'name' in font_properties:
font.name = font_properties['name']
if 'color' in font_properties:
from docx.shared import RGBColor
# Define common RGB colors
color_map = {
'red': RGBColor(255, 0, 0),
'blue': RGBColor(0, 0, 255),
'green': RGBColor(0, 128, 0),
'yellow': RGBColor(255, 255, 0),
'black': RGBColor(0, 0, 0),
'gray': RGBColor(128, 128, 128),
'white': RGBColor(255, 255, 255),
'purple': RGBColor(128, 0, 128),
'orange': RGBColor(255, 165, 0)
}
color_value = font_properties['color']
try:
# Handle string color names
if isinstance(color_value, str) and color_value.lower() in color_map:
font.color.rgb = color_map[color_value.lower()]
# Handle RGBColor objects
elif hasattr(color_value, 'rgb'):
font.color.rgb = color_value
# Try to parse as RGB string
elif isinstance(color_value, str):
font.color.rgb = RGBColor.from_string(color_value)
# Use directly if it's already an RGB value
else:
font.color.rgb = color_value
except Exception as e:
# Fallback to black if all else fails
font.color.rgb = RGBColor(0, 0, 0)
# Set paragraph properties
if paragraph_properties:
if 'alignment' in paragraph_properties:
new_style.paragraph_format.alignment = paragraph_properties['alignment']
if 'spacing' in paragraph_properties:
new_style.paragraph_format.line_spacing = paragraph_properties['spacing']
return new_style

283
core/tables.py Normal file
View File

@ -0,0 +1,283 @@
"""
Table-related operations for Word Document Server.
"""
from docx.oxml.shared import OxmlElement, qn
from docx.oxml.ns import nsdecls
from docx.oxml import parse_xml
def set_cell_border(cell, **kwargs):
"""
Set cell border properties.
Args:
cell: The cell to modify
**kwargs: Border properties (top, bottom, left, right, val, color)
"""
tc = cell._tc
tcPr = tc.get_or_add_tcPr()
# Create border elements
for key, value in kwargs.items():
if key in ['top', 'left', 'bottom', 'right']:
tag = 'w:{}'.format(key)
element = OxmlElement(tag)
element.set(qn('w:val'), kwargs.get('val', 'single'))
element.set(qn('w:sz'), kwargs.get('sz', '4'))
element.set(qn('w:space'), kwargs.get('space', '0'))
element.set(qn('w:color'), kwargs.get('color', 'auto'))
tcBorders = tcPr.first_child_found_in("w:tcBorders")
if tcBorders is None:
tcBorders = OxmlElement('w:tcBorders')
tcPr.append(tcBorders)
tcBorders.append(element)
def apply_table_style(table, has_header_row=False, border_style=None, shading=None):
"""
Apply formatting to a table.
Args:
table: The table to format
has_header_row: If True, formats the first row as a header
border_style: Style for borders ('none', 'single', 'double', 'thick')
shading: 2D list of cell background colors (by row and column)
Returns:
True if successful, False otherwise
"""
try:
# Format header row if requested
if has_header_row and table.rows:
header_row = table.rows[0]
for cell in header_row.cells:
for paragraph in cell.paragraphs:
if paragraph.runs:
for run in paragraph.runs:
run.bold = True
# Apply border style if specified
if border_style:
val_map = {
'none': 'nil',
'single': 'single',
'double': 'double',
'thick': 'thick'
}
val = val_map.get(border_style.lower(), 'single')
# Apply to all cells
for row in table.rows:
for cell in row.cells:
set_cell_border(
cell,
top=True,
bottom=True,
left=True,
right=True,
val=val,
color="000000"
)
# Apply cell shading if specified
if shading:
for i, row_colors in enumerate(shading):
if i >= len(table.rows):
break
for j, color in enumerate(row_colors):
if j >= len(table.rows[i].cells):
break
try:
# Apply shading to cell
cell = table.rows[i].cells[j]
shading_elm = parse_xml(f'<w:shd {nsdecls("w")} w:fill="{color}"/>')
cell._tc.get_or_add_tcPr().append(shading_elm)
except:
# Skip if color format is invalid
pass
return True
except Exception:
return False
def copy_table(source_table, target_doc, ifadjustheight=True, height = 1):
"""
Copy a table from one document to another.
Args:
source_table: The table to copy
target_doc: The document to copy the table to
Returns:
The new table in the target document
"""
# Create a new table with the same dimensions
new_table = target_doc.add_table(rows=len(source_table.rows), cols=len(source_table.columns))
# Try to apply the same style
try:
if source_table.style:
new_table.style = 'Table Grid'
except:
# Fall back to default grid style
try:
new_table.style = 'Table Grid'
except:
pass
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.shared import Pt, Inches, Cm, RGBColor
# Copy cell contents
for i, row in enumerate(source_table.rows):
for j, cell in enumerate(row.cells):
for paragraph in cell.paragraphs:
average_char_width_in_points = 6
if paragraph.text:
new_table.cell(i,j).text = paragraph.text
new_table.cell(i,j).paragraphs[0].runs[0].font.name = "Times New Roman" #设置英文字体
new_table.cell(i,j).paragraphs[0].runs[0].font.size = Pt(10.5) # 字体大小
new_table.cell(i,j).paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') #设置中文字体
new_table.cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_table.cell(i,j).vertical_alignment = WD_ALIGN_VERTICAL.CENTER
"""
待添加如何让表格自适应大小autofit目前不知为何没有作用
"""
if ifadjustheight:
new_table.rows[i].height = Cm(height)
try:
new_table = merge_tables(new_table)
except Exception as e:
print(f"合并表格失败:{e}")
from docx.shared import Inches
return new_table
from collections import deque
def merge_tables(table):
"""BFS遍历将相邻且相同单元格合并
Args:
table: 表格docx库的Table类型
Returns:
合并后的表格
"""
if not table or len(table.rows) == 0:
return table
rows = len(table.rows)
cols = len(table.columns)
# 创建访问标记矩阵
visited = [[False for _ in range(cols)] for _ in range(rows)]
# 定义四个方向的移动:上、右、下、左
directions = [(0, 0), (0, 1), (1, 0), (0, 0)]
for i in range(rows):
for j in range(cols):
if not visited[i][j]:
current_cell = table.cell(i, j)
current_text = current_cell.text.strip()
if not current_text: # 跳过空单元格
visited[i][j] = True
continue
# BFS队列
queue = deque()
queue.append((i, j))
visited[i][j] = True
# 记录需要合并的单元格
cells_to_merge = []
while queue:
x, y = queue.popleft()
cells_to_merge.append((x, y))
for dx, dy in directions:
nx, ny = x + dx, y + dy
# 检查边界和访问状态
if 0 <= nx < rows and 0 <= ny < cols and not visited[nx][ny]:
neighbor_cell = table.cell(nx, ny)
neighbor_text = neighbor_cell.text.strip()
if neighbor_text == current_text:
visited[nx][ny] = True
queue.append((nx, ny))
# 如果有需要合并的单元格
if len(cells_to_merge) > 1:
# 按行和列排序,确保左上角是第一个单元格
cells_to_merge.sort()
min_row, min_col = cells_to_merge[0]
max_row, max_col = cells_to_merge[-1]
# 清空所有待合并单元格(包括换行符)
for x, y in cells_to_merge[1:]:
cell = table.cell(x, y)
# 删除所有段落(彻底清空)
for paragraph in list(cell.paragraphs):
p = paragraph._element
p.getparent().remove(p)
# 可选:添加一个空段落防止格式问题
cell.add_paragraph()
# 执行合并
if max_row > min_row or max_col > min_col:
table.cell(min_row, min_col).merge(table.cell(max_row, max_col))
return table
def fill_tables(Y_table_list, row, col, Y_Table_num, Y):
"""根据前端返回json块填写表格list并实时跟进已填写表格数量
目前只支持固定的缺陷图的填写
Args:
Y_table_list (list): 前端返回的json块
row (int): 表格行数
col (int): 表格列数
Y_Table_num: json块中有几个表格
Xu_Hao: 是第几个json块
Y: 其他参数
Return:
Y1_TABLES: 三维表和对应元素
table_index_to_images: 字典表索引到图片路径列表的映射
Xu_Hao到达第几个表了
"""
table_index_to_images = {}
Y_TABLES = [[["" for _ in range(row)] for _ in range(col)] for _ in range(Y_Table_num)]
# 处理前端返回数据
for l, table_dict in enumerate(Y_table_list):
if table_dict:
Y_TABLES[l][1][0] = Y
Y_TABLES[l][1][1] = table_dict["QueXianLeiXing"]
Y_TABLES[l][1][2] = table_dict["QueXianWeiZhi"]
Y_TABLES[l][1][3] = table_dict["QueXianChiCun"]
Y_TABLES[l][3][0] = table_dict["WeiZongDengJi"]
Y_TABLES[l][3][1] = table_dict["visibility"]
Y_TABLES[l][3][2] = table_dict["urgency"]
Y_TABLES[l][3][3] = table_dict["repair_suggestion"]
# 获取图片路径
image_path = table_dict['Tupian_Dir']
if image_path:
# 确保路径是字符串形式
if isinstance(image_path, list):
table_index_to_images[l] = image_path.copy()
else:
table_index_to_images[l] = [str(image_path)]
return Y_TABLES, table_index_to_images

BIN
muban/check2.docx Normal file

Binary file not shown.

BIN
muban/check3.docx Normal file

Binary file not shown.

BIN
muban/check_check.docx Normal file

Binary file not shown.

Binary file not shown.

BIN
muban/check_content.docx Normal file

Binary file not shown.

BIN
muban/checkinfo.docx Normal file

Binary file not shown.

BIN
muban/checkmethod.docx Normal file

Binary file not shown.

BIN
muban/chengguo_sub.docx Normal file

Binary file not shown.

BIN
muban/fengmian.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

BIN
muban/fengmian1.docx Normal file

Binary file not shown.

BIN
muban/fengmian2.docx Normal file

Binary file not shown.

BIN
muban/result.docx Normal file

Binary file not shown.

BIN
muban/total_check.docx Normal file

Binary file not shown.

BIN
muban/xiangmugaikuo.docx Normal file

Binary file not shown.

Binary file not shown.

5
tools/API.py Normal file
View File

@ -0,0 +1,5 @@
DTURL = "http://pms.dtyx.net:9158"
GETPROJECTINFO = "/project/detail/{projectId}"
GETJIZUINFO = "/turbine/detail/{turbineId}"
GETSHIGONGINFO = "/t-construction/list"
GETWEATHERINFO = "/weather-type/{weatherCode}"

79
tools/Get_Json.py Normal file
View File

@ -0,0 +1,79 @@
import requests
import json
from tools.API import *
def get_project_info(projectId):
projecturl = DTURL + GETPROJECTINFO.format(projectId=projectId)
headers = {
"content_type" : "application/x-www-form-urlencoded"
}
try:
response = requests.get(projecturl, headers=headers)
if response.status_code == 200:
data = json.loads(response.text)
print(f"获取到项目的数据:{data}")
return data["data"]
else:
print(f"获取项目{projectId}数据失败,状态码:{response.status_code}")
return None
except Exception as e:
print(f"获取项目{projectId}数据失败,异常:{e}")
return None
def get_jizu_info(turbineId):
jizuurl = DTURL + GETJIZUINFO.format(turbineId=turbineId)
headers = {
"content_type" : "application/x-www-form-urlencoded"
}
try:
response = requests.get(jizuurl, headers=headers)
if response.status_code == 200:
data = json.loads(response.text)
print(f"获取到机组的数据:{data}")
return data["data"]
else:
print(f"获取项目{turbineId}数据失败,状态码:{response.status_code}")
return None
except Exception as e:
print(f"获取项目{turbineId}数据失败,异常:{e}")
return None
def get_jizu_shigong_info(turbineId):
jizuurl = DTURL + GETSHIGONGINFO
headers = {
"content_type" : "application/x-www-form-urlencoded"
}
params = {
"rows" : {
"turbineId" : turbineId
}
}
try:
response = requests.get(jizuurl, headers=headers, params=params)
if response.status_code == 200:
data = json.loads(response.text)
print(f"获取到机组施工的数据:{data}")
return data["rows"][0]
else:
print(f"获取项目{turbineId}施工数据失败,状态码:{response.status_code}")
return None
except Exception as e:
print(f"获取项目{turbineId}施工数据失败,异常:{e}")
return None
def get_weather(weatherid):
weatherurl = DTURL + GETWEATHERINFO.format(weatherCode=weatherid)
headers = {
"content_type" : "application/x-www-form-urlencoded"
}
try:
response = requests.get(weatherurl, headers=headers)
if response.status_code == 200:
data = json.loads(response.text)
print(f"获取到天气数据:{data}")
return data["data"]
else:
print(f"获取天气{weatherid}数据失败,状态码:{response.status_code}")
return None
except Exception as e:
print(f"获取天气{weatherid}数据失败,异常:{e}")
return None

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

637
tools/content_tools.py Normal file
View File

@ -0,0 +1,637 @@
"""
Content tools for Word Document Server.
These tools add various types of content to Word documents,
including headings, paragraphs, tables, images, and page breaks.
"""
import os
from typing import List, Optional, Dict, Any
from docx import Document
from docx.shared import Inches, Pt
from docx.oxml.shared import qn
from utils.file_utils import check_file_writeable, ensure_docx_extension
from utils.document_utils import find_and_replace_text
from core.styles import ensure_heading_style, ensure_table_style
def split_table_by_row_content(
doc_path: str,
output_path: str,
table_num: int = 0
) -> str:
"""
根据表格第二行第一列内容的行数对指定表格进行分行处理
并将每列内容按相同行数分割不足则重复
参数:
doc_path: 输入Word文档路径
output_path: 输出Word文档路径
table_num: 要处理的表格序号(从0开始)
"""
try:
from docx import Document
from docx.shared import Pt
from docx.oxml.shared import qn
from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL
# 打开文档
doc = Document(doc_path)
# 检查表格是否存在
if len(doc.tables) <= table_num:
return f"文档中不存在第{table_num+1}个表格"
# 获取指定表格
table = doc.tables[table_num]
# 获取表格行数和列数
row_count = len(table.rows)
col_count = len(table.columns)
# 如果表格行数小于2无法处理
if row_count < 2:
doc.save(output_path)
return "表格行数少于2行无法按照要求分行"
# 获取第二行第一列的文本内容
second_row_first_cell = table.cell(1, 0)
second_row_text = second_row_first_cell.text
# 计算第二行第一列文本的行数(按换行符分割)
lines_in_second_row = len(second_row_text.split('\n'))
# 如果行数为0设置为1至少分为1部分
split_count = max(1, lines_in_second_row)
print(f'原表格行数:{row_count},第二行第一列内容行数:{split_count},需要分割为:{split_count}部分')
# 创建新表格来替代原表格(分割后的表格)
# 新表格的行数 = 标题行(1) + 原数据行数 × 分割部分数
new_table = doc.add_table(rows=1 + (row_count-1)*split_count, cols=col_count)
# 设置表格样式
new_table.style = table.style
new_table.autofit = True
# 1. 处理标题行(第一行)保持不变
for col_idx in range(col_count):
orig_cell = table.cell(0, col_idx)
new_cell = new_table.cell(0, col_idx)
# 复制内容并设置格式
new_cell.text = orig_cell.text
if orig_cell.paragraphs:
# 设置格式
new_cell.paragraphs[0].runs[0].font.name = "Times New Roman"
new_cell.paragraphs[0].runs[0].font.size = Pt(10.5)
new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
new_cell.width = orig_cell.width
# 2. 处理数据行(从第二行开始)
for orig_row_idx in range(1, row_count): # 遍历原表格的每一行数据
for col_idx in range(col_count): # 遍历每一列
orig_cell = table.cell(orig_row_idx, col_idx)
cell_text = orig_cell.text
# 分割当前单元格内容
cell_lines = cell_text.split('\n')
cell_line_count = len(cell_lines)
# 如果内容行数不足分割数,则重复最后一行
if cell_line_count < split_count:
cell_lines += [cell_lines[-1]] * (split_count - cell_line_count)
# 在新表格中对应的位置写入分割后的内容
for part_idx in range(split_count):
# 计算新表格中的行位置
new_row_idx = 1 + (orig_row_idx-1)*split_count + part_idx
# 获取新单元格
new_cell = new_table.cell(new_row_idx, col_idx)
# 写入分割后的内容
line_text = cell_lines[part_idx] if part_idx < len(cell_lines) else cell_lines[-1]
new_cell.text = line_text
# 设置格式
if new_cell.paragraphs:
new_cell.paragraphs[0].runs[0].font.name = "Times New Roman"
new_cell.paragraphs[0].runs[0].font.size = Pt(10.5)
new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
# 复制单元格宽度
new_cell.width = orig_cell.width
# 删除原表格
table._element.getparent().remove(table._element)
# 保存文档
doc.save(output_path)
return f"{table_num+1}个表格已成功分行处理"
except Exception as e:
return f"处理表格时出错: {str(e)}"
async def add_heading(filename: str, text: str, level: int = 1) -> str:
"""对文档增加标题
Args:
filename: 目标文档路径
text: 标题文本
level: 标题级别1为最高级
"""
filename = ensure_docx_extension(filename)
# Ensure level is converted to integer
try:
level = int(level)
except (ValueError, TypeError):
return "Invalid parameter: level must be an integer between 1 and 9"
# Validate level range
if level < 1 or level > 9:
return f"Invalid heading level: {level}. Level must be between 1 and 9."
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
# Ensure heading styles exist
ensure_heading_style(doc)
# Try to add heading with style
try:
heading = doc.add_heading(text, level=level)
doc.save(filename)
return f"Heading '{text}' (level {level}) added to {filename}"
except Exception as style_error:
# If style-based approach fails, use direct formatting
paragraph = doc.add_paragraph(text)
paragraph.style = doc.styles['Normal']
run = paragraph.runs[0]
run.bold = True
rPr = run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
from docx.oxml.shared import qn
rFonts.set(qn('w:eastAsia'), '宋体(中文正文)')
# Adjust size based on heading level
if level == 1:
run.font.size = Pt(12)
elif level == 2:
run.font.size = Pt(14)
else:
run.font.size = Pt(12)
doc.save(filename)
return f"Heading '{text}' added to {filename} with direct formatting (style not available)"
except Exception as e:
return f"Failed to add heading: {str(e)}"
async def add_paragraph(filename: str, text: str, style: Optional[str] = None) -> str:
"""对文档添加一个段落(一行)
Args:
filename: 目标文档路径
text: 段落内容
style: 段落样式可选
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
paragraph = doc.add_paragraph(text)
if style:
try:
paragraph.style = style
except KeyError:
# Style doesn't exist, use normal and report it
paragraph.style = doc.styles['Normal']
# Copy run formatting
# for i, run in enumerate(paragraph.runs):
# if i < len(paragraph.runs):
# new_run = paragraph.runs[i]
# # Copy basic formatting
# new_run.bold = run.bold
# new_run.italic = run.italic
# new_run.underline = run.underline
# #添加同时合并字体2025427
# new_run.font.name = run.font.name
# rPr = new_run.element.get_or_add_rPr()
# rFonts = rPr.get_or_add_rFonts()
# # 检查 run.font.name 是否为 None
# if run.font.name is None:
# # 设置默认的中文字体名称
# run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体
# rFonts.set(qn('w:eastAsia'), run.font.name)
# new_run.font.color.rgb = run.font.color.rgb
# # Font size if specified
# if run.font.size:
# new_run.font.size = run.font.size
doc.save(filename)
return f"Style '{style}' not found, paragraph added with default style to {filename}"
doc.save(filename)
return f"Paragraph added to {filename}"
except Exception as e:
return f"Failed to add paragraph: {str(e)}"
async def add_table(filename: str, rows: int, cols: int, data: Optional[List[List[str]]] = None) -> str:
"""对文档添加一个表格
Args:
filename: 目标文档路径
rows: 表格行数
cols: 表格列数
data: 二维数组列表每一项为单元格内容默认为空
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
table = doc.add_table(rows=rows, cols=cols)
# Try to set the table style
try:
table.style = 'Table Grid'
except KeyError:
# If style doesn't exist, add basic borders
pass
# Fill table with data if provided
if data:
for i, row_data in enumerate(data):
if i >= rows:
break
for j, cell_text in enumerate(row_data):
if j >= cols:
break
table.cell(i, j).text = str(cell_text)
doc.save(filename)
return f"Table ({rows}x{cols}) added to {filename}"
except Exception as e:
return f"Failed to add table: {str(e)}"
async def add_picture_to_table(target_doc: Document, target_filename: str, row: int, col: int, image_path: str,table_num: int = -1, width: Optional[float] = None) -> str:
"""向文档中对应表格添加图片
Args:
target_doc: 目标文档
target_filename: 目标文档保存路径
row: 表格行数
col: 表格列数
image_path: 图片路径
table_num: 表格序号默认为-1即最后一个表格
width: 图片宽度默认为None表示使用原始图片大小
"""
from PIL import Image
if not os.path.exists(image_path):
return f"Image file not found: {image_path}"
# Check image file size
try:
image_size = os.path.getsize(image_path) / 1024 # Size in KB
if image_size <= 0:
return f"Image file appears to be empty: {image_path} (0 KB)"
elif image_size > 9126:
# Create the output directory if it doesn't exist
output_dir = os.path.join(os.path.dirname(image_path), "压缩图片")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Define the output path for the compressed image
image_name = os.path.basename(image_path)
output_path = os.path.join(output_dir, image_name)
# Compress the image
while image_size > 9126:
print(f"压缩图片:{image_path} ({image_size:.2f} KB) -> {output_path} (9126 KB)")
with Image.open(image_path) as img:
img.save(output_path, optimize=True, quality=85)
image_size = os.path.getsize(output_path) / 1024 # Size in KB
# Update the image path to the compressed image path
image_path = output_path
except Exception as size_error:
return f"Error checking image file: {str(size_error)}"
try:
table = target_doc.tables[table_num]
# Add the picture to the cell
cell = table.cell(row, col)
if len(cell.text) == 1: cell.text = ""
paragraph = cell.paragraphs[-1]
run = paragraph.add_run()
try:
if width:
run.add_picture(image_path, width=Inches(width))
else:
run.add_picture(image_path)
except Exception as e:
# 如果添加图片时出现问题尝试将图片转换为PNG格式
try:
print(f"正常添加失败,尝试转换图片后添加:{image_path}")
# 打开图片
img = Image.open(image_path)
# 转换为PNG格式
temp_image_path = os.path.splitext(image_path)[0] + '.png'
img.save(temp_image_path, 'PNG')
# 尝试添加转换后的图片
if width:
run.add_picture(temp_image_path, width=Inches(width))
else:
run.add_picture(temp_image_path)
# 添加完成后删除转换后的图片
os.remove(temp_image_path)
except Exception as e:
# 如果转换或添加转换后的图片时出现问题,返回错误信息
return f"调用add_picture函数出现问题: {str(e)}"
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.enum.table import WD_ALIGN_VERTICAL
cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
target_doc.save(target_filename)
return f"Picture {image_path} added to table {table_num} cell ({row},{col})"
except Exception as e:
return f"Failed to add picture to table: {str(e)}"
async def add_picture(filename: str, image_path: str, width: Optional[float] = None) -> str:
"""添加一个图片到文档中
Args:
filename: 文档路径
image_path: 图片路径
width: 图片大小
"""
filename = ensure_docx_extension(filename)
# Validate document existence
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Get absolute paths for better diagnostics
abs_filename = os.path.abspath(filename)
abs_image_path = os.path.abspath(image_path)
# Validate image existence with improved error message
if not os.path.exists(abs_image_path):
return f"Image file not found: {abs_image_path}"
# Check image file size
try:
image_size = os.path.getsize(abs_image_path) / 1024 # Size in KB
if image_size <= 0:
return f"Image file appears to be empty: {abs_image_path} (0 KB)"
except Exception as size_error:
return f"Error checking image file: {str(size_error)}"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(abs_filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(abs_filename)
# Additional diagnostic info
diagnostic = f"Attempting to add image ({abs_image_path}, {image_size:.2f} KB) to document ({abs_filename})"
try:
if width:
doc.add_picture(abs_image_path, width=Inches(width))
else:
doc.add_picture(abs_image_path)
doc.save(abs_filename)
return f"Picture {image_path} added to {filename}"
except Exception as inner_error:
# More detailed error for the specific operation
error_type = type(inner_error).__name__
error_msg = str(inner_error)
return f"Failed to add picture: {error_type} - {error_msg or 'No error details available'}\nDiagnostic info: {diagnostic}"
except Exception as outer_error:
# Fallback error handling
error_type = type(outer_error).__name__
error_msg = str(outer_error)
return f"Document processing error: {error_type} - {error_msg or 'No error details available'}"
async def add_page_break(filename: str) -> str:
"""增加分页符
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
doc.add_page_break()
doc.save(filename)
return f"Page break added to {filename}."
except Exception as e:
return f"Failed to add page break: {str(e)}"
async def add_table_of_contents(filename: str, title: str = "Table of Contents", max_level: int = 3) -> str:
"""根据标题样式向Word文档添加目录。
参数:
filename: Word文档的路径
title: 可自行选择的一个标题
max_level: 要包含的最大标题级别1-9
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
# Ensure max_level is within valid range
max_level = max(1, min(max_level, 9))
doc = Document(filename)
# Collect headings and their positions
headings = []
for i, paragraph in enumerate(doc.paragraphs):
# Check if paragraph style is a heading
if paragraph.style and paragraph.style.name.startswith('Heading '):
try:
# Extract heading level from style name
level = int(paragraph.style.name.split(' ')[1])
if level <= max_level:
headings.append({
'level': level,
'text': paragraph.text,
'position': i
})
except (ValueError, IndexError):
# Skip if heading level can't be determined
pass
if not headings:
return f"No headings found in document {filename}. Table of contents not created."
# Create a new document with the TOC
toc_doc = Document()
# Add title
if title:
toc_doc.add_heading(title, level=1)
# Add TOC entries
for heading in headings:
# Indent based on level (using tab characters)
indent = ' ' * (heading['level'] - 1)
toc_doc.add_paragraph(f"{indent}{heading['text']}")
# Add page break
toc_doc.add_page_break()
# Get content from original document
for paragraph in doc.paragraphs:
p = toc_doc.add_paragraph(paragraph.text)
# Copy style if possible
try:
if paragraph.style:
p.style = paragraph.style.name
except:
pass
# Copy tables
for table in doc.tables:
# Create a new table with the same dimensions
new_table = toc_doc.add_table(rows=len(table.rows), cols=len(table.columns))
# Copy cell contents
for i, row in enumerate(table.rows):
for j, cell in enumerate(row.cells):
for paragraph in cell.paragraphs:
new_table.cell(i, j).text = paragraph.text
# Save the new document with TOC
toc_doc.save(filename)
return f"Table of contents with {len(headings)} entries added to {filename}"
except Exception as e:
return f"Failed to add table of contents: {str(e)}"
async def delete_paragraph(filename: str, paragraph_index: int) -> str:
"""通过行索引从文档中删除一段
Args:
filename: Path to the Word document
paragraph_index: 段落位置第几行
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
# Validate paragraph index
if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs):
return f"Invalid paragraph index. Document has {len(doc.paragraphs)} paragraphs (0-{len(doc.paragraphs)-1})."
# Delete the paragraph (by removing its content and setting it empty)
# Note: python-docx doesn't support true paragraph deletion, this is a workaround
paragraph = doc.paragraphs[paragraph_index]
p = paragraph._p
p.getparent().remove(p)
doc.save(filename)
return f"Paragraph at index {paragraph_index} deleted successfully."
except Exception as e:
return f"Failed to delete paragraph: {str(e)}"
async def search_and_replace(filename: str, find_text: str, replace_text: str) -> str:
"""替换所有find_text为replace_text
Args:
filename: Path to the Word document
find_text: Text to search for
replace_text: Text to replace with
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
# Perform find and replace
count = find_and_replace_text(doc, find_text, replace_text)
if count > 0:
doc.save(filename)
return f"Replaced {count} occurrence(s) of '{find_text}' with '{replace_text}'."
else:
return f"No occurrences of '{find_text}' found."
except Exception as e:
return f"Failed to search and replace: {str(e)}"

176
tools/defines.py Normal file
View File

@ -0,0 +1,176 @@
"""
缺陷图目录格式
缺陷图期望格式 _隔开
外部内部命名格式都如下
图片名xuhao_缺陷类型_缺陷位置_缺陷尺寸_可见程度_紧急程度_危重等级_维修建议
涂层损伤_叶片ps面距叶根3m处_缺陷尺寸弦向100mm轴向800mm_轻微_紧急_重要_建议打磨维修
每个的选项见我发的图
防雷
轮毂至塔基导通阻值_169mΩ
缺陷例轮毂至塔基未导通 #即标明未导通即可
"""
DEFAULT_BAOGAO_INFO = {
#目录
'picture_dir': "", #图片存放地址 为了报告美观希望总的汇总图片数量为3的倍数
'shengcheng_dir': "", #报告生成的路径
'muban_dir': "", #文档模板存放路径
'total_picture_dir': "", #用于生成略缩图的路径
'key_words': '缺,损,裂,脱,污', #关键字,用于汇总图的名字包含缺陷时标红,匹配逻辑为正则匹配单个字则为红 后续可优化
#当前检查报告基本内容
'jizu_type': 'H3-08#', #检查的机组编号
'jiancha_date': '2021年12月10日 9.00', #检查叶片日期 注意空格分开 小时分钟间隔为.
'baogao_date': '2021年12月10日 9.00', #生成报告时间 注意空格分开
'baogao_type': "风力发电机组叶片检查报告", #报告类型
#检查方案
'beizhu': '',
'renyuan_peizhi': '''2人主检飞手1人副检抄表1人
3轮毂机舱作业1人揽风绳作业1人无人设备操作员及抄表1人
1抄表人员1人检测人员1人监护1人
1主检飞手1人
2轮毂作业检查2人''',
'gongzuo_neirong': '''无人机叶片防雷导通测
无人吊篮叶片导通测试含机舱设备
风机基础办公楼变电站防雷接地检测及浪涌保护器测试
无人机叶片外观巡检
叶片内部检查''',
'shigong_fangan': '',
'shebei_peizhi': '''1四轴电阻无人机1套电子微欧计1台视频记录手机1台
无人吊篮系统1套爬绳器+接触平台电子微欧计1套视频记录手机1台对讲机2台
接地电阻测试仪1套SPD测试仪1套对讲机2个
1大疆无人机1台M350rtkM300rtkM30TM30精灵4PRO2大疆精灵4PRO+索尼A7R2机身+索尼200-600mm镜头/适马150-600mm镜头
1人工检查照明设备2套视频记录手机2台含氧量监测仪1台电动扳手2套卷尺1个2爬壁机器人检查无人作业校车+视频图传1套照明设备2套含氧量监测仪1台电动扳手2套卷尺1个''',
'jiancha_renyuan': '张三',
#检查信息
'waibu_jiancha': 'True', #是否包含外部检查
'neibu_jiancha': 'True', #是否包含内部检查
'fanglei_jiancha': 'True', #是否包含防雷检查 #注:防雷检测占不存放缺陷图
'jiancha_location': '叶片外部外观', #检查内容文字
'jiancha_fangshi': '作业无人机近距离外观检查', #检查方式文字
'yepian_changjia': '株洲时代新材料科技股份有限公司', #叶片厂家信息
#报告处理信息
'yezhu_renyuan': '李四', #业主(人员)
'changjia_renyuan': '王五', #厂家(人员)
'date_process': '生成报告人员', #数据处理人员 吴总希望获取前端用户执行生成人员为这个人
'baogao_bianzhi': '生成报告人员', #报告编制人员 吴总希望获取前端用户执行生成人员为这个人
'baogao_shenghe': '待填写(人员)', #报告审核人员
'shenghe_date': '待填写(日期)', #报告审核日期
#检查情况汇总表(文字信息) 前端根据是否包含对应部分检查自行确定检查内容,这里默认全部包含
'Y1_jiancha_neirong': '''1.叶片前缘、后缘、PS面、SS面
2.叶片内部导雷卡腹板透光人孔盖版叶根盖板...
3.轮毂至塔基导通内部导线线阻外部导线线阻...''',
'Y2_jiancha_neirong': '''1.叶片前缘、后缘、PS面、SS面
2.叶片内部导雷卡腹板透光人孔盖版叶根盖板...
3.轮毂至塔基导通内部导线线阻外部导线线阻...''',
'Y3_jiancha_neirong': '''1.叶片前缘、后缘、PS面、SS面
2.叶片内部导雷卡腹板透光人孔盖版叶根盖板...
3.轮毂至塔基导通内部导线线阻外部导线线阻...''',
#报告总结
'baogao_zongjie': '''1、因海上风电叶片运行环境恶劣、空气盐碱度高叶片前缘合模缝区域及PS面迎风面涂层易受腐蚀建议定期观察维护。
2经无人机近距离外观检查发现H3-08#机位Y200220AF叶片PS面距叶根20m处发现一处裂纹损伤长度轴向3m该缺陷经我方判定为严重缺陷建议尽快结安排对该机组停机并结合其他检查手段如人工打磨进一步勘查并决定维修处置方案防止风险进一步升级。
3经无人机近距离外观检查未发现H3-08#机位Y200249AF、Y200250AF叶片有明显影响机组正常运行的外部缺陷。
''', #报告总结文字
}
DEFAULT_BASE_INFO = { #项目基本信息
#项目概况
'jituan_jianxie': '甲方集团',
'jia_Company': '甲方公司名',
'jizu_num': '项目规格(台)',
'fengchang_name': '风场名称',
'fengchang_location': '风场位置',
'jizu_xinghao': '机组型号', #机组的型号
#乙方信息
'yi_Company': '乙方公司名',
'fuzeren': '甲方负责人(吴明洲)',
'phone_fuzeren': '联系电话18807109269 ',
}
oneproject = {
"status": 200,
"data": {
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"coverUrl": "",
"farmName": "三峡能源阿城万兴风电场",
"farmAddress": "哈尔滨市阿城区",
"client": "辽宁信达检测有限公司",
"clientContact": "李经理",
"clientPhone": "13504783720",
"inspectionUnit": "武汉市迪特影像科技有限公司",
"inspectionContact": "吴名州",
"inspectionPhone": "18807109269",
"scale": "",
"turbineModel": "",
"constructorIds": "5709ccfece2685090ff700a3469f2539,a76d78f1325deda1790a12bdad4aad4e",
"auditorId": "ca37c4337df8673a5c045b6c25acf74a",
"qualityOfficerId": "862e027910c2562d2b67d88ec33d77ba",
"projectManagerId": "fbaa9e0aecf2ce287138c38a4b654085",
"constructionTeamLeaderId": None,
"status": 0,
"startDate": None,
"endDate": None,
"constructorName": None,
"auditorName": "李四",
"qualityOfficerName": "辛奇",
"projectManagerName": "张三",
"constructionTeamLeaderName": None,
"statusLabel": "待施工"
},
"msg": "",
"code": 200,
"success": True
}
onejizu = {
"status": 200,
"data": [
{
"turbineId": "183463dbf40d9278549a76b82b175dd9",
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"turbineName": "一期012号",
"turbineCode": "00000",
"turbineDesc": "一期012号全新设备",
"turbineManufacturer": "",
"turbineModel": "",
"turbineCoverUrl": ""
}
],
"msg": "",
"code": 200,
"success": True
}
yepian = {
"status": 200,
"data": [
{
"partId": "12bc30fb209f3af3bf530541c5b062bc",
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"turbineId": "183463dbf40d9278549a76b82b175dd9",
"turbineName": "一期012号",
"partName": "叶片2",
"partCode": "0001",
"partType": "VANE-2",
"partTypeLabel": "叶片2"
},
{
"partId": "12bc30fb209f3af3bf530541c5b062bd",
"projectId": "96e0debf78187300f144d7f3450a2477",
"projectName": "三峡能源阿城万兴风电场防雷通道检测项目",
"turbineId": "183463dbf40d9278549a76b82b175dd9",
"turbineName": "一期012号",
"partName": "叶片1",
"partCode": "0000",
"partType": "VANE-1",
"partTypeLabel": "叶片1"
}
],
"msg": "",
"code": 200,
"success": True
}

609
tools/document_tools.py Normal file
View File

@ -0,0 +1,609 @@
"""
Document creation and manipulation tools for Word Document Server.
"""
import os
import json, re
from typing import Dict, List, Optional, Any
from docx import Document
from utils.file_utils import check_file_writeable, ensure_docx_extension, create_document_copy
from utils.document_utils import get_document_properties, extract_document_text, get_document_structure
from core.styles import ensure_heading_style, ensure_table_style
from docx.oxml.shared import qn
from docx.oxml import OxmlElement
from tools.content_tools import search_and_replace,add_picture_to_table
async def create_document(filename: str, title: Optional[str] = None, author: Optional[str] = None) -> str:
"""创建一个包含可选元数据的新Word文档。
参数:
filename: 要创建的文档名称带或不带.docx扩展名
title: 可选标题
author: 可选作者
"""
filename = ensure_docx_extension(filename)
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot create document: {error_message}"
try:
doc = Document()
# Set properties if provided
if title:
doc.core_properties.title = title
if author:
doc.core_properties.author = author
# Ensure necessary styles exist
ensure_heading_style(doc)
ensure_table_style(doc)
# 更改纸张大小为A4
from docx.shared import Mm, Inches
sections = doc.sections
for section in sections:
section.page_height = Mm(297)
section.page_width = Mm(210)
section.left_margin = Inches(0.94)
section.right_margin = Inches(0.94)
# Save the document
doc.save(filename)
return f"Document {filename} created successfully"
except Exception as e:
return f"Failed to create document: {str(e)}"
async def get_document_info(filename: str) -> str:
"""获得文档信息
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
try:
properties = get_document_properties(filename)
return json.dumps(properties, indent=2)
except Exception as e:
return f"Failed to get document info: {str(e)}"
async def get_document_text(filename: str) -> str:
"""获得文档的所有文本
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
return extract_document_text(filename)
async def get_document_outline(filename: str) -> str:
"""获得文档的所有结构信息
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
structure = get_document_structure(filename)
return json.dumps(structure, indent=2)
async def list_available_documents(directory: str = ".") -> str:
"""列出目录下所有Word文档
Args:
directory: 目录
"""
try:
if not os.path.exists(directory):
return f"Directory {directory} does not exist"
docx_files = [f for f in os.listdir(directory) if f.endswith('.docx')]
if not docx_files:
return f"No Word documents found in {directory}"
result = f"Found {len(docx_files)} Word documents in {directory}:\n"
for file in docx_files:
file_path = os.path.join(directory, file)
size = os.path.getsize(file_path) / 1024 # KB
result += f"- {file} ({size:.2f} KB)\n"
return result
except Exception as e:
return f"Failed to list documents: {str(e)}"
async def copy_document(source_filename: str, destination_filename: Optional[str] = None) -> str:
"""创建文档的副本
Args:
source_filename: 源文档路径
destination_filename: 目标文档路径为空则为当前目录
"""
source_filename = ensure_docx_extension(source_filename)
if destination_filename:
destination_filename = ensure_docx_extension(destination_filename)
success, message, new_path = create_document_copy(source_filename, destination_filename)
if success:
return message
else:
return f"Failed to copy document: {message}"
def add_documents(target_filename: str, source_filename: str) -> str:
"""将源文档(文本)添加到目标文档尾部
Args:
target_doc: 目标文档
source_filename: 源文档路径
"""
target_doc = Document(target_filename)
source_filename = ensure_docx_extension(source_filename)
source_doc = Document(source_filename)
for source_paragraph in source_doc.paragraphs:
new_paragraph = target_doc.add_paragraph(source_paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = source_paragraph.alignment
print(f"Source paragraph alignment: {source_paragraph.alignment}")
# Try to match the style if possible
try:
if source_paragraph.style and source_paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[source_paragraph.style.name]
except Exception as e:
print(f"Failed to apply style: {e}")
# Copy run formatting
for i, run in enumerate(source_paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
target_doc.save(target_filename)
return f"{target_filename}添加{source_filename}成功"
def write_table(target_filename: str, rows: int, cols: int, table_num: int, data: Optional[List[List[str]]] = None, ifadjustheight: Optional[bool] = True, height: Optional[float] = 1, key_words: re.Pattern[str] = None, ALIGMENT: Optional[str] = 'CENTER') -> Document:
"""填写word文档里的表格返回填写后的文档
Args:
target_filename: 目标文档路径
rows: 表格行数
cols: 表格列数
table_num: 表格序号
data: 表格数据二维列表每个单元格为字符串
ifadjustheight: bool为真则表格行高自动调整
"""
target_filename = ensure_docx_extension(target_filename)
# Check if target file is writeable
is_writeable, error_message = check_file_writeable(target_filename)
if not is_writeable:
return f"Cannot create target document: {error_message}"
try:
target_filename = ensure_docx_extension(target_filename)
target_doc = Document(target_filename)
except Exception as e:
print(f"获取{target_filename}失败:{str(e)}")
# Try to set the table style
try:
target_doc.tables[table_num].style = 'Table Grid'
except KeyError as k:
pass
except Exception as e:
print(f"{target_doc}最后一个表格更改样式失败: {str(e)}")
print("开始写入表格")
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.shared import Pt, Inches, Cm, RGBColor
try:
if data:
for i, row_data in enumerate(data):
if i >= rows + 1:
break
for j, cell_text in enumerate(row_data):
if j >= cols + 1:
break
if str(cell_text) == "": continue
print(f"在[{i},{j}]处写入{str(cell_text)}")
target_doc.tables[table_num].cell(i,j).text = str(cell_text)
print(key_words, cell_text)
if key_words and key_words.search(str(cell_text)):
print(f'{cell_text}包含关键之,已置红')
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0)
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.name = "Times New Roman" #设置英文字体
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0].font.size = Pt(10.5) # 字体大小
target_doc.tables[table_num].cell(i,j).paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') #设置中文字体
if ALIGMENT == 'CENTER':
target_doc.tables[table_num].cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
elif ALIGMENT == 'LEFT':
target_doc.tables[table_num].cell(i,j).paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.LEFT
target_doc.tables[table_num].cell(i,j).vertical_alignment = WD_ALIGN_VERTICAL.CENTER
if ifadjustheight:
target_doc.tables[table_num].rows[i].height = Cm(height)
except Exception as e:
print(f"写入{target_filename}tables.cell({i},{j})失败:{str(e)}")
print("表格写入完成")
return target_doc
def set_document_para(target_doc: Document) -> Document:
"""设置文档的段落格式
"""
paragraphs_to_remove = []
for i, paragraph in enumerate(target_doc.paragraphs):
if i <= 11:
continue
if not paragraph.text.strip():
paragraphs_to_remove.append(paragraph)
for paragraph in paragraphs_to_remove:
p = paragraph._element
p.getparent().remove(p)
return target_doc
async def add_table_to_document(target_filename: str, source_filename: str, rows: int, cols: int, table_num: int, data: Optional[List[List[str]]] = None, ifadjustheight: Optional[bool] = True, height: Optional[float] = 1, key_words: re.Pattern[str] = None, ALIGMENT: Optional[str] = 'CENTER') -> str:
"""复制源文件中的文字与表格(先文字后表格格式)到目标文档
Args:
target_filename: 目标文档路径
source_doc: 源文档路径
rows: 表格行数
cols: 表格列数
table_num: 表格序号
data: 表格数据二维列表每个单元格为字符串
ifadjustheight: bool为真则表格行高自动调整
key_words: list, 关键字
"""
target_filename = ensure_docx_extension(target_filename)
source_filename = ensure_docx_extension(source_filename)
source_doc = Document(source_filename)
target_doc = Document(target_filename)
try:
# Copy all paragraphs
for paragraph in source_doc.paragraphs:
# Create a new paragraph with the same text and style
new_paragraph = target_doc.add_paragraph(paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = paragraph.alignment
# 复制段落分页属性
new_paragraph.paragraph_format.page_break_before = paragraph.paragraph_format.page_break_before
# Try to match the style if possible
try:
if paragraph.style and paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[paragraph.style.name]
except:
pass
# Copy run formatting
for i, run in enumerate(paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
# 复制分页符处理w:br标签
for element in run._element:
if element.tag.endswith('br'):
br_type = element.get(qn('type'), '')
if br_type == 'page':
new_br = OxmlElement('w:br')
new_br.set(qn('type'), 'page')
new_run._element.append(new_br)
except Exception as e:
print(f"添加表格前文章失败:{str(e)}")
try:# Copy all tables
from core.tables import copy_table
copy_table(source_doc.tables[0], target_doc, ifadjustheight, height)
except Exception as e:
print(f"添加表格失败:{str(e)}")
print(f"{target_doc}写入表格{source_doc.tables[0]}成功")
target_doc = set_document_para(target_doc)
target_doc.save(target_filename)
target_doc = Document(target_filename)
try:
target_doc = write_table(target_filename, rows, cols, table_num, data, ifadjustheight, height, key_words, ALIGMENT)
except Exception as e:
print(f"{target_filename}写入{data}失败:{str(e)}")
target_doc.save(target_filename)
return target_doc,f"{target_filename}添加表格{source_doc}成功"
async def add_table_and_replace(target_filename: str, source_filename: str, ifadjustheight: Optional[bool] = True, list_to_replace: dict = {}, height: Optional[float] = 1):
"""复制源文件中的文字与表格(先文字后表格格式)到目标文档
Args:
target_filename: 目标文档路径
source_doc: 源文档路径
ifadjustheight: bool为真则表格行高自动调整
list_to_replace: dict, 待替换内容和替换内容
"""
target_filename = ensure_docx_extension(target_filename)
source_filename = ensure_docx_extension(source_filename)
source_doc = Document(source_filename)
target_doc = Document(target_filename)
try:
# Copy all paragraphs
for paragraph in source_doc.paragraphs:
# Create a new paragraph with the same text and style
new_paragraph = target_doc.add_paragraph(paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = paragraph.alignment
# 复制段落分页属性
new_paragraph.paragraph_format.page_break_before = paragraph.paragraph_format.page_break_before
# Try to match the style if possible
try:
if paragraph.style and paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[paragraph.style.name]
except:
pass
# Copy run formatting
for i, run in enumerate(paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
# 复制分页符处理w:br标签
for element in run._element:
if element.tag.endswith('br'):
br_type = element.get(qn('type'), '')
if br_type == 'page':
new_br = OxmlElement('w:br')
new_br.set(qn('type'), 'page')
new_run._element.append(new_br)
except Exception as e:
print(f"添加表格前文章失败:{str(e)}")
try:# Copy all tables
from core.tables import copy_table
copy_table(source_doc.tables[0], target_doc, ifadjustheight, height)
target_doc.save(target_filename)
except Exception as e:
print(f"添加表格失败:{str(e)}")
for find_text, replace_text in list_to_replace.items():
print(await search_and_replace(target_filename, find_text, replace_text))
async def merge_documents(target_filename: str, source_filenames: List[str], add_page_breaks: bool = True) -> str:
"""合并文档(文本) 表格会添加到最后
Args:
target_filename: 合并后文档路径
source_filenames: 源文档路径列表
add_page_breaks: bool为真则每个源文档中间加入分页符
"""
from core.tables import copy_table
target_filename = ensure_docx_extension(target_filename)
# Check if target file is writeable
is_writeable, error_message = check_file_writeable(target_filename)
if not is_writeable:
return f"Cannot create target document: {error_message}"
# Validate all source documents exist
missing_files = []
for filename in source_filenames:
doc_filename = ensure_docx_extension(filename)
if not os.path.exists(doc_filename):
missing_files.append(doc_filename)
if missing_files:
return f"Cannot merge documents. The following source files do not exist: {', '.join(missing_files)}"
try:
# Create a new document for the merged result
target_doc = Document()
# Process each source document
for i, filename in enumerate(source_filenames):
doc_filename = ensure_docx_extension(filename)
source_doc = Document(doc_filename)
# Add page break between documents (except before the first one)
if add_page_breaks and i > 0:
target_doc.add_page_break()
# Copy all paragraphs
for paragraph in source_doc.paragraphs:
# Create a new paragraph with the same text and style
new_paragraph = target_doc.add_paragraph(paragraph.text)
new_paragraph.style = target_doc.styles['Normal'] # Default style
#获取合并等样式2025427
new_paragraph.alignment = paragraph.alignment
# Try to match the style if possible
try:
if paragraph.style and paragraph.style.name in target_doc.styles:
new_paragraph.style = target_doc.styles[paragraph.style.name]
except:
pass
# Copy run formatting
for i, run in enumerate(paragraph.runs):
if i < len(new_paragraph.runs):
new_run = new_paragraph.runs[i]
# Copy basic formatting
new_run.bold = run.bold
new_run.italic = run.italic
new_run.underline = run.underline
#添加同时合并字体2025427
new_run.font.name = run.font.name
rPr = new_run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
# 检查 run.font.name 是否为 None
if run.font.name is None:
# 设置默认的中文字体名称
run.font.name = '宋体(中文正文)' # 或者使用其他你喜欢的中文字体
rFonts.set(qn('w:eastAsia'), run.font.name)
new_run.font.color.rgb = run.font.color.rgb
# Font size if specified
if run.font.size:
new_run.font.size = run.font.size
# Copy all tables
for table in source_doc.tables:
copy_table(table, target_doc)
# Save the merged document
target_doc.save(target_filename)
return f"Successfully merged {len(source_filenames)} documents into {target_filename}"
except Exception as e:
return f"Failed to merge documents: {str(e)}"
async def right_align_last_three_para(target_filename: str) -> str:
"""右对齐最后三个段落
Args:
target_filename: 目标文档路径
"""
target_filename = ensure_docx_extension(target_filename)
# Check if target file is writeable
is_writeable, error_message = check_file_writeable(target_filename)
if not is_writeable:
return f"Cannot right align paragraphs: {error_message}"
try:
# Open the target document
target_doc = Document(target_filename)
# Get the last three paragraphs
paragraphs = target_doc.paragraphs[-3:]
# Set the alignment of each paragraph to right
from docx.enum.text import WD_ALIGN_PARAGRAPH
for paragraph in paragraphs:
paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT
# Save the modified document
target_doc.save(target_filename)
return f"Successfully right aligned the last three paragraphs in {target_filename}"
except Exception as e:
return f"Failed to right align paragraphs: {str(e)}"
async def process_images_table(data_dict, output_dir, start_i, JIANCHA_NEIRONG_PICTURES_TABLE, key_words = None):
"""添加对应表格且填写图片名与插入图片
Args:
data_dict (dict): dict内容图片:图片路径
output_dir (str): 输出路径
start_i (int): 总表格数量
JIANCHA_NEIRONG_PICTURES_TABLE (str): 二维表模板路径
Returns:
int: 最后使用的表格序号
"""
items = list(data_dict.items())
picture_num = len(items)
line_index = 0
picture_index = 0
i = start_i
for content_row in range(((picture_num + 2) // 3) * 2):
if content_row % 2 == 1:
# 文字行(从 items 取图片名)
JIANCHA_NEIRONG_TEXT = [["" for _ in range(3)] for _ in range(1)] # 1行3列
for k in range(1): # 只有1行
for l in range(3):
if line_index >= picture_num:
break
JIANCHA_NEIRONG_TEXT[k][l] = items[line_index][0] # 图片名
print(f'当前为文字表格,在({k},{l})位置插入文字: {items[line_index][0]}')
line_index += 1
print(f"当前待插入表格: {JIANCHA_NEIRONG_TEXT}")
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, JIANCHA_NEIRONG_TEXT, False, None, key_words
)
i += 1
else:
# 图片行(从 items 取图片路径)
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, None, False
)
for k in range(3):
if picture_index < picture_num:
pic_path = items[picture_index][1] # 图片路径
print(f"当前为图片表格,在(0,{k})位置插入图片: {pic_path}")
print(await add_picture_to_table(output_doc, output_dir, 0, k, pic_path, i, 1.8898))
picture_index += 1
i += 1
print(message)
return i # 返回最后使用的表格序号

182
tools/esay_docx_func.py Normal file
View File

@ -0,0 +1,182 @@
from content_tools import add_picture_to_table, search_and_replace
from get_pictures import resize_and_reduce_quality
from document_tools import add_table_to_document
def fill_tables(Y_table_list, row, col, Y_Table_num, Y):
"""根据前端返回json块填写表格list并实时跟进已填写表格数量
目前只支持固定的缺陷图的填写
Args:
Y_table_list (list): 前端返回的json块
row (int): 表格行数
col (int): 表格列数
Y_Table_num: json块中有几个表格
Xu_Hao: 是第几个json块
Y: 其他参数
Return:
Y1_TABLES: 三维表和对应元素
table_index_to_images: 字典表索引到图片路径列表的映射
Xu_Hao到达第几个表了
"""
table_index_to_images = {}
Y_TABLES = [[["" for _ in range(row)] for _ in range(col)] for _ in range(Y_Table_num)]
# 处理前端返回数据
for l, table_dict in enumerate(Y_table_list):
if table_dict:
Y_TABLES[l][1][0] = Y
Y_TABLES[l][1][1] = table_dict["QueXianLeiXing"]
Y_TABLES[l][1][2] = table_dict["QueXianWeiZhi"]
Y_TABLES[l][1][3] = table_dict["QueXianChiCun"]
Y_TABLES[l][3][0] = table_dict["WeiZongDengJi"]
Y_TABLES[l][3][1] = table_dict["visibility"]
Y_TABLES[l][3][2] = table_dict["urgency"]
Y_TABLES[l][3][3] = table_dict["repair_suggestion"]
# 获取图片路径
image_path = table_dict['Tupian_Dir']
if image_path:
# 确保路径是字符串形式
if isinstance(image_path, list):
table_index_to_images[l] = image_path.copy()
else:
table_index_to_images[l] = [str(image_path)]
return Y_TABLES, table_index_to_images
async def process_images_table(data_dict, output_dir, start_i, JIANCHA_NEIRONG_PICTURES_TABLE, key_words = None):
"""添加对应表格且填写图片名与插入图片
Args:
data_dict (dict): dict内容图片:图片路径
output_dir (str): 输出路径
start_i (int): 总表格数量
JIANCHA_NEIRONG_PICTURES_TABLE (str): 二维表模板路径
Returns:
int: 最后使用的表格序号
"""
items = list(data_dict.items())
picture_num = len(items)
line_index = 0
picture_index = 0
i = start_i
for content_row in range(((picture_num + 2) // 3) * 2):
if content_row % 2 == 1:
# 文字行(从 items 取图片名)
JIANCHA_NEIRONG_TEXT = [["" for _ in range(3)] for _ in range(1)] # 1行3列
for k in range(1): # 只有1行
for l in range(3):
if line_index >= picture_num:
break
JIANCHA_NEIRONG_TEXT[k][l] = items[line_index][0] # 图片名
print(f'当前为文字表格,在({k},{l})位置插入文字: {items[line_index][0]}')
line_index += 1
print(f"当前待插入表格: {JIANCHA_NEIRONG_TEXT}")
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, JIANCHA_NEIRONG_TEXT, False, None, key_words
)
i += 1
else:
# 图片行(从 items 取图片路径)
print(f"当前表格序号为 {i}")
output_doc, message = await add_table_to_document(
output_dir, JIANCHA_NEIRONG_PICTURES_TABLE, 1, 3, i, None, False
)
for k in range(3):
if picture_index < picture_num:
pic_path = items[picture_index][1] # 图片路径
print(f"当前为图片表格,在(0,{k})位置插入图片: {pic_path}")
print(await add_picture_to_table(output_doc, output_dir, 0, k, pic_path, i, 1.8898))
picture_index += 1
i += 1
print(message)
return i # 返回最后使用的表格序号
async def add_dynamic_table(output_doc, output_dir, table_num, TABLES, JIANCHA_XIANGQING_DIR, PICTURES, row, col, i, FLAG, xuhao):
"""创建动态表
Args:
output_doc (Document): 文档对象
output_dir (str): 输出目录
table_num (int): 表格序号
TABLES (list): 表格数据
JIANCHA_XIANGQING_DIR (str): 检查详情表目录
PICTURES (dict): 图片数据字典键为表索引值为图片路径列表
row (int): 行数
col (int): 列数
i (int): 表格序号
FLAG: 其他标志
Returns:
tuple: (i, table_num) 更新后的表格序号和表格数量
"""
for table_idx, Table in enumerate(TABLES):
print(Table)
output_doc, message = await add_table_to_document(output_dir, JIANCHA_XIANGQING_DIR, row, col, i, Table, FLAG)
print(message)
# 获取当前表格对应的图片
current_table_pictures = PICTURES.get(table_idx, [])
print(f"开始处理图片列表: {current_table_pictures}")
for picturedir in current_table_pictures:
try:
print(f"添加 {picturedir} {type(picturedir)}到表格{table_idx}")
resize_and_reduce_quality(picturedir, picturedir)
await add_picture_to_table(output_doc, output_dir, 4, 0, picturedir, i, 4.7232)
except Exception as e:
print(f"添加图片失败:{e}")
print(await search_and_replace(output_dir, 'tupian_xuhao', f'{xuhao}'))
table_num += 1
i += 1
xuhao += 1
return i, table_num, xuhao
def get_year_month(date):
"""根据格式化date字符串获取年月 'date': '二〇二一年十二月十日 9:00'
Args: date (str): 日期字符串
Returns: 年月字符串 '二〇二一年十二月'
"""
unit_map = {'1' : '', '2' : '', '3' : '', '4' : '', '5' : '', '6' : '', '7' : '', '8' : '', '9' : '', '0' : ''}
unit_map_month = {1 : '', 2 : '', 3 : '', 4 : '', 5 : '', 6 : '', 7 : '', 8 : '', 9 : '', 10 : '', 11 : '十一', 12 : '十二'}
year = date.split('')[0]
month = date.split('')[1].split('')[0]
year = ''.join([unit_map[i] for i in year])
month = unit_map_month[int(month)]
return f"{year}{month}"
def merge_info(frontend_info, default_info):
"""
合并前端传入的 info 和默认 info
规则如果前端传入的值为空None 或空字符串则使用默认值
Args:
frontend_info: 前端传入的字典
default_info: 默认的完整字典
Returns:
合并后的完整字典
"""
if not isinstance(frontend_info, dict) or frontend_info is None:
return default_info.copy()
merged_info = {}
for key, default_value in default_info.items():
# 获取前端传入的值
frontend_value = frontend_info.get(key)
# 判断前端值是否为空None 或空字符串)
if frontend_value is None or frontend_value == "":
merged_info[key] = default_value
else:
merged_info[key] = frontend_value
return merged_info

234
tools/get_pictures.py Normal file
View File

@ -0,0 +1,234 @@
import os
import math
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
def resize_and_reduce_quality(image_path, output_path, target_width = None):
try:
# 检查图片文件大小
if os.path.getsize(image_path) < 10 * 1024 * 1024: # 10MB
print("图片文件大小小于10MB不进行调整")
return image_path
# 打开图片
with Image.open(image_path) as img:
# 计算新的高度以保持宽高比
if target_width is None:
target_width = img.width
aspect_ratio = img.height / img.width
new_height = int(target_width * aspect_ratio)
# 调整图片大小
img_resized = img.resize((target_width, new_height), Image.LANCZOS)
# 降低图片质量
quality = 70 # 质量从1最差到95最好可以根据需要调整
img_resized.save(output_path, quality=quality)
return output_path
except Exception as e:
return f"调整图片大小和质量时出现问题: {str(e)}"
def get_picture_nums(source_path: str) -> int:
picture_count = 0
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
picture_count += 1
return picture_count
def collect_defect_data(
Y: str,
picture_dir: str,
search_file_list: list = [],
) -> tuple[int, dict]:
"""
收集指定年份的缺陷图片数据并根据布尔值决定是否扫描特定类型的缺陷图
Args:
Y: 叶片号 "Y1""Y2""Y3"
picture_dir: 图片根目录
search_file_list (list, optional): 要搜索的文件列表.规定为3个元素
Returns:
(缺陷图片总数, 缺陷图片文件名字典)
"""
total_num = 0
result_dict = {}
try:
for defect_type in search_file_list:
dir_path = os.path.join(picture_dir, Y, defect_type)
num, img_dict = get_picture_nums_and_image_with_name(dir_path)
total_num += num
result_dict.update(img_dict)
except Exception as e:
print(f"获取图片数据时出现问题: {str(e)},搜寻的目录:{dir_path}")
return total_num, result_dict
def get_picture_nums_and_image_with_name(source_path: str) -> tuple[int, dict]:
"""
获取指定目录下图片的数量并返回每个图片的路径和名称字典
Args:
source_path (str): 要搜索的目录路径
Returns:
tuple: 包含两个元素的元组
picture_count (int): 图片数量
image_with_name (dict): 图片路径和名称的字典格式为 {图片名称: 图片完整路径}
"""
picture_count = 0
image_with_name = {}
name_list = []
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
picture_count += 1
image_with_name[os.path.splitext(file)[0]] = os.path.join(root, file)
return picture_count, image_with_name
def find_image(directory, image_name):
"""
在指定目录中查找指定名称的图片文件
参数:
directory (str): 要搜索的目录路径
image_name (str): 要查找的图片文件名(可带扩展名或不带)
返回:
str: 找到的图片完整路径如果未找到则返回None
"""
# 支持的图片扩展名列表
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']
# 遍历目录中的所有文件
for root, dirs, files in os.walk(directory):
for file in files:
# 获取文件名和扩展名
filename, ext = os.path.splitext(file)
# 检查是否匹配图片名称(带或不带扩展名)
if (file.lower() == image_name.lower() or
filename.lower() == image_name.lower() and ext.lower() in image_extensions):
return os.path.join(root, file)
return None
async def make_Thumbnail(source_path: str, output_path: str, size: tuple = (436, 233)) -> str:
"""获取目录下所有图片,将所有图片合并制作略缩图并保存
Args:
source_path: 源目录
output_path: 输出目录
size: 合并后的略缩图总大小 (宽度, 高度)
"""
print("略缩图处理中")
try:
if not os.path.exists(output_path):
print(f"无输出目录,创建中,输出目录为:{output_path}")
os.makedirs(output_path)
except Exception as e:
print(f"输出目录有问题:{e}")
return ""
#如果存在merged_thumbnail.jpg文件则直接返回该文件路径
if os.path.exists(os.path.join(output_path,'merged_thumbnail.jpg')):
print(f"已有略缩图,不用处理, 目前如需重新生成,请去往{output_path}目录 删除 merged_thumbnail.jpg 图片")
"""
此处可预留接口询问用户是否重新生成一份略缩图
"""
return os.path.join(output_path,'merged_thumbnail.jpg')
print("目录中无略缩图,合并略缩图中")
# 获取源目录下所有的图片文件
try:
image_files = []
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')):
image_files.append(os.path.join(root, file))
except Exception as e:
print(f"递归获取图片失败,原因:{e}")
if not image_files:
print("源目录中没有找到图片文件")
return ""
# 计算每个缩略图的大小
num_images = len(image_files)
target_width, target_height = size
# 计算最佳的缩略图排列方式
# 先尝试计算每行可以放多少个缩略图
aspect_ratio = target_width / target_height
cols = math.ceil(math.sqrt(num_images * aspect_ratio))
rows = math.ceil(num_images / cols)
# 计算单个缩略图的大小
thumb_width = target_width // cols
thumb_height = target_height // rows
# 创建线程池处理图片
with ThreadPoolExecutor() as executor:
thumbnails = list(executor.map(
lambda file: create_thumbnail(file, (thumb_width, thumb_height)),
image_files
))
# 过滤掉 None 值
thumbnails = [thumb for thumb in thumbnails if thumb is not None]
if not thumbnails:
print("没有成功创建任何略缩图")
return ""
# 计算实际需要的行数和列数
actual_cols = min(len(thumbnails), cols)
actual_rows = math.ceil(len(thumbnails) / actual_cols)
# 创建合并后的图像
merged_image = Image.new('RGB', (actual_cols * thumb_width, actual_rows * thumb_height))
# 粘贴缩略图
for index, thumb in enumerate(thumbnails):
row = index // actual_cols
col = index % actual_cols
merged_image.paste(thumb, (col * thumb_width, row * thumb_height))
# 如果最终尺寸不完全匹配,调整大小
if merged_image.size != size:
merged_image = merged_image.resize(size, Image.LANCZOS)
# 保存合并后的略缩图
merged_thumbnail_path = os.path.join(output_path, 'merged_thumbnail.jpg')
merged_image.save(merged_thumbnail_path)
print(f"合并后的略缩图已保存到:{merged_thumbnail_path}")
return merged_thumbnail_path
def create_thumbnail(file_path: str, size: tuple) -> Image:
"""创建单个图片的略缩图
Args:
file_path: 图片文件路径
size: 缩略图大小
"""
try:
with Image.open(file_path) as img:
# 保持原始宽高比
img.thumbnail(size, Image.LANCZOS)
# 创建新图像确保尺寸一致
new_img = Image.new('RGB', size)
new_img.paste(img, ((size[0] - img.width) // 2, (size[1] - img.height) // 2))
return new_img
except Exception as e:
print(f"图片处理有问题:{e}")
return None

8
utils/__init__.py Normal file
View File

@ -0,0 +1,8 @@
"""
Utility functions for the Word Document Server.
This package contains utility modules for file operations and document handling.
"""
from utils.file_utils import check_file_writeable, create_document_copy, ensure_docx_extension
from utils.document_utils import get_document_properties, extract_document_text, get_document_structure, find_paragraph_by_text, find_and_replace_text

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

167
utils/document_utils.py Normal file
View File

@ -0,0 +1,167 @@
"""
Document utility functions for Word Document Server.
"""
import json
from typing import Dict, List, Any
from docx import Document
def get_document_properties(doc_path: str) -> Dict[str, Any]:
"""Get properties of a Word document."""
import os
if not os.path.exists(doc_path):
return {"error": f"Document {doc_path} does not exist"}
try:
doc = Document(doc_path)
core_props = doc.core_properties
return {
"title": core_props.title or "",
"author": core_props.author or "",
"subject": core_props.subject or "",
"keywords": core_props.keywords or "",
"created": str(core_props.created) if core_props.created else "",
"modified": str(core_props.modified) if core_props.modified else "",
"last_modified_by": core_props.last_modified_by or "",
"revision": core_props.revision or 0,
"page_count": len(doc.sections),
"word_count": sum(len(paragraph.text.split()) for paragraph in doc.paragraphs),
"paragraph_count": len(doc.paragraphs),
"table_count": len(doc.tables)
}
except Exception as e:
return {"error": f"Failed to get document properties: {str(e)}"}
def extract_document_text(doc_path: str) -> str:
"""Extract all text from a Word document."""
import os
if not os.path.exists(doc_path):
return f"Document {doc_path} does not exist"
try:
doc = Document(doc_path)
text = []
for paragraph in doc.paragraphs:
text.append(paragraph.text)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
text.append(paragraph.text)
return "\n".join(text)
except Exception as e:
return f"Failed to extract text: {str(e)}"
def get_document_structure(doc_path: str) -> Dict[str, Any]:
"""Get the structure of a Word document."""
import os
if not os.path.exists(doc_path):
return {"error": f"Document {doc_path} does not exist"}
try:
doc = Document(doc_path)
structure = {
"paragraphs": [],
"tables": []
}
# Get paragraphs
for i, para in enumerate(doc.paragraphs):
structure["paragraphs"].append({
"index": i,
"text": para.text[:100] + ("..." if len(para.text) > 100 else ""),
"style": para.style.name if para.style else "Normal"
})
# Get tables
for i, table in enumerate(doc.tables):
table_data = {
"index": i,
"rows": len(table.rows),
"columns": len(table.columns),
"preview": []
}
# Get sample of table data
max_rows = min(3, len(table.rows))
for row_idx in range(max_rows):
row_data = []
max_cols = min(3, len(table.columns))
for col_idx in range(max_cols):
try:
cell_text = table.cell(row_idx, col_idx).text
row_data.append(cell_text[:20] + ("..." if len(cell_text) > 20 else ""))
except IndexError:
row_data.append("N/A")
table_data["preview"].append(row_data)
structure["tables"].append(table_data)
return structure
except Exception as e:
return {"error": f"Failed to get document structure: {str(e)}"}
def find_paragraph_by_text(doc, text, partial_match=False):
"""
Find paragraphs containing specific text.
Args:
doc: Document object
text: Text to search for
partial_match: If True, matches paragraphs containing the text; if False, matches exact text
Returns:
List of paragraph indices that match the criteria
"""
matching_paragraphs = []
for i, para in enumerate(doc.paragraphs):
if partial_match and text in para.text:
matching_paragraphs.append(i)
elif not partial_match and para.text == text:
matching_paragraphs.append(i)
return matching_paragraphs
def find_and_replace_text(doc, old_text, new_text):
"""
Find and replace text throughout the document.
Args:
doc: Document object
old_text: Text to find
new_text: Text to replace with
Returns:
Number of replacements made
"""
count = 0
# Search in paragraphs
for para in doc.paragraphs:
if old_text in para.text:
for run in para.runs:
if old_text in run.text:
run.text = run.text.replace(old_text, new_text)
count += 1
# Search in tables
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
if old_text in para.text:
for run in para.runs:
if old_text in run.text:
run.text = run.text.replace(old_text, new_text)
count += 1
return count

View File

@ -0,0 +1,165 @@
"""
Extended document utilities for Word Document Server.
"""
from typing import Dict, List, Any, Tuple
from docx import Document
def get_paragraph_text(doc_path: str, paragraph_index: int) -> Dict[str, Any]:
"""
Get text from a specific paragraph in a Word document.
Args:
doc_path: Path to the Word document
paragraph_index: Index of the paragraph to extract (0-based)
Returns:
Dictionary with paragraph text and metadata
"""
import os
if not os.path.exists(doc_path):
return {"error": f"Document {doc_path} does not exist"}
try:
doc = Document(doc_path)
# Check if paragraph index is valid
if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs):
return {"error": f"Invalid paragraph index: {paragraph_index}. Document has {len(doc.paragraphs)} paragraphs."}
paragraph = doc.paragraphs[paragraph_index]
return {
"index": paragraph_index,
"text": paragraph.text,
"style": paragraph.style.name if paragraph.style else "Normal",
"is_heading": paragraph.style.name.startswith("Heading") if paragraph.style else False
}
except Exception as e:
return {"error": f"Failed to get paragraph text: {str(e)}"}
def find_text(doc_path: str, text_to_find: str, match_case: bool = True, whole_word: bool = False) -> Dict[str, Any]:
"""
Find all occurrences of specific text in a Word document.
Args:
doc_path: Path to the Word document
text_to_find: Text to search for
match_case: Whether to perform case-sensitive search
whole_word: Whether to match whole words only
Returns:
Dictionary with search results
"""
import os
if not os.path.exists(doc_path):
return {"error": f"Document {doc_path} does not exist"}
if not text_to_find:
return {"error": "Search text cannot be empty"}
try:
doc = Document(doc_path)
results = {
"query": text_to_find,
"match_case": match_case,
"whole_word": whole_word,
"occurrences": [],
"total_count": 0
}
# Search in paragraphs
for i, para in enumerate(doc.paragraphs):
# Prepare text for comparison
para_text = para.text
search_text = text_to_find
if not match_case:
para_text = para_text.lower()
search_text = search_text.lower()
# Find all occurrences (simple implementation)
start_pos = 0
while True:
if whole_word:
# For whole word search, we need to check word boundaries
words = para_text.split()
found = False
for word_idx, word in enumerate(words):
if (word == search_text or
(not match_case and word.lower() == search_text.lower())):
results["occurrences"].append({
"paragraph_index": i,
"position": word_idx,
"context": para.text[:100] + ("..." if len(para.text) > 100 else "")
})
results["total_count"] += 1
found = True
# Break after checking all words
break
else:
# For substring search
pos = para_text.find(search_text, start_pos)
if pos == -1:
break
results["occurrences"].append({
"paragraph_index": i,
"position": pos,
"context": para.text[:100] + ("..." if len(para.text) > 100 else "")
})
results["total_count"] += 1
start_pos = pos + len(search_text)
# Search in tables
for table_idx, table in enumerate(doc.tables):
for row_idx, row in enumerate(table.rows):
for col_idx, cell in enumerate(row.cells):
for para_idx, para in enumerate(cell.paragraphs):
# Prepare text for comparison
para_text = para.text
search_text = text_to_find
if not match_case:
para_text = para_text.lower()
search_text = search_text.lower()
# Find all occurrences (simple implementation)
start_pos = 0
while True:
if whole_word:
# For whole word search, check word boundaries
words = para_text.split()
found = False
for word_idx, word in enumerate(words):
if (word == search_text or
(not match_case and word.lower() == search_text.lower())):
results["occurrences"].append({
"location": f"Table {table_idx}, Row {row_idx}, Column {col_idx}",
"position": word_idx,
"context": para.text[:100] + ("..." if len(para.text) > 100 else "")
})
results["total_count"] += 1
found = True
# Break after checking all words
break
else:
# For substring search
pos = para_text.find(search_text, start_pos)
if pos == -1:
break
results["occurrences"].append({
"location": f"Table {table_idx}, Row {row_idx}, Column {col_idx}",
"position": pos,
"context": para.text[:100] + ("..." if len(para.text) > 100 else "")
})
results["total_count"] += 1
start_pos = pos + len(search_text)
return results
except Exception as e:
return {"error": f"Failed to search for text: {str(e)}"}

85
utils/file_utils.py Normal file
View File

@ -0,0 +1,85 @@
"""
File utility functions for Word Document Server.
"""
import os
from typing import Tuple, Optional
import shutil
def check_file_writeable(filepath: str) -> Tuple[bool, str]:
"""
Check if a file can be written to.
Args:
filepath: Path to the file
Returns:
Tuple of (is_writeable, error_message)
"""
# If file doesn't exist, check if directory is writeable
if not os.path.exists(filepath):
directory = os.path.dirname(filepath)
# If no directory is specified (empty string), use current directory
if directory == '':
directory = '.'
if not os.path.exists(directory):
return False, f"Directory {directory} does not exist"
if not os.access(directory, os.W_OK):
return False, f"Directory {directory} is not writeable"
return True, ""
# If file exists, check if it's writeable
if not os.access(filepath, os.W_OK):
return False, f"File {filepath} is not writeable (permission denied)"
# Try to open the file for writing to see if it's locked
try:
with open(filepath, 'a'):
pass
return True, ""
except IOError as e:
return False, f"File {filepath} is not writeable: {str(e)}"
except Exception as e:
return False, f"Unknown error checking file permissions: {str(e)}"
def create_document_copy(source_path: str, dest_path: Optional[str] = None) -> Tuple[bool, str, Optional[str]]:
"""
Create a copy of a document.
Args:
source_path: Path to the source document
dest_path: Optional path for the new document. If not provided, will use source_path + '_copy.docx'
Returns:
Tuple of (success, message, new_filepath)
"""
if not os.path.exists(source_path):
return False, f"Source document {source_path} does not exist", None
if not dest_path:
# Generate a new filename if not provided
base, ext = os.path.splitext(source_path)
dest_path = f"{base}_copy{ext}"
try:
# Simple file copy
shutil.copy2(source_path, dest_path)
return True, f"Document copied to {dest_path}", dest_path
except Exception as e:
return False, f"Failed to copy document: {str(e)}", None
def ensure_docx_extension(filename: str) -> str:
"""
Ensure filename has .docx extension.
Args:
filename: The filename to check
Returns:
Filename with .docx extension
"""
if not filename.endswith('.docx'):
return filename + '.docx'
return filename