From 6057163f2803be9b7eea722cb37ecffa29e6ded5 Mon Sep 17 00:00:00 2001 From: zstar <65890619+zstar1003@users.noreply.github.com> Date: Thu, 17 Apr 2025 16:31:20 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E8=A7=A3=E6=9E=90=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E5=A2=9E=E5=8A=A0):=20=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=A2=9E=E5=8A=A0=E5=AF=B9word=E5=92=8Cppt?= =?UTF-8?q?=E7=9A=84=E6=94=AF=E6=8C=81=E3=80=82=20(#32)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 增加对word和ppt的支持,需要安装LibreOffice。同时,将文档解析逻辑从 `KnowledgebaseService` 中提取到独立的 `document_parser.py` 模块,以提高代码的可维护性和复用性。同时优化了文件上传和临时文件处理的逻辑,确保资源正确释放。 --- management/server/services/files/service.py | 4 +- .../knowledgebases/document_parser.py | 400 +++++++++++ .../server/services/knowledgebases/service.py | 662 +++--------------- management/web/.env.development | 2 +- .../DocumentParseProgress/index.vue | 5 +- 5 files changed, 508 insertions(+), 565 deletions(-) create mode 100644 management/server/services/knowledgebases/document_parser.py diff --git a/management/server/services/files/service.py b/management/server/services/files/service.py index c5200d8..7aeccf9 100644 --- a/management/server/services/files/service.py +++ b/management/server/services/files/service.py @@ -1,6 +1,7 @@ import os import mysql.connector import re +import tempfile from io import BytesIO from minio import Minio from dotenv import load_dotenv @@ -15,7 +16,8 @@ from database import DB_CONFIG, MINIO_CONFIG # 加载环境变量 load_dotenv("../../docker/.env") -UPLOAD_FOLDER = '/data/uploads' +temp_dir = tempfile.gettempdir() +UPLOAD_FOLDER = os.path.join(temp_dir, "uploads") ALLOWED_EXTENSIONS = {'pdf', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx', 'jpg', 'jpeg', 'png', 'txt', 'md'} def allowed_file(filename): diff --git a/management/server/services/knowledgebases/document_parser.py b/management/server/services/knowledgebases/document_parser.py new file mode 100644 index 0000000..799cc30 --- /dev/null +++ b/management/server/services/knowledgebases/document_parser.py @@ -0,0 +1,400 @@ +import os +import tempfile +import shutil +import json +import mysql.connector +import time +import traceback +from io import BytesIO +from datetime import datetime +from elasticsearch import Elasticsearch +from database import MINIO_CONFIG, ES_CONFIG, DB_CONFIG, get_minio_client, get_es_client +from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader +from magic_pdf.data.dataset import PymuDocDataset +from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze +from magic_pdf.config.enums import SupportedPdfParseMethod +from magic_pdf.data.read_api import read_local_office +from utils import generate_uuid + +# 自定义tokenizer和文本处理函数,替代rag.nlp中的功能 +def tokenize_text(text): + """将文本分词,替代rag_tokenizer功能""" + # 简单实现,实际应用中可能需要更复杂的分词逻辑 + return text.split() + +def merge_chunks(sections, chunk_token_num=128, delimiter="\n。;!?"): + """合并文本块,替代naive_merge功能""" + if not sections: + return [] + + chunks = [""] + token_counts = [0] + + for section in sections: + # 计算当前部分的token数量 + text = section[0] if isinstance(section, tuple) else section + position = section[1] if isinstance(section, tuple) and len(section) > 1 else "" + + # 简单估算token数量 + token_count = len(text.split()) + + # 如果当前chunk已经超过限制,创建新chunk + if token_counts[-1] > chunk_token_num: + chunks.append(text) + token_counts.append(token_count) + else: + # 否则添加到当前chunk + chunks[-1] += text + token_counts[-1] += token_count + + return chunks + +def _get_db_connection(): + """创建数据库连接""" + return mysql.connector.connect(**DB_CONFIG) + +def _update_document_progress(doc_id, progress=None, message=None, status=None, run=None, chunk_count=None, process_duration=None): + """更新数据库中文档的进度和状态""" + conn = None + cursor = None + try: + conn = _get_db_connection() + cursor = conn.cursor() + updates = [] + params = [] + + if progress is not None: + updates.append("progress = %s") + params.append(float(progress)) + if message is not None: + updates.append("progress_msg = %s") + params.append(message) + if status is not None: + updates.append("status = %s") + params.append(status) + if run is not None: + updates.append("run = %s") + params.append(run) + if chunk_count is not None: + updates.append("chunk_num = %s") + params.append(chunk_count) + if process_duration is not None: + updates.append("process_duation = %s") + params.append(process_duration) + + + if not updates: + return + + query = f"UPDATE document SET {', '.join(updates)} WHERE id = %s" + params.append(doc_id) + cursor.execute(query, params) + conn.commit() + except Exception as e: + print(f"[Parser-ERROR] 更新文档 {doc_id} 进度失败: {e}") + finally: + if cursor: + cursor.close() + if conn: + conn.close() + +def _update_kb_chunk_count(kb_id, count_delta): + """更新知识库的块数量""" + conn = None + cursor = None + try: + conn = _get_db_connection() + cursor = conn.cursor() + current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + kb_update = """ + UPDATE knowledgebase + SET chunk_num = chunk_num + %s, + update_date = %s + WHERE id = %s + """ + cursor.execute(kb_update, (count_delta, current_date, kb_id)) + conn.commit() + except Exception as e: + print(f"[Parser-ERROR] 更新知识库 {kb_id} 块数量失败: {e}") + finally: + if cursor: + cursor.close() + if conn: + conn.close() + +def _create_task_record(doc_id, chunk_ids_list): + """创建task记录""" + conn = None + cursor = None + try: + conn = _get_db_connection() + cursor = conn.cursor() + task_id = generate_uuid() + current_datetime = datetime.now() + current_timestamp = int(current_datetime.timestamp() * 1000) + current_time_str = current_datetime.strftime("%Y-%m-%d %H:%M:%S") + current_date_only = current_datetime.strftime("%Y-%m-%d") + digest = f"{doc_id}_{0}_{1}" # 假设 from_page=0, to_page=1 + chunk_ids_str = ' '.join(chunk_ids_list) + + task_insert = """ + INSERT INTO task ( + id, create_time, create_date, update_time, update_date, + doc_id, from_page, to_page, begin_at, process_duation, + progress, progress_msg, retry_count, digest, chunk_ids, task_type + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + task_params = [ + task_id, current_timestamp, current_date_only, current_timestamp, current_date_only, + doc_id, 0, 1, None, 0.0, # begin_at, process_duration + 1.0, "MinerU解析完成", 1, digest, chunk_ids_str, "" # progress, msg, retry, digest, chunks, type + ] + cursor.execute(task_insert, task_params) + conn.commit() + print(f"[Parser-INFO] Task记录创建成功,Task ID: {task_id}") + except Exception as e: + print(f"[Parser-ERROR] 创建Task记录失败: {e}") + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + +def perform_parse(doc_id, doc_info, file_info): + """ + 执行文档解析的核心逻辑 + + Args: + doc_id (str): 文档ID. + doc_info (dict): 包含文档信息的字典 (name, location, type, kb_id, parser_config, created_by). + file_info (dict): 包含文件信息的字典 (parent_id/bucket_name). + + Returns: + dict: 包含解析结果的字典 (success, chunk_count). + """ + temp_pdf_path = None + temp_image_dir = None + start_time = time.time() + + try: + kb_id = doc_info['kb_id'] + file_location = doc_info['location'] + # 从文件路径中提取原始后缀名 + _, file_extension = os.path.splitext(file_location) + file_type = doc_info['type'].lower() + parser_config = json.loads(doc_info['parser_config']) if isinstance(doc_info['parser_config'], str) else doc_info['parser_config'] + bucket_name = file_info['parent_id'] # 文件存储的桶是 parent_id + tenant_id = doc_info['created_by'] # 文档创建者作为 tenant_id + + # 进度更新回调 (直接调用内部更新函数) + def update_progress(prog=None, msg=None): + _update_document_progress(doc_id, progress=prog, message=msg) + print(f"[Parser-PROGRESS] Doc: {doc_id}, Progress: {prog}, Message: {msg}") + + # 1. 从 MinIO 获取文件内容 + minio_client = get_minio_client() + if not minio_client.bucket_exists(bucket_name): + raise Exception(f"存储桶不存在: {bucket_name}") + + update_progress(0.1, f"正在从存储中获取文件: {file_location}") + response = minio_client.get_object(bucket_name, file_location) + file_content = response.read() + response.close() + update_progress(0.2, "文件获取成功,准备解析") + + # 2. 根据文件类型选择解析器 + content_list = [] + if file_type.endswith('pdf'): + update_progress(0.3, "使用MinerU解析器") + + # 创建临时文件保存PDF内容 + temp_dir = tempfile.gettempdir() + temp_pdf_path = os.path.join(temp_dir, f"{doc_id}.pdf") + with open(temp_pdf_path, 'wb') as f: + f.write(file_content) + + # 使用Magic PDF处理 + reader = FileBasedDataReader("") + pdf_bytes = reader.read(temp_pdf_path) + ds = PymuDocDataset(pdf_bytes) + + update_progress(0.3, "分析PDF类型") + is_ocr = ds.classify() == SupportedPdfParseMethod.OCR + mode_msg = "OCR模式" if is_ocr else "文本模式" + update_progress(0.4, f"使用{mode_msg}处理PDF") + + infer_result = ds.apply(doc_analyze, ocr=is_ocr) + + # 设置临时输出目录 + temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}") + os.makedirs(temp_image_dir, exist_ok=True) + image_writer = FileBasedDataWriter(temp_image_dir) + + update_progress(0.6, f"处理{mode_msg}结果") + pipe_result = infer_result.pipe_ocr_mode(image_writer) if is_ocr else infer_result.pipe_txt_mode(image_writer) + + update_progress(0.8, "提取内容") + content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir)) + + elif file_type.endswith('word') or file_type.endswith('ppt'): + update_progress(0.3, "使用MinerU解析器") + # 创建临时文件保存文件内容 + temp_dir = tempfile.gettempdir() + temp_file_path = os.path.join(temp_dir, f"{doc_id}{file_extension}") + with open(temp_file_path, 'wb') as f: + f.write(file_content) + + print(f"[Parser-INFO] 临时文件路径: {temp_file_path}") + # 使用MinerU处理 + ds = read_local_office(temp_file_path)[0] + infer_result = ds.apply(doc_analyze, ocr=True) + + # 设置临时输出目录 + temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}") + os.makedirs(temp_image_dir, exist_ok=True) + image_writer = FileBasedDataWriter(temp_image_dir) + + update_progress(0.6, f"处理文件结果") + pipe_result = infer_result.pipe_txt_mode(image_writer) + + update_progress(0.8, "提取内容") + content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir)) + + else: + update_progress(0.3, f"暂不支持的文件类型: {file_type}") + raise NotImplementedError(f"文件类型 '{file_type}' 的解析器尚未实现") + + # 3. 处理解析结果 (上传到MinIO, 存储到ES) + update_progress(0.95, "保存解析结果") + es_client = get_es_client() + # 注意:MinIO的桶应该是知识库ID (kb_id),而不是文件的 parent_id + output_bucket = kb_id + if not minio_client.bucket_exists(output_bucket): + minio_client.make_bucket(output_bucket) + print(f"[Parser-INFO] 创建MinIO桶: {output_bucket}") + + index_name = f"ragflow_{tenant_id}" + if not es_client.indices.exists(index=index_name): + # 创建索引 + es_client.indices.create( + index=index_name, + body={ + "settings": {"number_of_replicas": 0}, # 单节点设为0 + "mappings": { "properties": { "doc_id": {"type": "keyword"}, "kb_id": {"type": "keyword"}, "content_with_weight": {"type": "text"} } } # 简化字段 + } + ) + print(f"[Parser-INFO] 创建Elasticsearch索引: {index_name}") + + chunk_count = 0 + chunk_ids_list = [] + for chunk_idx, chunk_data in enumerate(content_list): + if chunk_data["type"] == "text": + content = chunk_data["text"] + if not content or not content.strip(): + continue + + chunk_id = generate_uuid() + try: + # 上传文本块到MinIO (桶为kb_id) + minio_client.put_object( + bucket_name=output_bucket, + object_name=chunk_id, + data=BytesIO(content.encode('utf-8')), + length=len(content.encode('utf-8')) # 使用字节长度 + ) + + # 准备ES文档 + content_tokens = tokenize_text(content) # 分词 + current_time_es = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_timestamp_es = datetime.now().timestamp() + + es_doc = { + "doc_id": doc_id, + "kb_id": kb_id, + "docnm_kwd": doc_info['name'], + "title_tks": doc_info['name'], + "title_sm_tks": doc_info['name'], + "content_with_weight": content, + "content_ltks": content_tokens, + "content_sm_ltks": content_tokens, + "page_num_int": [1], # 简化处理 + "position_int": [[1, 0, 0, 0, 0]], # 简化处理 + "top_int": [1], # 简化处理 + "create_time": current_time_es, + "create_timestamp_flt": current_timestamp_es, + "img_id": "", + "q_1024_vec": [] # 向量字段留空 + } + + # 存储到Elasticsearch + es_client.index(index=index_name, document=es_doc) # 使用 document 参数 + + chunk_count += 1 + chunk_ids_list.append(chunk_id) + print(f"成功处理文本块 {chunk_count}/{len(content_list)}") + + except Exception as e: + print(f"[Parser-ERROR] 处理文本块 {chunk_idx} 失败: {e}") + + elif chunk_data["type"] == "image": + img_path_relative = chunk_data.get('img_path') + if not img_path_relative or not temp_image_dir: + continue + + img_path_abs = os.path.join(temp_image_dir, os.path.basename(img_path_relative)) + if not os.path.exists(img_path_abs): + print(f"[Parser-WARNING] 图片文件不存在: {img_path_abs}") + continue + + img_id = generate_uuid() + img_ext = os.path.splitext(img_path_abs)[1] + img_key = f"images/{img_id}{img_ext}" # MinIO中的对象名 + content_type = f"image/{img_ext[1:].lower()}" + if content_type == "image/jpg": content_type = "image/jpeg" + + # try: + # # 上传图片到MinIO (桶为kb_id) + # minio_client.fput_object( + # bucket_name=output_bucket, + # object_name=img_key, + # file_path=img_path_abs, + # content_type=content_type + # ) + # print(f"成功上传图片: {img_key}") + # # 注意:设置公共访问权限可能需要额外配置MinIO服务器和存储桶策略 + + # except Exception as e: + # print(f"[Parser-ERROR] 上传图片 {img_path_abs} 失败: {e}") + + # 4. 更新最终状态 + process_duration = time.time() - start_time + _update_document_progress(doc_id, progress=1.0, message="解析完成", status='1', run='3', chunk_count=chunk_count, process_duration=process_duration) + _update_kb_chunk_count(kb_id, chunk_count) # 更新知识库总块数 + _create_task_record(doc_id, chunk_ids_list) # 创建task记录 + + update_progress(1.0, "解析完成") + print(f"[Parser-INFO] 解析完成,文档ID: {doc_id}, 耗时: {process_duration:.2f}s, 块数: {chunk_count}") + + return {"success": True, "chunk_count": chunk_count} + + except Exception as e: + process_duration = time.time() - start_time + # error_message = f"解析失败: {str(e)}" + print(f"[Parser-ERROR] 文档 {doc_id} 解析失败: {e}") + error_message = f"解析失败: 无法执行文件转换。请确保已正确安装LibreOffice,并将其添加到系统环境变量PATH中。" + traceback.print_exc() # 打印详细错误堆栈 + # 更新文档状态为失败 + _update_document_progress(doc_id, status='1', run='0', message=error_message, process_duration=process_duration) # status=1表示完成,run=0表示失败 + # 不抛出异常,让调用者知道任务已结束(但失败) + return {"success": False, "error": error_message} + + finally: + # 清理临时文件 + try: + if temp_pdf_path and os.path.exists(temp_pdf_path): + os.remove(temp_pdf_path) + if temp_image_dir and os.path.exists(temp_image_dir): + shutil.rmtree(temp_image_dir, ignore_errors=True) + except Exception as clean_e: + print(f"[Parser-WARNING] 清理临时文件失败: {clean_e}") \ No newline at end of file diff --git a/management/server/services/knowledgebases/service.py b/management/server/services/knowledgebases/service.py index 6709acb..a40713e 100644 --- a/management/server/services/knowledgebases/service.py +++ b/management/server/services/knowledgebases/service.py @@ -1,83 +1,19 @@ import mysql.connector import json -from flask import current_app +import threading from datetime import datetime from utils import generate_uuid -from database import DB_CONFIG, get_minio_client, get_es_client -import io -import os -import json -import threading -import time -import tempfile -import shutil -from elasticsearch import Elasticsearch -from io import BytesIO +from database import DB_CONFIG # 解析相关模块 -from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader -from magic_pdf.data.dataset import PymuDocDataset -from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze -from magic_pdf.config.enums import SupportedPdfParseMethod - -# 自定义tokenizer和文本处理函数,替代rag.nlp中的功能 -def tokenize_text(text): - """将文本分词,替代rag_tokenizer功能""" - # 简单实现,实际应用中可能需要更复杂的分词逻辑 - return text.split() - -def merge_chunks(sections, chunk_token_num=128, delimiter="\n。;!?"): - """合并文本块,替代naive_merge功能""" - if not sections: - return [] - - chunks = [""] - token_counts = [0] - - for section in sections: - # 计算当前部分的token数量 - text = section[0] if isinstance(section, tuple) else section - position = section[1] if isinstance(section, tuple) and len(section) > 1 else "" - - # 简单估算token数量 - token_count = len(text.split()) - - # 如果当前chunk已经超过限制,创建新chunk - if token_counts[-1] > chunk_token_num: - chunks.append(text) - token_counts.append(token_count) - else: - # 否则添加到当前chunk - chunks[-1] += text - token_counts[-1] += token_count - - return chunks - -def process_document_chunks(chunks, document_info): - """处理文档块,替代tokenize_chunks功能""" - results = [] - - for chunk in chunks: - if not chunk.strip(): - continue - - # 创建文档块对象 - chunk_data = document_info.copy() - chunk_data["content"] = chunk - chunk_data["tokens"] = tokenize_text(chunk) - - results.append(chunk_data) - - return results - - +from .document_parser import perform_parse, _update_document_progress class KnowledgebaseService: - + @classmethod def _get_db_connection(cls): - """Get database connection""" + """创建数据库连接""" return mysql.connector.connect(**DB_CONFIG) - + @classmethod def get_knowledgebase_list(cls, page=1, size=10, name=''): """获取知识库列表""" @@ -304,7 +240,7 @@ class KnowledgebaseService: return cls.get_knowledgebase_detail(kb_id) except Exception as e: - current_app.logger.error(f"创建知识库失败: {str(e)}") + print(f"创建知识库失败: {str(e)}") raise Exception(f"创建知识库失败: {str(e)}") @classmethod @@ -391,7 +327,7 @@ class KnowledgebaseService: return True except Exception as e: - current_app.logger.error(f"删除知识库失败: {str(e)}") + print(f"删除知识库失败: {str(e)}") raise Exception(f"删除知识库失败: {str(e)}") @classmethod @@ -422,7 +358,7 @@ class KnowledgebaseService: return len(kb_ids) except Exception as e: - current_app.logger.error(f"批量删除知识库失败: {str(e)}") + print(f"批量删除知识库失败: {str(e)}") raise Exception(f"批量删除知识库失败: {str(e)}") @classmethod @@ -483,15 +419,14 @@ class KnowledgebaseService: cursor.close() conn.close() - - print(results) + return { 'list': results, 'total': total } except Exception as e: - current_app.logger.error(f"获取知识库文档列表失败: {str(e)}") + print(f"获取知识库文档列表失败: {str(e)}") raise Exception(f"获取知识库文档列表失败: {str(e)}") @classmethod @@ -727,528 +662,135 @@ class KnowledgebaseService: raise Exception(f"删除文档失败: {str(e)}") @classmethod - def parse_document(cls, doc_id, callback=None): - """解析文档并提供进度反馈""" + def parse_document(cls, doc_id): + """解析文档(同步版本,调用后台解析逻辑)""" conn = None cursor = None try: - # 获取文档信息 + # 1. 获取文档和文件信息 conn = cls._get_db_connection() cursor = conn.cursor(dictionary=True) - + # 查询文档信息 - query = """ - SELECT d.id, d.name, d.location, d.type, d.kb_id, d.parser_id, d.parser_config + doc_query = """ + SELECT d.id, d.name, d.location, d.type, d.kb_id, d.parser_id, d.parser_config, d.created_by FROM document d WHERE d.id = %s """ - cursor.execute(query, (doc_id,)) - doc = cursor.fetchone() - - if not doc: - raise Exception("文档不存在") - - # 更新文档状态为处理中 - update_query = """ - UPDATE document - SET status = '2', run = '1', progress = 0.0, progress_msg = '开始解析' - WHERE id = %s - """ - cursor.execute(update_query, (doc_id,)) - conn.commit() + cursor.execute(doc_query, (doc_id,)) + doc_info = cursor.fetchone() - # 获取文件ID和桶ID + if not doc_info: + raise Exception("文档不存在") + + # 获取关联的文件信息 (主要是 parent_id 作为 bucket_name) f2d_query = "SELECT file_id FROM file2document WHERE document_id = %s" cursor.execute(f2d_query, (doc_id,)) f2d_result = cursor.fetchone() - if not f2d_result: raise Exception("无法找到文件到文档的映射关系") - file_id = f2d_result['file_id'] - + file_query = "SELECT parent_id FROM file WHERE id = %s" cursor.execute(file_query, (file_id,)) - file_result = cursor.fetchone() - - if not file_result: + file_info = cursor.fetchone() + if not file_info: raise Exception("无法找到文件记录") - - bucket_name = file_result['parent_id'] - - # 创建 MinIO 客户端 - minio_client = get_minio_client() - - # 检查桶是否存在 - if not minio_client.bucket_exists(bucket_name): - raise Exception(f"存储桶不存在: {bucket_name}") - - # 进度更新函数 - def update_progress(prog=None, msg=None): - if prog is not None: - progress_query = "UPDATE document SET progress = %s WHERE id = %s" - cursor.execute(progress_query, (float(prog), doc_id)) - conn.commit() - - if msg is not None: - msg_query = "UPDATE document SET progress_msg = %s WHERE id = %s" - cursor.execute(msg_query, (msg, doc_id)) - conn.commit() - - if callback: - callback(prog, msg, doc_id) - - # 从 MinIO 获取文件内容 - file_location = doc['location'] - try: - update_progress(0.1, f"正在从存储中获取文件: {file_location}") - response = minio_client.get_object(bucket_name, file_location) - file_content = response.read() - response.close() - update_progress(0.2, "文件获取成功,准备解析") - except Exception as e: - raise Exception(f"无法从存储中获取文件: {file_location}, 错误: {str(e)}") - - # 解析配置 - parser_config = json.loads(doc['parser_config']) if isinstance(doc['parser_config'], str) else doc['parser_config'] - - # 根据文件类型选择解析器 - file_type = doc['type'].lower() - chunks = [] - - update_progress(0.2, "正在识别文档类型") - - # 使用magic_pdf进行解析 - if file_type.endswith('pdf'): - update_progress(0.3, "使用Magic PDF解析器") - - # 创建临时文件保存PDF内容(路径:C:\Users\username\AppData\Local\Temp) - temp_dir = tempfile.gettempdir() - temp_pdf_path = os.path.join(temp_dir, f"{doc_id}.pdf") - with open(temp_pdf_path, 'wb') as f: - f.write(file_content) - - try: - # 使用您的脚本中的方法处理PDF - def magic_callback(prog, msg): - # 将进度映射到20%-90%范围 - actual_prog = 0.2 + prog * 0.7 - update_progress(actual_prog, msg) - - # 初始化数据读取器 - reader = FileBasedDataReader("") - pdf_bytes = reader.read(temp_pdf_path) - - # 创建PDF数据集实例 - ds = PymuDocDataset(pdf_bytes) - - # 根据PDF类型选择处理方法 - update_progress(0.3, "分析PDF类型") - if ds.classify() == SupportedPdfParseMethod.OCR: - update_progress(0.4, "使用OCR模式处理PDF") - infer_result = ds.apply(doc_analyze, ocr=True) - - # 设置临时输出目录 - temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}") - os.makedirs(temp_image_dir, exist_ok=True) - image_writer = FileBasedDataWriter(temp_image_dir) - - update_progress(0.6, "处理OCR结果") - pipe_result = infer_result.pipe_ocr_mode(image_writer) - else: - update_progress(0.4, "使用文本模式处理PDF") - infer_result = ds.apply(doc_analyze, ocr=False) - - # 设置临时输出目录 - temp_image_dir = os.path.join(temp_dir, f"images_{doc_id}") - os.makedirs(temp_image_dir, exist_ok=True) - image_writer = FileBasedDataWriter(temp_image_dir) - - update_progress(0.6, "处理文本结果") - pipe_result = infer_result.pipe_txt_mode(image_writer) - - # 获取内容列表 - update_progress(0.8, "提取内容") - content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir)) - - print(f"开始保存解析结果到MinIO,文档ID: {doc_id}") - # 处理内容列表 - update_progress(0.95, "保存解析结果") - - # 获取或创建MinIO桶 - kb_id = doc['kb_id'] - minio_client = get_minio_client() - if not minio_client.bucket_exists(kb_id): - minio_client.make_bucket(kb_id) - print(f"创建MinIO桶: {kb_id}") - # 使用content_list而不是chunks变量 - print(f"解析得到内容块数量: {len(content_list)}") - - # 处理内容列表并创建文档块 - document_info = { - "doc_id": doc_id, - "doc_name": doc['name'], - "kb_id": kb_id - } - - # TODO: 对于块的预处理 - # 合并内容块 - # chunk_token_num = parser_config.get("chunk_token_num", 512) - # delimiter = parser_config.get("delimiter", "\n!?;。;!?") - # merged_chunks = merge_chunks(content_list, chunk_token_num, delimiter) - - # 处理文档块 - # processed_chunks = process_document_chunks(merged_chunks, document_info) - - # 直接使用原始内容列表,不进行合并和处理 - # processed_chunks = [] - print(f"[DEBUG] 开始处理内容列表,共 {len(content_list)} 个原始内容块") - - # for i, content in enumerate(content_list): - # if not content.strip(): - # continue - - # chunk_data = document_info.copy() - # chunk_data["content"] = content - # chunk_data["tokens"] = tokenize_text(content) - # processed_chunks.append(chunk_data) + cursor.close() + conn.close() + conn = None # 确保连接已关闭 - print(f"[DEBUG] 开始上传到MinIO,目标桶: {kb_id}") - - # 获取Elasticsearch客户端 - es_client = get_es_client() - - # 获取tenant_id (文档创建者id) - tenant_query = """ - SELECT created_by FROM document WHERE id = %s - """ - cursor.execute(tenant_query, (doc_id,)) - tenant_result = cursor.fetchone() - tenant_id = tenant_result['created_by'] - print(f"[DEBUG] 文档 {doc_id} 的tenant_id: {tenant_id}") + # 2. 更新文档状态为处理中 (使用 parser 模块的函数) + _update_document_progress(doc_id, status='2', run='1', progress=0.0, message='开始解析') - # 确保索引存在 - index_name = f"ragflow_{tenant_id}" - if not es_client.indices.exists(index=index_name): - # 创建索引,设置为0个副本 - es_client.indices.create( - index=index_name, - body={ - "settings": { - "number_of_shards": 2, - "number_of_replicas": 0 # 单节点环境设置为0个副本 - }, - "mappings": { - "properties": { - "doc_id": {"type": "keyword"}, - "kb_id": {"type": "keyword"}, - "docnm_kwd": {"type": "keyword"}, - "title_tks": {"type": "keyword"}, - "title_sm_tks": {"type": "keyword"}, - "content_with_weight": {"type": "text"}, - "content_ltks": {"type": "keyword"}, - "content_sm_ltks": {"type": "keyword"}, - "page_num_int": {"type": "integer"}, - "position_int": {"type": "integer"}, - "top_int": {"type": "integer"}, - "create_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}, - "create_timestamp_flt": {"type": "float"}, - "img_id": {"type": "keyword"}, - "q_1024_vec": {"type": "keyword"} - } - } - } - ) + # 3. 调用后台解析函数 + parse_result = perform_parse(doc_id, doc_info, file_info) - # 处理内容块并上传到MinIO - chunk_count = 0 - chunk_ids_list = [] - for chunk_idx, chunk_data in enumerate(content_list): - if chunk_data["type"] == "text": - content = chunk_data["text"] - if not content.strip(): - print(f"[DEBUG] 跳过空文本块 {chunk_idx}") - continue - - # 生成唯一ID - chunk_id = generate_uuid() - - try: - # 1. 上传到MinIO - minio_client.put_object( - bucket_name=kb_id, - object_name=chunk_id, - data=BytesIO(content.encode('utf-8')), - length=len(content) - ) - - # 分词处理 - content_tokens = tokenize_text(content) - - # 获取当前时间 - current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - current_timestamp = datetime.now().timestamp() - - # 创建Elasticsearch文档 - es_doc = { - "doc_id": doc_id, - "kb_id": kb_id, - "docnm_kwd": doc['name'], - "title_tks": doc['name'], # 简化处理,使用文档名作为标题 - "title_sm_tks": doc['name'], # 简化处理 - "content_with_weight": content, - "content_ltks": content_tokens, - "content_sm_ltks": content_tokens, # 简化处理 - "page_num_int": [1], # 默认页码 - "position_int": [[1, 0, 0, 0, 0]], # 默认位置 - "top_int": [1], # 默认顶部位置 - "create_time": current_time, - "create_timestamp_flt": current_timestamp, - "img_id": "", # 如果没有图片,置空 - "q_1024_vec": [] - } - - # 2. 存储到Elasticsearch - es_client.index( - index=index_name, - # id=es_chunk_id, - body=es_doc - ) - - chunk_count += 1 - chunk_ids_list.append(chunk_id) - print(f"成功上传文本块 {chunk_count}/{len(content_list)}") - except Exception as e: - print(f"上传文本块失败: {str(e)}") - continue - - elif chunk_data["type"] == "image": - print(f"[INFO] 处理图像块: {chunk_data['img_path']}") - try: - # 获取图片路径 - img_path = chunk_data['img_path'] + # 4. 返回解析结果 + return parse_result - # 检查是否为相对路径,如果是则添加临时目录前缀 - if not os.path.isabs(img_path): - # 使用临时图片目录作为基础路径 - img_path = os.path.join(temp_image_dir, os.path.basename(img_path)) - print(f"[INFO] 转换为绝对路径: {img_path}") - - if os.path.exists(img_path): - # 生成图片ID和存储路径 - img_id = generate_uuid() - img_key = f"images/{img_id}{os.path.splitext(img_path)[1]}" - - # 读取图片内容 - with open(img_path, 'rb') as img_file: - img_data = img_file.read() - - # 设置图片的Content-Type - content_type = f"image/{os.path.splitext(img_path)[1][1:].lower()}" - if content_type == "image/jpg": - content_type = "image/jpeg" - - # 上传图片到MinIO - minio_client.put_object( - bucket_name=kb_id, - object_name=img_key, - data=BytesIO(img_data), - length=len(img_data), - content_type=content_type - ) - - # 设置图片的公共访问权限 - policy = { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": {"AWS": "*"}, - "Action": ["s3:GetObject"], - "Resource": [f"arn:aws:s3:::{kb_id}/{img_key}"] - } - ] - } - minio_client.set_bucket_policy(kb_id, json.dumps(policy)) - - print(f"[SUCCESS] 成功上传图片: {img_key}") - else: - print(f"[WARNING] 图片文件不存在: {img_path}") - except Exception as e: - print(f"[ERROR] 上传图片失败: {str(e)}") - continue - - # 更新文档状态和块数量 - final_update = """ - UPDATE document - SET status = '1', run = '3', progress = 1.0, - progress_msg = '解析完成', chunk_num = %s, - process_duation = %s - WHERE id = %s - """ - cursor.execute(final_update, (chunk_count, 0.0, doc_id)) - conn.commit() - print(f"[INFO] document表更新完成,文档ID: {doc_id}") - - current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - # 更新知识库文档数量 - kb_update = """ - UPDATE knowledgebase - SET chunk_num = chunk_num + %s, - update_date = %s - WHERE id = %s - """ - cursor.execute(kb_update, (chunk_count, current_date, kb_id)) - conn.commit() - print(f"[INFO] knowledgebase表更新完成,文档ID: {doc_id}") - - # 生成task记录 - task_id = generate_uuid() - # 获取当前时间 - current_datetime = datetime.now() - current_timestamp = int(current_datetime.timestamp() * 1000) # 毫秒级时间戳 - current_time = current_datetime.strftime("%Y-%m-%d %H:%M:%S") # 格式化日期时间 - current_date_only = current_datetime.strftime("%Y-%m-%d") # 仅日期 - digest = f"{doc_id}_{0}_{1}" - - # 将chunk_ids列表转为JSON字符串 - chunk_ids_str = ' '.join(chunk_ids_list) - - task_insert = """ - INSERT INTO task ( - id, create_time, create_date, update_time, update_date, - doc_id, from_page, to_page, begin_at, process_duation, - progress, progress_msg, retry_count, digest, chunk_ids, task_type - ) VALUES ( - %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s - ) - """ - - task_params = [ - task_id, current_timestamp, current_date_only, current_timestamp, current_date_only, - doc_id, 0, 1, None, 0.0, - 1.0, "MinerU解析完成", 1, digest, chunk_ids_str, "" - ] - - cursor.execute(task_insert, task_params) - conn.commit() - - update_progress(1.0, "解析完成") - print(f"[INFO] 解析完成,文档ID: {doc_id}") - cursor.close() - conn.close() - - # 清理临时文件 - try: - os.remove(temp_pdf_path) - shutil.rmtree(temp_image_dir, ignore_errors=True) - except: - pass - - return { - "success": True, - "chunk_count": chunk_count - } - - except Exception as e: - print(f"出现异常: {str(e)}") - except Exception as e: - print(f"文档解析失败: {str(e)}") - # 更新文档状态为失败 + print(f"文档解析启动或执行过程中出错 (Doc ID: {doc_id}): {str(e)}") + # 确保在异常时更新状态为失败 try: - error_update = """ - UPDATE document - SET status = '1', run = '0', progress_msg = %s - WHERE id = %s - """ - cursor.execute(error_update, (f"解析失败: {str(e)}", doc_id)) - conn.commit() + _update_document_progress(doc_id, status='1', run='0', message=f"解析失败: {str(e)}") + except Exception as update_err: + print(f"更新文档失败状态时出错 (Doc ID: {doc_id}): {str(update_err)}") + # 向上层抛出异常或返回错误信息 + # raise Exception(f"文档解析失败: {str(e)}") + return {"success": False, "error": f"文档解析失败: {str(e)}"} + + finally: + if cursor: cursor.close() + if conn: conn.close() - except: - pass - - raise Exception(f"文档解析失败: {str(e)}") @classmethod def async_parse_document(cls, doc_id): """异步解析文档""" try: - # 先立即返回响应,表示任务已开始 + # 启动后台线程执行同步的 parse_document 方法 thread = threading.Thread(target=cls.parse_document, args=(doc_id,)) - thread.daemon = True + thread.daemon = True # 设置为守护线程,主程序退出时线程也退出 thread.start() - + + # 立即返回,表示任务已提交 return { - "task_id": doc_id, + "task_id": doc_id, # 使用 doc_id 作为任务标识符 "status": "processing", - "message": "文档解析已开始" + "message": "文档解析任务已提交到后台处理" } except Exception as e: - current_app.logger.error(f"启动解析任务失败: {str(e)}") - raise Exception(f"启动解析任务失败: {str(e)}") + print(f"启动异步解析任务失败 (Doc ID: {doc_id}): {str(e)}") + # 可以在这里尝试更新文档状态为失败 + try: + _update_document_progress(doc_id, status='1', run='0', message=f"启动解析失败: {str(e)}") + except Exception as update_err: + print(f"更新文档启动失败状态时出错 (Doc ID: {doc_id}): {str(update_err)}") + raise Exception(f"启动异步解析任务失败: {str(e)}") - @classmethod + @classmethod def get_document_parse_progress(cls, doc_id): - """获取文档解析进度 - 添加缓存机制""" - - # 正常数据库查询 - conn = cls._get_db_connection() - cursor = conn.cursor(dictionary=True) - - query = """ - SELECT progress, progress_msg, status, run - FROM document - WHERE id = %s - """ - cursor.execute(query, (doc_id,)) - result = cursor.fetchone() - - cursor.close() - conn.close() - - if not result: - return {"error": "文档不存在"} - - - return { - "progress": float(result["progress"]), - "message": result["progress_msg"], - "status": result["status"], - "running": result["run"] == "1" - } - """获取文档解析进度 - - Args: - doc_id: 文档ID - - Returns: - 解析进度信息 - """ - conn = cls._get_db_connection() - cursor = conn.cursor(dictionary=True) - - query = """ - SELECT progress, progress_msg, status, run - FROM document - WHERE id = %s - """ - cursor.execute(query, (doc_id,)) - result = cursor.fetchone() - - cursor.close() - conn.close() - - if not result: - return {"error": "文档不存在"} - - return { - "progress": float(result["progress"]), - "message": result["progress_msg"], - "status": result["status"], - "running": result["run"] == "1" - } \ No newline at end of file + """获取文档解析进度""" + conn = None + cursor = None + try: + conn = cls._get_db_connection() + cursor = conn.cursor(dictionary=True) + + query = """ + SELECT progress, progress_msg, status, run + FROM document + WHERE id = %s + """ + cursor.execute(query, (doc_id,)) + result = cursor.fetchone() + + if not result: + return {"error": "文档不存在"} + + # 确保 progress 是浮点数 + progress_value = 0.0 + if result.get("progress") is not None: + try: + progress_value = float(result["progress"]) + except (ValueError, TypeError): + progress_value = 0.0 # 或记录错误 + + return { + "progress": progress_value, + "message": result.get("progress_msg", ""), + "status": result.get("status", "0"), + "running": result.get("run", "0"), + } + + except Exception as e: + print(f"获取文档进度失败 (Doc ID: {doc_id}): {str(e)}") + return {"error": f"获取进度失败: {str(e)}"} + finally: + if cursor: + cursor.close() + if conn: + conn.close() \ No newline at end of file diff --git a/management/web/.env.development b/management/web/.env.development index 8c5e12f..a276ede 100644 --- a/management/web/.env.development +++ b/management/web/.env.development @@ -1,7 +1,7 @@ # 开发环境的环境变量(命名必须以 VITE_ 开头) ## 后端接口地址(如果解决跨域问题采用反向代理就只需写相对路径) -VITE_BASE_URL = http://localhost:5000 +VITE_BASE_URL = "" ## 开发环境域名和静态资源公共路径(一般 / 或 ./ 都可以) VITE_PUBLIC_PATH = / diff --git a/management/web/src/layouts/components/DocumentParseProgress/index.vue b/management/web/src/layouts/components/DocumentParseProgress/index.vue index bc58956..ac8b11a 100644 --- a/management/web/src/layouts/components/DocumentParseProgress/index.vue +++ b/management/web/src/layouts/components/DocumentParseProgress/index.vue @@ -1,3 +1,4 @@ +