Report_Generate_Server/tools/content_tools.py

638 lines
24 KiB
Python
Raw Normal View History

"""
Content tools for Word Document Server.
These tools add various types of content to Word documents,
including headings, paragraphs, tables, images, and page breaks.
"""
import os
from typing import List, Optional, Dict, Any
from docx import Document
from docx.shared import Inches, Pt
from docx.oxml.shared import qn
from utils.file_utils import check_file_writeable, ensure_docx_extension
from utils.document_utils import find_and_replace_text
from core.styles import ensure_heading_style, ensure_table_style
def split_table_by_row_content(
doc_path: str,
output_path: str,
table_num: int = 0
) -> str:
"""
根据表格第二行第一列内容的行数对指定表格进行分行处理
并将每列内容按相同行数分割不足则重复
参数:
doc_path: 输入Word文档路径
output_path: 输出Word文档路径
table_num: 要处理的表格序号(从0开始)
"""
try:
from docx import Document
from docx.shared import Pt
from docx.oxml.shared import qn
from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL
# 打开文档
doc = Document(doc_path)
# 检查表格是否存在
if len(doc.tables) <= table_num:
return f"文档中不存在第{table_num+1}个表格"
# 获取指定表格
table = doc.tables[table_num]
# 获取表格行数和列数
row_count = len(table.rows)
col_count = len(table.columns)
# 如果表格行数小于2无法处理
if row_count < 2:
doc.save(output_path)
return "表格行数少于2行无法按照要求分行"
# 获取第二行第一列的文本内容
second_row_first_cell = table.cell(1, 0)
second_row_text = second_row_first_cell.text
# 计算第二行第一列文本的行数(按换行符分割)
lines_in_second_row = len(second_row_text.split('\n'))
# 如果行数为0设置为1至少分为1部分
split_count = max(1, lines_in_second_row)
print(f'原表格行数:{row_count},第二行第一列内容行数:{split_count},需要分割为:{split_count}部分')
# 创建新表格来替代原表格(分割后的表格)
# 新表格的行数 = 标题行(1) + 原数据行数 × 分割部分数
new_table = doc.add_table(rows=1 + (row_count-1)*split_count, cols=col_count)
# 设置表格样式
new_table.style = table.style
new_table.autofit = True
# 1. 处理标题行(第一行)保持不变
for col_idx in range(col_count):
orig_cell = table.cell(0, col_idx)
new_cell = new_table.cell(0, col_idx)
# 复制内容并设置格式
new_cell.text = orig_cell.text
if orig_cell.paragraphs:
# 设置格式
new_cell.paragraphs[0].runs[0].font.name = "Times New Roman"
new_cell.paragraphs[0].runs[0].font.size = Pt(10.5)
new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
new_cell.width = orig_cell.width
# 2. 处理数据行(从第二行开始)
for orig_row_idx in range(1, row_count): # 遍历原表格的每一行数据
for col_idx in range(col_count): # 遍历每一列
orig_cell = table.cell(orig_row_idx, col_idx)
cell_text = orig_cell.text
# 分割当前单元格内容
cell_lines = cell_text.split('\n')
cell_line_count = len(cell_lines)
# 如果内容行数不足分割数,则重复最后一行
if cell_line_count < split_count:
cell_lines += [cell_lines[-1]] * (split_count - cell_line_count)
# 在新表格中对应的位置写入分割后的内容
for part_idx in range(split_count):
# 计算新表格中的行位置
new_row_idx = 1 + (orig_row_idx-1)*split_count + part_idx
# 获取新单元格
new_cell = new_table.cell(new_row_idx, col_idx)
# 写入分割后的内容
line_text = cell_lines[part_idx] if part_idx < len(cell_lines) else cell_lines[-1]
new_cell.text = line_text
# 设置格式
if new_cell.paragraphs:
new_cell.paragraphs[0].runs[0].font.name = "Times New Roman"
new_cell.paragraphs[0].runs[0].font.size = Pt(10.5)
new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋')
new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
# 复制单元格宽度
new_cell.width = orig_cell.width
# 删除原表格
table._element.getparent().remove(table._element)
# 保存文档
doc.save(output_path)
return f"{table_num+1}个表格已成功分行处理"
except Exception as e:
return f"处理表格时出错: {str(e)}"
async def add_heading(filename: str, text: str, level: int = 1) -> str:
"""对文档增加标题
Args:
filename: 目标文档路径
text: 标题文本
level: 标题级别1为最高级
"""
filename = ensure_docx_extension(filename)
# Ensure level is converted to integer
try:
level = int(level)
except (ValueError, TypeError):
return "Invalid parameter: level must be an integer between 1 and 9"
# Validate level range
if level < 1 or level > 9:
return f"Invalid heading level: {level}. Level must be between 1 and 9."
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
# Ensure heading styles exist
ensure_heading_style(doc)
# Try to add heading with style
try:
heading = doc.add_heading(text, level=level)
doc.save(filename)
return f"Heading '{text}' (level {level}) added to {filename}"
except Exception as style_error:
# If style-based approach fails, use direct formatting
paragraph = doc.add_paragraph(text)
paragraph.style = doc.styles['Normal']
run = paragraph.runs[0]
run.bold = True
rPr = run.element.get_or_add_rPr()
rFonts = rPr.get_or_add_rFonts()
from docx.oxml.shared import qn
rFonts.set(qn('w:eastAsia'), '宋体(中文正文)')
# Adjust size based on heading level
if level == 1:
run.font.size = Pt(12)
elif level == 2:
run.font.size = Pt(14)
else:
run.font.size = Pt(12)
doc.save(filename)
return f"Heading '{text}' added to {filename} with direct formatting (style not available)"
except Exception as e:
return f"Failed to add heading: {str(e)}"
async def add_paragraph(filename: str, text: str, style: Optional[str] = None) -> str:
"""对文档添加一个段落(一行)
Args:
filename: 目标文档路径
text: 段落内容
style: 段落样式可选
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
paragraph = doc.add_paragraph(text)
if style:
try:
paragraph.style = style
except KeyError:
# Style doesn't exist, use normal and report it
paragraph.style = doc.styles['Normal']
# Copy run formatting
# for i, run in enumerate(paragraph.runs):
# if i < len(paragraph.runs):
# new_run = paragraph.runs[i]
# # Copy basic formatting
# new_run.bold = run.bold
# new_run.italic = run.italic
# new_run.underline = run.underline
# #添加同时合并字体2025427
# new_run.font.name = run.font.name
# rPr = new_run.element.get_or_add_rPr()
# rFonts = rPr.get_or_add_rFonts()
# # 检查 run.font.name 是否为 None
# if run.font.name is None:
# # 设置默认的中文字体名称
# run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体
# rFonts.set(qn('w:eastAsia'), run.font.name)
# new_run.font.color.rgb = run.font.color.rgb
# # Font size if specified
# if run.font.size:
# new_run.font.size = run.font.size
doc.save(filename)
return f"Style '{style}' not found, paragraph added with default style to {filename}"
doc.save(filename)
return f"Paragraph added to {filename}"
except Exception as e:
return f"Failed to add paragraph: {str(e)}"
async def add_table(filename: str, rows: int, cols: int, data: Optional[List[List[str]]] = None) -> str:
"""对文档添加一个表格
Args:
filename: 目标文档路径
rows: 表格行数
cols: 表格列数
data: 二维数组列表每一项为单元格内容默认为空
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
# Suggest creating a copy
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(filename)
table = doc.add_table(rows=rows, cols=cols)
# Try to set the table style
try:
table.style = 'Table Grid'
except KeyError:
# If style doesn't exist, add basic borders
pass
# Fill table with data if provided
if data:
for i, row_data in enumerate(data):
if i >= rows:
break
for j, cell_text in enumerate(row_data):
if j >= cols:
break
table.cell(i, j).text = str(cell_text)
doc.save(filename)
return f"Table ({rows}x{cols}) added to {filename}"
except Exception as e:
return f"Failed to add table: {str(e)}"
async def add_picture_to_table(target_doc: Document, target_filename: str, row: int, col: int, image_path: str,table_num: int = -1, width: Optional[float] = None) -> str:
"""向文档中对应表格添加图片
Args:
target_doc: 目标文档
target_filename: 目标文档保存路径
row: 表格行数
col: 表格列数
image_path: 图片路径
table_num: 表格序号默认为-1即最后一个表格
width: 图片宽度默认为None表示使用原始图片大小
"""
from PIL import Image
if not os.path.exists(image_path):
return f"Image file not found: {image_path}"
# Check image file size
try:
image_size = os.path.getsize(image_path) / 1024 # Size in KB
if image_size <= 0:
return f"Image file appears to be empty: {image_path} (0 KB)"
elif image_size > 9126:
# Create the output directory if it doesn't exist
output_dir = os.path.join(os.path.dirname(image_path), "压缩图片")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Define the output path for the compressed image
image_name = os.path.basename(image_path)
output_path = os.path.join(output_dir, image_name)
# Compress the image
while image_size > 9126:
print(f"压缩图片:{image_path} ({image_size:.2f} KB) -> {output_path} (9126 KB)")
with Image.open(image_path) as img:
img.save(output_path, optimize=True, quality=85)
image_size = os.path.getsize(output_path) / 1024 # Size in KB
# Update the image path to the compressed image path
image_path = output_path
except Exception as size_error:
return f"Error checking image file: {str(size_error)}"
try:
table = target_doc.tables[table_num]
# Add the picture to the cell
cell = table.cell(row, col)
if len(cell.text) == 1: cell.text = ""
paragraph = cell.paragraphs[-1]
run = paragraph.add_run()
try:
if width:
run.add_picture(image_path, width=Inches(width))
else:
run.add_picture(image_path)
except Exception as e:
# 如果添加图片时出现问题尝试将图片转换为PNG格式
try:
print(f"正常添加失败,尝试转换图片后添加:{image_path}")
# 打开图片
img = Image.open(image_path)
# 转换为PNG格式
temp_image_path = os.path.splitext(image_path)[0] + '.png'
img.save(temp_image_path, 'PNG')
# 尝试添加转换后的图片
if width:
run.add_picture(temp_image_path, width=Inches(width))
else:
run.add_picture(temp_image_path)
# 添加完成后删除转换后的图片
os.remove(temp_image_path)
except Exception as e:
# 如果转换或添加转换后的图片时出现问题,返回错误信息
return f"调用add_picture函数出现问题: {str(e)}"
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.enum.table import WD_ALIGN_VERTICAL
cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER
cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
target_doc.save(target_filename)
return f"Picture {image_path} added to table {table_num} cell ({row},{col})"
except Exception as e:
return f"Failed to add picture to table: {str(e)}"
async def add_picture(filename: str, image_path: str, width: Optional[float] = None) -> str:
"""添加一个图片到文档中
Args:
filename: 文档路径
image_path: 图片路径
width: 图片大小
"""
filename = ensure_docx_extension(filename)
# Validate document existence
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Get absolute paths for better diagnostics
abs_filename = os.path.abspath(filename)
abs_image_path = os.path.abspath(image_path)
# Validate image existence with improved error message
if not os.path.exists(abs_image_path):
return f"Image file not found: {abs_image_path}"
# Check image file size
try:
image_size = os.path.getsize(abs_image_path) / 1024 # Size in KB
if image_size <= 0:
return f"Image file appears to be empty: {abs_image_path} (0 KB)"
except Exception as size_error:
return f"Error checking image file: {str(size_error)}"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(abs_filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document."
try:
doc = Document(abs_filename)
# Additional diagnostic info
diagnostic = f"Attempting to add image ({abs_image_path}, {image_size:.2f} KB) to document ({abs_filename})"
try:
if width:
doc.add_picture(abs_image_path, width=Inches(width))
else:
doc.add_picture(abs_image_path)
doc.save(abs_filename)
return f"Picture {image_path} added to {filename}"
except Exception as inner_error:
# More detailed error for the specific operation
error_type = type(inner_error).__name__
error_msg = str(inner_error)
return f"Failed to add picture: {error_type} - {error_msg or 'No error details available'}\nDiagnostic info: {diagnostic}"
except Exception as outer_error:
# Fallback error handling
error_type = type(outer_error).__name__
error_msg = str(outer_error)
return f"Document processing error: {error_type} - {error_msg or 'No error details available'}"
async def add_page_break(filename: str) -> str:
"""增加分页符
Args:
filename: 目标文档
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
doc.add_page_break()
doc.save(filename)
return f"Page break added to {filename}."
except Exception as e:
return f"Failed to add page break: {str(e)}"
async def add_table_of_contents(filename: str, title: str = "Table of Contents", max_level: int = 3) -> str:
"""根据标题样式向Word文档添加目录。
参数:
filename: Word文档的路径
title: 可自行选择的一个标题
max_level: 要包含的最大标题级别1-9
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
# Ensure max_level is within valid range
max_level = max(1, min(max_level, 9))
doc = Document(filename)
# Collect headings and their positions
headings = []
for i, paragraph in enumerate(doc.paragraphs):
# Check if paragraph style is a heading
if paragraph.style and paragraph.style.name.startswith('Heading '):
try:
# Extract heading level from style name
level = int(paragraph.style.name.split(' ')[1])
if level <= max_level:
headings.append({
'level': level,
'text': paragraph.text,
'position': i
})
except (ValueError, IndexError):
# Skip if heading level can't be determined
pass
if not headings:
return f"No headings found in document {filename}. Table of contents not created."
# Create a new document with the TOC
toc_doc = Document()
# Add title
if title:
toc_doc.add_heading(title, level=1)
# Add TOC entries
for heading in headings:
# Indent based on level (using tab characters)
indent = ' ' * (heading['level'] - 1)
toc_doc.add_paragraph(f"{indent}{heading['text']}")
# Add page break
toc_doc.add_page_break()
# Get content from original document
for paragraph in doc.paragraphs:
p = toc_doc.add_paragraph(paragraph.text)
# Copy style if possible
try:
if paragraph.style:
p.style = paragraph.style.name
except:
pass
# Copy tables
for table in doc.tables:
# Create a new table with the same dimensions
new_table = toc_doc.add_table(rows=len(table.rows), cols=len(table.columns))
# Copy cell contents
for i, row in enumerate(table.rows):
for j, cell in enumerate(row.cells):
for paragraph in cell.paragraphs:
new_table.cell(i, j).text = paragraph.text
# Save the new document with TOC
toc_doc.save(filename)
return f"Table of contents with {len(headings)} entries added to {filename}"
except Exception as e:
return f"Failed to add table of contents: {str(e)}"
async def delete_paragraph(filename: str, paragraph_index: int) -> str:
"""通过行索引从文档中删除一段
Args:
filename: Path to the Word document
paragraph_index: 段落位置第几行
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
# Validate paragraph index
if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs):
return f"Invalid paragraph index. Document has {len(doc.paragraphs)} paragraphs (0-{len(doc.paragraphs)-1})."
# Delete the paragraph (by removing its content and setting it empty)
# Note: python-docx doesn't support true paragraph deletion, this is a workaround
paragraph = doc.paragraphs[paragraph_index]
p = paragraph._p
p.getparent().remove(p)
doc.save(filename)
return f"Paragraph at index {paragraph_index} deleted successfully."
except Exception as e:
return f"Failed to delete paragraph: {str(e)}"
async def search_and_replace(filename: str, find_text: str, replace_text: str) -> str:
"""替换所有find_text为replace_text
Args:
filename: Path to the Word document
find_text: Text to search for
replace_text: Text to replace with
"""
filename = ensure_docx_extension(filename)
if not os.path.exists(filename):
return f"Document {filename} does not exist"
# Check if file is writeable
is_writeable, error_message = check_file_writeable(filename)
if not is_writeable:
return f"Cannot modify document: {error_message}. Consider creating a copy first."
try:
doc = Document(filename)
# Perform find and replace
count = find_and_replace_text(doc, find_text, replace_text)
if count > 0:
doc.save(filename)
return f"Replaced {count} occurrence(s) of '{find_text}' with '{replace_text}'."
else:
return f"No occurrences of '{find_text}' found."
except Exception as e:
return f"Failed to search and replace: {str(e)}"