""" Content tools for Word Document Server. These tools add various types of content to Word documents, including headings, paragraphs, tables, images, and page breaks. """ import os from typing import List, Optional, Dict, Any from docx import Document from docx.shared import Inches, Pt from docx.oxml.shared import qn from utils.file_utils import check_file_writeable, ensure_docx_extension from utils.document_utils import find_and_replace_text from core.styles import ensure_heading_style, ensure_table_style def split_table_by_row_content( doc_path: str, output_path: str, table_num: int = 0 ) -> str: """ 根据表格第二行第一列内容的行数对指定表格进行分行处理, 并将每列内容按相同行数分割,不足则重复 参数: doc_path: 输入Word文档路径 output_path: 输出Word文档路径 table_num: 要处理的表格序号(从0开始) """ try: from docx import Document from docx.shared import Pt from docx.oxml.shared import qn from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL # 打开文档 doc = Document(doc_path) # 检查表格是否存在 if len(doc.tables) <= table_num: return f"文档中不存在第{table_num+1}个表格" # 获取指定表格 table = doc.tables[table_num] # 获取表格行数和列数 row_count = len(table.rows) col_count = len(table.columns) # 如果表格行数小于2,无法处理 if row_count < 2: doc.save(output_path) return "表格行数少于2行,无法按照要求分行" # 获取第二行第一列的文本内容 second_row_first_cell = table.cell(1, 0) second_row_text = second_row_first_cell.text # 计算第二行第一列文本的行数(按换行符分割) lines_in_second_row = len(second_row_text.split('\n')) # 如果行数为0,设置为1(至少分为1部分) split_count = max(1, lines_in_second_row) print(f'原表格行数:{row_count},第二行第一列内容行数:{split_count},需要分割为:{split_count}部分') # 创建新表格来替代原表格(分割后的表格) # 新表格的行数 = 标题行(1) + 原数据行数 × 分割部分数 new_table = doc.add_table(rows=1 + (row_count-1)*split_count, cols=col_count) # 设置表格样式 new_table.style = table.style new_table.autofit = True # 1. 处理标题行(第一行)保持不变 for col_idx in range(col_count): orig_cell = table.cell(0, col_idx) new_cell = new_table.cell(0, col_idx) # 复制内容并设置格式 new_cell.text = orig_cell.text if orig_cell.paragraphs: # 设置格式 new_cell.paragraphs[0].runs[0].font.name = "Times New Roman" new_cell.paragraphs[0].runs[0].font.size = Pt(10.5) new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER new_cell.width = orig_cell.width # 2. 处理数据行(从第二行开始) for orig_row_idx in range(1, row_count): # 遍历原表格的每一行数据 for col_idx in range(col_count): # 遍历每一列 orig_cell = table.cell(orig_row_idx, col_idx) cell_text = orig_cell.text # 分割当前单元格内容 cell_lines = cell_text.split('\n') cell_line_count = len(cell_lines) # 如果内容行数不足分割数,则重复最后一行 if cell_line_count < split_count: cell_lines += [cell_lines[-1]] * (split_count - cell_line_count) # 在新表格中对应的位置写入分割后的内容 for part_idx in range(split_count): # 计算新表格中的行位置 new_row_idx = 1 + (orig_row_idx-1)*split_count + part_idx # 获取新单元格 new_cell = new_table.cell(new_row_idx, col_idx) # 写入分割后的内容 line_text = cell_lines[part_idx] if part_idx < len(cell_lines) else cell_lines[-1] new_cell.text = line_text # 设置格式 if new_cell.paragraphs: new_cell.paragraphs[0].runs[0].font.name = "Times New Roman" new_cell.paragraphs[0].runs[0].font.size = Pt(10.5) new_cell.paragraphs[0].runs[0]._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') new_cell.paragraphs[0].paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER new_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER # 复制单元格宽度 new_cell.width = orig_cell.width # 删除原表格 table._element.getparent().remove(table._element) # 保存文档 doc.save(output_path) return f"第{table_num+1}个表格已成功分行处理" except Exception as e: return f"处理表格时出错: {str(e)}" def add_heading(filename: str, text: str, level: int = 1) -> str: """对文档增加标题 Args: filename: 目标文档路径 text: 标题文本 level: 标题级别,1为最高级 """ filename = ensure_docx_extension(filename) # Ensure level is converted to integer try: level = int(level) except (ValueError, TypeError): return "Invalid parameter: level must be an integer between 1 and 9" # Validate level range if level < 1 or level > 9: return f"Invalid heading level: {level}. Level must be between 1 and 9." if not os.path.exists(filename): return f"Document {filename} does not exist" # Check if file is writeable is_writeable, error_message = check_file_writeable(filename) if not is_writeable: # Suggest creating a copy return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document." try: doc = Document(filename) # Ensure heading styles exist ensure_heading_style(doc) # Try to add heading with style try: heading = doc.add_heading(text, level=level) doc.save(filename) return f"Heading '{text}' (level {level}) added to {filename}" except Exception as style_error: print("style-based approach fails, use direct formatting") # If style-based approach fails, use direct formatting paragraph = doc.add_paragraph() run = paragraph.add_run(text) run.bold = True rPr = run.element.get_or_add_rPr() rFonts = rPr.get_or_add_rFonts() from docx.oxml.shared import qn rFonts.set(qn('w:eastAsia'), '宋体(中文正文)') # Adjust size based on heading level if level == 1: run.font.size = Pt(12) elif level == 2: run.font.size = Pt(14) else: run.font.size = Pt(12) doc.save(filename) return f"Heading '{text}' added to {filename} with direct formatting (style not available)" except Exception as e: return f"Failed to add heading: {str(e)}" async def add_paragraph(filename: str, text: str, style: Optional[str] = None) -> str: """对文档添加一个段落(一行) Args: filename: 目标文档路径 text: 段落内容 style: 段落样式,可选 """ filename = ensure_docx_extension(filename) if not os.path.exists(filename): return f"Document {filename} does not exist" # Check if file is writeable is_writeable, error_message = check_file_writeable(filename) if not is_writeable: # Suggest creating a copy return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document." try: doc = Document(filename) paragraph = doc.add_paragraph(text) if style: try: paragraph.style = style except KeyError: # Style doesn't exist, use normal and report it paragraph.style = doc.styles['Normal'] # Copy run formatting # for i, run in enumerate(paragraph.runs): # if i < len(paragraph.runs): # new_run = paragraph.runs[i] # # Copy basic formatting # new_run.bold = run.bold # new_run.italic = run.italic # new_run.underline = run.underline # #添加同时合并字体2025427 # new_run.font.name = run.font.name # rPr = new_run.element.get_or_add_rPr() # rFonts = rPr.get_or_add_rFonts() # # 检查 run.font.name 是否为 None # if run.font.name is None: # # 设置默认的中文字体名称 # run.font.name = '宋体 (中文正文)' # 或者使用其他你喜欢的中文字体 # rFonts.set(qn('w:eastAsia'), run.font.name) # new_run.font.color.rgb = run.font.color.rgb # # Font size if specified # if run.font.size: # new_run.font.size = run.font.size doc.save(filename) return f"Style '{style}' not found, paragraph added with default style to {filename}" doc.save(filename) return f"Paragraph added to {filename}" except Exception as e: return f"Failed to add paragraph: {str(e)}" async def add_table(filename: str, rows: int, cols: int, data: Optional[List[List[str]]] = None) -> str: """对文档添加一个表格 Args: filename: 目标文档路径 rows: 表格行数 cols: 表格列数 data: 二维数组列表,每一项为单元格内容,默认为空 """ filename = ensure_docx_extension(filename) if not os.path.exists(filename): return f"Document {filename} does not exist" # Check if file is writeable is_writeable, error_message = check_file_writeable(filename) if not is_writeable: # Suggest creating a copy return f"Cannot modify document: {error_message}. Consider creating a copy first or creating a new document." try: doc = Document(filename) table = doc.add_table(rows=rows, cols=cols) # Try to set the table style try: table.style = 'Table Grid' except KeyError: # If style doesn't exist, add basic borders pass # Fill table with data if provided if data: for i, row_data in enumerate(data): if i >= rows: break for j, cell_text in enumerate(row_data): if j >= cols: break table.cell(i, j).text = str(cell_text) doc.save(filename) return f"Table ({rows}x{cols}) added to {filename}" except Exception as e: return f"Failed to add table: {str(e)}" def add_picture_to_table( target_doc: Document, target_filename: str, row: int, col: int, image_path: str, table_num: int = -1, width: Optional[float] = None, height: Optional[float] = None ) -> str: from PIL import Image from io import BytesIO import requests is_url = image_path.startswith(("http://", "https://")) image_bytes = None try: # 1. 获取图片数据 if is_url: response = requests.get(image_path, timeout=30) response.raise_for_status() image_bytes = BytesIO(response.content) else: if not os.path.exists(image_path): return f"Image not found: {image_path}" with open(image_path, 'rb') as f: image_bytes = BytesIO(f.read()) # 2. 准备图片数据(关键步骤) img = Image.open(image_bytes) final_bytes = BytesIO() # 转换为Word兼容的最佳格式 if img.mode == 'RGBA': img.save(final_bytes, format='PNG') else: img.save(final_bytes, format='JPEG', quality=85) final_bytes.seek(0) # ⚠️ 必须重置指针! # 3. 添加到文档 table = target_doc.tables[table_num] cell = table.cell(row, col) # 彻底清除单元格 for paragraph in cell.paragraphs: paragraph.clear() paragraph = cell.add_paragraph() run = paragraph.add_run() # 添加图片(带异常捕获) try: if width: try: run.add_picture(final_bytes, width=Inches(width), height=Inches(height)) except: run.add_picture(final_bytes, width=Inches(width)) else: run.add_picture(final_bytes) except Exception: final_bytes.seek(0) # 再次重置指针 if width: run.add_picture(final_bytes, width=Inches(width), height=Inches(height)) else: run.add_picture(final_bytes) # 4. 设置对齐并保存 from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL paragraph.paragraph_format.alignment = WD_TABLE_ALIGNMENT.CENTER cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER # 确保保存到新文件(避免内存文档与文件不同步) temp_filename = target_filename.replace('.docx', '_temp.docx') target_doc.save(temp_filename) # 验证文档有效性 try: Document(temp_filename) # 尝试读取 os.replace(temp_filename, target_filename) except Exception: os.remove(temp_filename) return "Failed to generate valid document" return "Picture added successfully" except Exception as e: return f"Error: {str(e)}" import requests from io import BytesIO from PIL import Image from docx.enum.text import WD_ALIGN_PARAGRAPH def add_picture(filename: str, image_path: str, width: Optional[float] = None, height: Optional[float] = None, is_center: Optional[bool] = False) -> str: """添加一个图片到文档中(支持本地路径或 URL) Args: filename: 文档路径 image_path: 图片路径(本地路径或 URL) width: 图片大小(英寸) """ filename = ensure_docx_extension(filename) # 检查文档是否存在 if not os.path.exists(filename): return f"Document {filename} does not exist" abs_filename = os.path.abspath(filename) is_url = image_path.startswith(("http://", "https://")) try: doc = Document(abs_filename) para = doc.add_paragraph() run = para.add_run() # 处理 URL 图片 if is_url: try: response = requests.get(image_path, timeout=10) response.raise_for_status() # 检查请求是否成功 image_bytes = BytesIO(response.content) # 验证图片有效性(可选) Image.open(image_bytes).verify() image_bytes.seek(0) # 重置指针 # 添加到文档 if width: run.add_picture(image_bytes, width=Inches(width), height=Inches(height)) else: run.add_picture(image_bytes) if is_center: para.alignment = WD_ALIGN_PARAGRAPH.CENTER doc.save(abs_filename) return f"Picture from URL {image_path} added to {filename}" except Exception as url_error: return f"Failed to download/add URL image: {str(url_error)}" # 处理本地图片 else: abs_image_path = os.path.abspath(image_path) if not os.path.exists(abs_image_path): return f"Image file not found: {abs_image_path}" # 检查文件大小和可读性(原逻辑) try: image_size = os.path.getsize(abs_image_path) / 1024 if image_size <= 0: return f"Image file is empty: {abs_image_path}" except Exception as size_error: return f"Error checking image file: {str(size_error)}" # 添加到文档 try: if width: run.add_picture(abs_image_path, width=Inches(width), height=Inches(height)) else: run.add_picture(abs_image_path) if is_center: para.alignment = WD_ALIGN_PARAGRAPH.CENTER doc.save(abs_filename) return f"Picture {image_path} added to {filename}" except Exception as inner_error: return f"Failed to add picture: {str(inner_error)}" except Exception as outer_error: return f"Document processing error: {str(outer_error)}" async def add_page_break(filename: str) -> str: """增加分页符 Args: filename: 目标文档 """ filename = ensure_docx_extension(filename) if not os.path.exists(filename): return f"Document {filename} does not exist" # Check if file is writeable is_writeable, error_message = check_file_writeable(filename) if not is_writeable: return f"Cannot modify document: {error_message}. Consider creating a copy first." try: doc = Document(filename) doc.add_page_break() doc.save(filename) return f"Page break added to {filename}." except Exception as e: return f"Failed to add page break: {str(e)}" async def add_table_of_contents(filename: str, title: str = "Table of Contents", max_level: int = 3) -> str: """根据标题样式向Word文档添加目录。 参数: filename: Word文档的路径 title: 可自行选择的一个标题 max_level: 要包含的最大标题级别(1-9) """ filename = ensure_docx_extension(filename) if not os.path.exists(filename): return f"Document {filename} does not exist" # Check if file is writeable is_writeable, error_message = check_file_writeable(filename) if not is_writeable: return f"Cannot modify document: {error_message}. Consider creating a copy first." try: # Ensure max_level is within valid range max_level = max(1, min(max_level, 9)) doc = Document(filename) # Collect headings and their positions headings = [] for i, paragraph in enumerate(doc.paragraphs): # Check if paragraph style is a heading if paragraph.style and paragraph.style.name.startswith('Heading '): try: # Extract heading level from style name level = int(paragraph.style.name.split(' ')[1]) if level <= max_level: headings.append({ 'level': level, 'text': paragraph.text, 'position': i }) except (ValueError, IndexError): # Skip if heading level can't be determined pass if not headings: return f"No headings found in document {filename}. Table of contents not created." # Create a new document with the TOC toc_doc = Document() # Add title if title: toc_doc.add_heading(title, level=1) # Add TOC entries for heading in headings: # Indent based on level (using tab characters) indent = ' ' * (heading['level'] - 1) toc_doc.add_paragraph(f"{indent}{heading['text']}") # Add page break toc_doc.add_page_break() # Get content from original document for paragraph in doc.paragraphs: p = toc_doc.add_paragraph(paragraph.text) # Copy style if possible try: if paragraph.style: p.style = paragraph.style.name except: pass # Copy tables for table in doc.tables: # Create a new table with the same dimensions new_table = toc_doc.add_table(rows=len(table.rows), cols=len(table.columns)) # Copy cell contents for i, row in enumerate(table.rows): for j, cell in enumerate(row.cells): for paragraph in cell.paragraphs: new_table.cell(i, j).text = paragraph.text # Save the new document with TOC toc_doc.save(filename) return f"Table of contents with {len(headings)} entries added to {filename}" except Exception as e: return f"Failed to add table of contents: {str(e)}" async def delete_paragraph(filename: str, paragraph_index: int) -> str: """通过行索引从文档中删除一段 Args: filename: Path to the Word document paragraph_index: 段落位置(第几行) """ filename = ensure_docx_extension(filename) if not os.path.exists(filename): return f"Document {filename} does not exist" # Check if file is writeable is_writeable, error_message = check_file_writeable(filename) if not is_writeable: return f"Cannot modify document: {error_message}. Consider creating a copy first." try: doc = Document(filename) # Validate paragraph index if paragraph_index < 0 or paragraph_index >= len(doc.paragraphs): return f"Invalid paragraph index. Document has {len(doc.paragraphs)} paragraphs (0-{len(doc.paragraphs)-1})." # Delete the paragraph (by removing its content and setting it empty) # Note: python-docx doesn't support true paragraph deletion, this is a workaround paragraph = doc.paragraphs[paragraph_index] p = paragraph._p p.getparent().remove(p) doc.save(filename) return f"Paragraph at index {paragraph_index} deleted successfully." except Exception as e: return f"Failed to delete paragraph: {str(e)}" def search_and_replace(filename: str, find_text: str, replace_text: str) -> str: """替换所有find_text为replace_text Args: filename: Path to the Word document find_text: Text to search for replace_text: Text to replace with """ filename = ensure_docx_extension(filename) if not os.path.exists(filename): return f"Document {filename} does not exist" # Check if file is writeable is_writeable, error_message = check_file_writeable(filename) if not is_writeable: return f"Cannot modify document: {error_message}. Consider creating a copy first." try: doc = Document(filename) # Perform find and replace count = find_and_replace_text(doc, find_text, replace_text) if count > 0: doc.save(filename) return f"Replaced {count} occurrence(s) of '{find_text}' with '{replace_text}'." else: return f"No occurrences of '{find_text}' found." except Exception as e: return f"Failed to search and replace: {str(e)}"