feat(server): 实现文件分块上传功能

- 新增分块上传相关路由和处理逻辑
- 实现文件分块接收、合并和上传
- 添加 Redis 作为上传状态存储
- 重构部分现有服务以支持分块上传
This commit is contained in:
zstar 2025-05-29 23:45:39 +08:00
parent 276e036e12
commit aae4ca821c
10 changed files with 254 additions and 268 deletions

View File

@ -1,55 +0,0 @@
import os
import mysql.connector
from dotenv import load_dotenv
# 加载环境变量
load_dotenv("../../docker/.env")
# 数据库连接配置
DB_CONFIG = {
"host": "localhost",
"port": int(os.getenv("MYSQL_PORT", "5455")),
"user": "root",
"password": os.getenv("MYSQL_PASSWORD", "infini_rag_flow"),
"database": "rag_flow"
}
def get_db_connection():
"""创建数据库连接"""
return mysql.connector.connect(**DB_CONFIG)
def get_all_tables():
"""获取数据库中所有表的名称"""
try:
# 连接数据库
conn = get_db_connection()
cursor = conn.cursor()
# 查询所有表名
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print(f"数据库 {DB_CONFIG['database']} 中的表:")
if tables:
for i, table in enumerate(tables, 1):
print(f"{i}. {table[0]}")
else:
print("数据库中没有表")
# 检查是否存在特定表
important_tables = ['document', 'file', 'file2document']
print("\n检查重要表是否存在:")
for table in important_tables:
cursor.execute(f"SHOW TABLES LIKE '{table}'")
exists = cursor.fetchone() is not None
status = "✓ 存在" if exists else "✗ 不存在"
print(f"{table}: {status}")
cursor.close()
conn.close()
except mysql.connector.Error as e:
print(f"数据库连接或查询出错: {e}")
if __name__ == "__main__":
get_all_tables()

View File

