Report_Generate_Server/tools/content_tools.py

694 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Content tools for Word Document Server.
These tools add various types of content to Word documents,
including headings, paragraphs, tables, images, and page breaks.
"""
import os
from typing import List, Optional, Dict, Any
from docx import Document
from docx.shared import Inches, Pt
from docx.oxml.shared import qn
from utils.file_utils import check_file_writeable, ensure_docx_extension
from utils.document_utils import find_and_replace_text
from core.styles import ensure_heading_style, ensure_table_style
def split_table_by_row_content(
doc_path: str,
output_path: str,
table_num: int = 0
) -> str:
"""
根据表格第二行第一列内容的行数对指定表格进行分行处理,
并将每列内容按相同行数分割,不足则重复
参数:
doc_path: 输入Word文档路径
output_path: 输出Word文档路径
table_num: 要处理的表格序号(从0开始)
"""
try:
from docx import Document
from docx.shared import Pt
from docx.oxml.shared import qn
from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL
# 打开文档
doc = Document(doc_path)
# 检查表格是否存在
if len(doc.tables) <= table_num:
return f"文档中不存在第{table_num+1}个表格"
# 获取指定表格
table = doc.tables[table_num]
# 获取表格行数和列数
row_count = len(table.rows)
col_count = len(table.columns)
# 如果表格行数小于2无法处理
if row_count < 2:
doc.save(output_path)
return "表格行数少于2行无法按照要求分行"
# 获取第二行第一列的文本内容
second_row_first_cell = table.cell(1, 0)
second_row_text = second_row_first_cell.text
# 计算第二行第一列文本的行数(按换行符分割)
lines_in_second_row = len(second_row_text.split('\n'))
# 如果行数为0设置为1至少分为1部分
split_count = max(1, lines_in_second_row)
print(f'原表格行数:{row_count},第二行第一列内容行数:{split_count},需要分割为:{split_count}部分')
# 创建新表格来替代原表格(分割后的表格)
# 新表格的行数 = 标题行(1) + 原数据行数 × 分割部分数
new_table = doc.add_table(rows=1 + (row_count-1)*split_count, cols=col_count)
# 设置表格样式
new_table.style = table.style
new_table.autofit = True
# 1. 处理标题行(第一行)保持不变
for col_idx in range(col_count):
orig_cell = table.cell(0, col_idx)
new_cell = new_table.cell(0, col_idx)
# 复制内容并设置格式
new_cell.text = orig_cell.text
if orig_cell.paragraphs:
# 设置格式
new_cell.paragraphs[0].runs[0].font.name = "Times New Roman"
new_cell.paragraphs[0].runs[0].font.size = Pt(10.5)
new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
new_cell.width = orig_cell.width
# 2. 处理数据行(从第二行开始)
for orig_row_idx in range(1, row_count): # 遍历原表格的每一行数据
for col_idx in range(col_count): # 遍历每一列
orig_cell = table.cell(orig_row_idx, col_idx)
cell_text = orig_cell.text
# 分割当前单元格内容
cell_lines = cell_text.split('\n')
cell_line_count = len(cell_lines)
# 如果内容行数不足分割数,则重复最后一行
if cell_line_count < split_count:
cell_lines += [cell_lines[-1]] * (split_count - cell_line_count)
# 在新表格中对应的位置写入分割后的内容
for part_idx in range(split_count):
# 计算新表格中的行位置
new_row_idx = 1 + (orig_row_idx-1)*split_count + part_idx
# 获取新单元格
new_cell = new_table.cell(new_row_idx, col_idx)
# 写入分割后的内容
line_text = cell_lines[part_idx] if part_idx < len(cell_lines) else cell_lines[-1]
new_cell.text = line_text
# 设置格式
if new_cell.paragraphs:
new_cell.paragraphs[0].runs[0].font.name = "Times New Roman"
new_cell.paragraphs[0].runs[0].font.size = Pt(10.5)
new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
# 复制单元格宽度
new_cell.width = orig_cell.width
# 删除原表格
table._element.getparent().remove(table._element)
# 保存文档
doc.save(output_path)
return f"{table_num+1}个表格已成功分行处理"
except Exception as e:
return f"处理表格时出错: {str(e)}"
def add_heading(filename: str, text: str, level: int = 1) -> str:
"""对文档增加标题
Args:
filename: 目标文档路径
text: 标题文本
level: 标题级别1为最高级
"""
filename = ensure_docx_extension(filename)
# Ensure level is converted to integer
try:
level = int(level)
except (ValueError, TypeError):
return "Invalid parameter: level must be an integer between 1 and 9"
# Validate level range
if level < 1 or level > 9:
return f"Invalid heading level: {level}. Level must be between 1 and 9."
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
# Ensure heading styles exist
ensure_heading_style(doc)
# Try to add heading with style
try:
heading = doc.add_heading(text, level=level)
doc.save(filename)
return f"Heading '{text}' (level {level}) added to {filename}"
except Exception as style_error:
print("style-based approach fails, use direct formatting")
# If style-based approach fails, use direct formatting
paragraph = doc.add_paragraph()
run = paragraph.add_run(text)
run.bold = True
rPr = run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
from docx.oxml.shared import qn
rFonts.set(qn('w:eastAsia'), '宋体(中文正文)')
# Adjust size based on heading level
if level == 1:
run.font.size = Pt(12)
elif level == 2:
run.font.size = Pt(14)
else:
run.font.size = Pt(12)
doc.save(filename)
return f"Heading '{text}' added to {filename} with direct formatting (style not available)"
except Exception as e:
return f"Failed to add heading: {str(e)}"
async def add_paragraph(filename: str, text: str, style: Optional[str] = None) -> str:
"""对文档添加一个段落(一行)
Args:
filename: 目标文档路径
text: 段落内容
style: 段落样式,可选
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
paragraph = doc.add_paragraph(text)
if style:
try:
paragraph.style = style
except KeyError:
# Style doesn't exist, use normal and report it
paragraph.style = doc.styles['Normal']
# Copy run formatting
# for i, run in enumerate(paragraph.runs):
# if i < len(paragraph.runs):
# new_run = paragraph.runs[i]
# # Copy basic formatting
# new_run.bold = run.bold
# new_run.italic = run.italic
# new_run.underline = run.underline
# #添加同时合并字体2025427
# new_run.font.name = run.font.name
# rPr = new_run.element.get_or_add_rPr()
# rFonts = rPr.get_or_add_rFonts()
# # 检查 run.font.name 是否为 None
# if run.font.name is None:
# # 设置默认的中文字体名称
# run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体
# rFonts.set(qn('w:eastAsia'), run.font.name)
# new_run.font.color.rgb = run.font.color.rgb
# # Font size if specified
# if run.font.size:
# new_run.font.size = run.font.size
doc.save(filename)
return f"Style '{style}' not found, paragraph added with default style to {filename}"
doc.save(filename)
return f"Paragraph added to {filename}"
except Exception as e:
return f"Failed to add paragraph: {str(e)}"
async def add_table(filename: str, rows: int, cols: int, data: Optional[List[List[str]]] = None) -> str:
"""对文档添加一个表格
Args:
filename: 目标文档路径
rows: 表格行数
cols: 表格列数
data: 二维数组列表,每一项为单元格内容,默认为空
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
table = doc.add_table(rows=rows, cols=cols)
# Try to set the table style
try:
table.style = 'Table Grid'
except KeyError:
# If style doesn't exist, add basic borders
pass
# Fill table with data if provided
if data:
for i, row_data in enumerate(data):
if i >= rows:
break
for j, cell_text in enumerate(row_data):
if j >= cols:
break
table.cell(i, j).text = str(cell_text)
doc.save(filename)
return f"Table ({rows}x{cols}) added to {filename}"
except Exception as e:
return f"Failed to add table: {str(e)}"
async def add_picture_to_table(
target_doc: Document,
target_filename: str,
row: int,
col: int,
image_path: str,
table_num: int = -1,
width: Optional[float] = None
) -> str:
"""修正版图片添加函数(解决图片不显示问题)"""
from PIL import Image
from io import BytesIO
import requests
is_url = image_path.startswith(("http://", "https://"))
image_bytes = None
try:
# 1. 获取图片数据
if is_url:
response = requests.get(image_path, timeout=30)
response.raise_for_status()
image_bytes = BytesIO(response.content)
else:
if not os.path.exists(image_path):
return f"Image not found: {image_path}"
with open(image_path, 'rb') as f:
image_bytes = BytesIO(f.read())
# 2. 准备图片数据(关键步骤)
img = Image.open(image_bytes)
final_bytes = BytesIO()
# 转换为Word兼容的最佳格式
if img.mode == 'RGBA':
img.save(final_bytes, format='PNG')
else:
img.save(final_bytes, format='JPEG', quality=85)
final_bytes.seek(0) # ⚠️ 必须重置指针!
# 3. 添加到文档
table = target_doc.tables[table_num]
cell = table.cell(row, col)
# 彻底清除单元格
for paragraph in cell.paragraphs:
paragraph.clear()
paragraph = cell.add_paragraph()
run = paragraph.add_run()
# 添加图片(带异常捕获)
try:
if width:
run.add_picture(final_bytes, width=Inches(width))
else:
run.add_picture(final_bytes)
except Exception:
final_bytes.seek(0) # 再次重置指针
if width:
run.add_picture(final_bytes, width=Inches(width))
else:
run.add_picture(final_bytes)
# 4. 设置对齐并保存
from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL
paragraph.paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
# 确保保存到新文件(避免内存文档与文件不同步)
temp_filename = target_filename.replace('.docx', '_temp.docx')
target_doc.save(temp_filename)
# 验证文档有效性
try:
Document(temp_filename) # 尝试读取
os.replace(temp_filename, target_filename)
except Exception:
os.remove(temp_filename)
return "Failed to generate valid document"
return "Picture added successfully"
except Exception as e:
return f"Error: {str(e)}"
import requests
from io import BytesIO
from PIL import Image
from docx.enum.text import WD_ALIGN_PARAGRAPH
def add_picture(filename: str, image_path: str, width: Optional[float] = None, height: Optional[float] = None, is_center: Optional[bool] = False) -> str:
"""添加一个图片到文档中(支持本地路径或 URL
Args:
filename: 文档路径
image_path: 图片路径(本地路径或 URL
width: 图片大小(英寸)
"""
filename = ensure_docx_extension(filename)
# 检查文档是否存在
if not os.path.exists(filename):
return f"Document {filename} does not exist"
abs_filename = os.path.abspath(filename)
is_url = image_path.startswith(("http://", "https://"))
try:
doc = Document(abs_filename)
para = doc.add_paragraph()
run = para.add_run()
# 处理 URL 图片
if is_url:
try:
response = requests.get(image_path, timeout=10)
response.raise_for_status() # 检查请求是否成功
image_bytes = BytesIO(response.content)
# 验证图片有效性(可选)
Image.open(image_bytes).verify()
image_bytes.seek(0) # 重置指针
# 添加到文档
if width:
run.add_picture(image_bytes, width=Inches(width), height=Inches(height))
else:
run.add_picture(image_bytes)
if is_center:
para.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.save(abs_filename)
return f"Picture from URL {image_path} added to {filename}"
except Exception as url_error:
return f"Failed to download/add URL image: {str(url_error)}"
# 处理本地图片
else:
abs_image_path = os.path.abspath(image_path)
if not os.path.exists(abs_image_path):
return f"Image file not found: {abs_image_path}"
# 检查文件大小和可读性(原逻辑)
try:
image_size = os.path.getsize(abs_image_path) / 1024
if image_size <= 0:
return f"Image file is empty: {abs_image_path}"
except Exception as size_error:
return f"Error checking image file: {str(size_error)}"
# 添加到文档
try:
if width:
run.add_picture(abs_image_path, width=Inches(width), height=Inches(height))
else:
run.add_picture(abs_image_path)
if is_center:
para.alignment = WD_ALIGN_PARAGRAPH.CENTER
doc.save(abs_filename)
return f"Picture {image_path} added to {filename}"
except Exception as inner_error:
return f"Failed to add picture: {str(inner_error)}"
except Exception as outer_error:
return f"Document processing error: {str(outer_error)}"
async def add_page_break(filename: str) -> str:
"""增加分页符
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
doc.add_page_break()
doc.save(filename)
return f"Page break added to {filename}."
except Exception as e:
return f"Failed to add page break: {str(e)}"
async def add_table_of_contents(filename: str, title: str = "Table of Contents", max_level: int = 3) -> str:
"""根据标题样式向Word文档添加目录。
参数:
filename: Word文档的路径
title: 可自行选择的一个标题
max_level: 要包含的最大标题级别1-9
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
# Ensure max_level is within valid range
max_level = max(1, min(max_level, 9))
doc = Document(filename)
# Collect headings and their positions
headings = []
for i, paragraph in enumerate(doc.paragraphs):
# Check if paragraph style is a heading
if paragraph.style and paragraph.style.name.startswith('Heading '):
try:
# Extract heading level from style name
level = int(paragraph.style.name.split(' ')[1])
if level <= max_level:
headings.append({
'level': level,
'text': paragraph.text,
'position': i
})
except (ValueError, IndexError):
# Skip if heading level can't be determined
pass
if not headings:
return f"No headings found in document {filename}. Table of contents not created."
# Create a new document with the TOC
toc_doc = Document()
# Add title
if title:
toc_doc.add_heading(title, level=1)
# Add TOC entries
for heading in headings:
# Indent based on level (using tab characters)
indent = ' ' * (heading['level'] - 1)
toc_doc.add_paragraph(f"{indent}{heading['text']}")
# Add page break
toc_doc.add_page_break()
# Get content from original document
for paragraph in doc.paragraphs:
p = toc_doc.add_paragraph(paragraph.text)
# Copy style if possible
try:
if paragraph.style:
p.style = paragraph.style.name
except:
pass
# Copy tables
for table in doc.tables:
# Create a new table with the same dimensions
new_table = toc_doc.add_table(rows=len(table.rows), cols=len(table.columns))
# Copy cell contents
for i, row in enumerate(table.rows):
for j, cell in enumerate(row.cells):
for paragraph in cell.paragraphs:
new_table.cell(i, j).text = paragraph.text
# Save the new document with TOC
toc_doc.save(filename)
return f"Table of contents with {len(headings)} entries added to {filename}"
except Exception as e:
return f"Failed to add table of contents: {str(e)}"
async def delete_paragraph(filename: str, paragraph_index: int) -> str:
"""通过行索引从文档中删除一段
Args:
filename: Path to the Word document
paragraph_index: 段落位置(第几行)
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
# Validate paragraph index
if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs):
return f"Invalid paragraph index. Document has {len(doc.paragraphs)} paragraphs (0-{len(doc.paragraphs)-1})."
# Delete the paragraph (by removing its content and setting it empty)
# Note: python-docx doesn't support true paragraph deletion, this is a workaround
paragraph = doc.paragraphs[paragraph_index]
p = paragraph._p
p.getparent().remove(p)
doc.save(filename)
return f"Paragraph at index {paragraph_index} deleted successfully."
except Exception as e:
return f"Failed to delete paragraph: {str(e)}"
def search_and_replace(filename: str, find_text: str, replace_text: str) -> str:
"""替换所有find_text为replace_text
Args:
filename: Path to the Word document
find_text: Text to search for
replace_text: Text to replace with
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
# Perform find and replace
count = find_and_replace_text(doc, find_text, replace_text)
if count > 0:
doc.save(filename)
return f"Replaced {count} occurrence(s) of '{find_text}' with '{replace_text}'."
else:
return f"No occurrences of '{find_text}' found."
except Exception as e:
return f"Failed to search and replace: {str(e)}"
def add_jf_picture_table(
typical_picture_dict : dict,
defect_picture_dict : dict,
TYPICAL_MUBAN_DIR : str,
DEFECT_MUBAN_DIR : str,
output_dir : str,
):
"""添加金风版本的图片展示表格
逻辑:
典型图模板是三行五列的表格。一列对应一张典型图图片信息。
缺陷图模板是二行五列的表格。一列对应一张缺陷图图片信息。
1.每次循环添加一次典型图片。
第一行为dict的key
第二行为图片如有缺陷则不是图片url而是字符串 损伤n处详见下表。此时要有变量记录总损伤数。
第三行看情况,如果正常,则保持默认,如果有缺陷,则字段为: 损伤类型n处。
2.如果上一次循环没有缺陷图则回到1否则前往3。
3.如果上一次循环中,有缺陷图存在,则进入对应的缺陷图添加模式,使用缺陷图表格模板。
根据上一行每列的总缺陷数遍历缺陷字典。
当缺陷数大于5时也要调用模板进行下一行的添加。
缺陷图表格使用缺陷图模板。第一行为defect_picture_dict的key第二行为图片url第三行为损伤类型。
Args:
typical_picture_dict: #有两种情况
{
str(图片描述) : str(图片地址), #情况一
str(损伤有n处见下表) : str({缺陷类型}n处), #情况二
...
}
defect_picture_dict: #只有一种情况
{
str(图片描述) : str(图片地址),
}
TYPICAL_MUBAN_DIR: 典型图模板路径
DEFECT_MUBAN_DIR: 缺陷图模板路径
"""