diff --git a/management/server/magic_pdf_parser.py b/management/server/magic_pdf_parser.py new file mode 100644 index 0000000..14b82cc --- /dev/null +++ b/management/server/magic_pdf_parser.py @@ -0,0 +1,141 @@ +import os +from io import BytesIO + +from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader +from magic_pdf.data.dataset import PymuDocDataset +from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze +from magic_pdf.config.enums import SupportedPdfParseMethod + +def process_pdf_with_magic(file_content, callback=None): + """ + 使用magic_pdf处理PDF文件 + + Args: + file_content: PDF文件内容 + callback: 回调函数,用于更新进度 + + Returns: + 解析后的内容列表 + """ + try: + from magic_pdf.processor import PDFProcessor + from magic_pdf.extractor import TextExtractor, ImageExtractor + + if callback: + callback(0.1, "初始化Magic PDF解析器") + + # 创建临时文件 + temp_dir = os.path.join(os.getcwd(), "temp") + os.makedirs(temp_dir, exist_ok=True) + + temp_pdf_path = os.path.join(temp_dir, "temp.pdf") + with open(temp_pdf_path, "wb") as f: + f.write(file_content) + + if callback: + callback(0.2, "开始解析PDF") + + # 初始化处理器 + processor = PDFProcessor(temp_pdf_path) + + if callback: + callback(0.3, "提取文本内容") + + # 提取文本 + text_extractor = TextExtractor(processor) + text_content = text_extractor.extract() + + if callback: + callback(0.5, "提取图片内容") + + # 提取图片 + image_extractor = ImageExtractor(processor) + images = image_extractor.extract() + + if callback: + callback(0.7, "组织解析结果") + + # 组织结果 + content_list = [] + + # 添加文本内容 + for page_num, page_text in enumerate(text_content): + content_list.append({ + "type": "text", + "page": page_num + 1, + "text": page_text + }) + + # 添加图片内容 + for i, img in enumerate(images): + content_list.append({ + "type": "image", + "page": img.get("page", i + 1), + "image_path": img.get("path", ""), + "caption": img.get("caption", "") + }) + + # 清理临时文件 + try: + os.remove(temp_pdf_path) + except: + pass + + if callback: + callback(1.0, "PDF解析完成") + + return content_list + + except ImportError: + # 如果magic_pdf未安装,使用简单的文本提取 + if callback: + callback(0.2, "Magic PDF未安装,使用备用方法") + + try: + import PyPDF2 + + if callback: + callback(0.3, "使用PyPDF2提取文本") + + pdf_reader = PyPDF2.PdfReader(BytesIO(file_content)) + content_list = [] + + for i, page in enumerate(pdf_reader.pages): + if callback and i % 5 == 0: + progress = 0.3 + (i / len(pdf_reader.pages)) * 0.6 + callback(progress, f"正在处理第 {i+1}/{len(pdf_reader.pages)} 页") + + text = page.extract_text() + if text: + content_list.append({ + "type": "text", + "page": i + 1, + "text": text + }) + + if callback: + callback(0.9, "文本提取完成") + + return content_list + + except Exception as e: + if callback: + callback(0.5, f"PDF解析失败: {str(e)}") + + # 最简单的备用方案 + return [{ + "type": "text", + "page": 1, + "text": "无法解析PDF文件内容" + }] + + except Exception as e: + if callback: + callback(0.5, f"PDF解析失败: {str(e)}") + + # 出错时返回空列表 + return [{ + "type": "text", + "page": 1, + "text": f"解析失败: {str(e)}" + }] \ No newline at end of file diff --git a/management/server/requirements.txt b/management/server/requirements.txt index 1de9102..9dfc3f1 100644 --- a/management/server/requirements.txt +++ b/management/server/requirements.txt @@ -5,4 +5,6 @@ pycryptodomex==3.20.0 tabulate==0.9.0 Werkzeug==3.1.3 PyJWT==2.10.1 -dotenv==0.9.9 \ No newline at end of file +dotenv==0.9.9 +magic-pdf[full]==1.3.0 +transformers==4.49.0 \ No newline at end of file diff --git a/management/server/routes/knowledgebases/routes.py b/management/server/routes/knowledgebases/routes.py index 347a437..85a033a 100644 --- a/management/server/routes/knowledgebases/routes.py +++ b/management/server/routes/knowledgebases/routes.py @@ -157,4 +157,41 @@ def delete_document(doc_id): KnowledgebaseService.delete_document(doc_id) return success_response(message="删除成功") except Exception as e: - return error_response(str(e)) \ No newline at end of file + return error_response(str(e)) + +@knowledgebase_bp.route('/documents//parse', methods=['POST']) +def parse_document(doc_id): + """开始解析文档""" + # 处理 OPTIONS 预检请求 + if request.method == 'OPTIONS': + response = success_response({}) + # 添加 CORS 相关头 + response.headers.add('Access-Control-Allow-Methods', 'POST') + response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') + return response + + try: + result = KnowledgebaseService.async_parse_document(doc_id) + return success_response(data=result) + except Exception as e: + return error_response(str(e), code=500) + +@knowledgebase_bp.route('/documents//parse/progress', methods=['GET']) +def get_parse_progress(doc_id): + """获取文档解析进度""" + # 处理 OPTIONS 预检请求 + if request.method == 'OPTIONS': + response = success_response({}) + # 添加 CORS 相关头 + response.headers.add('Access-Control-Allow-Methods', 'GET') + response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') + return response + + try: + result = KnowledgebaseService.get_document_parse_progress(doc_id) + if isinstance(result, dict) and 'error' in result: + return error_response(result['error'], code=404) + return success_response(data=result) + except Exception as e: + current_app.logger.error(f"获取解析进度失败: {str(e)}") + return error_response("解析进行中,请稍后重试", code=202) \ No newline at end of file diff --git a/management/server/services/knowledgebases/service.py b/management/server/services/knowledgebases/service.py index eb6452d..3a98eda 100644 --- a/management/server/services/knowledgebases/service.py +++ b/management/server/services/knowledgebases/service.py @@ -3,7 +3,73 @@ import json from flask import current_app from datetime import datetime from utils import generate_uuid -from database import DB_CONFIG +from database import DB_CONFIG, get_minio_client +import io +import os +import json +import threading +import time +import tempfile +import shutil +from io import BytesIO + +# 解析相关模块 +from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader +from magic_pdf.data.dataset import PymuDocDataset +from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze +from magic_pdf.config.enums import SupportedPdfParseMethod + +# 自定义tokenizer和文本处理函数,替代rag.nlp中的功能 +def tokenize_text(text): + """将文本分词,替代rag_tokenizer功能""" + # 简单实现,实际应用中可能需要更复杂的分词逻辑 + return text.split() + +def merge_chunks(sections, chunk_token_num=128, delimiter="\n。;!?"): + """合并文本块,替代naive_merge功能""" + if not sections: + return [] + + chunks = [""] + token_counts = [0] + + for section in sections: + # 计算当前部分的token数量 + text = section[0] if isinstance(section, tuple) else section + position = section[1] if isinstance(section, tuple) and len(section) > 1 else "" + + # 简单估算token数量 + token_count = len(text.split()) + + # 如果当前chunk已经超过限制,创建新chunk + if token_counts[-1] > chunk_token_num: + chunks.append(text) + token_counts.append(token_count) + else: + # 否则添加到当前chunk + chunks[-1] += text + token_counts[-1] += token_count + + return chunks + +def process_document_chunks(chunks, document_info): + """处理文档块,替代tokenize_chunks功能""" + results = [] + + for chunk in chunks: + if not chunk.strip(): + continue + + # 创建文档块对象 + chunk_data = document_info.copy() + chunk_data["content"] = chunk + chunk_data["tokens"] = tokenize_text(chunk) + + results.append(chunk_data) + + return results + + class KnowledgebaseService: @@ -658,4 +724,399 @@ class KnowledgebaseService: except Exception as e: print(f"[ERROR] 删除文档失败: {str(e)}") - raise Exception(f"删除文档失败: {str(e)}") \ No newline at end of file + raise Exception(f"删除文档失败: {str(e)}") + + @classmethod + def parse_document(cls, doc_id, callback=None): + """解析文档并提供进度反馈""" + conn = None + cursor = None + try: + # 获取文档信息 + conn = cls._get_db_connection() + cursor = conn.cursor(dictionary=True) + + # 查询文档信息 + query = """ + SELECT d.id, d.name, d.location, d.type, d.kb_id, d.parser_id, d.parser_config + FROM document d + WHERE d.id = %s + """ + cursor.execute(query, (doc_id,)) + doc = cursor.fetchone() + + if not doc: + raise Exception("文档不存在") + + # 更新文档状态为处理中 + update_query = """ + UPDATE document + SET status = '2', run = '1', progress = 0.0, progress_msg = '开始解析' + WHERE id = %s + """ + cursor.execute(update_query, (doc_id,)) + conn.commit() + + # 获取文件ID和桶ID + f2d_query = "SELECT file_id FROM file2document WHERE document_id = %s" + cursor.execute(f2d_query, (doc_id,)) + f2d_result = cursor.fetchone() + + if not f2d_result: + raise Exception("无法找到文件到文档的映射关系") + + file_id = f2d_result['file_id'] + + file_query = "SELECT parent_id FROM file WHERE id = %s" + cursor.execute(file_query, (file_id,)) + file_result = cursor.fetchone() + + if not file_result: + raise Exception("无法找到文件记录") + + bucket_name = file_result['parent_id'] + + # 创建 MinIO 客户端 + minio_client = get_minio_client() + + # 检查桶是否存在 + if not minio_client.bucket_exists(bucket_name): + raise Exception(f"存储桶不存在: {bucket_name}") + + # 进度更新函数 + def update_progress(prog=None, msg=None): + if prog is not None: + progress_query = "UPDATE document SET progress = %s WHERE id = %s" + cursor.execute(progress_query, (float(prog), doc_id)) + conn.commit() + + if msg is not None: + msg_query = "UPDATE document SET progress_msg = %s WHERE id = %s" + cursor.execute(msg_query, (msg, doc_id)) + conn.commit() + + if callback: + callback(prog, msg, doc_id) + + # 从 MinIO 获取文件内容 + file_location = doc['location'] + try: + update_progress(0.1, f"正在从存储中获取文件: {file_location}") + response = minio_client.get_object(bucket_name, file_location) + file_content = response.read() + response.close() + update_progress(0.2, "文件获取成功,准备解析") + except Exception as e: + raise Exception(f"无法从存储中获取文件: {file_location}, 错误: {str(e)}") + + # 解析配置 + parser_config = json.loads(doc['parser_config']) if isinstance(doc['parser_config'], str) else doc['parser_config'] + + # 根据文件类型选择解析器 + file_type = doc['type'].lower() + chunks = [] + + update_progress(0.2, "正在识别文档类型") + + # 使用magic_pdf进行解析 + if file_type.endswith('pdf'): + update_progress(0.3, "使用Magic PDF解析器") + + # 创建临时文件保存PDF内容(路径:C:\Users\username\AppData\Local\Temp) + temp_dir = tempfile.gettempdir() + temp_pdf_path = os.path.join(temp_dir, f"{doc_id}.pdf") + with open(temp_pdf_path, 'wb') as f: + f.write(file_content) + + try: + # 使用您的脚本中的方法处理PDF + def magic_callback(prog, msg): + # 将进度映射到20%-90%范围 + actual_prog = 0.2 + prog * 0.7 + update_progress(actual_prog, msg) + + # 初始化数据读取器 + reader = FileBasedDataReader("") + pdf_bytes = reader.read(temp_pdf_path) + + # 创建PDF数据集实例 + ds = PymuDocDataset(pdf_bytes) + + # 根据PDF类型选择处理方法 + update_progress(0.3, "分析PDF类型") + if ds.classify() == SupportedPdfParseMethod.OCR: + update_progress(0.4, "使用OCR模式处理PDF") + infer_result = ds.apply(doc_analyze, ocr=True) + + # 设置临时输出目录 + temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}") + os.makedirs(temp_image_dir, exist_ok=True) + image_writer = FileBasedDataWriter(temp_image_dir) + + update_progress(0.6, "处理OCR结果") + pipe_result = infer_result.pipe_ocr_mode(image_writer) + else: + update_progress(0.4, "使用文本模式处理PDF") + infer_result = ds.apply(doc_analyze, ocr=False) + + # 设置临时输出目录 + temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}") + os.makedirs(temp_image_dir, exist_ok=True) + image_writer = FileBasedDataWriter(temp_image_dir) + + update_progress(0.6, "处理文本结果") + pipe_result = infer_result.pipe_txt_mode(image_writer) + + # 获取内容列表 + update_progress(0.8, "提取内容") + content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir)) + + print(f"开始保存解析结果到MinIO,文档ID: {doc_id}") + # 处理内容列表 + update_progress(0.95, "保存解析结果") + + # 获取或创建MinIO桶 + kb_id = doc['kb_id'] + minio_client = get_minio_client() + if not minio_client.bucket_exists(kb_id): + minio_client.make_bucket(kb_id) + print(f"创建MinIO桶: {kb_id}") + + # 使用content_list而不是chunks变量 + print(f"解析得到内容块数量: {len(content_list)}") + + # 处理内容列表并创建文档块 + document_info = { + "doc_id": doc_id, + "doc_name": doc['name'], + "kb_id": kb_id + } + + # TODO: 对于块的预处理 + # 合并内容块 + # chunk_token_num = parser_config.get("chunk_token_num", 512) + # delimiter = parser_config.get("delimiter", "\n!?;。;!?") + # merged_chunks = merge_chunks(content_list, chunk_token_num, delimiter) + + # 处理文档块 + # processed_chunks = process_document_chunks(merged_chunks, document_info) + + # 直接使用原始内容列表,不进行合并和处理 + # processed_chunks = [] + print(f"[DEBUG] 开始处理内容列表,共 {len(content_list)} 个原始内容块") + + # for i, content in enumerate(content_list): + # if not content.strip(): + # continue + + # chunk_data = document_info.copy() + # chunk_data["content"] = content + # chunk_data["tokens"] = tokenize_text(content) + # processed_chunks.append(chunk_data) + + print(f"[DEBUG] 开始上传到MinIO,目标桶: {kb_id}") + + + # 处理内容块并上传到MinIO + chunk_count = 0 + chunk_ids_list = [] + for chunk_idx, chunk_data in enumerate(content_list): + if chunk_data["type"] == "text": + content = chunk_data["text"] + if not content.strip(): + print(f"[DEBUG] 跳过空文本块 {chunk_idx}") + continue + + chunk_id = generate_uuid() + + try: + minio_client.put_object( + bucket_name=kb_id, + object_name=chunk_id, + data=BytesIO(content.encode('utf-8')), + length=len(content) + ) + chunk_count += 1 + chunk_ids_list.append(chunk_id) + print(f"成功上传文本块 {chunk_count}/{len(content_list)}") + except Exception as e: + print(f"上传文本块失败: {str(e)}") + continue + + elif chunk_data["type"] == "image": + print(f"[INFO] 跳过图像块处理: {chunk_data['img_path']}") + continue + + # 更新文档状态和块数量 + final_update = """ + UPDATE document + SET status = '1', run = '3', progress = 1.0, + progress_msg = '解析完成', chunk_num = %s, + process_duation = %s + WHERE id = %s + """ + cursor.execute(final_update, (chunk_count, 0.0, doc_id)) + conn.commit() + print(f"[INFO] document表更新完成,文档ID: {doc_id}") + + current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # 更新知识库文档数量 + kb_update = """ + UPDATE knowledgebase + SET chunk_num = chunk_num + %s, + update_date = %s + WHERE id = %s + """ + cursor.execute(kb_update, (chunk_count, current_date, kb_id)) + conn.commit() + print(f"[INFO] knowledgebase表更新完成,文档ID: {doc_id}") + + # 生成task记录 + task_id = generate_uuid() + # 获取当前时间 + current_datetime = datetime.now() + current_timestamp = int(current_datetime.timestamp() * 1000) # 毫秒级时间戳 + current_time = current_datetime.strftime("%Y-%m-%d %H:%M:%S") # 格式化日期时间 + current_date_only = current_datetime.strftime("%Y-%m-%d") # 仅日期 + digest = f"{doc_id}_{0}_{1}" + + # 将chunk_ids列表转为JSON字符串 + chunk_ids_str = ' '.join(chunk_ids_list) + + task_insert = """ + INSERT INTO task ( + id, create_time, create_date, update_time, update_date, + doc_id, from_page, to_page, begin_at, process_duation, + progress, progress_msg, retry_count, digest, chunk_ids, task_type + ) VALUES ( + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s + ) + """ + + task_params = [ + task_id, current_timestamp, current_date_only, current_timestamp, current_date_only, + doc_id, 0, 1, None, 0.0, + 1.0, "MinerU解析完成", 1, digest, chunk_ids_str, "" + ] + + cursor.execute(task_insert, task_params) + conn.commit() + + update_progress(1.0, "解析完成") + print(f"[INFO] 解析完成,文档ID: {doc_id}") + cursor.close() + conn.close() + + # 清理临时文件 + try: + os.remove(temp_pdf_path) + shutil.rmtree(temp_image_dir, ignore_errors=True) + except: + pass + + return { + "success": True, + "chunk_count": chunk_count + } + + except Exception as e: + print(f"出现异常: {str(e)}") + + except Exception as e: + print(f"文档解析失败: {str(e)}") + # 更新文档状态为失败 + try: + error_update = """ + UPDATE document + SET status = '1', run = '0', progress_msg = %s + WHERE id = %s + """ + cursor.execute(error_update, (f"解析失败: {str(e)}", doc_id)) + conn.commit() + cursor.close() + conn.close() + except: + pass + + raise Exception(f"文档解析失败: {str(e)}") + + @classmethod + def async_parse_document(cls, doc_id): + """异步解析文档""" + try: + # 先立即返回响应,表示任务已开始 + thread = threading.Thread(target=cls.parse_document, args=(doc_id,)) + thread.daemon = True + thread.start() + + return { + "task_id": doc_id, + "status": "processing", + "message": "文档解析已开始" + } + except Exception as e: + current_app.logger.error(f"启动解析任务失败: {str(e)}") + raise Exception(f"启动解析任务失败: {str(e)}") + + @classmethod + def get_document_parse_progress(cls, doc_id): + """获取文档解析进度 - 添加缓存机制""" + + # 正常数据库查询 + conn = cls._get_db_connection() + cursor = conn.cursor(dictionary=True) + + query = """ + SELECT progress, progress_msg, status, run + FROM document + WHERE id = %s + """ + cursor.execute(query, (doc_id,)) + result = cursor.fetchone() + + cursor.close() + conn.close() + + if not result: + return {"error": "文档不存在"} + + + return { + "progress": float(result["progress"]), + "message": result["progress_msg"], + "status": result["status"], + "running": result["run"] == "1" + } + """获取文档解析进度 + + Args: + doc_id: 文档ID + + Returns: + 解析进度信息 + """ + conn = cls._get_db_connection() + cursor = conn.cursor(dictionary=True) + + query = """ + SELECT progress, progress_msg, status, run + FROM document + WHERE id = %s + """ + cursor.execute(query, (doc_id,)) + result = cursor.fetchone() + + cursor.close() + conn.close() + + if not result: + return {"error": "文档不存在"} + + return { + "progress": float(result["progress"]), + "message": result["progress_msg"], + "status": result["status"], + "running": result["run"] == "1" + } \ No newline at end of file diff --git a/management/web/src/common/apis/kbs/document.ts b/management/web/src/common/apis/kbs/document.ts index f4ab16d..7245957 100644 --- a/management/web/src/common/apis/kbs/document.ts +++ b/management/web/src/common/apis/kbs/document.ts @@ -32,6 +32,22 @@ export function getDocumentDetailApi(id: string) { }) } +// 获取文档解析进度 +export function getDocumentParseProgress(docId: any) { + return request({ + url: `/api/v1/knowledgebases/documents/${docId}/parse/progress`, + method: "get" + }) +} + +// 开始解析文档 +export function startDocumentParse(docId: any) { + return request({ + url: `/api/v1/knowledgebases/documents/${docId}/parse`, + method: "post" + }) +} + // 上传文档 export function uploadDocumentApi(formData: FormData): Promise { return request({ diff --git a/management/web/src/layouts/components/DocumentParseProgress/index.vue b/management/web/src/layouts/components/DocumentParseProgress/index.vue new file mode 100644 index 0000000..b26fd80 --- /dev/null +++ b/management/web/src/layouts/components/DocumentParseProgress/index.vue @@ -0,0 +1,218 @@ + + + + + diff --git a/management/web/src/pages/knowledgebase/index.vue b/management/web/src/pages/knowledgebase/index.vue index ccb2fb9..52ffc4f 100644 --- a/management/web/src/pages/knowledgebase/index.vue +++ b/management/web/src/pages/knowledgebase/index.vue @@ -1,6 +1,7 @@