diff --git a/management/server/services/knowledgebases/document_parser.py b/management/server/services/knowledgebases/document_parser.py index 799cc30..3a1e4dc 100644 --- a/management/server/services/knowledgebases/document_parser.py +++ b/management/server/services/knowledgebases/document_parser.py @@ -161,6 +161,18 @@ def _create_task_record(doc_id, chunk_ids_list): conn.close() +def get_text_from_block(block): + """从 preproc_blocks 中的一个块提取所有文本内容 (简化版)""" + block_text = "" + if "lines" in block: + for line in block.get("lines", []): + if "spans" in line: + for span in line.get("spans", []): + content = span.get("content") + if isinstance(content, str): + block_text += content + return ' '.join(block_text.split()) + def perform_parse(doc_id, doc_info, file_info): """ 执行文档解析的核心逻辑 @@ -176,6 +188,7 @@ def perform_parse(doc_id, doc_info, file_info): temp_pdf_path = None temp_image_dir = None start_time = time.time() + middle_json_content = None # 初始化 middle_json_content try: kb_id = doc_info['kb_id'] @@ -214,7 +227,7 @@ def perform_parse(doc_id, doc_info, file_info): with open(temp_pdf_path, 'wb') as f: f.write(file_content) - # 使用Magic PDF处理 + # 使用MinerU处理 reader = FileBasedDataReader("") pdf_bytes = reader.read(temp_pdf_path) ds = PymuDocDataset(pdf_bytes) @@ -236,6 +249,9 @@ def perform_parse(doc_id, doc_info, file_info): update_progress(0.8, "提取内容") content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir)) + # 获取内容列表(JSON格式) + middle_content = pipe_result.get_middle_json() + middle_json_content = json.loads(middle_content) elif file_type.endswith('word') or file_type.endswith('ppt'): update_progress(0.3, "使用MinerU解析器") @@ -260,11 +276,45 @@ def perform_parse(doc_id, doc_info, file_info): update_progress(0.8, "提取内容") content_list = pipe_result.get_content_list(os.path.basename(temp_image_dir)) - + # 获取内容列表(JSON格式) + middle_content = pipe_result.get_middle_json() + middle_json_content = json.loads(middle_content) else: update_progress(0.3, f"暂不支持的文件类型: {file_type}") raise NotImplementedError(f"文件类型 '{file_type}' 的解析器尚未实现") + # 解析 middle_json_content 并提取块信息 + block_info_list = [] + if middle_json_content: + try: + if isinstance(middle_json_content, dict): + middle_data = middle_json_content # 直接赋值 + else: + middle_data = None + print(f"[Parser-WARNING] middle_json_content 不是预期的字典格式,实际类型: {type(middle_json_content)}。") + # 提取信息 + for page_idx, page_data in enumerate(middle_data.get("pdf_info", [])): + for block in page_data.get("preproc_blocks", []): + block_text = get_text_from_block(block) + # 仅提取包含文本且有 bbox 的块 + if block_text and "bbox" in block: + bbox = block.get("bbox") + # 确保 bbox 是包含4个数字的列表 + if isinstance(bbox, list) and len(bbox) == 4 and all(isinstance(n, (int, float)) for n in bbox): + block_info_list.append({ + "page_idx": page_idx, + "bbox": bbox + }) + else: + print(f"[Parser-WARNING] 块的 bbox 格式无效: {bbox},跳过。") + + print(f"[Parser-INFO] 从 middle_data 提取了 {len(block_info_list)} 个块的信息。") + + except json.JSONDecodeError: + print("[Parser-ERROR] 解析 middle_json_content 失败。") + except Exception as e: + print(f"[Parser-ERROR] 处理 middle_json_content 时出错: {e}") + # 3. 处理解析结果 (上传到MinIO, 存储到ES) update_progress(0.95, "保存解析结果") es_client = get_es_client() @@ -288,27 +338,51 @@ def perform_parse(doc_id, doc_info, file_info): chunk_count = 0 chunk_ids_list = [] + middle_block_idx = 0 # 用于按顺序匹配 block_info_list + processed_text_chunks = 0 # 记录处理的文本块数量 + for chunk_idx, chunk_data in enumerate(content_list): if chunk_data["type"] == "text": + processed_text_chunks += 1 content = chunk_data["text"] if not content or not content.strip(): continue chunk_id = generate_uuid() + page_idx = 0 # 默认页面索引 + bbox = [0, 0, 0, 0] # 默认 bbox + + # -- 尝试匹配并获取 page_idx 和 bbox -- + if middle_block_idx < len(block_info_list): + block_info = block_info_list[middle_block_idx] + page_idx = block_info.get("page_idx", 0) + bbox = block_info.get("bbox", [0, 0, 0, 0]) + middle_block_idx += 1 # 移动到下一个块 + else: + # 如果 block_info_list 耗尽,打印警告 + if processed_text_chunks == len(block_info_list) + 1: # 只在第一次耗尽时警告一次 + print(f"[Parser-WARNING] middle_data 提供的块信息少于 content_list 中的文本块数量。后续文本块将使用默认 page/bbox。") + # -- 匹配结束 -- + try: - # 上传文本块到MinIO (桶为kb_id) + # 上传文本块到 MinIO minio_client.put_object( bucket_name=output_bucket, object_name=chunk_id, data=BytesIO(content.encode('utf-8')), length=len(content.encode('utf-8')) # 使用字节长度 ) - + # 准备ES文档 content_tokens = tokenize_text(content) # 分词 current_time_es = datetime.now().strftime("%Y-%m-%d %H:%M:%S") current_timestamp_es = datetime.now().timestamp() + # 转换坐标格式 + x1, y1, x2, y2 = bbox + bbox_reordered = [x1, x2, y1, y2] + + es_doc = { "doc_id": doc_id, "kb_id": kb_id, @@ -318,8 +392,8 @@ def perform_parse(doc_id, doc_info, file_info): "content_with_weight": content, "content_ltks": content_tokens, "content_sm_ltks": content_tokens, - "page_num_int": [1], # 简化处理 - "position_int": [[1, 0, 0, 0, 0]], # 简化处理 + "page_num_int": [page_idx + 1], + "position_int": [[page_idx + 1] + bbox_reordered], # 格式: [[page, x1, y1, x2, y2]] "top_int": [1], # 简化处理 "create_time": current_time_es, "create_timestamp_flt": current_timestamp_es, @@ -332,10 +406,11 @@ def perform_parse(doc_id, doc_info, file_info): chunk_count += 1 chunk_ids_list.append(chunk_id) - print(f"成功处理文本块 {chunk_count}/{len(content_list)}") + # print(f"成功处理文本块 {chunk_count}/{len(content_list)}") # 可以取消注释用于详细调试 except Exception as e: - print(f"[Parser-ERROR] 处理文本块 {chunk_idx} 失败: {e}") + print(f"[Parser-ERROR] 处理文本块 {chunk_idx} (page: {page_idx}, bbox: {bbox}) 失败: {e}") + traceback.print_exc() # 打印更详细的错误 elif chunk_data["type"] == "image": img_path_relative = chunk_data.get('img_path') @@ -367,6 +442,12 @@ def perform_parse(doc_id, doc_info, file_info): # except Exception as e: # print(f"[Parser-ERROR] 上传图片 {img_path_abs} 失败: {e}") + # 打印匹配总结信息 + print(f"[Parser-INFO] 共处理 {processed_text_chunks} 个文本块。") + if middle_block_idx < len(block_info_list): + print(f"[Parser-WARNING] middle_data 中还有 {len(block_info_list) - middle_block_idx} 个提取的块信息未被使用。") + + # 4. 更新最终状态 process_duration = time.time() - start_time _update_document_progress(doc_id, progress=1.0, message="解析完成", status='1', run='3', chunk_count=chunk_count, process_duration=process_duration) diff --git a/management/server/services/knowledgebases/service.py b/management/server/services/knowledgebases/service.py index a40713e..ae2be4b 100644 --- a/management/server/services/knowledgebases/service.py +++ b/management/server/services/knowledgebases/service.py @@ -600,7 +600,6 @@ class KnowledgebaseService: conn.commit() # 先提交更新操作 except Exception as e: print(f"[WARNING] 更新知识库文档数量失败,但文档已添加: {str(e)}") - # 这里不抛出异常,因为文档已经添加成功 cursor.close() conn.close() @@ -663,7 +662,7 @@ class KnowledgebaseService: @classmethod def parse_document(cls, doc_id): - """解析文档(同步版本,调用后台解析逻辑)""" + """解析文档(调用解析逻辑)""" conn = None cursor = None try: @@ -717,7 +716,6 @@ class KnowledgebaseService: _update_document_progress(doc_id, status='1', run='0', message=f"解析失败: {str(e)}") except Exception as update_err: print(f"更新文档失败状态时出错 (Doc ID: {doc_id}): {str(update_err)}") - # 向上层抛出异常或返回错误信息 # raise Exception(f"文档解析失败: {str(e)}") return {"success": False, "error": f"文档解析失败: {str(e)}"}