RAGflow/management/server/services/knowledgebases/utils.py

158 lines
5.1 KiB
Python
Raw Normal View History

from datetime import datetime
from database import get_db_connection
from utils import generate_uuid
from . import logger
def _update_document_progress(doc_id, progress=None, message=None, status=None, run=None, chunk_count=None, process_duration=None):
"""更新数据库中文档的进度和状态"""
conn = None
cursor = None
try:
conn = get_db_connection()
cursor = conn.cursor()
updates = []
params = []
if progress is not None:
updates.append("progress = %s")
params.append(float(progress))
if message is not None:
updates.append("progress_msg = %s")
params.append(message)
if status is not None:
updates.append("status = %s")
params.append(status)
if run is not None:
updates.append("run = %s")
params.append(run)
if chunk_count is not None:
updates.append("chunk_num = %s")
params.append(chunk_count)
if process_duration is not None:
updates.append("process_duation = %s")
params.append(process_duration)
if not updates:
return
query = f"UPDATE document SET {', '.join(updates)} WHERE id = %s"
params.append(doc_id)
cursor.execute(query, params)
conn.commit()
except Exception as e:
logger.error(f"[Parser-ERROR] 更新文档 {doc_id} 进度失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def _update_kb_chunk_count(kb_id, count_delta):
"""更新知识库的块数量"""
conn = None
cursor = None
try:
conn = get_db_connection()
cursor = conn.cursor()
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
kb_update = """
UPDATE knowledgebase
SET chunk_num = chunk_num + %s,
update_date = %s
WHERE id = %s
"""
cursor.execute(kb_update, (count_delta, current_date, kb_id))
conn.commit()
except Exception as e:
logger.error(f"[Parser-ERROR] 更新知识库 {kb_id} 块数量失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def _create_task_record(doc_id, chunk_ids_list):
"""创建task记录兼容无 priority 字段的新表结构"""
conn = None
cursor = None
try:
conn = get_db_connection()
cursor = conn.cursor()
# 检查 task 表中是否有 priority 字段
cursor.execute("SHOW COLUMNS FROM task LIKE 'priority'")
has_priority = cursor.fetchone() is not None
task_id = generate_uuid()
current_datetime = datetime.now()
current_timestamp = int(current_datetime.timestamp() * 1000)
current_date_only = current_datetime.strftime("%Y-%m-%d")
digest = f"{doc_id}_{0}_{1}" # 假设 from_page=0, to_page=1
chunk_ids_str = " ".join(chunk_ids_list)
common_fields = [
"id",
"create_time",
"create_date",
"update_time",
"update_date",
"doc_id",
"from_page",
"to_page",
"begin_at",
"process_duation",
"progress",
"progress_msg",
"retry_count",
"digest",
"chunk_ids",
"task_type",
]
common_values = [task_id, current_timestamp, current_date_only, current_timestamp, current_date_only, doc_id, 0, 1, None, 0.0, 1.0, "MinerU解析完成", 1, digest, chunk_ids_str, ""]
if has_priority:
common_fields.append("priority")
common_values.append(0)
fields_sql = ", ".join(common_fields)
placeholders = ", ".join(["%s"] * len(common_values))
task_insert = f"INSERT INTO task ({fields_sql}) VALUES ({placeholders})"
cursor.execute(task_insert, common_values)
conn.commit()
logger.info(f"[Parser-INFO] Task记录创建成功Task ID: {task_id}")
except Exception as e:
logger.error(f"[Parser-ERROR] 创建Task记录失败: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
def get_bbox_from_block(block):
"""
preproc_blocks 中的一个块提取最外层的 bbox 信息
Args:
block (dict): 代表一个块的字典期望包含 'bbox'
Returns:
list: 包含4个数字的 bbox 列表如果找不到或格式无效则返回 [0, 0, 0, 0]
"""
if isinstance(block, dict) and "bbox" in block:
bbox = block.get("bbox")
# 验证 bbox 是否为包含4个数字的有效列表
if isinstance(bbox, list) and len(bbox) == 4 and all(isinstance(n, (int, float)) for n in bbox):
return bbox
else:
logger.warning(f"[Parser-WARNING] 块的 bbox 格式无效: {bbox},将使用默认值。") # noqa: F821
# 如果 block 不是字典或没有 bbox 键,或 bbox 格式无效,返回默认值
return [0, 0, 0, 0]