feat(解析文件类型增加): 解析文件增加对word和ppt的支持。 (#32)

增加对word和ppt的支持,需要安装LibreOffice。同时,将文档解析逻辑从 `KnowledgebaseService` 中提取到独立的 `document_parser.py` 模块,以提高代码的可维护性和复用性。同时优化了文件上传和临时文件处理的逻辑,确保资源正确释放。
This commit is contained in:
zstar 2025-04-17 16:31:20 +08:00 committed by GitHub
parent a7a689d0a9
commit 6057163f28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 508 additions and 565 deletions

View File

@ -1,6 +1,7 @@
import os import os
import mysql.connector import mysql.connector
import re import re
import tempfile
from io import BytesIO from io import BytesIO
from minio import Minio from minio import Minio
from dotenv import load_dotenv from dotenv import load_dotenv
@ -15,7 +16,8 @@ from database import DB_CONFIG, MINIO_CONFIG
# 加载环境变量 # 加载环境变量
load_dotenv("../../docker/.env") load_dotenv("../../docker/.env")
UPLOAD_FOLDER = '/data/uploads' temp_dir = tempfile.gettempdir()
UPLOAD_FOLDER = os.path.join(temp_dir, "uploads")
ALLOWED_EXTENSIONS = {'pdf', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx', 'jpg', 'jpeg', 'png', 'txt', 'md'} ALLOWED_EXTENSIONS = {'pdf', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx', 'jpg', 'jpeg', 'png', 'txt', 'md'}
def allowed_file(filename): def allowed_file(filename):

View File

@ -0,0 +1,400 @@
import os
import tempfile
import shutil
import json
import mysql.connector
import time
import traceback
from io import BytesIO
from datetime import datetime
from elasticsearch import Elasticsearch
from database import MINIO_CONFIG, ES_CONFIG, DB_CONFIG, get_minio_client, get_es_client
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.data.read_api import read_local_office
from utils import generate_uuid
# 自定义tokenizer和文本处理函数替代rag.nlp中的功能
def tokenize_text(text):
"""将文本分词替代rag_tokenizer功能"""
# 简单实现,实际应用中可能需要更复杂的分词逻辑
return text.split()
def merge_chunks(sections, chunk_token_num=128, delimiter="\n。;!?"):
"""合并文本块替代naive_merge功能"""
if not sections:
return []
chunks = [""]
token_counts = [0]
for section in sections:
# 计算当前部分的token数量
text = section[0] if isinstance(section, tuple) else section
position = section[1] if isinstance(section, tuple) and len(section) > 1 else ""
# 简单估算token数量
token_count = len(text.split())
# 如果当前chunk已经超过限制创建新chunk
if token_counts[-1] > chunk_token_num:
chunks.append(text)
token_counts.append(token_count)
else:
# 否则添加到当前chunk
chunks[-1] += text
token_counts[-1] += token_count
return chunks
def _get_db_connection():
"""创建数据库连接"""
return mysql.connector.connect(**DB_CONFIG)
def _update_document_progress(doc_id, progress=None, message=None, status=None, run=None, chunk_count=None, process_duration=None):
"""更新数据库中文档的进度和状态"""
conn = None
cursor = None
try:
conn = _get_db_connection()
cursor = conn.cursor()
updates = []
params = []
if progress is not None:
updates.append("progress = %s")
params.append(float(progress))
if message is not None:
updates.append("progress_msg = %s")
params.append(message)
if status is not None:
updates.append("status = %s")
params.append(status)
if run is not None:
updates.append("run = %s")
params.append(run)
if chunk_count is not None:
updates.append("chunk_num = %s")
params.append(chunk_count)
if process_duration is not None:
updates.append("process_duation = %s")
params.append(process_duration)
if not updates:
return
query = f"UPDATE document SET {', '.join(updates)} WHERE id = %s"
params.append(doc_id)
cursor.execute(query, params)
conn.commit()
except Exception as e:
print(f"[Parser-ERROR] 更新文档 {doc_id} 进度失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def _update_kb_chunk_count(kb_id, count_delta):
"""更新知识库的块数量"""
conn = None
cursor = None
try:
conn = _get_db_connection()
cursor = conn.cursor()
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
kb_update = """
UPDATE knowledgebase
SET chunk_num = chunk_num + %s,
update_date = %s
WHERE id = %s
"""
cursor.execute(kb_update, (count_delta, current_date, kb_id))
conn.commit()
except Exception as e:
print(f"[Parser-ERROR] 更新知识库 {kb_id} 块数量失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def _create_task_record(doc_id, chunk_ids_list):
"""创建task记录"""
conn = None
cursor = None
try:
conn = _get_db_connection()
cursor = conn.cursor()
task_id = generate_uuid()
current_datetime = datetime.now()
current_timestamp = int(current_datetime.timestamp() * 1000)
current_time_str = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
current_date_only = current_datetime.strftime("%Y-%m-%d")
digest = f"{doc_id}_{0}_{1}" # 假设 from_page=0, to_page=1
chunk_ids_str = ' '.join(chunk_ids_list)
task_insert = """
INSERT INTO task (
id, create_time, create_date, update_time, update_date,
doc_id, from_page, to_page, begin_at, process_duation,
progress, progress_msg, retry_count, digest, chunk_ids, task_type
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
task_params = [
task_id, current_timestamp, current_date_only, current_timestamp, current_date_only,
doc_id, 0, 1, None, 0.0, # begin_at, process_duration
1.0, "MinerU解析完成", 1, digest, chunk_ids_str, "" # progress, msg, retry, digest, chunks, type
]
cursor.execute(task_insert, task_params)
conn.commit()
print(f"[Parser-INFO] Task记录创建成功Task ID: {task_id}")
except Exception as e:
print(f"[Parser-ERROR] 创建Task记录失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def perform_parse(doc_id, doc_info, file_info):
"""
执行文档解析的核心逻辑
Args:
doc_id (str): 文档ID.
doc_info (dict): 包含文档信息的字典 (name, location, type, kb_id, parser_config, created_by).
file_info (dict): 包含文件信息的字典 (parent_id/bucket_name).
Returns:
dict: 包含解析结果的字典 (success, chunk_count).
"""
temp_pdf_path = None
temp_image_dir = None
start_time = time.time()
try:
kb_id = doc_info['kb_id']
file_location = doc_info['location']
# 从文件路径中提取原始后缀名
_, file_extension = os.path.splitext(file_location)
file_type = doc_info['type'].lower()
parser_config = json.loads(doc_info['parser_config']) if isinstance(doc_info['parser_config'], str) else doc_info['parser_config']
bucket_name = file_info['parent_id'] # 文件存储的桶是 parent_id
tenant_id = doc_info['created_by'] # 文档创建者作为 tenant_id
# 进度更新回调 (直接调用内部更新函数)
def update_progress(prog=None, msg=None):
_update_document_progress(doc_id, progress=prog, message=msg)
print(f"[Parser-PROGRESS] Doc: {doc_id}, Progress: {prog}, Message: {msg}")
# 1. 从 MinIO 获取文件内容
minio_client = get_minio_client()
if not minio_client.bucket_exists(bucket_name):
raise Exception(f"存储桶不存在: {bucket_name}")
update_progress(0.1, f"正在从存储中获取文件: {file_location}")
response = minio_client.get_object(bucket_name, file_location)
file_content = response.read()
response.close()
update_progress(0.2, "文件获取成功,准备解析")
# 2. 根据文件类型选择解析器
content_list = []
if file_type.endswith('pdf'):
update_progress(0.3, "使用MinerU解析器")
# 创建临时文件保存PDF内容
temp_dir = tempfile.gettempdir()
temp_pdf_path = os.path.join(temp_dir, f"{doc_id}.pdf")
with open(temp_pdf_path, 'wb') as f:
f.write(file_content)
# 使用Magic PDF处理
reader = FileBasedDataReader("")
pdf_bytes = reader.read(temp_pdf_path)
ds = PymuDocDataset(pdf_bytes)
update_progress(0.3, "分析PDF类型")
is_ocr = ds.classify() == SupportedPdfParseMethod.OCR
mode_msg = "OCR模式" if is_ocr else "文本模式"
update_progress(0.4, f"使用{mode_msg}处理PDF")
infer_result = ds.apply(doc_analyze, ocr=is_ocr)
# 设置临时输出目录
temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}")
os.makedirs(temp_image_dir, exist_ok=True)
image_writer = FileBasedDataWriter(temp_image_dir)
update_progress(0.6, f"处理{mode_msg}结果")
pipe_result = infer_result.pipe_ocr_mode(image_writer) if is_ocr else infer_result.pipe_txt_mode(image_writer)
update_progress(0.8, "提取内容")
content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir))
elif file_type.endswith('word') or file_type.endswith('ppt'):
update_progress(0.3, "使用MinerU解析器")
# 创建临时文件保存文件内容
temp_dir = tempfile.gettempdir()
temp_file_path = os.path.join(temp_dir, f"{doc_id}{file_extension}")
with open(temp_file_path, 'wb') as f:
f.write(file_content)
print(f"[Parser-INFO] 临时文件路径: {temp_file_path}")
# 使用MinerU处理
ds = read_local_office(temp_file_path)[0]
infer_result = ds.apply(doc_analyze, ocr=True)
# 设置临时输出目录
temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}")
os.makedirs(temp_image_dir, exist_ok=True)
image_writer = FileBasedDataWriter(temp_image_dir)
update_progress(0.6, f"处理文件结果")
pipe_result = infer_result.pipe_txt_mode(image_writer)
update_progress(0.8, "提取内容")
content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir))
else:
update_progress(0.3, f"暂不支持的文件类型: {file_type}")
raise NotImplementedError(f"文件类型 '{file_type}' 的解析器尚未实现")
# 3. 处理解析结果 (上传到MinIO, 存储到ES)
update_progress(0.95, "保存解析结果")
es_client = get_es_client()
# 注意MinIO的桶应该是知识库ID (kb_id),而不是文件的 parent_id
output_bucket = kb_id
if not minio_client.bucket_exists(output_bucket):
minio_client.make_bucket(output_bucket)
print(f"[Parser-INFO] 创建MinIO桶: {output_bucket}")
index_name = f"ragflow_{tenant_id}"
if not es_client.indices.exists(index=index_name):
# 创建索引
es_client.indices.create(
index=index_name,
body={
"settings": {"number_of_replicas": 0}, # 单节点设为0
"mappings": { "properties": { "doc_id": {"type": "keyword"}, "kb_id": {"type": "keyword"}, "content_with_weight": {"type": "text"} } } # 简化字段
}
)
print(f"[Parser-INFO] 创建Elasticsearch索引: {index_name}")
chunk_count = 0
chunk_ids_list = []
for chunk_idx, chunk_data in enumerate(content_list):
if chunk_data["type"] == "text":
content = chunk_data["text"]
if not content or not content.strip():
continue
chunk_id = generate_uuid()
try:
# 上传文本块到MinIO (桶为kb_id)
minio_client.put_object(
bucket_name=output_bucket,
object_name=chunk_id,
data=BytesIO(content.encode('utf-8')),
length=len(content.encode('utf-8')) # 使用字节长度
)
# 准备ES文档
content_tokens = tokenize_text(content) # 分词
current_time_es = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_timestamp_es = datetime.now().timestamp()
es_doc = {
"doc_id": doc_id,
"kb_id": kb_id,
"docnm_kwd": doc_info['name'],
"title_tks": doc_info['name'],
"title_sm_tks": doc_info['name'],
"content_with_weight": content,
"content_ltks": content_tokens,
"content_sm_ltks": content_tokens,
"page_num_int": [1], # 简化处理
"position_int": [[1, 0, 0, 0, 0]], # 简化处理
"top_int": [1], # 简化处理
"create_time": current_time_es,
"create_timestamp_flt": current_timestamp_es,
"img_id": "",
"q_1024_vec": [] # 向量字段留空
}
# 存储到Elasticsearch
es_client.index(index=index_name, document=es_doc) # 使用 document 参数
chunk_count += 1
chunk_ids_list.append(chunk_id)
print(f"成功处理文本块 {chunk_count}/{len(content_list)}")
except Exception as e:
print(f"[Parser-ERROR] 处理文本块 {chunk_idx} 失败: {e}")
elif chunk_data["type"] == "image":
img_path_relative = chunk_data.get('img_path')
if not img_path_relative or not temp_image_dir:
continue
img_path_abs = os.path.join(temp_image_dir, os.path.basename(img_path_relative))
if not os.path.exists(img_path_abs):
print(f"[Parser-WARNING] 图片文件不存在: {img_path_abs}")
continue
img_id = generate_uuid()
img_ext = os.path.splitext(img_path_abs)[1]
img_key = f"images/{img_id}{img_ext}" # MinIO中的对象名
content_type = f"image/{img_ext[1:].lower()}"
if content_type == "image/jpg": content_type = "image/jpeg"
# try:
# # 上传图片到MinIO (桶为kb_id)
# minio_client.fput_object(
# bucket_name=output_bucket,
# object_name=img_key,
# file_path=img_path_abs,
# content_type=content_type
# )
# print(f"成功上传图片: {img_key}")
# # 注意设置公共访问权限可能需要额外配置MinIO服务器和存储桶策略
# except Exception as e:
# print(f"[Parser-ERROR] 上传图片 {img_path_abs} 失败: {e}")
# 4. 更新最终状态
process_duration = time.time() - start_time
_update_document_progress(doc_id, progress=1.0, message="解析完成", status='1', run='3', chunk_count=chunk_count, process_duration=process_duration)
_update_kb_chunk_count(kb_id, chunk_count) # 更新知识库总块数
_create_task_record(doc_id, chunk_ids_list) # 创建task记录
update_progress(1.0, "解析完成")
print(f"[Parser-INFO] 解析完成文档ID: {doc_id}, 耗时: {process_duration:.2f}s, 块数: {chunk_count}")
return {"success": True, "chunk_count": chunk_count}
except Exception as e:
process_duration = time.time() - start_time
# error_message = f"解析失败: {str(e)}"
print(f"[Parser-ERROR] 文档 {doc_id} 解析失败: {e}")
error_message = f"解析失败: 无法执行文件转换。请确保已正确安装LibreOffice并将其添加到系统环境变量PATH中。"
traceback.print_exc() # 打印详细错误堆栈
# 更新文档状态为失败
_update_document_progress(doc_id, status='1', run='0', message=error_message, process_duration=process_duration) # status=1表示完成run=0表示失败
# 不抛出异常,让调用者知道任务已结束(但失败)
return {"success": False, "error": error_message}
finally:
# 清理临时文件
try:
if temp_pdf_path and os.path.exists(temp_pdf_path):
os.remove(temp_pdf_path)
if temp_image_dir and os.path.exists(temp_image_dir):
shutil.rmtree(temp_image_dir, ignore_errors=True)
except Exception as clean_e:
print(f"[Parser-WARNING] 清理临时文件失败: {clean_e}")

View File

@ -1,83 +1,19 @@
import mysql.connector import mysql.connector
import json import json
from flask import current_app import threading
from datetime import datetime from datetime import datetime
from utils import generate_uuid from utils import generate_uuid
from database import DB_CONFIG, get_minio_client, get_es_client from database import DB_CONFIG
import io
import os
import json
import threading
import time
import tempfile
import shutil
from elasticsearch import Elasticsearch
from io import BytesIO
# 解析相关模块 # 解析相关模块
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader from .document_parser import perform_parse, _update_document_progress
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
# 自定义tokenizer和文本处理函数替代rag.nlp中的功能
def tokenize_text(text):
"""将文本分词替代rag_tokenizer功能"""
# 简单实现,实际应用中可能需要更复杂的分词逻辑
return text.split()
def merge_chunks(sections, chunk_token_num=128, delimiter="\n。;!?"):
"""合并文本块替代naive_merge功能"""
if not sections:
return []
chunks = [""]
token_counts = [0]
for section in sections:
# 计算当前部分的token数量
text = section[0] if isinstance(section, tuple) else section
position = section[1] if isinstance(section, tuple) and len(section) > 1 else ""
# 简单估算token数量
token_count = len(text.split())
# 如果当前chunk已经超过限制创建新chunk
if token_counts[-1] > chunk_token_num:
chunks.append(text)
token_counts.append(token_count)
else:
# 否则添加到当前chunk
chunks[-1] += text
token_counts[-1] += token_count
return chunks
def process_document_chunks(chunks, document_info):
"""处理文档块替代tokenize_chunks功能"""
results = []
for chunk in chunks:
if not chunk.strip():
continue
# 创建文档块对象
chunk_data = document_info.copy()
chunk_data["content"] = chunk
chunk_data["tokens"] = tokenize_text(chunk)
results.append(chunk_data)
return results
class KnowledgebaseService: class KnowledgebaseService:
@classmethod @classmethod
def _get_db_connection(cls): def _get_db_connection(cls):
"""Get database connection""" """创建数据库连接"""
return mysql.connector.connect(**DB_CONFIG) return mysql.connector.connect(**DB_CONFIG)
@classmethod @classmethod
def get_knowledgebase_list(cls, page=1, size=10, name=''): def get_knowledgebase_list(cls, page=1, size=10, name=''):
"""获取知识库列表""" """获取知识库列表"""
@ -304,7 +240,7 @@ class KnowledgebaseService:
return cls.get_knowledgebase_detail(kb_id) return cls.get_knowledgebase_detail(kb_id)
except Exception as e: except Exception as e:
current_app.logger.error(f"创建知识库失败: {str(e)}") print(f"创建知识库失败: {str(e)}")
raise Exception(f"创建知识库失败: {str(e)}") raise Exception(f"创建知识库失败: {str(e)}")
@classmethod @classmethod
@ -391,7 +327,7 @@ class KnowledgebaseService:
return True return True
except Exception as e: except Exception as e:
current_app.logger.error(f"删除知识库失败: {str(e)}") print(f"删除知识库失败: {str(e)}")
raise Exception(f"删除知识库失败: {str(e)}") raise Exception(f"删除知识库失败: {str(e)}")
@classmethod @classmethod
@ -422,7 +358,7 @@ class KnowledgebaseService:
return len(kb_ids) return len(kb_ids)
except Exception as e: except Exception as e:
current_app.logger.error(f"批量删除知识库失败: {str(e)}") print(f"批量删除知识库失败: {str(e)}")
raise Exception(f"批量删除知识库失败: {str(e)}") raise Exception(f"批量删除知识库失败: {str(e)}")
@classmethod @classmethod
@ -483,15 +419,14 @@ class KnowledgebaseService:
cursor.close() cursor.close()
conn.close() conn.close()
print(results)
return { return {
'list': results, 'list': results,
'total': total 'total': total
} }
except Exception as e: except Exception as e:
current_app.logger.error(f"获取知识库文档列表失败: {str(e)}") print(f"获取知识库文档列表失败: {str(e)}")
raise Exception(f"获取知识库文档列表失败: {str(e)}") raise Exception(f"获取知识库文档列表失败: {str(e)}")
@classmethod @classmethod
@ -727,528 +662,135 @@ class KnowledgebaseService:
raise Exception(f"删除文档失败: {str(e)}") raise Exception(f"删除文档失败: {str(e)}")
@classmethod @classmethod
def parse_document(cls, doc_id, callback=None): def parse_document(cls, doc_id):
"""解析文档并提供进度反馈""" """解析文档(同步版本,调用后台解析逻辑)"""
conn = None conn = None
cursor = None cursor = None
try: try:
# 获取文档信息 # 1. 获取文档和文件信息
conn = cls._get_db_connection() conn = cls._get_db_connection()
cursor = conn.cursor(dictionary=True) cursor = conn.cursor(dictionary=True)
# 查询文档信息 # 查询文档信息
query = """ doc_query = """
SELECT d.id, d.name, d.location, d.type, d.kb_id, d.parser_id, d.parser_config SELECT d.id, d.name, d.location, d.type, d.kb_id, d.parser_id, d.parser_config, d.created_by
FROM document d FROM document d
WHERE d.id = %s WHERE d.id = %s
""" """
cursor.execute(query, (doc_id,)) cursor.execute(doc_query, (doc_id,))
doc = cursor.fetchone() doc_info = cursor.fetchone()
if not doc:
raise Exception("文档不存在")
# 更新文档状态为处理中
update_query = """
UPDATE document
SET status = '2', run = '1', progress = 0.0, progress_msg = '开始解析'
WHERE id = %s
"""
cursor.execute(update_query, (doc_id,))
conn.commit()
# 获取文件ID和桶ID if not doc_info:
raise Exception("文档不存在")
# 获取关联的文件信息 (主要是 parent_id 作为 bucket_name)
f2d_query = "SELECT file_id FROM file2document WHERE document_id = %s" f2d_query = "SELECT file_id FROM file2document WHERE document_id = %s"
cursor.execute(f2d_query, (doc_id,)) cursor.execute(f2d_query, (doc_id,))
f2d_result = cursor.fetchone() f2d_result = cursor.fetchone()
if not f2d_result: if not f2d_result:
raise Exception("无法找到文件到文档的映射关系") raise Exception("无法找到文件到文档的映射关系")
file_id = f2d_result['file_id'] file_id = f2d_result['file_id']
file_query = "SELECT parent_id FROM file WHERE id = %s" file_query = "SELECT parent_id FROM file WHERE id = %s"
cursor.execute(file_query, (file_id,)) cursor.execute(file_query, (file_id,))
file_result = cursor.fetchone() file_info = cursor.fetchone()
if not file_info:
if not file_result:
raise Exception("无法找到文件记录") raise Exception("无法找到文件记录")
bucket_name = file_result['parent_id']
# 创建 MinIO 客户端
minio_client = get_minio_client()
# 检查桶是否存在
if not minio_client.bucket_exists(bucket_name):
raise Exception(f"存储桶不存在: {bucket_name}")
# 进度更新函数
def update_progress(prog=None, msg=None):
if prog is not None:
progress_query = "UPDATE document SET progress = %s WHERE id = %s"
cursor.execute(progress_query, (float(prog), doc_id))
conn.commit()
if msg is not None:
msg_query = "UPDATE document SET progress_msg = %s WHERE id = %s"
cursor.execute(msg_query, (msg, doc_id))
conn.commit()
if callback:
callback(prog, msg, doc_id)
# 从 MinIO 获取文件内容
file_location = doc['location']
try:
update_progress(0.1, f"正在从存储中获取文件: {file_location}")
response = minio_client.get_object(bucket_name, file_location)
file_content = response.read()
response.close()
update_progress(0.2, "文件获取成功,准备解析")
except Exception as e:
raise Exception(f"无法从存储中获取文件: {file_location}, 错误: {str(e)}")
# 解析配置
parser_config = json.loads(doc['parser_config']) if isinstance(doc['parser_config'], str) else doc['parser_config']
# 根据文件类型选择解析器
file_type = doc['type'].lower()
chunks = []
update_progress(0.2, "正在识别文档类型")
# 使用magic_pdf进行解析
if file_type.endswith('pdf'):
update_progress(0.3, "使用Magic PDF解析器")
# 创建临时文件保存PDF内容(路径C:\Users\username\AppData\Local\Temp)
temp_dir = tempfile.gettempdir()
temp_pdf_path = os.path.join(temp_dir, f"{doc_id}.pdf")
with open(temp_pdf_path, 'wb') as f:
f.write(file_content)
try:
# 使用您的脚本中的方法处理PDF
def magic_callback(prog, msg):
# 将进度映射到20%-90%范围
actual_prog = 0.2 + prog * 0.7
update_progress(actual_prog, msg)
# 初始化数据读取器
reader = FileBasedDataReader("")
pdf_bytes = reader.read(temp_pdf_path)
# 创建PDF数据集实例
ds = PymuDocDataset(pdf_bytes)
# 根据PDF类型选择处理方法
update_progress(0.3, "分析PDF类型")
if ds.classify() == SupportedPdfParseMethod.OCR:
update_progress(0.4, "使用OCR模式处理PDF")
infer_result = ds.apply(doc_analyze, ocr=True)
# 设置临时输出目录
temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}")
os.makedirs(temp_image_dir, exist_ok=True)
image_writer = FileBasedDataWriter(temp_image_dir)
update_progress(0.6, "处理OCR结果")
pipe_result = infer_result.pipe_ocr_mode(image_writer)
else:
update_progress(0.4, "使用文本模式处理PDF")
infer_result = ds.apply(doc_analyze, ocr=False)
# 设置临时输出目录
temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}")
os.makedirs(temp_image_dir, exist_ok=True)
image_writer = FileBasedDataWriter(temp_image_dir)
update_progress(0.6, "处理文本结果")
pipe_result = infer_result.pipe_txt_mode(image_writer)
# 获取内容列表
update_progress(0.8, "提取内容")
content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir))
print(f"开始保存解析结果到MinIO文档ID: {doc_id}")
# 处理内容列表
update_progress(0.95, "保存解析结果")
# 获取或创建MinIO桶
kb_id = doc['kb_id']
minio_client = get_minio_client()
if not minio_client.bucket_exists(kb_id):
minio_client.make_bucket(kb_id)
print(f"创建MinIO桶: {kb_id}")
# 使用content_list而不是chunks变量 cursor.close()
print(f"解析得到内容块数量: {len(content_list)}") conn.close()
conn = None # 确保连接已关闭
# 处理内容列表并创建文档块
document_info = {
"doc_id": doc_id,
"doc_name": doc['name'],
"kb_id": kb_id
}
# TODO: 对于块的预处理
# 合并内容块
# chunk_token_num = parser_config.get("chunk_token_num", 512)
# delimiter = parser_config.get("delimiter", "\n!?;。;!?")
# merged_chunks = merge_chunks(content_list, chunk_token_num, delimiter)
# 处理文档块
# processed_chunks = process_document_chunks(merged_chunks, document_info)
# 直接使用原始内容列表,不进行合并和处理
# processed_chunks = []
print(f"[DEBUG] 开始处理内容列表,共 {len(content_list)} 个原始内容块")
# for i, content in enumerate(content_list):
# if not content.strip():
# continue
# chunk_data = document_info.copy()
# chunk_data["content"] = content
# chunk_data["tokens"] = tokenize_text(content)
# processed_chunks.append(chunk_data)
print(f"[DEBUG] 开始上传到MinIO目标桶: {kb_id}") # 2. 更新文档状态为处理中 (使用 parser 模块的函数)
_update_document_progress(doc_id, status='2', run='1', progress=0.0, message='开始解析')
# 获取Elasticsearch客户端
es_client = get_es_client()
# 获取tenant_id (文档创建者id)
tenant_query = """
SELECT created_by FROM document WHERE id = %s
"""
cursor.execute(tenant_query, (doc_id,))
tenant_result = cursor.fetchone()
tenant_id = tenant_result['created_by']
print(f"[DEBUG] 文档 {doc_id} 的tenant_id: {tenant_id}")
# 确保索引存在 # 3. 调用后台解析函数
index_name = f"ragflow_{tenant_id}" parse_result = perform_parse(doc_id, doc_info, file_info)
if not es_client.indices.exists(index=index_name):
# 创建索引设置为0个副本
es_client.indices.create(
index=index_name,
body={
"settings": {
"number_of_shards": 2,
"number_of_replicas": 0 # 单节点环境设置为0个副本
},
"mappings": {
"properties": {
"doc_id": {"type": "keyword"},
"kb_id": {"type": "keyword"},
"docnm_kwd": {"type": "keyword"},
"title_tks": {"type": "keyword"},
"title_sm_tks": {"type": "keyword"},
"content_with_weight": {"type": "text"},
"content_ltks": {"type": "keyword"},
"content_sm_ltks": {"type": "keyword"},
"page_num_int": {"type": "integer"},
"position_int": {"type": "integer"},
"top_int": {"type": "integer"},
"create_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"create_timestamp_flt": {"type": "float"},
"img_id": {"type": "keyword"},
"q_1024_vec": {"type": "keyword"}
}
}
}
)
# 处理内容块并上传到MinIO # 4. 返回解析结果
chunk_count = 0 return parse_result
chunk_ids_list = []
for chunk_idx, chunk_data in enumerate(content_list):
if chunk_data["type"] == "text":
content = chunk_data["text"]
if not content.strip():
print(f"[DEBUG] 跳过空文本块 {chunk_idx}")
continue
# 生成唯一ID
chunk_id = generate_uuid()
try:
# 1. 上传到MinIO
minio_client.put_object(
bucket_name=kb_id,
object_name=chunk_id,
data=BytesIO(content.encode('utf-8')),
length=len(content)
)
# 分词处理
content_tokens = tokenize_text(content)
# 获取当前时间
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_timestamp = datetime.now().timestamp()
# 创建Elasticsearch文档
es_doc = {
"doc_id": doc_id,
"kb_id": kb_id,
"docnm_kwd": doc['name'],
"title_tks": doc['name'], # 简化处理,使用文档名作为标题
"title_sm_tks": doc['name'], # 简化处理
"content_with_weight": content,
"content_ltks": content_tokens,
"content_sm_ltks": content_tokens, # 简化处理
"page_num_int": [1], # 默认页码
"position_int": [[1, 0, 0, 0, 0]], # 默认位置
"top_int": [1], # 默认顶部位置
"create_time": current_time,
"create_timestamp_flt": current_timestamp,
"img_id": "", # 如果没有图片,置空
"q_1024_vec": []
}
# 2. 存储到Elasticsearch
es_client.index(
index=index_name,
# id=es_chunk_id,
body=es_doc
)
chunk_count += 1
chunk_ids_list.append(chunk_id)
print(f"成功上传文本块 {chunk_count}/{len(content_list)}")
except Exception as e:
print(f"上传文本块失败: {str(e)}")
continue
elif chunk_data["type"] == "image":
print(f"[INFO] 处理图像块: {chunk_data['img_path']}")
try:
# 获取图片路径
img_path = chunk_data['img_path']
# 检查是否为相对路径,如果是则添加临时目录前缀
if not os.path.isabs(img_path):
# 使用临时图片目录作为基础路径
img_path = os.path.join(temp_image_dir, os.path.basename(img_path))
print(f"[INFO] 转换为绝对路径: {img_path}")
if os.path.exists(img_path):
# 生成图片ID和存储路径
img_id = generate_uuid()
img_key = f"images/{img_id}{os.path.splitext(img_path)[1]}"
# 读取图片内容
with open(img_path, 'rb') as img_file:
img_data = img_file.read()
# 设置图片的Content-Type
content_type = f"image/{os.path.splitext(img_path)[1][1:].lower()}"
if content_type == "image/jpg":
content_type = "image/jpeg"
# 上传图片到MinIO
minio_client.put_object(
bucket_name=kb_id,
object_name=img_key,
data=BytesIO(img_data),
length=len(img_data),
content_type=content_type
)
# 设置图片的公共访问权限
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{kb_id}/{img_key}"]
}
]
}
minio_client.set_bucket_policy(kb_id, json.dumps(policy))
print(f"[SUCCESS] 成功上传图片: {img_key}")
else:
print(f"[WARNING] 图片文件不存在: {img_path}")
except Exception as e:
print(f"[ERROR] 上传图片失败: {str(e)}")
continue
# 更新文档状态和块数量
final_update = """
UPDATE document
SET status = '1', run = '3', progress = 1.0,
progress_msg = '解析完成', chunk_num = %s,
process_duation = %s
WHERE id = %s
"""
cursor.execute(final_update, (chunk_count, 0.0, doc_id))
conn.commit()
print(f"[INFO] document表更新完成文档ID: {doc_id}")
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 更新知识库文档数量
kb_update = """
UPDATE knowledgebase
SET chunk_num = chunk_num + %s,
update_date = %s
WHERE id = %s
"""
cursor.execute(kb_update, (chunk_count, current_date, kb_id))
conn.commit()
print(f"[INFO] knowledgebase表更新完成文档ID: {doc_id}")
# 生成task记录
task_id = generate_uuid()
# 获取当前时间
current_datetime = datetime.now()
current_timestamp = int(current_datetime.timestamp() * 1000) # 毫秒级时间戳
current_time = current_datetime.strftime("%Y-%m-%d %H:%M:%S") # 格式化日期时间
current_date_only = current_datetime.strftime("%Y-%m-%d") # 仅日期
digest = f"{doc_id}_{0}_{1}"
# 将chunk_ids列表转为JSON字符串
chunk_ids_str = ' '.join(chunk_ids_list)
task_insert = """
INSERT INTO task (
id, create_time, create_date, update_time, update_date,
doc_id, from_page, to_page, begin_at, process_duation,
progress, progress_msg, retry_count, digest, chunk_ids, task_type
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s
)
"""
task_params = [
task_id, current_timestamp, current_date_only, current_timestamp, current_date_only,
doc_id, 0, 1, None, 0.0,
1.0, "MinerU解析完成", 1, digest, chunk_ids_str, ""
]
cursor.execute(task_insert, task_params)
conn.commit()
update_progress(1.0, "解析完成")
print(f"[INFO] 解析完成文档ID: {doc_id}")
cursor.close()
conn.close()
# 清理临时文件
try:
os.remove(temp_pdf_path)
shutil.rmtree(temp_image_dir, ignore_errors=True)
except:
pass
return {
"success": True,
"chunk_count": chunk_count
}
except Exception as e:
print(f"出现异常: {str(e)}")
except Exception as e: except Exception as e:
print(f"文档解析失败: {str(e)}") print(f"文档解析启动或执行过程中出错 (Doc ID: {doc_id}): {str(e)}")
# 更新文档状态为失败 # 确保在异常时更新状态为失败
try: try:
error_update = """ _update_document_progress(doc_id, status='1', run='0', message=f"解析失败: {str(e)}")
UPDATE document except Exception as update_err:
SET status = '1', run = '0', progress_msg = %s print(f"更新文档失败状态时出错 (Doc ID: {doc_id}): {str(update_err)}")
WHERE id = %s # 向上层抛出异常或返回错误信息
""" # raise Exception(f"文档解析失败: {str(e)}")
cursor.execute(error_update, (f"解析失败: {str(e)}", doc_id)) return {"success": False, "error": f"文档解析失败: {str(e)}"}
conn.commit()
finally:
if cursor:
cursor.close() cursor.close()
if conn:
conn.close() conn.close()
except:
pass
raise Exception(f"文档解析失败: {str(e)}")
@classmethod @classmethod
def async_parse_document(cls, doc_id): def async_parse_document(cls, doc_id):
"""异步解析文档""" """异步解析文档"""
try: try:
# 先立即返回响应,表示任务已开始 # 启动后台线程执行同步的 parse_document 方法
thread = threading.Thread(target=cls.parse_document, args=(doc_id,)) thread = threading.Thread(target=cls.parse_document, args=(doc_id,))
thread.daemon = True thread.daemon = True # 设置为守护线程,主程序退出时线程也退出
thread.start() thread.start()
# 立即返回,表示任务已提交
return { return {
"task_id": doc_id, "task_id": doc_id, # 使用 doc_id 作为任务标识符
"status": "processing", "status": "processing",
"message": "文档解析已开始" "message": "文档解析任务已提交到后台处理"
} }
except Exception as e: except Exception as e:
current_app.logger.error(f"启动解析任务失败: {str(e)}") print(f"启动异步解析任务失败 (Doc ID: {doc_id}): {str(e)}")
raise Exception(f"启动解析任务失败: {str(e)}") # 可以在这里尝试更新文档状态为失败
try:
_update_document_progress(doc_id, status='1', run='0', message=f"启动解析失败: {str(e)}")
except Exception as update_err:
print(f"更新文档启动失败状态时出错 (Doc ID: {doc_id}): {str(update_err)}")
raise Exception(f"启动异步解析任务失败: {str(e)}")
@classmethod @classmethod
def get_document_parse_progress(cls, doc_id): def get_document_parse_progress(cls, doc_id):
"""获取文档解析进度 - 添加缓存机制""" """获取文档解析进度"""
conn = None
# 正常数据库查询 cursor = None
conn = cls._get_db_connection() try:
cursor = conn.cursor(dictionary=True) conn = cls._get_db_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT progress, progress_msg, status, run query = """
FROM document SELECT progress, progress_msg, status, run
WHERE id = %s FROM document
""" WHERE id = %s
cursor.execute(query, (doc_id,)) """
result = cursor.fetchone() cursor.execute(query, (doc_id,))
result = cursor.fetchone()
cursor.close()
conn.close() if not result:
return {"error": "文档不存在"}
if not result:
return {"error": "文档不存在"} # 确保 progress 是浮点数
progress_value = 0.0
if result.get("progress") is not None:
return { try:
"progress": float(result["progress"]), progress_value = float(result["progress"])
"message": result["progress_msg"], except (ValueError, TypeError):
"status": result["status"], progress_value = 0.0 # 或记录错误
"running": result["run"] == "1"
} return {
"""获取文档解析进度 "progress": progress_value,
"message": result.get("progress_msg", ""),
Args: "status": result.get("status", "0"),
doc_id: 文档ID "running": result.get("run", "0"),
}
Returns:
解析进度信息 except Exception as e:
""" print(f"获取文档进度失败 (Doc ID: {doc_id}): {str(e)}")
conn = cls._get_db_connection() return {"error": f"获取进度失败: {str(e)}"}
cursor = conn.cursor(dictionary=True) finally:
if cursor:
query = """ cursor.close()
SELECT progress, progress_msg, status, run if conn:
FROM document conn.close()
WHERE id = %s
"""
cursor.execute(query, (doc_id,))
result = cursor.fetchone()
cursor.close()
conn.close()
if not result:
return {"error": "文档不存在"}
return {
"progress": float(result["progress"]),
"message": result["progress_msg"],
"status": result["status"],
"running": result["run"] == "1"
}

View File

@ -1,7 +1,7 @@
# 开发环境的环境变量(命名必须以 VITE_ 开头) # 开发环境的环境变量(命名必须以 VITE_ 开头)
## 后端接口地址(如果解决跨域问题采用反向代理就只需写相对路径) ## 后端接口地址(如果解决跨域问题采用反向代理就只需写相对路径)
VITE_BASE_URL = http://localhost:5000 VITE_BASE_URL = ""
## 开发环境域名和静态资源公共路径(一般 / 或 ./ 都可以) ## 开发环境域名和静态资源公共路径(一般 / 或 ./ 都可以)
VITE_PUBLIC_PATH = / VITE_PUBLIC_PATH = /

View File

@ -1,3 +1,4 @@
<!-- eslint-disable vue/custom-event-name-casing -->
<script> <script>
import { getDocumentParseProgress } from "@@/apis/kbs/document" import { getDocumentParseProgress } from "@@/apis/kbs/document"
@ -118,11 +119,10 @@ export default {
} }
// //
if (data.status === "1" && !data.running) { if (data.running === "3") {
this.isCompleted = true this.isCompleted = true
this.progressStatus = "success" this.progressStatus = "success"
this.stopPolling() this.stopPolling()
// eslint-disable-next-line vue/custom-event-name-casing
this.$emit("parse-complete") this.$emit("parse-complete")
} }
@ -131,7 +131,6 @@ export default {
this.hasError = true this.hasError = true
this.progressStatus = "exception" this.progressStatus = "exception"
this.stopPolling() this.stopPolling()
// eslint-disable-next-line vue/custom-event-name-casing
this.$emit("parse-failed", data.message || "解析失败") this.$emit("parse-failed", data.message || "解析失败")
} }
} }