@ -1,5 +1,6 @@
import mysql.connector import mysql.connector
import os import os
import redis
from minio import Minio from minio import Minio
from dotenv import load_dotenv from dotenv import load_dotenv
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
@ -28,6 +29,8 @@ if is_running_in_docker():
MINIO_PORT = 9000 MINIO_PORT = 9000
ES_HOST = "es01" ES_HOST = "es01"
ES_PORT = 9200 ES_PORT = 9200
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
else: else:
MYSQL_HOST = "localhost" MYSQL_HOST = "localhost"
MYSQL_PORT = int(os.getenv("MYSQL_PORT", "5455")) MYSQL_PORT = int(os.getenv("MYSQL_PORT", "5455"))
@ -35,6 +38,9 @@ else:
MINIO_PORT = int(os.getenv("MINIO_PORT", "9000")) MINIO_PORT = int(os.getenv("MINIO_PORT", "9000"))
ES_HOST = "localhost" ES_HOST = "localhost"
ES_PORT = int(os.getenv("ES_PORT", "9200")) ES_PORT = int(os.getenv("ES_PORT", "9200"))
REDIS_HOST = "localhost"
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
# 数据库连接配置 # 数据库连接配置
DB_CONFIG = { DB_CONFIG = {
@ -61,6 +67,14 @@ ES_CONFIG = {
"use_ssl": os.getenv("ES_USE_SSL", "false").lower() == "true", "use_ssl": os.getenv("ES_USE_SSL", "false").lower() == "true",
} }
# Redis连接配置
REDIS_CONFIG = {
"host": REDIS_HOST,
"port": REDIS_PORT,
"password": os.getenv("REDIS_PASSWORD", "infini_rag_flow"),
"decode_responses": False,
}
def get_db_connection(): def get_db_connection():
"""创建MySQL数据库连接""" """创建MySQL数据库连接"""
@ -88,11 +102,11 @@ def get_es_client():
# 构建连接参数 # 构建连接参数
es_params = {"hosts": [ES_CONFIG["host"]]} es_params = {"hosts": [ES_CONFIG["host"]]}
# 如果提供了用户名和密码,添加认证信息 # 添加认证信息
if ES_CONFIG["user"] and ES_CONFIG["password"]: if ES_CONFIG["user"] and ES_CONFIG["password"]:
es_params["basic_auth"] = (ES_CONFIG["user"], ES_CONFIG["password"]) es_params["basic_auth"] = (ES_CONFIG["user"], ES_CONFIG["password"])
# 如果需要SSL添加SSL配置 # 添加SSL配置
if ES_CONFIG["use_ssl"]: if ES_CONFIG["use_ssl"]:
es_params["use_ssl"] = True es_params["use_ssl"] = True
es_params["verify_certs"] = False # 在开发环境中可以设置为False生产环境应该设置为True es_params["verify_certs"] = False # 在开发环境中可以设置为False生产环境应该设置为True
@ -104,6 +118,19 @@ def get_es_client():
raise e raise e
def get_redis_connection():
"""创建Redis连接"""
try:
# 使用配置创建Redis连接
r = redis.Redis(**REDIS_CONFIG)
# 测试连接
r.ping()
return r
except Exception as e:
print(f"Redis连接失败: {str(e)}")
raise e
def test_connections(): def test_connections():
"""测试数据库和MinIO连接""" """测试数据库和MinIO连接"""
try: try:

View File

@ -1,161 +0,0 @@
import os
import sys
import argparse
from minio import Minio
from dotenv import load_dotenv
# 加载环境变量
load_dotenv("../../docker/.env")
def is_running_in_docker():
# 检查是否存在/.dockerenv文件
docker_env = os.path.exists('/.dockerenv')
# 或者检查cgroup中是否包含docker字符串
try:
with open('/proc/self/cgroup', 'r') as f:
return docker_env or 'docker' in f.read()
except:
return docker_env
MINIO_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost'
# MinIO连接配置
MINIO_CONFIG = {
"endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}",
"access_key": os.getenv("MINIO_USER", "rag_flow"),
"secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"),
"secure": False
}
def get_minio_client():
"""创建MinIO客户端"""
return Minio(
endpoint=MINIO_CONFIG["endpoint"],
access_key=MINIO_CONFIG["access_key"],
secret_key=MINIO_CONFIG["secret_key"],
secure=MINIO_CONFIG["secure"]
)
def get_image_url(kb_id, image_key):
"""获取图片的公共访问URL
Args:
kb_id: 知识库ID
image_key: 图片在MinIO中的键
Returns:
图片的公共访问URL
"""
try:
minio_client = get_minio_client()
# 检查桶和对象是否存在
if not minio_client.bucket_exists(kb_id):
print(f"[ERROR] 存储桶不存在: {kb_id}")
return None
try:
minio_client.stat_object(kb_id, image_key)
except Exception as e:
print(f"[ERROR] 对象不存在: {kb_id}/{image_key}, 错误: {str(e)}")
return None
# 获取MinIO服务器配置
minio_endpoint = MINIO_CONFIG["endpoint"]
use_ssl = MINIO_CONFIG["secure"]
# 构建URL
protocol = "https" if use_ssl else "http"
url = f"{protocol}://{minio_endpoint}/{kb_id}/{image_key}"
return url
except Exception as e:
print(f"[ERROR] 获取图片URL失败: {str(e)}")
return None
def list_bucket_images(kb_id, prefix="images/"):
"""列出知识库中的所有图片
Args:
kb_id: 知识库ID
prefix: 图片前缀默认为"images/"
Returns:
图片列表
"""
try:
minio_client = get_minio_client()
# 检查桶是否存在
if not minio_client.bucket_exists(kb_id):
print(f"[ERROR] 存储桶不存在: {kb_id}")
return []
# 列出桶中的所有对象
objects = minio_client.list_objects(kb_id, prefix=prefix, recursive=True)
image_list = []
for obj in objects:
image_list.append(obj.object_name)
return image_list
except Exception as e:
print(f"[ERROR] 列出图片失败: {str(e)}")
return []
def main():
parser = argparse.ArgumentParser(description="查询MinIO中图片的外链")
parser.add_argument("--kb_id", help="知识库ID", required=False)
parser.add_argument("--image_key", help="图片在MinIO中的键", required=False)
parser.add_argument("--list", action="store_true", help="列出知识库中的所有图片")
parser.add_argument("--list_buckets", action="store_true", help="列出所有存储桶")
args = parser.parse_args()
# 列出所有存储桶
if args.list_buckets:
try:
minio_client = get_minio_client()
buckets = minio_client.list_buckets()
print("可用的存储桶列表:")
for bucket in buckets:
print(f" - {bucket.name}")
return
except Exception as e:
print(f"[ERROR] 列出存储桶失败: {str(e)}")
return
# 检查必要参数
if not args.kb_id:
print("[ERROR] 请提供知识库ID (--kb_id)")
parser.print_help()
return
# 列出知识库中的所有图片
if args.list:
images = list_bucket_images(args.kb_id)
if images:
print(f"知识库 {args.kb_id} 中的图片列表:")
for i, image in enumerate(images, 1):
url = get_image_url(args.kb_id, image)
print(f" {i}. {image}")
print(f" URL: {url}")
else:
print(f"知识库 {args.kb_id} 中没有图片")
return
# 获取单个图片的URL
if not args.image_key:
print("[ERROR] 请提供图片键 (--image_key) 或使用 --list 列出所有图片")
parser.print_help()
return
url = get_image_url(args.kb_id, args.image_key)
if url:
print(f"图片 {args.kb_id}/{args.image_key} 的URL:")
print(url)
else:
print(f"无法获取图片 {args.kb_id}/{args.image_key} 的URL")
if __name__ == "__main__":
main()

