RAGflow/management/server/services/knowledgebases/utils.py

158 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from database import get_db_connection
from utils import generate_uuid
from . import logger
def _update_document_progress(doc_id, progress=None, message=None, status=None, run=None, chunk_count=None, process_duration=None):
"""更新数据库中文档的进度和状态"""
conn = None
cursor = None
try:
conn = get_db_connection()
cursor = conn.cursor()
updates = []
params = []
if progress is not None:
updates.append("progress = %s")
params.append(float(progress))
if message is not None:
updates.append("progress_msg = %s")
params.append(message)
if status is not None:
updates.append("status = %s")
params.append(status)
if run is not None:
updates.append("run = %s")
params.append(run)
if chunk_count is not None:
updates.append("chunk_num = %s")
params.append(chunk_count)
if process_duration is not None:
updates.append("process_duation = %s")
params.append(process_duration)
if not updates:
return
query = f"UPDATE document SET {', '.join(updates)} WHERE id = %s"
params.append(doc_id)
cursor.execute(query, params)
conn.commit()
except Exception as e:
logger.error(f"[Parser-ERROR] 更新文档 {doc_id} 进度失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def _update_kb_chunk_count(kb_id, count_delta):
"""更新知识库的块数量"""
conn = None
cursor = None
try:
conn = get_db_connection()
cursor = conn.cursor()
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
kb_update = """
UPDATE knowledgebase
SET chunk_num = chunk_num + %s,
update_date = %s
WHERE id = %s
"""
cursor.execute(kb_update, (count_delta, current_date, kb_id))
conn.commit()
except Exception as e:
logger.error(f"[Parser-ERROR] 更新知识库 {kb_id} 块数量失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def _create_task_record(doc_id, chunk_ids_list):
"""创建task记录兼容无 priority 字段的新表结构"""
conn = None
cursor = None
try:
conn = get_db_connection()
cursor = conn.cursor()
# 检查 task 表中是否有 priority 字段
cursor.execute("SHOW COLUMNS FROM task LIKE 'priority'")
has_priority = cursor.fetchone() is not None
task_id = generate_uuid()
current_datetime = datetime.now()
current_timestamp = int(current_datetime.timestamp() * 1000)
current_date_only = current_datetime.strftime("%Y-%m-%d")
digest = f"{doc_id}_{0}_{1}" # 假设 from_page=0, to_page=1
chunk_ids_str = " ".join(chunk_ids_list)
common_fields = [
"id",
"create_time",
"create_date",
"update_time",
"update_date",
"doc_id",
"from_page",
"to_page",
"begin_at",
"process_duation",
"progress",
"progress_msg",
"retry_count",
"digest",
"chunk_ids",
"task_type",
]
common_values = [task_id, current_timestamp, current_date_only, current_timestamp, current_date_only, doc_id, 0, 1, None, 0.0, 1.0, "MinerU解析完成", 1, digest, chunk_ids_str, ""]
if has_priority:
common_fields.append("priority")
common_values.append(0)
fields_sql = ", ".join(common_fields)
placeholders = ", ".join(["%s"] * len(common_values))
task_insert = f"INSERT INTO task ({fields_sql}) VALUES ({placeholders})"
cursor.execute(task_insert, common_values)
conn.commit()
logger.info(f"[Parser-INFO] Task记录创建成功Task ID: {task_id}")
except Exception as e:
logger.error(f"[Parser-ERROR] 创建Task记录失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def get_bbox_from_block(block):
"""
从 preproc_blocks 中的一个块提取最外层的 bbox 信息。
Args:
block (dict): 代表一个块的字典,期望包含 'bbox' 键。
Returns:
list: 包含4个数字的 bbox 列表,如果找不到或格式无效则返回 [0, 0, 0, 0]。
"""
if isinstance(block, dict) and "bbox" in block:
bbox = block.get("bbox")
# 验证 bbox 是否为包含4个数字的有效列表
if isinstance(bbox, list) and len(bbox) == 4 and all(isinstance(n, (int, float)) for n in bbox):
return bbox
else:
logger.warning(f"[Parser-WARNING] 块的 bbox 格式无效: {bbox},将使用默认值。") # noqa: F821
# 如果 block 不是字典或没有 bbox 键,或 bbox 格式无效,返回默认值
return [0, 0, 0, 0]