diff --git a/management/server/check_tables.py b/management/server/check_tables.py deleted file mode 100644 index a56ea61..0000000 --- a/management/server/check_tables.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -import mysql.connector -from dotenv import load_dotenv - -# 加载环境变量 -load_dotenv("../../docker/.env") - -# 数据库连接配置 -DB_CONFIG = { - "host": "localhost", - "port": int(os.getenv("MYSQL_PORT", "5455")), - "user": "root", - "password": os.getenv("MYSQL_PASSWORD", "infini_rag_flow"), - "database": "rag_flow" -} - -def get_db_connection(): - """创建数据库连接""" - return mysql.connector.connect(**DB_CONFIG) - -def get_all_tables(): - """获取数据库中所有表的名称""" - try: - # 连接数据库 - conn = get_db_connection() - cursor = conn.cursor() - - # 查询所有表名 - cursor.execute("SHOW TABLES") - tables = cursor.fetchall() - - print(f"数据库 {DB_CONFIG['database']} 中的表:") - if tables: - for i, table in enumerate(tables, 1): - print(f"{i}. {table[0]}") - else: - print("数据库中没有表") - - # 检查是否存在特定表 - important_tables = ['document', 'file', 'file2document'] - print("\n检查重要表是否存在:") - for table in important_tables: - cursor.execute(f"SHOW TABLES LIKE '{table}'") - exists = cursor.fetchone() is not None - status = "✓ 存在" if exists else "✗ 不存在" - print(f"{table}: {status}") - - cursor.close() - conn.close() - - except mysql.connector.Error as e: - print(f"数据库连接或查询出错: {e}") - -if __name__ == "__main__": - get_all_tables() \ No newline at end of file diff --git a/management/server/database.py b/management/server/database.py index ea20581..58d4c0f 100644 --- a/management/server/database.py +++ b/management/server/database.py @@ -1,5 +1,6 @@ import mysql.connector import os +import redis from minio import Minio from dotenv import load_dotenv from elasticsearch import Elasticsearch @@ -28,6 +29,8 @@ if is_running_in_docker(): MINIO_PORT = 9000 ES_HOST = "es01" ES_PORT = 9200 + REDIS_HOST = os.getenv("REDIS_HOST", "redis") + REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) else: MYSQL_HOST = "localhost" MYSQL_PORT = int(os.getenv("MYSQL_PORT", "5455")) @@ -35,6 +38,9 @@ else: MINIO_PORT = int(os.getenv("MINIO_PORT", "9000")) ES_HOST = "localhost" ES_PORT = int(os.getenv("ES_PORT", "9200")) + REDIS_HOST = "localhost" + REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) + # 数据库连接配置 DB_CONFIG = { @@ -61,6 +67,14 @@ ES_CONFIG = { "use_ssl": os.getenv("ES_USE_SSL", "false").lower() == "true", } +# Redis连接配置 +REDIS_CONFIG = { + "host": REDIS_HOST, + "port": REDIS_PORT, + "password": os.getenv("REDIS_PASSWORD", "infini_rag_flow"), + "decode_responses": False, +} + def get_db_connection(): """创建MySQL数据库连接""" @@ -88,11 +102,11 @@ def get_es_client(): # 构建连接参数 es_params = {"hosts": [ES_CONFIG["host"]]} - # 如果提供了用户名和密码,添加认证信息 + # 添加认证信息 if ES_CONFIG["user"] and ES_CONFIG["password"]: es_params["basic_auth"] = (ES_CONFIG["user"], ES_CONFIG["password"]) - # 如果需要SSL,添加SSL配置 + # 添加SSL配置 if ES_CONFIG["use_ssl"]: es_params["use_ssl"] = True es_params["verify_certs"] = False # 在开发环境中可以设置为False,生产环境应该设置为True @@ -104,6 +118,19 @@ def get_es_client(): raise e +def get_redis_connection(): + """创建Redis连接""" + try: + # 使用配置创建Redis连接 + r = redis.Redis(**REDIS_CONFIG) + # 测试连接 + r.ping() + return r + except Exception as e: + print(f"Redis连接失败: {str(e)}") + raise e + + def test_connections(): """测试数据库和MinIO连接""" try: diff --git a/management/server/get_minio_image_url.py b/management/server/get_minio_image_url.py deleted file mode 100644 index cb33317..0000000 --- a/management/server/get_minio_image_url.py +++ /dev/null @@ -1,161 +0,0 @@ -import os -import sys -import argparse -from minio import Minio -from dotenv import load_dotenv - -# 加载环境变量 -load_dotenv("../../docker/.env") - -def is_running_in_docker(): - # 检查是否存在/.dockerenv文件 - docker_env = os.path.exists('/.dockerenv') - # 或者检查cgroup中是否包含docker字符串 - try: - with open('/proc/self/cgroup', 'r') as f: - return docker_env or 'docker' in f.read() - except: - return docker_env - -MINIO_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost' - -# MinIO连接配置 -MINIO_CONFIG = { - "endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}", - "access_key": os.getenv("MINIO_USER", "rag_flow"), - "secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"), - "secure": False -} - -def get_minio_client(): - """创建MinIO客户端""" - return Minio( - endpoint=MINIO_CONFIG["endpoint"], - access_key=MINIO_CONFIG["access_key"], - secret_key=MINIO_CONFIG["secret_key"], - secure=MINIO_CONFIG["secure"] - ) - -def get_image_url(kb_id, image_key): - """获取图片的公共访问URL - - Args: - kb_id: 知识库ID - image_key: 图片在MinIO中的键 - - Returns: - 图片的公共访问URL - """ - try: - minio_client = get_minio_client() - - # 检查桶和对象是否存在 - if not minio_client.bucket_exists(kb_id): - print(f"[ERROR] 存储桶不存在: {kb_id}") - return None - - try: - minio_client.stat_object(kb_id, image_key) - except Exception as e: - print(f"[ERROR] 对象不存在: {kb_id}/{image_key}, 错误: {str(e)}") - return None - - # 获取MinIO服务器配置 - minio_endpoint = MINIO_CONFIG["endpoint"] - use_ssl = MINIO_CONFIG["secure"] - - # 构建URL - protocol = "https" if use_ssl else "http" - url = f"{protocol}://{minio_endpoint}/{kb_id}/{image_key}" - - return url - except Exception as e: - print(f"[ERROR] 获取图片URL失败: {str(e)}") - return None - -def list_bucket_images(kb_id, prefix="images/"): - """列出知识库中的所有图片 - - Args: - kb_id: 知识库ID - prefix: 图片前缀,默认为"images/" - - Returns: - 图片列表 - """ - try: - minio_client = get_minio_client() - - # 检查桶是否存在 - if not minio_client.bucket_exists(kb_id): - print(f"[ERROR] 存储桶不存在: {kb_id}") - return [] - - # 列出桶中的所有对象 - objects = minio_client.list_objects(kb_id, prefix=prefix, recursive=True) - image_list = [] - - for obj in objects: - image_list.append(obj.object_name) - - return image_list - except Exception as e: - print(f"[ERROR] 列出图片失败: {str(e)}") - return [] - -def main(): - parser = argparse.ArgumentParser(description="查询MinIO中图片的外链") - parser.add_argument("--kb_id", help="知识库ID", required=False) - parser.add_argument("--image_key", help="图片在MinIO中的键", required=False) - parser.add_argument("--list", action="store_true", help="列出知识库中的所有图片") - parser.add_argument("--list_buckets", action="store_true", help="列出所有存储桶") - - args = parser.parse_args() - - # 列出所有存储桶 - if args.list_buckets: - try: - minio_client = get_minio_client() - buckets = minio_client.list_buckets() - print("可用的存储桶列表:") - for bucket in buckets: - print(f" - {bucket.name}") - return - except Exception as e: - print(f"[ERROR] 列出存储桶失败: {str(e)}") - return - - # 检查必要参数 - if not args.kb_id: - print("[ERROR] 请提供知识库ID (--kb_id)") - parser.print_help() - return - - # 列出知识库中的所有图片 - if args.list: - images = list_bucket_images(args.kb_id) - if images: - print(f"知识库 {args.kb_id} 中的图片列表:") - for i, image in enumerate(images, 1): - url = get_image_url(args.kb_id, image) - print(f" {i}. {image}") - print(f" URL: {url}") - else: - print(f"知识库 {args.kb_id} 中没有图片") - return - - # 获取单个图片的URL - if not args.image_key: - print("[ERROR] 请提供图片键 (--image_key) 或使用 --list 列出所有图片") - parser.print_help() - return - - url = get_image_url(args.kb_id, args.image_key) - if url: - print(f"图片 {args.kb_id}/{args.image_key} 的URL:") - print(url) - else: - print(f"无法获取图片 {args.kb_id}/{args.image_key} 的URL") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/management/server/requirements.txt b/management/server/requirements.txt index aa48b9e..2b434c9 100644 --- a/management/server/requirements.txt +++ b/management/server/requirements.txt @@ -23,4 +23,5 @@ shapely==2.1.0 pyclipper==1.3.0.post6 omegaconf==2.3.0 rapid-table==1.0.3 -openai==1.70.0 \ No newline at end of file +openai==1.70.0 +redis==6.2.0 \ No newline at end of file diff --git a/management/server/routes/files/routes.py b/management/server/routes/files/routes.py index 333f62e..8b63d99 100644 --- a/management/server/routes/files/routes.py +++ b/management/server/routes/files/routes.py @@ -3,7 +3,7 @@ from io import BytesIO from .. import files_bp -from services.files.service import get_files_list, get_file_info, download_file_from_minio, delete_file, batch_delete_files, upload_files_to_server +from services.files.service import get_files_list, get_file_info, download_file_from_minio, delete_file, batch_delete_files, handle_chunk_upload, merge_chunks, upload_files_to_server from services.files.utils import FileType UPLOAD_FOLDER = "/data/uploads" @@ -121,3 +121,58 @@ def batch_delete_files_route(): except Exception as e: return jsonify({"code": 500, "message": f"批量删除文件失败: {str(e)}"}), 500 + + +@files_bp.route("/upload/chunk", methods=["POST"]) +def upload_chunk(): + """ + 处理文件分块上传 + """ + if "chunk" not in request.files: + return jsonify({"code": 400, "message": "未选择文件分块", "data": None}), 400 + + chunk = request.files["chunk"] + chunk_index = request.form.get("chunkIndex") + total_chunks = request.form.get("totalChunks") + upload_id = request.form.get("uploadId") + file_name = request.form.get("fileName") + parent_id = request.form.get("parent_id") + + if not all([chunk_index, total_chunks, upload_id, file_name]): + return jsonify({"code": 400, "message": "缺少必要参数", "data": None}), 400 + + result = handle_chunk_upload(chunk, chunk_index, total_chunks, upload_id, file_name, parent_id) + + # 检查结果中是否有错误信息 + if result.get("code", 0) != 0: + # 如果有错误,返回相应的HTTP状态码 + return jsonify(result), result.get("code", 500) + + return jsonify(result) + + +@files_bp.route("/upload/merge", methods=["POST"]) +def merge_upload(): + """ + 合并已上传的文件分块 + """ + data = request.json + if not data: + return jsonify({"code": 400, "message": "请求数据为空", "data": None}), 400 + + upload_id = data.get("uploadId") + file_name = data.get("fileName") + total_chunks = data.get("totalChunks") + parent_id = data.get("parentId") + + if not all([upload_id, file_name, total_chunks]): + return jsonify({"code": 400, "message": "缺少必要参数", "data": None}), 400 + + result = merge_chunks(upload_id, file_name, total_chunks, parent_id) + + # 检查结果中是否有错误信息 + if result.get("code", 0) != 0: + # 如果有错误,返回相应的HTTP状态码 + return jsonify(result), result.get("code", 500) + + return jsonify(result) diff --git a/management/server/clean_all_data.py b/management/server/scripts/clean_all_data.py similarity index 85% rename from management/server/clean_all_data.py rename to management/server/scripts/clean_all_data.py index a493c40..f6aeff2 100644 --- a/management/server/clean_all_data.py +++ b/management/server/scripts/clean_all_data.py @@ -7,79 +7,71 @@ from minio import Minio load_dotenv("../../docker/.env") # 数据库连接配置 -DB_CONFIG = { - "host": "localhost", - "port": int(os.getenv("MYSQL_PORT", "5455")), - "user": "root", - "password": os.getenv("MYSQL_PASSWORD", "infini_rag_flow"), - "database": "rag_flow" -} +DB_CONFIG = {"host": "localhost", "port": int(os.getenv("MYSQL_PORT", "5455")), "user": "root", "password": os.getenv("MYSQL_PASSWORD", "infini_rag_flow"), "database": "rag_flow"} # MinIO连接配置 MINIO_CONFIG = { "endpoint": "localhost:" + os.getenv("MINIO_PORT", "9000"), "access_key": os.getenv("MINIO_USER", "rag_flow"), "secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"), - "secure": False + "secure": False, } + def get_minio_client(): """创建MinIO客户端""" - return Minio( - endpoint=MINIO_CONFIG["endpoint"], - access_key=MINIO_CONFIG["access_key"], - secret_key=MINIO_CONFIG["secret_key"], - secure=MINIO_CONFIG["secure"] - ) + return Minio(endpoint=MINIO_CONFIG["endpoint"], access_key=MINIO_CONFIG["access_key"], secret_key=MINIO_CONFIG["secret_key"], secure=MINIO_CONFIG["secure"]) + def clear_database_tables(): """清空数据库表""" try: conn = mysql.connector.connect(**DB_CONFIG) cursor = conn.cursor() - + # 禁用外键检查 cursor.execute("SET FOREIGN_KEY_CHECKS = 0") - + # 清空表数据 tables = ["document", "file2document", "file"] for table in tables: cursor.execute(f"TRUNCATE TABLE {table}") print(f"已清空表: {table}") - + # 启用外键检查 cursor.execute("SET FOREIGN_KEY_CHECKS = 1") - + conn.commit() cursor.close() conn.close() - + print("数据库表已全部清空") - + except Exception as e: print(f"清空数据库表失败: {str(e)}") raise + def clear_minio_buckets(): """清空并删除MinIO所有存储桶""" try: minio_client = get_minio_client() buckets = minio_client.list_buckets() - + if not buckets: print("MinIO中没有存储桶需要清理") return - + print(f"开始清理 {len(buckets)} 个MinIO存储桶...") - + for bucket in buckets: bucket_name = bucket.name - + # 跳过系统保留的存储桶 - if bucket_name.startswith('.'): + if bucket_name.startswith("."): print(f"跳过系统存储桶: {bucket_name}") continue - + try: # 递归删除存储桶中的所有对象(包括版本控制对象) objects = minio_client.list_objects(bucket_name, recursive=True) @@ -90,7 +82,7 @@ def clear_minio_buckets(): except Exception as e: print(f"删除对象 {obj.object_name} 失败: {str(e)}") continue - + # 确保所有对象已删除 while True: remaining_objects = list(minio_client.list_objects(bucket_name)) @@ -98,7 +90,7 @@ def clear_minio_buckets(): break for obj in remaining_objects: minio_client.remove_object(bucket_name, obj.object_name) - + # 实际删除存储桶 try: minio_client.remove_bucket(bucket_name) @@ -115,21 +107,23 @@ def clear_minio_buckets(): print(f"已强制删除存储桶: {bucket_name}") except Exception as e: print(f"强制删除存储桶 {bucket_name} 仍然失败: {str(e)}") - + except Exception as e: print(f"处理存储桶 {bucket_name} 时发生错误: {str(e)}") - + print("MinIO存储桶清理完成") - + except Exception as e: print(f"清理MinIO存储桶失败: {str(e)}") raise - + + def confirm_action(): """确认操作""" print("警告: 此操作将永久删除所有数据!") confirmation = input("确认要清空所有数据吗? (输入'y'确认): ") - return confirmation.lower() == 'y' + return confirmation.lower() == "y" + if __name__ == "__main__": if confirm_action(): @@ -138,4 +132,4 @@ if __name__ == "__main__": clear_minio_buckets() print("数据清理完成") else: - print("操作已取消") \ No newline at end of file + print("操作已取消") diff --git a/management/server/cleanup_minio_buckets.py b/management/server/scripts/cleanup_minio_buckets.py similarity index 100% rename from management/server/cleanup_minio_buckets.py rename to management/server/scripts/cleanup_minio_buckets.py diff --git a/management/server/minio_test.py b/management/server/scripts/minio_test.py similarity index 100% rename from management/server/minio_test.py rename to management/server/scripts/minio_test.py diff --git a/management/server/siliconflow_emb_test.py b/management/server/scripts/siliconflow_emb_test.py similarity index 100% rename from management/server/siliconflow_emb_test.py rename to management/server/scripts/siliconflow_emb_test.py diff --git a/management/server/services/files/service.py b/management/server/services/files/service.py index ccd43a2..a2fc86b 100644 --- a/management/server/services/files/service.py +++ b/management/server/services/files/service.py @@ -1,16 +1,21 @@ import os -import mysql.connector +import shutil import re import tempfile -from minio import Minio from dotenv import load_dotenv from datetime import datetime +from pathlib import Path +from database import get_db_connection, get_minio_client, get_redis_connection from .utils import FileType, FileSource, get_uuid -from database import DB_CONFIG, MINIO_CONFIG + # 加载环境变量 load_dotenv("../../docker/.env") +# redis配置参数 +UPLOAD_TEMP_DIR = os.getenv("UPLOAD_TEMP_DIR", tempfile.gettempdir()) +CHUNK_EXPIRY_SECONDS = 3600 * 24 # 分块24小时过期 + temp_dir = tempfile.gettempdir() UPLOAD_FOLDER = os.path.join(temp_dir, "uploads") ALLOWED_EXTENSIONS = {"pdf", "doc", "docx", "ppt", "pptx", "xls", "xlsx", "jpg", "jpeg", "png", "bmp", "txt", "md", "html"} @@ -43,16 +48,6 @@ def filename_type(filename): return FileType.OTHER.value -def get_minio_client(): - """创建MinIO客户端""" - return Minio(endpoint=MINIO_CONFIG["endpoint"], access_key=MINIO_CONFIG["access_key"], secret_key=MINIO_CONFIG["secret_key"], secure=MINIO_CONFIG["secure"]) - - -def get_db_connection(): - """创建数据库连接""" - return mysql.connector.connect(**DB_CONFIG) - - def get_files_list(current_page, page_size, name_filter="", sort_by="create_time", sort_order="desc"): """ 获取文件列表 @@ -247,7 +242,7 @@ def delete_file(file_id): document_mappings = cursor.fetchall() - # 创建MinIO客户端(在事务外创建) + # 创建MinIO客户端 minio_client = get_minio_client() # 开始事务 @@ -608,3 +603,133 @@ def upload_files_to_server(files, parent_id=None, user_id=None): raise RuntimeError({"name": filename, "error": "不支持的文件类型", "status": "failed"}) return {"code": 0, "data": results, "message": f"成功上传 {len([r for r in results if r['status'] == 'success'])}/{len(files)} 个文件"} + + +def handle_chunk_upload(chunk_file, chunk_index, total_chunks, upload_id, file_name, parent_id=None): + """ + 处理分块上传 + + Args: + chunk_file: 上传的文件分块 + chunk_index: 分块索引 + total_chunks: 总分块数 + upload_id: 上传ID + file_name: 文件名 + parent_id: 父目录ID + + Returns: + dict: 上传结果 + """ + try: + # 创建临时目录存储分块 + upload_dir = Path(UPLOAD_TEMP_DIR) / "chunks" / upload_id + upload_dir.mkdir(parents=True, exist_ok=True) + + # 保存分块 + chunk_path = upload_dir / f"{chunk_index}.chunk" + chunk_file.save(str(chunk_path)) + + # 使用Redis记录上传状态 + r = get_redis_connection() + + # 记录文件信息 + if int(chunk_index) == 0: + r.hmset(f"upload:{upload_id}:info", {"file_name": file_name, "total_chunks": total_chunks, "parent_id": parent_id or "", "status": "uploading"}) + r.expire(f"upload:{upload_id}:info", CHUNK_EXPIRY_SECONDS) + + # 记录分块状态 + r.setbit(f"upload:{upload_id}:chunks", int(chunk_index), 1) + r.expire(f"upload:{upload_id}:chunks", CHUNK_EXPIRY_SECONDS) + + # 检查是否所有分块都已上传 + is_complete = True + for i in range(int(total_chunks)): + if not r.getbit(f"upload:{upload_id}:chunks", i): + is_complete = False + break + + return {"code": 0, "data": {"upload_id": upload_id, "chunk_index": chunk_index, "is_complete": is_complete}, "message": "分块上传成功"} + except Exception as e: + print(f"分块上传失败: {str(e)}") + return {"code": 500, "message": f"分块上传失败: {str(e)}"} + + +def merge_chunks(upload_id, file_name, total_chunks, parent_id=None): + """ + 合并文件分块 + + Args: + upload_id: 上传ID + file_name: 文件名 + total_chunks: 总分块数 + parent_id: 父目录ID + + Returns: + dict: 合并结果 + """ + try: + r = get_redis_connection() + + # 检查上传状态 + if not r.exists(f"upload:{upload_id}:info"): + return {"code": 404, "message": "上传任务不存在或已过期"} + + # 检查所有分块是否都已上传 + for i in range(int(total_chunks)): + if not r.getbit(f"upload:{upload_id}:chunks", i): + return {"code": 400, "message": f"分块 {i} 未上传,无法合并"} + + # 获取上传信息 + upload_info = r.hgetall(f"upload:{upload_id}:info") + if not upload_info: + return {"code": 404, "message": "上传信息不存在"} + + # 将字节字符串转换为普通字符串 + upload_info = {k.decode("utf-8"): v.decode("utf-8") for k, v in upload_info.items()} + + # 使用存储的信息,如果参数中没有提供 + file_name = file_name or upload_info.get("file_name") + + # 创建临时文件用于合并 + upload_dir = Path(UPLOAD_TEMP_DIR) / "chunks" / upload_id + merged_path = Path(UPLOAD_TEMP_DIR) / f"merged_{upload_id}_{file_name}" + + # 合并文件 + with open(merged_path, "wb") as merged_file: + for i in range(int(total_chunks)): + chunk_path = upload_dir / f"{i}.chunk" + with open(chunk_path, "rb") as chunk_file: + merged_file.write(chunk_file.read()) + + # 使用上传函数处理合并后的文件 + with open(merged_path, "rb") as file_obj: + # 创建FileStorage对象 + class MockFileStorage: + def __init__(self, file_obj, filename): + self.file = file_obj + self.filename = filename + + def save(self, dst): + with open(dst, "wb") as f: + f.write(self.file.read()) + self.file.seek(0) # 重置文件指针 + + mock_file = MockFileStorage(file_obj, file_name) + result = upload_files_to_server([mock_file]) + + # 更新状态为已完成 + r.hset(f"upload:{upload_id}:info", "status", "completed") + + # 清理临时文件 + try: + if os.path.exists(merged_path): + os.remove(merged_path) + if upload_dir.exists(): + shutil.rmtree(upload_dir) + except Exception as e: + print(f"清理临时文件失败: {str(e)}") + + return result + except Exception as e: + print(f"合并分块失败: {str(e)}") + return {"code": 500, "message": f"合并分块失败: {str(e)}"}