View File

@ -23,4 +23,5 @@ shapely==2.1.0
pyclipper==1.3.0.post6 pyclipper==1.3.0.post6
omegaconf==2.3.0 omegaconf==2.3.0
rapid-table==1.0.3 rapid-table==1.0.3
openai==1.70.0 openai==1.70.0
redis==6.2.0

View File

@ -3,7 +3,7 @@ from io import BytesIO
from .. import files_bp from .. import files_bp
from services.files.service import get_files_list, get_file_info, download_file_from_minio, delete_file, batch_delete_files, upload_files_to_server from services.files.service import get_files_list, get_file_info, download_file_from_minio, delete_file, batch_delete_files, handle_chunk_upload, merge_chunks, upload_files_to_server
from services.files.utils import FileType from services.files.utils import FileType
UPLOAD_FOLDER = "/data/uploads" UPLOAD_FOLDER = "/data/uploads"
@ -121,3 +121,58 @@ def batch_delete_files_route():
except Exception as e: except Exception as e:
return jsonify({"code": 500, "message": f"批量删除文件失败: {str(e)}"}), 500 return jsonify({"code": 500, "message": f"批量删除文件失败: {str(e)}"}), 500
@files_bp.route("/upload/chunk", methods=["POST"])
def upload_chunk():
"""
处理文件分块上传
"""
if "chunk" not in request.files:
return jsonify({"code": 400, "message": "未选择文件分块", "data": None}), 400
chunk = request.files["chunk"]
chunk_index = request.form.get("chunkIndex")
total_chunks = request.form.get("totalChunks")
upload_id = request.form.get("uploadId")
file_name = request.form.get("fileName")
parent_id = request.form.get("parent_id")
if not all([chunk_index, total_chunks, upload_id, file_name]):
return jsonify({"code": 400, "message": "缺少必要参数", "data": None}), 400
result = handle_chunk_upload(chunk, chunk_index, total_chunks, upload_id, file_name, parent_id)
# 检查结果中是否有错误信息
if result.get("code", 0) != 0:
# 如果有错误返回相应的HTTP状态码
return jsonify(result), result.get("code", 500)
return jsonify(result)
@files_bp.route("/upload/merge", methods=["POST"])
def merge_upload():
"""
合并已上传的文件分块
"""
data = request.json
if not data:
return jsonify({"code": 400, "message": "请求数据为空", "data": None}), 400
upload_id = data.get("uploadId")
file_name = data.get("fileName")
total_chunks = data.get("totalChunks")
parent_id = data.get("parentId")
if not all([upload_id, file_name, total_chunks]):
return jsonify({"code": 400, "message": "缺少必要参数", "data": None}), 400
result = merge_chunks(upload_id, file_name, total_chunks, parent_id)
# 检查结果中是否有错误信息
if result.get("code", 0) != 0:
# 如果有错误返回相应的HTTP状态码
return jsonify(result), result.get("code", 500)
return jsonify(result)

View File

