JinFeng_Report_Generate/tools/total_copy_docx.py

465 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import zipfile
import shutil
import logging
from tempfile import mkdtemp
from lxml import etree
from copy import deepcopy
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('docx_merge_advanced.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class AdvancedDocxMerger:
def __init__(self, doc1_path, doc2_path, output_path):
self.doc1_path = doc1_path
self.doc2_path = doc2_path
self.output_path = output_path
self.temp_dir = mkdtemp(prefix='docx_merge_')
self.doc1_dir = os.path.join(self.temp_dir, "doc1")
self.doc2_dir = os.path.join(self.temp_dir, "doc2")
self.merged_dir = os.path.join(self.temp_dir, "merged")
# XML命名空间
self.ns = {
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships',
'wp': 'http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing'
}
# XML解析器配置
self.parser = etree.XMLParser(remove_blank_text=True)
logger.info(f"初始化合并器,临时目录: {self.temp_dir}")
def _extract_docx(self):
"""解压两个Word文档到临时目录"""
logger.info("开始解压文档...")
try:
os.makedirs(self.doc1_dir, exist_ok=True)
os.makedirs(self.doc2_dir, exist_ok=True)
with zipfile.ZipFile(self.doc1_path, 'r') as zip_ref:
zip_ref.extractall(self.doc1_dir)
logger.info(f"解压 {self.doc1_path} 完成")
with zipfile.ZipFile(self.doc2_path, 'r') as zip_ref:
zip_ref.extractall(self.doc2_dir)
logger.info(f"解压 {self.doc2_path} 完成")
return True
except Exception as e:
logger.error(f"解压失败: {str(e)}")
return False
def _prepare_merged_dir(self):
"""准备合并目录初始复制doc1的全部内容"""
logger.info("准备合并目录...")
try:
# 先完整复制doc1作为基础
shutil.copytree(self.doc1_dir, self.merged_dir)
# 记录所有已存在的文件
self.existing_files = set()
for root, _, files in os.walk(self.merged_dir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), self.merged_dir)
self.existing_files.add(rel_path.replace("\\", "/"))
logger.info(f"初始合并目录准备完成,已有 {len(self.existing_files)} 个文件")
return True
except Exception as e:
logger.error(f"准备合并目录失败: {str(e)}")
return False
def _is_xml_file(self, filename):
"""判断是否为XML文件"""
return filename.endswith('.xml') or filename.endswith('.rels')
def _merge_styles(self, root1, root2):
"""合并样式表"""
try:
# 收集现有样式ID
existing_style_ids = {elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}styleId')
for elem in root1 if elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}styleId')}
# 合并新样式
added = 0
for style in root2:
style_id = style.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}styleId')
if style_id and style_id not in existing_style_ids:
root1.append(deepcopy(style))
existing_style_ids.add(style_id)
added += 1
logger.debug(f"添加了 {added} 个新样式")
return True
except Exception as e:
logger.error(f"合并样式失败: {str(e)}")
return False
def _merge_numbering(self, root1, root2):
"""合并编号列表"""
try:
# 收集现有编号ID
existing_num_ids = {num.get('numId') for num in root1.xpath('//w:num', namespaces=self.ns)}
# 合并新编号
added = 0
for num in root2.xpath('//w:num', namespaces=self.ns):
num_id = num.get('numId')
if num_id and num_id not in existing_num_ids:
root1.append(deepcopy(num))
existing_num_ids.add(num_id)
added += 1
logger.debug(f"添加了 {added} 个新编号")
return True
except Exception as e:
logger.error(f"合并编号失败: {str(e)}")
return False
def _merge_notes(self, root1, root2, note_type="footnote"):
"""合并脚注/尾注"""
try:
# 获取现有最大ID
max_id = max((int(note.get('id', 0)) for note in root1.xpath(f'//w:{note_type}', namespaces=self.ns)) if root1.xpath(f'//w:{note_type}', namespaces=self.ns) else 0)
# 合并新脚注/尾注
added = 0
for note in root2.xpath(f'//w:{note_type}', namespaces=self.ns):
max_id += 1
new_note = deepcopy(note)
new_note.set('id', str(max_id))
root1.append(new_note)
added += 1
logger.debug(f"添加了 {added} 个新{note_type}")
return True
except Exception as e:
logger.error(f"合并{note_type}失败: {str(e)}")
return False
def _merge_header_footer(self, root1, root2):
"""合并页眉页脚内容"""
try:
# # 简单追加所有内容
# for elem in root2:
# root1.append(deepcopy(elem))
return True
except Exception as e:
logger.error(f"合并页眉页脚失败: {str(e)}")
return False
def _merge_settings(self, root1, root2):
"""合并文档设置,智能整合两个文档的配置"""
try:
# 需要合并的设置项及其处理策略
merge_strategies = {
# 页面设置
'w:defaultTabStop': 'max', # 取较大的制表位宽度
'w:autoHyphenation': 'or', # 任一文档启用则启用
'w:consecutiveHyphenLimit': 'max', # 取较大的连字符限制
# 兼容性设置
'w:compat': 'merge', # 合并兼容性设置
'w:useFELayout': 'or', # 任一文档启用则启用
# 修订跟踪
'w:trackRevisions': 'or', # 任一文档启用则启用
'w:doNotTrackMoves': 'and', # 两文档都启用才启用
# 其他重要设置
'w:zoom': 'doc1', # 使用第一个文档的缩放设置
'w:mirrorMargins': 'or', # 任一文档启用则启用
}
# 记录合并的更改
changes = []
# 处理每个设置项
for setting in root2:
# 获取设置项名称
tag = setting.tag.split('}')[1] if '}' in setting.tag else setting.tag
# 跳过不需要处理的设置
if tag not in merge_strategies:
continue
strategy = merge_strategies[tag]
existing = root1.xpath(f'//w:{tag}', namespaces=self.ns)
if not existing:
# 如果doc1没有该设置直接添加
root1.append(deepcopy(setting))
changes.append(f"添加 {tag} = {setting.get('val', '')}")
else:
# 根据策略合并设置
existing_setting = existing[0]
if strategy == 'max':
val1 = float(existing_setting.get('val', 0))
val2 = float(setting.get('val', 0))
if val2 > val1:
existing_setting.set('val', str(val2))
changes.append(f"更新 {tag} 为较大值: {val1}{val2}")
elif strategy == 'or':
if setting.get('val') == '1' and existing_setting.get('val') != '1':
existing_setting.set('val', '1')
changes.append(f"启用 {tag}")
elif strategy == 'and':
if setting.get('val') != '1' and existing_setting.get('val') == '1':
existing_setting.set('val', '0')
changes.append(f"禁用 {tag} (因doc2禁用)")
elif strategy == 'merge' and tag == 'w:compat':
# 合并兼容性设置
for child in setting:
if not root1.xpath(f'//w:compat/w:{child.tag.split("}")[1]}', namespaces=self.ns):
existing_setting.append(deepcopy(child))
changes.append(f"添加兼容性设置 {child.tag.split('}')[1]}")
# 特殊处理:文档保护设置
doc_protection1 = root1.xpath('//w:documentProtection', namespaces=self.ns)
doc_protection2 = root2.xpath('//w:documentProtection', namespaces=self.ns)
if doc_protection2 and not doc_protection1:
root1.append(deepcopy(doc_protection2[0]))
changes.append("添加文档保护设置")
# 特殊处理:拼写和语法检查设置
proof_state1 = root1.xpath('//w:proofState', namespaces=self.ns)
proof_state2 = root2.xpath('//w:proofState', namespaces=self.ns)
if proof_state2:
if proof_state1:
# 合并拼写检查状态
for attr in ['spelling', 'grammar']:
if proof_state2[0].get(attr) == 'clean' and proof_state1[0].get(attr) != 'clean':
proof_state1[0].set(attr, 'clean')
changes.append(f"更新 {attr} 检查状态为 clean")
else:
root1.append(deepcopy(proof_state2[0]))
changes.append("添加拼写检查设置")
if changes:
logger.info(f"合并文档设置,进行了 {len(changes)} 处更改:\n - " + "\n - ".join(changes))
else:
logger.info("文档设置无需更改,保留第一个文档的设置")
return True
except Exception as e:
logger.error(f"合并文档设置失败: {str(e)}")
return False
def _merge_relationships(self, root1, root2):
"""合并关系文件"""
try:
# 收集所有现有的关系ID
existing_ids = {rel.get('Id') for rel in root1}
# 找出最大的rId值
max_id = 0
for rel in root1:
if rel.get('Id', '').startswith('rId'):
try:
current_id = int(rel.get('Id')[3:])
if current_id > max_id:
max_id = current_id
except ValueError:
pass
# 合并关系
added = 0
for rel in root2:
rel_id = rel.get('Id')
if rel_id not in existing_ids:
# 生成新的唯一ID
max_id += 1
new_id = f"rId{max_id}"
rel.set('Id', new_id)
root1.append(deepcopy(rel))
added += 1
logger.debug(f"添加了 {added} 个新关系")
return True
except Exception as e:
logger.error(f"合并关系失败: {str(e)}")
return False
def _merge_xml_files(self, file1, file2, output_file):
"""合并两个XML文件"""
try:
# 特殊处理settings.xml和rels文件
if 'settings.xml' in output_file:
if not os.path.exists(file2):
return True
tree1 = etree.parse(file1, self.parser)
tree2 = etree.parse(file2, self.parser)
return self._merge_settings(tree1.getroot(), tree2.getroot())
if '_rels' in output_file and output_file.endswith('.rels'):
if not os.path.exists(file2):
return True
tree1 = etree.parse(file1, self.parser)
tree2 = etree.parse(file2, self.parser)
return self._merge_relationships(tree1.getroot(), tree2.getroot())
# 其他XML文件合并逻辑
if not os.path.exists(file2):
return True
tree1 = etree.parse(file1, self.parser)
tree2 = etree.parse(file2, self.parser)
root1 = tree1.getroot()
root2 = tree2.getroot()
# 特殊处理不同类型的XML文件
if 'document.xml' in output_file:
self._merge_document_content(root1, root2)
elif 'styles.xml' in output_file:
self._merge_styles(root1, root2)
elif 'footnotes.xml' in output_file:
self._merge_notes(root1, root2, "footnote")
elif 'endnotes.xml' in output_file:
self._merge_notes(root1, root2, "endnote")
elif 'numbering.xml' in output_file:
self._merge_numbering(root1, root2)
elif 'header' in output_file or 'footer' in output_file:
self._merge_header_footer(root1, root2)
else:
# 默认合并策略:追加所有子元素
for child in root2:
root1.append(deepcopy(child))
# 保存合并后的XML
tree1.write(output_file, encoding='UTF-8', xml_declaration=True)
logger.debug(f"成功合并XML文件: {output_file}")
return True
except Exception as e:
logger.error(f"合并XML文件失败 {output_file}: {str(e)}")
return False
def _merge_document_content(self, root1, root2):
"""合并文档正文内容"""
try:
body1 = root1.xpath("//w:body", namespaces=self.ns)
body2 = root2.xpath("//w:body", namespaces=self.ns)
if not body1 or not body2:
logger.warning("文档缺少body元素")
return
body1 = body1[0]
body2 = body2[0]
# 在合并前添加分节符保持格式
sect_prs = body1.xpath(".//w:sectPr", namespaces=self.ns)
if sect_prs:
sect_pr = sect_prs[-1]
new_p = etree.Element("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}p")
new_p.append(deepcopy(sect_pr))
body1.append(new_p)
# 合并内容
for elem in body2:
body1.append(deepcopy(elem))
except Exception as e:
logger.error(f"合并文档内容失败: {str(e)}")
raise
def _deep_merge_docx(self):
"""深度合并两个文档的所有文件"""
logger.info("开始深度合并文档...")
# 遍历doc2的所有文件
for root, _, files in os.walk(self.doc2_dir):
for file in files:
src_file = os.path.join(root, file)
rel_path = os.path.relpath(src_file, self.doc2_dir)
dest_file = os.path.join(self.merged_dir, rel_path)
# 标准化路径比较
norm_rel_path = rel_path.replace("\\", "/")
if norm_rel_path not in self.existing_files:
# 新文件,直接复制
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
shutil.copy2(src_file, dest_file)
logger.debug(f"复制新文件: {norm_rel_path}")
else:
# 已存在文件,判断是否需要合并
if self._is_xml_file(file):
# XML文件需要合并
existing_file = os.path.join(self.merged_dir, rel_path)
if os.path.exists(existing_file):
if not self._merge_xml_files(existing_file, src_file, dest_file):
logger.warning(f"合并失败,保留原文件: {norm_rel_path}")
else:
# 非XML文件保留原文件
logger.debug(f"文件已存在,跳过: {norm_rel_path}")
logger.info("深度合并完成")
return True
def _repack_docx(self):
"""重新打包为新的docx文件"""
logger.info("开始重新打包文档...")
try:
with zipfile.ZipFile(self.output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(self.merged_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, self.merged_dir)
zipf.write(file_path, arcname)
logger.info(f"成功创建合并文档: {self.output_path}")
return True
except Exception as e:
logger.error(f"重新打包失败: {str(e)}")
return False
def _cleanup(self):
"""清理临时文件"""
try:
shutil.rmtree(self.temp_dir)
logger.info("已清理临时文件")
except Exception as e:
logger.warning(f"清理临时文件失败: {str(e)}")
def merge(self):
"""执行合并流程"""
logger.info(f"开始合并文档: {self.doc1_path} + {self.doc2_path} -> {self.output_path}")
if not self._extract_docx():
return False
if not self._prepare_merged_dir():
return False
if not self._deep_merge_docx():
return False
if not self._repack_docx():
return False
self._cleanup()
logger.info("文档合并成功完成!")
return True
if __name__ == "__main__":
# 使用示例
merger = AdvancedDocxMerger(
doc1_path="jingfeng_fengmian1.docx",
doc2_path="quexian.docx",
output_path="merged_document.docx"
)
if merger.merge():
print("合并成功!")
else:
print("合并过程中出现错误,请查看日志文件。")