From 59d5ca5c95c966f9bfab1fd7f678908fbdae6cb9 Mon Sep 17 00:00:00 2001 From: zstar <65890619+zstar1003@users.noreply.github.com> Date: Tue, 10 Jun 2025 12:29:26 +0800 Subject: [PATCH] =?UTF-8?q?refactor(management):=20=E5=90=8E=E5=8F=B0?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E6=97=B6=EF=BC=8C=E6=B7=BB=E5=8A=A0=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在管理端应用中添加日志记录功能,用于记录解析过程中的信息和错误 - 优化代码格式和结构,提高可读性和可维护性 - 在.docker-compose.yml中添加日志目录挂载 - 清理无用的环境变量加载代码 --- .gitignore | 1 + api/db/services/database.py | 8 -- docker/docker-compose.yml | 3 + management/server/app.py | 75 +++++++------ .../knowledgebases/document_parser.py | 105 +++++++++--------- .../services/knowledgebases/rag_tokenizer.py | 5 +- .../server/services/knowledgebases/service.py | 11 +- 7 files changed, 110 insertions(+), 98 deletions(-) diff --git a/.gitignore b/.gitignore index 97c992a..24be45a 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,4 @@ management/web/types/auto web/node_modules/.cache/logger/umi.log management/models--slanet_plus node_modules/.cache/logger/umi.log +*.log diff --git a/api/db/services/database.py b/api/db/services/database.py index ddfcf0c..50cd87d 100644 --- a/api/db/services/database.py +++ b/api/db/services/database.py @@ -1,15 +1,7 @@ import os -from pathlib import Path -from dotenv import load_dotenv from minio import Minio -from api.root_path import get_root_folder - -# 加载环境变量 -env_path = Path(get_root_folder()) / "docker" / ".env" -load_dotenv(env_path) - # 检测是否在Docker容器中运行 def is_running_in_docker(): diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 11dd245..3ed5847 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -19,6 +19,8 @@ services: - ./nginx/nginx.conf:/etc/nginx/nginx.conf - ../api/db/services/database.py:/ragflow/api/db/services/database.py - ../api/db/services/dialog_service.py:/ragflow/api/db/services/dialog_service.py + - ../api/ragflow_server.py:/ragflow/api/ragflow_server.py + - ../api/root_path.py:/ragflow/api/root_path.py env_file: .env environment: - TZ=${TIMEZONE} @@ -57,6 +59,7 @@ services: ports: - "5000:5000" volumes: + - ./ragflow-plus-logs:/app/logs - ./magic-pdf.json:/root/magic-pdf.json depends_on: mysql: diff --git a/management/server/app.py b/management/server/app.py index d221d7d..3525c18 100644 --- a/management/server/app.py +++ b/management/server/app.py @@ -1,70 +1,79 @@ -import jwt +import logging import os +from datetime import datetime, timedelta + +import jwt +from dotenv import load_dotenv from flask import Flask, request from flask_cors import CORS -from datetime import datetime, timedelta from routes import register_routes -from dotenv import load_dotenv # 加载环境变量 -load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'docker', '.env')) +load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "docker", ".env")) app = Flask(__name__) # 启用CORS,允许前端访问 -CORS(app, resources={ - r"/api/*": { - "origins": "*", - "methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], - "allow_headers": ["Content-Type", "Authorization"] - } -}) +CORS(app, resources={r"/api/*": {"origins": "*", "methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization"]}}) # 注册所有路由 register_routes(app) # 从环境变量获取配置 -ADMIN_USERNAME = os.getenv('MANAGEMENT_ADMIN_USERNAME', 'admin') -ADMIN_PASSWORD = os.getenv('MANAGEMENT_ADMIN_PASSWORD', '12345678') -JWT_SECRET = os.getenv('MANAGEMENT_JWT_SECRET', 'your-secret-key') +ADMIN_USERNAME = os.getenv("MANAGEMENT_ADMIN_USERNAME", "admin") +ADMIN_PASSWORD = os.getenv("MANAGEMENT_ADMIN_PASSWORD", "12345678") +JWT_SECRET = os.getenv("MANAGEMENT_JWT_SECRET", "your-secret-key") + + +# 设置日志目录和文件名 +log_dir = "logs" +os.makedirs(log_dir, exist_ok=True) +log_file = os.path.join(log_dir, "parser.log") + +# 配置 logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", + handlers=[ + logging.FileHandler(log_file, encoding="utf-8"), + logging.StreamHandler(), # 同时也输出到控制台 + ], +) + # 生成token def generate_token(username): # 设置令牌过期时间(例如1小时后过期) expire_time = datetime.utcnow() + timedelta(hours=1) - + # 生成令牌 - token = jwt.encode({ - 'username': username, - 'exp': expire_time - }, JWT_SECRET, algorithm='HS256') - + token = jwt.encode({"username": username, "exp": expire_time}, JWT_SECRET, algorithm="HS256") + return token + # 登录路由保留在主文件中 -@app.route('/api/v1/auth/login', methods=['POST']) +@app.route("/api/v1/auth/login", methods=["POST"]) def login(): data = request.get_json() - username = data.get('username') - password = data.get('password') - + username = data.get("username") + password = data.get("password") + # 创建用户名和密码的映射 - valid_users = { - ADMIN_USERNAME: ADMIN_PASSWORD - } - + valid_users = {ADMIN_USERNAME: ADMIN_PASSWORD} + # 验证用户名是否存在 if not username or username not in valid_users: return {"code": 1, "message": "用户名不存在"}, 400 - + # 验证密码是否正确 if not password or password != valid_users[username]: return {"code": 1, "message": "密码错误"}, 400 - + # 生成token token = generate_token(username) - + return {"code": 0, "data": {"token": token}, "message": "登录成功"} -if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000) \ No newline at end of file +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5000) diff --git a/management/server/services/knowledgebases/document_parser.py b/management/server/services/knowledgebases/document_parser.py index eb0dac5..b9c0031 100644 --- a/management/server/services/knowledgebases/document_parser.py +++ b/management/server/services/knowledgebases/document_parser.py @@ -1,23 +1,28 @@ -import os -import tempfile -import shutil import json +import logging +import os +import re +import shutil +import tempfile import time import traceback -import re -import requests -from io import BytesIO from datetime import datetime -from database import MINIO_CONFIG, get_minio_client, get_es_client, get_db_connection -from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader -from magic_pdf.data.dataset import PymuDocDataset -from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze -from magic_pdf.config.enums import SupportedPdfParseMethod -from magic_pdf.data.read_api import read_local_office, read_local_images -from utils import generate_uuid +from io import BytesIO from urllib.parse import urlparse -from .rag_tokenizer import RagTokenizer + +import requests +from database import MINIO_CONFIG, get_db_connection, get_es_client, get_minio_client +from magic_pdf.config.enums import SupportedPdfParseMethod +from magic_pdf.data.data_reader_writer import FileBasedDataReader, FileBasedDataWriter +from magic_pdf.data.dataset import PymuDocDataset +from magic_pdf.data.read_api import read_local_images, read_local_office +from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze +from utils import generate_uuid + from .excel_parser import parse_excel +from .rag_tokenizer import RagTokenizer + +logger = logging.getLogger(__name__) tknzr = RagTokenizer() @@ -64,7 +69,7 @@ def _update_document_progress(doc_id, progress=None, message=None, status=None, cursor.execute(query, params) conn.commit() except Exception as e: - print(f"[Parser-ERROR] 更新文档 {doc_id} 进度失败: {e}") + logger.error(f"[Parser-ERROR] 更新文档 {doc_id} 进度失败: {e}") finally: if cursor: cursor.close() @@ -89,7 +94,7 @@ def _update_kb_chunk_count(kb_id, count_delta): cursor.execute(kb_update, (count_delta, current_date, kb_id)) conn.commit() except Exception as e: - print(f"[Parser-ERROR] 更新知识库 {kb_id} 块数量失败: {e}") + logger.error(f"[Parser-ERROR] 更新知识库 {kb_id} 块数量失败: {e}") finally: if cursor: cursor.close() @@ -146,10 +151,10 @@ def _create_task_record(doc_id, chunk_ids_list): task_insert = f"INSERT INTO task ({fields_sql}) VALUES ({placeholders})" cursor.execute(task_insert, common_values) conn.commit() - print(f"[Parser-INFO] Task记录创建成功,Task ID: {task_id}") + logger.info(f"[Parser-INFO] Task记录创建成功,Task ID: {task_id}") except Exception as e: - print(f"[Parser-ERROR] 创建Task记录失败: {e}") + logger.error(f"[Parser-ERROR] 创建Task记录失败: {e}") finally: if cursor: cursor.close() @@ -173,7 +178,7 @@ def get_bbox_from_block(block): if isinstance(bbox, list) and len(bbox) == 4 and all(isinstance(n, (int, float)) for n in bbox): return bbox else: - print(f"[Parser-WARNING] 块的 bbox 格式无效: {bbox},将使用默认值。") + logger.warning(f"[Parser-WARNING] 块的 bbox 格式无效: {bbox},将使用默认值。") # 如果 block 不是字典或没有 bbox 键,或 bbox 格式无效,返回默认值 return [0, 0, 0, 0] @@ -212,7 +217,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): # 如果 API 基础地址为空字符串,设置为硅基流动的 API 地址 if embedding_api_base == "": embedding_api_base = "https://api.siliconflow.cn/v1/embeddings" - print(f"[Parser-INFO] API 基础地址为空,已设置为硅基流动的 API 地址: {embedding_api_base}") + logger.info(f"[Parser-INFO] API 基础地址为空,已设置为硅基流动的 API 地址: {embedding_api_base}") embedding_api_key = embedding_config.get("api_key") if embedding_config else None # 可能为 None 或空字符串 @@ -238,7 +243,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): else: embedding_url = normalized_base_url + "/v1/embeddings" - print(f"[Parser-INFO] 使用 Embedding 配置: URL='{embedding_url}', Model='{embedding_model_name}', Key={embedding_api_key}") + logger.info(f"[Parser-INFO] 使用 Embedding 配置: URL='{embedding_url}', Model='{embedding_model_name}', Key={embedding_api_key}") try: kb_id = doc_info["kb_id"] @@ -252,7 +257,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): # 进度更新回调 (直接调用内部更新函数) def update_progress(prog=None, msg=None): _update_document_progress(doc_id, progress=prog, message=msg) - print(f"[Parser-PROGRESS] Doc: {doc_id}, Progress: {prog}, Message: {msg}") + logger.info(f"[Parser-PROGRESS] Doc: {doc_id}, Progress: {prog}, Message: {msg}") # 1. 从 MinIO 获取文件内容 minio_client = get_minio_client() @@ -310,7 +315,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): with open(temp_file_path, "wb") as f: f.write(file_content) - print(f"[Parser-INFO] 临时文件路径: {temp_file_path}") + logger.info(f"[Parser-INFO] 临时文件路径: {temp_file_path}") # 使用MinerU处理 ds = read_local_office(temp_file_path)[0] infer_result = ds.apply(doc_analyze, ocr=True) @@ -338,7 +343,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): with open(temp_file_path, "wb") as f: f.write(file_content) - print(f"[Parser-INFO] 临时文件路径: {temp_file_path}") + logger.info(f"[Parser-INFO] 临时文件路径: {temp_file_path}") update_progress(0.8, "提取内容") # 处理内容列表 @@ -353,7 +358,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): with open(temp_file_path, "wb") as f: f.write(file_content) - print(f"[Parser-INFO] 临时文件路径: {temp_file_path}") + logger.info(f"[Parser-INFO] 临时文件路径: {temp_file_path}") # 使用MinerU处理 ds = read_local_images(temp_file_path)[0] infer_result = ds.apply(doc_analyze, ocr=True) @@ -390,7 +395,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): middle_data = middle_json_content # 直接赋值 else: middle_data = None - print(f"[Parser-WARNING] middle_json_content 不是预期的字典格式,实际类型: {type(middle_json_content)}。") + logger.warning(f"[Parser-WARNING] middle_json_content 不是预期的字典格式,实际类型: {type(middle_json_content)}。") # 提取信息 for page_idx, page_data in enumerate(middle_data.get("pdf_info", [])): for block in page_data.get("preproc_blocks", []): @@ -399,15 +404,15 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): if block_bbox != [0, 0, 0, 0]: block_info_list.append({"page_idx": page_idx, "bbox": block_bbox}) else: - print("[Parser-WARNING] 块的 bbox 格式无效,跳过。") + logger.warning("[Parser-WARNING] 块的 bbox 格式无效,跳过。") - print(f"[Parser-INFO] 从 middle_data 提取了 {len(block_info_list)} 个块的信息。") + logger.info(f"[Parser-INFO] 从 middle_data 提取了 {len(block_info_list)} 个块的信息。") except json.JSONDecodeError: - print("[Parser-ERROR] 解析 middle_json_content 失败。") + logger.error("[Parser-ERROR] 解析 middle_json_content 失败。") raise Exception("[Parser-ERROR] 解析 middle_json_content 失败。") except Exception as e: - print(f"[Parser-ERROR] 处理 middle_json_content 时出错: {e}") + logger.error(f"[Parser-ERROR] 处理 middle_json_content 时出错: {e}") raise Exception(f"[Parser-ERROR] 处理 middle_json_content 时出错: {e}") # 3. 处理解析结果 (上传到MinIO, 存储到ES) @@ -417,7 +422,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): output_bucket = kb_id if not minio_client.bucket_exists(output_bucket): minio_client.make_bucket(output_bucket) - print(f"[Parser-INFO] 创建MinIO桶: {output_bucket}") + logger.info(f"[Parser-INFO] 创建MinIO桶: {output_bucket}") index_name = f"ragflow_{tenant_id}" if not es_client.indices.exists(index=index_name): @@ -431,7 +436,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): }, }, ) - print(f"[Parser-INFO] 创建Elasticsearch索引: {index_name}") + logger.info(f"[Parser-INFO] 创建Elasticsearch索引: {index_name}") chunk_count = 0 chunk_ids_list = [] @@ -447,13 +452,13 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): bbox = block_info.get("bbox", [0, 0, 0, 0]) # 验证 bbox 是否有效,如果无效则重置为默认值 (可选,取决于是否需要严格验证) if not (isinstance(bbox, list) and len(bbox) == 4 and all(isinstance(n, (int, float)) for n in bbox)): - print(f"[Parser-WARNING] Chunk {chunk_idx} 对应的 bbox 格式无效: {bbox},将使用默认值。") + logger.info(f"[Parser-WARNING] Chunk {chunk_idx} 对应的 bbox 格式无效: {bbox},将使用默认值。") bbox = [0, 0, 0, 0] else: # 如果 block_info_list 的长度小于 content_list,打印警告 # 仅在第一次索引越界时打印一次警告,避免刷屏 if chunk_idx == len(block_info_list): - print(f"[Parser-WARNING] block_info_list 的长度 ({len(block_info_list)}) 小于 content_list 的长度 ({len(content_list)})。后续块将使用默认 page_idx 和 bbox。") + logger.warning(f"[Parser-WARNING] block_info_list 的长度 ({len(block_info_list)}) 小于 content_list 的长度 ({len(content_list)})。后续块将使用默认 page_idx 和 bbox。") if chunk_data["type"] == "text" or chunk_data["type"] == "table" or chunk_data["type"] == "equation": if chunk_data["type"] == "text": @@ -531,16 +536,16 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): q_1024_vec = embedding_data.get("embedding") else: q_1024_vec = embedding_data["data"][0]["embedding"] - print(f"[Parser-INFO] 获取embedding成功,长度: {len(q_1024_vec)}") + # logger.info(f"[Parser-INFO] 获取embedding成功,长度: {len(q_1024_vec)}") # 检查向量维度是否为1024 if len(q_1024_vec) != 1024: error_msg = f"[Parser-ERROR] Embedding向量维度不是1024,实际维度: {len(q_1024_vec)}, 建议使用bge-m3模型" - print(error_msg) + logger.error(error_msg) update_progress(-5, error_msg) raise ValueError(error_msg) except Exception as e: - print(f"[Parser-ERROR] 获取embedding失败: {e}") + logger.error(f"[Parser-ERROR] 获取embedding失败: {e}") raise Exception(f"[Parser-ERROR] 获取embedding失败: {e}") chunk_id = generate_uuid() @@ -587,8 +592,8 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): chunk_ids_list.append(chunk_id) except Exception as e: - print(f"[Parser-ERROR] 处理文本块 {chunk_idx} (page: {page_idx}, bbox: {bbox}) 失败: {e}") - traceback.print_exc() # 打印更详细的错误 + logger.error(f"[Parser-ERROR] 处理文本块 {chunk_idx} (page: {page_idx}, bbox: {bbox}) 失败: {e}") + traceback.logger.info_exc() # 打印更详细的错误 raise Exception(f"[Parser-ERROR] 处理文本块 {chunk_idx} (page: {page_idx}, bbox: {bbox}) 失败: {e}") elif chunk_data["type"] == "image": @@ -598,7 +603,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): img_path_abs = os.path.join(temp_image_dir, os.path.basename(img_path_relative)) if not os.path.exists(img_path_abs): - print(f"[Parser-WARNING] 图片文件不存在: {img_path_abs}") + logger.warning(f"[Parser-WARNING] 图片文件不存在: {img_path_abs}") continue img_id = generate_uuid() @@ -616,7 +621,7 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): policy = {"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"AWS": "*"}, "Action": ["s3:GetObject"], "Resource": [f"arn:aws:s3:::{kb_id}/images/*"]}]} minio_client.set_bucket_policy(kb_id, json.dumps(policy)) - print(f"成功上传图片: {img_key}") + logger.info(f"成功上传图片: {img_key}") minio_endpoint = MINIO_CONFIG["endpoint"] use_ssl = MINIO_CONFIG.get("secure", False) protocol = "https" if use_ssl else "http" @@ -629,14 +634,14 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): } image_info_list.append(image_info) - print(f"图片访问链接: {img_url}") + logger.info(f"图片访问链接: {img_url}") except Exception as e: - print(f"[Parser-ERROR] 上传图片 {img_path_abs} 失败: {e}") + logger.error(f"[Parser-ERROR] 上传图片 {img_path_abs} 失败: {e}") raise Exception(f"[Parser-ERROR] 上传图片 {img_path_abs} 失败: {e}") # 打印匹配总结信息 - print(f"[Parser-INFO] 共处理 {chunk_count} 个文本块。") + logger.info(f"[Parser-INFO] 共处理 {chunk_count} 个文本块。") # 4. 更新文本块的图像信息 if image_info_list and chunk_ids_list: @@ -667,10 +672,10 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): direct_update = {"doc": {"img_id": relative_path}} es_client.update(index=index_name, id=chunk_id, body=direct_update, refresh=True) index_name = f"ragflow_{tenant_id}" - print(f"[Parser-INFO] 更新文本块 {chunk_id} 的图片关联: {relative_path}") + logger.info(f"[Parser-INFO] 更新文本块 {chunk_id} 的图片关联: {relative_path}") except Exception as e: - print(f"[Parser-ERROR] 更新文本块图片关联失败: {e}") + logger.error(f"[Parser-ERROR] 更新文本块图片关联失败: {e}") raise Exception(f"[Parser-ERROR] 更新文本块图片关联失败: {e}") finally: if cursor: @@ -685,16 +690,16 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): _create_task_record(doc_id, chunk_ids_list) # 创建task记录 update_progress(1.0, "解析完成") - print(f"[Parser-INFO] 解析完成,文档ID: {doc_id}, 耗时: {process_duration:.2f}s, 块数: {chunk_count}") + logger.info(f"[Parser-INFO] 解析完成,文档ID: {doc_id}, 耗时: {process_duration:.2f}s, 块数: {chunk_count}") return {"success": True, "chunk_count": chunk_count} except Exception as e: process_duration = time.time() - start_time # error_message = f"解析失败: {str(e)}" - print(f"[Parser-ERROR] 文档 {doc_id} 解析失败: {e}") + logger.error(f"[Parser-ERROR] 文档 {doc_id} 解析失败: {e}") error_message = f"解析失败: {e}" - traceback.print_exc() # 打印详细错误堆栈 + traceback.logger.info_exc() # 打印详细错误堆栈 # 更新文档状态为失败 _update_document_progress(doc_id, status="1", run="0", message=error_message, process_duration=process_duration) # status=1表示完成,run=0表示失败 return {"success": False, "error": error_message} @@ -707,4 +712,4 @@ def perform_parse(doc_id, doc_info, file_info, embedding_config, kb_info): if temp_image_dir and os.path.exists(temp_image_dir): shutil.rmtree(temp_image_dir, ignore_errors=True) except Exception as clean_e: - print(f"[Parser-WARNING] 清理临时文件失败: {clean_e}") + logger.error(f"[Parser-WARNING] 清理临时文件失败: {clean_e}") diff --git a/management/server/services/knowledgebases/rag_tokenizer.py b/management/server/services/knowledgebases/rag_tokenizer.py index 7d253f7..89b5f74 100644 --- a/management/server/services/knowledgebases/rag_tokenizer.py +++ b/management/server/services/knowledgebases/rag_tokenizer.py @@ -1,10 +1,11 @@ -import logging import copy -import datrie +import logging import math import os import re import string + +import datrie from hanziconv import HanziConv from nltk import word_tokenize from nltk.stem import PorterStemmer, WordNetLemmatizer diff --git a/management/server/services/knowledgebases/service.py b/management/server/services/knowledgebases/service.py index b303b4b..6cf67e2 100644 --- a/management/server/services/knowledgebases/service.py +++ b/management/server/services/knowledgebases/service.py @@ -1,15 +1,16 @@ -import mysql.connector import json import threading -import requests -import traceback import time +import traceback from datetime import datetime -from utils import generate_uuid + +import mysql.connector +import requests from database import DB_CONFIG +from utils import generate_uuid # 解析相关模块 -from .document_parser import perform_parse, _update_document_progress +from .document_parser import _update_document_progress, perform_parse # 用于存储进行中的顺序批量任务状态 # 结构: { kb_id: {"status": "running/completed/failed", "total": N, "current": M, "message": "...", "start_time": timestamp} }