refactor(knowledgebases): 重构excel文件的解析逻辑
This commit is contained in:
parent
dd2b661703
commit
45b7233432
|
@ -2,15 +2,13 @@ import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
import json
|
import json
|
||||||
import mysql.connector
|
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
import re
|
import re
|
||||||
import requests
|
import requests
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from database import MINIO_CONFIG, DB_CONFIG, get_minio_client, get_es_client
|
from database import MINIO_CONFIG, get_minio_client, get_es_client, get_db_connection
|
||||||
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
|
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
|
||||||
from magic_pdf.data.dataset import PymuDocDataset
|
from magic_pdf.data.dataset import PymuDocDataset
|
||||||
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
|
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
|
||||||
|
@ -18,7 +16,7 @@ from magic_pdf.config.enums import SupportedPdfParseMethod
|
||||||
from magic_pdf.data.read_api import read_local_office, read_local_images
|
from magic_pdf.data.read_api import read_local_office, read_local_images
|
||||||
from utils import generate_uuid
|
from utils import generate_uuid
|
||||||
from .rag_tokenizer import RagTokenizer
|
from .rag_tokenizer import RagTokenizer
|
||||||
|
from .excel_parser import parse_excel
|
||||||
|
|
||||||
tknzr = RagTokenizer()
|
tknzr = RagTokenizer()
|
||||||
|
|
||||||
|
@ -54,17 +52,12 @@ def merge_chunks(sections, chunk_token_num=128, delimiter="\n。;!?"):
|
||||||
return chunks
|
return chunks
|
||||||
|
|
||||||
|
|
||||||
def _get_db_connection():
|
|
||||||
"""创建数据库连接"""
|
|
||||||
return mysql.connector.connect(**DB_CONFIG)
|
|
||||||
|
|
||||||
|
|
||||||
def _update_document_progress(doc_id, progress=None, message=None, status=None, run=None, chunk_count=None, process_duration=None):
|
def _update_document_progress(doc_id, progress=None, message=None, status=None, run=None, chunk_count=None, process_duration=None):
|
||||||
"""更新数据库中文档的进度和状态"""
|
"""更新数据库中文档的进度和状态"""
|
||||||
conn = None
|
conn = None
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
conn = _get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
updates = []
|
updates = []
|
||||||
params = []
|
params = []
|
||||||
|
@ -109,7 +102,7 @@ def _update_kb_chunk_count(kb_id, count_delta):
|
||||||
conn = None
|
conn = None
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
conn = _get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
kb_update = """
|
kb_update = """
|
||||||
|
@ -134,7 +127,7 @@ def _create_task_record(doc_id, chunk_ids_list):
|
||||||
conn = None
|
conn = None
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
conn = _get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
task_id = generate_uuid()
|
task_id = generate_uuid()
|
||||||
current_datetime = datetime.now()
|
current_datetime = datetime.now()
|
||||||
|
@ -184,58 +177,6 @@ def get_bbox_from_block(block):
|
||||||
return [0, 0, 0, 0]
|
return [0, 0, 0, 0]
|
||||||
|
|
||||||
|
|
||||||
def process_table_content(content_list):
|
|
||||||
"""
|
|
||||||
处理表格内容,将每一行分开存储
|
|
||||||
|
|
||||||
Args:
|
|
||||||
content_list: 原始内容列表
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
处理后的内容列表
|
|
||||||
"""
|
|
||||||
new_content_list = []
|
|
||||||
|
|
||||||
for item in content_list:
|
|
||||||
if "table_body" in item and item["table_body"]:
|
|
||||||
# 使用BeautifulSoup解析HTML表格
|
|
||||||
soup = BeautifulSoup(item["table_body"], "html.parser")
|
|
||||||
table = soup.find("table")
|
|
||||||
|
|
||||||
if table:
|
|
||||||
rows = table.find_all("tr")
|
|
||||||
# 获取表头(第一行)
|
|
||||||
header_row = rows[0] if rows else None
|
|
||||||
|
|
||||||
# 处理每一行,从第二行开始(跳过表头)
|
|
||||||
for i, row in enumerate(rows):
|
|
||||||
# 创建新的内容项
|
|
||||||
new_item = item.copy()
|
|
||||||
|
|
||||||
# 创建只包含当前行的表格
|
|
||||||
new_table = soup.new_tag("table")
|
|
||||||
|
|
||||||
# 如果有表头,添加表头
|
|
||||||
if header_row and i > 0:
|
|
||||||
new_table.append(header_row)
|
|
||||||
|
|
||||||
# 添加当前行
|
|
||||||
new_table.append(row)
|
|
||||||
|
|
||||||
# 创建新的HTML结构
|
|
||||||
new_html = f"<html><body>{str(new_table)}</body></html>"
|
|
||||||
new_item["table_body"] = f"\n\n{new_html}\n\n"
|
|
||||||
|
|
||||||
# 添加到新的内容列表
|
|
||||||
new_content_list.append(new_item)
|
|
||||||
else:
|
|
||||||
new_content_list.append(item)
|
|
||||||
else:
|
|
||||||
new_content_list.append(item)
|
|
||||||
|
|
||||||
return new_content_list
|
|
||||||
|
|
||||||
|
|
||||||
def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info):
|
def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info):
|
||||||
"""
|
"""
|
||||||
执行文档解析的核心逻辑
|
执行文档解析的核心逻辑
|
||||||
|
@ -343,7 +284,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info):
|
||||||
update_progress(0.3, "分析PDF类型")
|
update_progress(0.3, "分析PDF类型")
|
||||||
is_ocr = ds.classify() == SupportedPdfParseMethod.OCR
|
is_ocr = ds.classify() == SupportedPdfParseMethod.OCR
|
||||||
mode_msg = "OCR模式" if is_ocr else "文本模式"
|
mode_msg = "OCR模式" if is_ocr else "文本模式"
|
||||||
update_progress(0.4, f"使用{mode_msg}处理PDF")
|
update_progress(0.4, f"使用{mode_msg}处理PDF,处理中,具体进度可查看日志")
|
||||||
|
|
||||||
infer_result = ds.apply(doc_analyze, ocr=is_ocr)
|
infer_result = ds.apply(doc_analyze, ocr=is_ocr)
|
||||||
|
|
||||||
|
@ -387,6 +328,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info):
|
||||||
# 获取内容列表(JSON格式)
|
# 获取内容列表(JSON格式)
|
||||||
middle_content = pipe_result.get_middle_json()
|
middle_content = pipe_result.get_middle_json()
|
||||||
middle_json_content = json.loads(middle_content)
|
middle_json_content = json.loads(middle_content)
|
||||||
|
|
||||||
# 对excel文件单独进行处理
|
# 对excel文件单独进行处理
|
||||||
elif file_type.endswith("excel"):
|
elif file_type.endswith("excel"):
|
||||||
update_progress(0.3, "使用MinerU解析器")
|
update_progress(0.3, "使用MinerU解析器")
|
||||||
|
@ -397,22 +339,11 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info):
|
||||||
f.write(file_content)
|
f.write(file_content)
|
||||||
|
|
||||||
print(f"[Parser-INFO] 临时文件路径: {temp_file_path}")
|
print(f"[Parser-INFO] 临时文件路径: {temp_file_path}")
|
||||||
# 使用MinerU处理
|
|
||||||
ds = read_local_office(temp_file_path)[0]
|
|
||||||
infer_result = ds.apply(doc_analyze, ocr=True)
|
|
||||||
|
|
||||||
# 设置临时输出目录
|
|
||||||
temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}")
|
|
||||||
os.makedirs(temp_image_dir, exist_ok=True)
|
|
||||||
image_writer = FileBasedDataWriter(temp_image_dir)
|
|
||||||
|
|
||||||
update_progress(0.6, "处理文件结果")
|
|
||||||
pipe_result = infer_result.pipe_txt_mode(image_writer)
|
|
||||||
|
|
||||||
update_progress(0.8, "提取内容")
|
update_progress(0.8, "提取内容")
|
||||||
origin_content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir))
|
|
||||||
# 处理内容列表
|
# 处理内容列表
|
||||||
content_list = process_table_content(origin_content_list)
|
content_list = parse_excel(temp_file_path)
|
||||||
|
|
||||||
elif file_type.endswith("visual"):
|
elif file_type.endswith("visual"):
|
||||||
update_progress(0.3, "使用MinerU解析器")
|
update_progress(0.3, "使用MinerU解析器")
|
||||||
|
|
||||||
|
@ -430,7 +361,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info):
|
||||||
update_progress(0.3, "分析PDF类型")
|
update_progress(0.3, "分析PDF类型")
|
||||||
is_ocr = ds.classify() == SupportedPdfParseMethod.OCR
|
is_ocr = ds.classify() == SupportedPdfParseMethod.OCR
|
||||||
mode_msg = "OCR模式" if is_ocr else "文本模式"
|
mode_msg = "OCR模式" if is_ocr else "文本模式"
|
||||||
update_progress(0.4, f"使用{mode_msg}处理PDF")
|
update_progress(0.4, f"使用{mode_msg}处理PDF,处理中,具体进度可查看日志")
|
||||||
|
|
||||||
infer_result = ds.apply(doc_analyze, ocr=is_ocr)
|
infer_result = ds.apply(doc_analyze, ocr=is_ocr)
|
||||||
|
|
||||||
|
@ -690,7 +621,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info):
|
||||||
conn = None
|
conn = None
|
||||||
cursor = None
|
cursor = None
|
||||||
try:
|
try:
|
||||||
conn = _get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
|
|
||||||
# 为每个文本块找到最近的图片
|
# 为每个文本块找到最近的图片
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
def parse_excel(file_path):
|
||||||
|
# 读取Excel文件
|
||||||
|
df = pd.read_excel(file_path)
|
||||||
|
# 获取表头
|
||||||
|
headers = df.columns.tolist()
|
||||||
|
blocks = []
|
||||||
|
|
||||||
|
for _, row in df.iterrows():
|
||||||
|
# 构建HTML表格
|
||||||
|
html_table = "<html><body><table><tr>{}</tr><tr>{}</tr></table></body></html>".format("".join(f"<td>{col}</td>" for col in headers), "".join(f"<td>{row[col]}</td>" for col in headers))
|
||||||
|
block = {"type": "table", "img_path": "", "table_caption": [], "table_footnote": [], "table_body": f"{html_table}", "page_idx": 0}
|
||||||
|
|
||||||
|
blocks.append(block)
|
||||||
|
|
||||||
|
return blocks
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
file_path = "test_excel.xls"
|
||||||
|
parse_excel_result = parse_excel(file_path)
|
||||||
|
print(parse_excel_result)
|
Loading…
Reference in New Issue