@ -7,79 +7,71 @@ from minio import Minio
load_dotenv("../../docker/.env") load_dotenv("../../docker/.env")
# 数据库连接配置 # 数据库连接配置
DB_CONFIG = { DB_CONFIG = {"host": "localhost", "port": int(os.getenv("MYSQL_PORT", "5455")), "user": "root", "password": os.getenv("MYSQL_PASSWORD", "infini_rag_flow"), "database": "rag_flow"}
"host": "localhost",
"port": int(os.getenv("MYSQL_PORT", "5455")),
"user": "root",
"password": os.getenv("MYSQL_PASSWORD", "infini_rag_flow"),
"database": "rag_flow"
}
# MinIO连接配置 # MinIO连接配置
MINIO_CONFIG = { MINIO_CONFIG = {
"endpoint": "localhost:" + os.getenv("MINIO_PORT", "9000"), "endpoint": "localhost:" + os.getenv("MINIO_PORT", "9000"),
"access_key": os.getenv("MINIO_USER", "rag_flow"), "access_key": os.getenv("MINIO_USER", "rag_flow"),
"secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"), "secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"),
"secure": False "secure": False,
} }
def get_minio_client(): def get_minio_client():
"""创建MinIO客户端""" """创建MinIO客户端"""
return Minio( return Minio(endpoint=MINIO_CONFIG["endpoint"], access_key=MINIO_CONFIG["access_key"], secret_key=MINIO_CONFIG["secret_key"], secure=MINIO_CONFIG["secure"])
endpoint=MINIO_CONFIG["endpoint"],
access_key=MINIO_CONFIG["access_key"],
secret_key=MINIO_CONFIG["secret_key"],
secure=MINIO_CONFIG["secure"]
)
def clear_database_tables(): def clear_database_tables():
"""清空数据库表""" """清空数据库表"""
try: try:
conn = mysql.connector.connect(**DB_CONFIG) conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor() cursor = conn.cursor()
# 禁用外键检查 # 禁用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 0") cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
# 清空表数据 # 清空表数据
tables = ["document", "file2document", "file"] tables = ["document", "file2document", "file"]
for table in tables: for table in tables:
cursor.execute(f"TRUNCATE TABLE {table}") cursor.execute(f"TRUNCATE TABLE {table}")
print(f"已清空表: {table}") print(f"已清空表: {table}")
# 启用外键检查 # 启用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 1") cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
conn.commit() conn.commit()
cursor.close() cursor.close()
conn.close() conn.close()
print("数据库表已全部清空") print("数据库表已全部清空")
except Exception as e: except Exception as e:
print(f"清空数据库表失败: {str(e)}") print(f"清空数据库表失败: {str(e)}")
raise raise
def clear_minio_buckets(): def clear_minio_buckets():
"""清空并删除MinIO所有存储桶""" """清空并删除MinIO所有存储桶"""
try: try:
minio_client = get_minio_client() minio_client = get_minio_client()
buckets = minio_client.list_buckets() buckets = minio_client.list_buckets()
if not buckets: if not buckets:
print("MinIO中没有存储桶需要清理") print("MinIO中没有存储桶需要清理")
return return
print(f"开始清理 {len(buckets)} 个MinIO存储桶...") print(f"开始清理 {len(buckets)} 个MinIO存储桶...")
for bucket in buckets: for bucket in buckets:
bucket_name = bucket.name bucket_name = bucket.name
# 跳过系统保留的存储桶 # 跳过系统保留的存储桶
if bucket_name.startswith('.'): if bucket_name.startswith("."):
print(f"跳过系统存储桶: {bucket_name}") print(f"跳过系统存储桶: {bucket_name}")
continue continue
try: try:
# 递归删除存储桶中的所有对象(包括版本控制对象) # 递归删除存储桶中的所有对象(包括版本控制对象)
objects = minio_client.list_objects(bucket_name, recursive=True) objects = minio_client.list_objects(bucket_name, recursive=True)
@ -90,7 +82,7 @@ def clear_minio_buckets():
except Exception as e: except Exception as e:
print(f"删除对象 {obj.object_name} 失败: {str(e)}") print(f"删除对象 {obj.object_name} 失败: {str(e)}")
continue continue
# 确保所有对象已删除 # 确保所有对象已删除
while True: while True:
remaining_objects = list(minio_client.list_objects(bucket_name)) remaining_objects = list(minio_client.list_objects(bucket_name))
@ -98,7 +90,7 @@ def clear_minio_buckets():
break break
for obj in remaining_objects: for obj in remaining_objects:
minio_client.remove_object(bucket_name, obj.object_name) minio_client.remove_object(bucket_name, obj.object_name)
# 实际删除存储桶 # 实际删除存储桶
try: try:
minio_client.remove_bucket(bucket_name) minio_client.remove_bucket(bucket_name)
@ -115,21 +107,23 @@ def clear_minio_buckets():
print(f"已强制删除存储桶: {bucket_name}") print(f"已强制删除存储桶: {bucket_name}")
except Exception as e: except Exception as e:
print(f"强制删除存储桶 {bucket_name} 仍然失败: {str(e)}") print(f"强制删除存储桶 {bucket_name} 仍然失败: {str(e)}")
except Exception as e: except Exception as e:
print(f"处理存储桶 {bucket_name} 时发生错误: {str(e)}") print(f"处理存储桶 {bucket_name} 时发生错误: {str(e)}")
print("MinIO存储桶清理完成") print("MinIO存储桶清理完成")
except Exception as e: except Exception as e:
print(f"清理MinIO存储桶失败: {str(e)}") print(f"清理MinIO存储桶失败: {str(e)}")
raise raise
def confirm_action(): def confirm_action():
"""确认操作""" """确认操作"""
print("警告: 此操作将永久删除所有数据!") print("警告: 此操作将永久删除所有数据!")
confirmation = input("确认要清空所有数据吗? (输入'y'确认): ") confirmation = input("确认要清空所有数据吗? (输入'y'确认): ")
return confirmation.lower() == 'y' return confirmation.lower() == "y"
if __name__ == "__main__": if __name__ == "__main__":
if confirm_action(): if confirm_action():
@ -138,4 +132,4 @@ if __name__ == "__main__":
clear_minio_buckets() clear_minio_buckets()
print("数据清理完成") print("数据清理完成")
else: else:
print("操作已取消") print("操作已取消")

View File

@ -1,16 +1,21 @@
import os import os
import mysql.connector import shutil
import re import re
import tempfile import tempfile
from minio import Minio
from dotenv import load_dotenv from dotenv import load_dotenv
from datetime import datetime from datetime import datetime
from pathlib import Path
from database import get_db_connection, get_minio_client, get_redis_connection
from .utils import FileType, FileSource, get_uuid from .utils import FileType, FileSource, get_uuid
from database import DB_CONFIG, MINIO_CONFIG
# 加载环境变量 # 加载环境变量
load_dotenv("../../docker/.env") load_dotenv("../../docker/.env")
# redis配置参数
UPLOAD_TEMP_DIR = os.getenv("UPLOAD_TEMP_DIR", tempfile.gettempdir())
CHUNK_EXPIRY_SECONDS = 3600 * 24 # 分块24小时过期
temp_dir = tempfile.gettempdir() temp_dir = tempfile.gettempdir()
UPLOAD_FOLDER = os.path.join(temp_dir, "uploads") UPLOAD_FOLDER = os.path.join(temp_dir, "uploads")
ALLOWED_EXTENSIONS = {"pdf", "doc", "docx", "ppt", "pptx", "xls", "xlsx", "jpg", "jpeg", "png", "bmp", "txt", "md", "html"} ALLOWED_EXTENSIONS = {"pdf", "doc", "docx", "ppt", "pptx", "xls", "xlsx", "jpg", "jpeg", "png", "bmp", "txt", "md", "html"}
@ -43,16 +48,6 @@ def filename_type(filename):
return FileType.OTHER.value return FileType.OTHER.value
def get_minio_client():
"""创建MinIO客户端"""
return Minio(endpoint=MINIO_CONFIG["endpoint"], access_key=MINIO_CONFIG["access_key"], secret_key=MINIO_CONFIG["secret_key"], secure=MINIO_CONFIG["secure"])
def get_db_connection():
"""创建数据库连接"""
return mysql.connector.connect(**DB_CONFIG)
def get_files_list(current_page, page_size, name_filter="", sort_by="create_time", sort_order="desc"): def get_files_list(current_page, page_size, name_filter="", sort_by="create_time", sort_order="desc"):
""" """
获取文件列表 获取文件列表
@ -247,7 +242,7 @@ def delete_file(file_id):
document_mappings = cursor.fetchall() document_mappings = cursor.fetchall()
# 创建MinIO客户端(在事务外创建) # 创建MinIO客户端
minio_client = get_minio_client() minio_client = get_minio_client()
# 开始事务 # 开始事务
@ -608,3 +603,133 @@ def upload_files_to_server(files, parent_id=None, user_id=None):
raise RuntimeError({"name": filename, "error": "不支持的文件类型", "status": "failed"}) raise RuntimeError({"name": filename, "error": "不支持的文件类型", "status": "failed"})
return {"code": 0, "data": results, "message": f"成功上传 {len([r for r in results if r['status'] == 'success'])}/{len(files)} 个文件"} return {"code": 0, "data": results, "message": f"成功上传 {len([r for r in results if r['status'] == 'success'])}/{len(files)} 个文件"}
def handle_chunk_upload(chunk_file, chunk_index, total_chunks, upload_id, file_name, parent_id=None):
"""
处理分块上传
Args:
chunk_file: 上传的文件分块
chunk_index: 分块索引
total_chunks: 总分块数
upload_id: 上传ID
file_name: 文件名
parent_id: 父目录ID
Returns:
dict: 上传结果
"""
try:
# 创建临时目录存储分块
upload_dir = Path(UPLOAD_TEMP_DIR) / "chunks" / upload_id
upload_dir.mkdir(parents=True, exist_ok=True)
# 保存分块
chunk_path = upload_dir / f"{chunk_index}.chunk"
chunk_file.save(str(chunk_path))
# 使用Redis记录上传状态
r = get_redis_connection()
# 记录文件信息
if int(chunk_index) == 0:
r.hmset(f"upload:{upload_id}:info", {"file_name": file_name, "total_chunks": total_chunks, "parent_id": parent_id or "", "status": "uploading"})
r.expire(f"upload:{upload_id}:info", CHUNK_EXPIRY_SECONDS)
# 记录分块状态
r.setbit(f"upload:{upload_id}:chunks", int(chunk_index), 1)
r.expire(f"upload:{upload_id}:chunks", CHUNK_EXPIRY_SECONDS)
# 检查是否所有分块都已上传
is_complete = True
for i in range(int(total_chunks)):
if not r.getbit(f"upload:{upload_id}:chunks", i):
is_complete = False
break
return {"code": 0, "data": {"upload_id": upload_id, "chunk_index": chunk_index, "is_complete": is_complete}, "message": "分块上传成功"}
except Exception as e:
print(f"分块上传失败: {str(e)}")
return {"code": 500, "message": f"分块上传失败: {str(e)}"}
def merge_chunks(upload_id, file_name, total_chunks, parent_id=None):
"""
合并文件分块
Args:
upload_id: 上传ID
file_name: 文件名
total_chunks: 总分块数
parent_id: 父目录ID
Returns:
dict: 合并结果
"""
try:
r = get_redis_connection()
# 检查上传状态
if not r.exists(f"upload:{upload_id}:info"):
return {"code": 404, "message": "上传任务不存在或已过期"}
# 检查所有分块是否都已上传
for i in range(int(total_chunks)):
if not r.getbit(f"upload:{upload_id}:chunks", i):
return {"code": 400, "message": f"分块 {i} 未上传,无法合并"}
# 获取上传信息
upload_info = r.hgetall(f"upload:{upload_id}:info")
if not upload_info:
return {"code": 404, "message": "上传信息不存在"}
# 将字节字符串转换为普通字符串
upload_info = {k.decode("utf-8"): v.decode("utf-8") for k, v in upload_info.items()}
# 使用存储的信息,如果参数中没有提供
file_name = file_name or upload_info.get("file_name")
# 创建临时文件用于合并
upload_dir = Path(UPLOAD_TEMP_DIR) / "chunks" / upload_id
merged_path = Path(UPLOAD_TEMP_DIR) / f"merged_{upload_id}_{file_name}"
# 合并文件
with open(merged_path, "wb") as merged_file:
for i in range(int(total_chunks)):
chunk_path = upload_dir / f"{i}.chunk"
with open(chunk_path, "rb") as chunk_file:
merged_file.write(chunk_file.read())
# 使用上传函数处理合并后的文件
with open(merged_path, "rb") as file_obj:
# 创建FileStorage对象
class MockFileStorage:
def __init__(self, file_obj, filename):
self.file = file_obj
self.filename = filename
def save(self, dst):
with open(dst, "wb") as f:
f.write(self.file.read())
self.file.seek(0) # 重置文件指针
mock_file = MockFileStorage(file_obj, file_name)
result = upload_files_to_server([mock_file])
# 更新状态为已完成
r.hset(f"upload:{upload_id}:info", "status", "completed")
# 清理临时文件
try:
if os.path.exists(merged_path):
os.remove(merged_path)
if upload_dir.exists():
shutil.rmtree(upload_dir)
except Exception as e:
print(f"清理临时文件失败: {str(e)}")
return result
except Exception as e:
print(f"合并分块失败: {str(e)}")
return {"code": 500, "message": f"合并分块失败: {str(e)}"}