diff --git a/management/server/database.py b/management/server/database.py index 4e1f9f8..ef4062a 100644 --- a/management/server/database.py +++ b/management/server/database.py @@ -4,6 +4,7 @@ from utils import generate_uuid, encrypt_password from datetime import datetime from minio import Minio from dotenv import load_dotenv +from elasticsearch import Elasticsearch # 加载环境变量 load_dotenv("../../docker/.env") @@ -22,6 +23,7 @@ def is_running_in_docker(): # 根据运行环境选择合适的主机地址 DB_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost' MINIO_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost' +ES_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost' # 添加 ES 主机地址 # 数据库连接配置 DB_CONFIG = { @@ -40,6 +42,14 @@ MINIO_CONFIG = { "secure": False } +# Elasticsearch连接配置 +ES_CONFIG = { + "host": f"http://{ES_HOST}:{os.getenv('ES_PORT', '9200')}", + "user": os.getenv("ELASTIC_USER", "elastic"), + "password": os.getenv("ELASTIC_PASSWORD", "infini_rag_flow"), + "use_ssl": os.getenv("ES_USE_SSL", "false").lower() == "true" +} + def get_db_connection(): """创建MySQL数据库连接""" try: @@ -63,6 +73,29 @@ def get_minio_client(): print(f"MinIO连接失败: {str(e)}") raise e +def get_es_client(): + """创建Elasticsearch客户端连接""" + try: + # 构建连接参数 + es_params = { + "hosts": [ES_CONFIG["host"]] + } + + # 如果提供了用户名和密码,添加认证信息 + if ES_CONFIG["user"] and ES_CONFIG["password"]: + es_params["basic_auth"] = (ES_CONFIG["user"], ES_CONFIG["password"]) + + # 如果需要SSL,添加SSL配置 + if ES_CONFIG["use_ssl"]: + es_params["use_ssl"] = True + es_params["verify_certs"] = False # 在开发环境中可以设置为False,生产环境应该设置为True + + es_client = Elasticsearch(**es_params) + return es_client + except Exception as e: + print(f"Elasticsearch连接失败: {str(e)}") + raise e + def test_connections(): """测试数据库和MinIO连接""" try: @@ -80,6 +113,14 @@ def test_connections(): buckets = minio_client.list_buckets() print(f"MinIO连接测试成功,共有 {len(buckets)} 个存储桶") + # 测试Elasticsearch连接 + try: + es_client = get_es_client() + es_info = es_client.info() + print(f"Elasticsearch连接测试成功,版本: {es_info.get('version', {}).get('number', '未知')}") + except Exception as e: + print(f"Elasticsearch连接测试失败: {str(e)}") + return True except Exception as e: print(f"连接测试失败: {str(e)}") diff --git a/management/server/get_minio_image_url.py b/management/server/get_minio_image_url.py new file mode 100644 index 0000000..cb33317 --- /dev/null +++ b/management/server/get_minio_image_url.py @@ -0,0 +1,161 @@ +import os +import sys +import argparse +from minio import Minio +from dotenv import load_dotenv + +# 加载环境变量 +load_dotenv("../../docker/.env") + +def is_running_in_docker(): + # 检查是否存在/.dockerenv文件 + docker_env = os.path.exists('/.dockerenv') + # 或者检查cgroup中是否包含docker字符串 + try: + with open('/proc/self/cgroup', 'r') as f: + return docker_env or 'docker' in f.read() + except: + return docker_env + +MINIO_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost' + +# MinIO连接配置 +MINIO_CONFIG = { + "endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}", + "access_key": os.getenv("MINIO_USER", "rag_flow"), + "secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"), + "secure": False +} + +def get_minio_client(): + """创建MinIO客户端""" + return Minio( + endpoint=MINIO_CONFIG["endpoint"], + access_key=MINIO_CONFIG["access_key"], + secret_key=MINIO_CONFIG["secret_key"], + secure=MINIO_CONFIG["secure"] + ) + +def get_image_url(kb_id, image_key): + """获取图片的公共访问URL + + Args: + kb_id: 知识库ID + image_key: 图片在MinIO中的键 + + Returns: + 图片的公共访问URL + """ + try: + minio_client = get_minio_client() + + # 检查桶和对象是否存在 + if not minio_client.bucket_exists(kb_id): + print(f"[ERROR] 存储桶不存在: {kb_id}") + return None + + try: + minio_client.stat_object(kb_id, image_key) + except Exception as e: + print(f"[ERROR] 对象不存在: {kb_id}/{image_key}, 错误: {str(e)}") + return None + + # 获取MinIO服务器配置 + minio_endpoint = MINIO_CONFIG["endpoint"] + use_ssl = MINIO_CONFIG["secure"] + + # 构建URL + protocol = "https" if use_ssl else "http" + url = f"{protocol}://{minio_endpoint}/{kb_id}/{image_key}" + + return url + except Exception as e: + print(f"[ERROR] 获取图片URL失败: {str(e)}") + return None + +def list_bucket_images(kb_id, prefix="images/"): + """列出知识库中的所有图片 + + Args: + kb_id: 知识库ID + prefix: 图片前缀,默认为"images/" + + Returns: + 图片列表 + """ + try: + minio_client = get_minio_client() + + # 检查桶是否存在 + if not minio_client.bucket_exists(kb_id): + print(f"[ERROR] 存储桶不存在: {kb_id}") + return [] + + # 列出桶中的所有对象 + objects = minio_client.list_objects(kb_id, prefix=prefix, recursive=True) + image_list = [] + + for obj in objects: + image_list.append(obj.object_name) + + return image_list + except Exception as e: + print(f"[ERROR] 列出图片失败: {str(e)}") + return [] + +def main(): + parser = argparse.ArgumentParser(description="查询MinIO中图片的外链") + parser.add_argument("--kb_id", help="知识库ID", required=False) + parser.add_argument("--image_key", help="图片在MinIO中的键", required=False) + parser.add_argument("--list", action="store_true", help="列出知识库中的所有图片") + parser.add_argument("--list_buckets", action="store_true", help="列出所有存储桶") + + args = parser.parse_args() + + # 列出所有存储桶 + if args.list_buckets: + try: + minio_client = get_minio_client() + buckets = minio_client.list_buckets() + print("可用的存储桶列表:") + for bucket in buckets: + print(f" - {bucket.name}") + return + except Exception as e: + print(f"[ERROR] 列出存储桶失败: {str(e)}") + return + + # 检查必要参数 + if not args.kb_id: + print("[ERROR] 请提供知识库ID (--kb_id)") + parser.print_help() + return + + # 列出知识库中的所有图片 + if args.list: + images = list_bucket_images(args.kb_id) + if images: + print(f"知识库 {args.kb_id} 中的图片列表:") + for i, image in enumerate(images, 1): + url = get_image_url(args.kb_id, image) + print(f" {i}. {image}") + print(f" URL: {url}") + else: + print(f"知识库 {args.kb_id} 中没有图片") + return + + # 获取单个图片的URL + if not args.image_key: + print("[ERROR] 请提供图片键 (--image_key) 或使用 --list 列出所有图片") + parser.print_help() + return + + url = get_image_url(args.kb_id, args.image_key) + if url: + print(f"图片 {args.kb_id}/{args.image_key} 的URL:") + print(url) + else: + print(f"无法获取图片 {args.kb_id}/{args.image_key} 的URL") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/management/server/requirements.txt b/management/server/requirements.txt index 9dfc3f1..0fde549 100644 --- a/management/server/requirements.txt +++ b/management/server/requirements.txt @@ -7,4 +7,5 @@ Werkzeug==3.1.3 PyJWT==2.10.1 dotenv==0.9.9 magic-pdf[full]==1.3.0 -transformers==4.49.0 \ No newline at end of file +transformers==4.49.0 +elasticsearch==8.12.0 \ No newline at end of file diff --git a/management/server/services/knowledgebases/service.py b/management/server/services/knowledgebases/service.py index 3a98eda..6709acb 100644 --- a/management/server/services/knowledgebases/service.py +++ b/management/server/services/knowledgebases/service.py @@ -3,7 +3,7 @@ import json from flask import current_app from datetime import datetime from utils import generate_uuid -from database import DB_CONFIG, get_minio_client +from database import DB_CONFIG, get_minio_client, get_es_client import io import os import json @@ -11,8 +11,8 @@ import threading import time import tempfile import shutil +from elasticsearch import Elasticsearch from io import BytesIO - # 解析相关模块 from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader from magic_pdf.data.dataset import PymuDocDataset @@ -916,7 +916,51 @@ class KnowledgebaseService: print(f"[DEBUG] 开始上传到MinIO,目标桶: {kb_id}") - + # 获取Elasticsearch客户端 + es_client = get_es_client() + + # 获取tenant_id (文档创建者id) + tenant_query = """ + SELECT created_by FROM document WHERE id = %s + """ + cursor.execute(tenant_query, (doc_id,)) + tenant_result = cursor.fetchone() + tenant_id = tenant_result['created_by'] + print(f"[DEBUG] 文档 {doc_id} 的tenant_id: {tenant_id}") + + # 确保索引存在 + index_name = f"ragflow_{tenant_id}" + if not es_client.indices.exists(index=index_name): + # 创建索引,设置为0个副本 + es_client.indices.create( + index=index_name, + body={ + "settings": { + "number_of_shards": 2, + "number_of_replicas": 0 # 单节点环境设置为0个副本 + }, + "mappings": { + "properties": { + "doc_id": {"type": "keyword"}, + "kb_id": {"type": "keyword"}, + "docnm_kwd": {"type": "keyword"}, + "title_tks": {"type": "keyword"}, + "title_sm_tks": {"type": "keyword"}, + "content_with_weight": {"type": "text"}, + "content_ltks": {"type": "keyword"}, + "content_sm_ltks": {"type": "keyword"}, + "page_num_int": {"type": "integer"}, + "position_int": {"type": "integer"}, + "top_int": {"type": "integer"}, + "create_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}, + "create_timestamp_flt": {"type": "float"}, + "img_id": {"type": "keyword"}, + "q_1024_vec": {"type": "keyword"} + } + } + } + ) + # 处理内容块并上传到MinIO chunk_count = 0 chunk_ids_list = [] @@ -926,16 +970,52 @@ class KnowledgebaseService: if not content.strip(): print(f"[DEBUG] 跳过空文本块 {chunk_idx}") continue - + + # 生成唯一ID chunk_id = generate_uuid() try: + # 1. 上传到MinIO minio_client.put_object( bucket_name=kb_id, object_name=chunk_id, data=BytesIO(content.encode('utf-8')), length=len(content) ) + + # 分词处理 + content_tokens = tokenize_text(content) + + # 获取当前时间 + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_timestamp = datetime.now().timestamp() + + # 创建Elasticsearch文档 + es_doc = { + "doc_id": doc_id, + "kb_id": kb_id, + "docnm_kwd": doc['name'], + "title_tks": doc['name'], # 简化处理,使用文档名作为标题 + "title_sm_tks": doc['name'], # 简化处理 + "content_with_weight": content, + "content_ltks": content_tokens, + "content_sm_ltks": content_tokens, # 简化处理 + "page_num_int": [1], # 默认页码 + "position_int": [[1, 0, 0, 0, 0]], # 默认位置 + "top_int": [1], # 默认顶部位置 + "create_time": current_time, + "create_timestamp_flt": current_timestamp, + "img_id": "", # 如果没有图片,置空 + "q_1024_vec": [] + } + + # 2. 存储到Elasticsearch + es_client.index( + index=index_name, + # id=es_chunk_id, + body=es_doc + ) + chunk_count += 1 chunk_ids_list.append(chunk_id) print(f"成功上传文本块 {chunk_count}/{len(content_list)}") @@ -944,7 +1024,59 @@ class KnowledgebaseService: continue elif chunk_data["type"] == "image": - print(f"[INFO] 跳过图像块处理: {chunk_data['img_path']}") + print(f"[INFO] 处理图像块: {chunk_data['img_path']}") + try: + # 获取图片路径 + img_path = chunk_data['img_path'] + + # 检查是否为相对路径,如果是则添加临时目录前缀 + if not os.path.isabs(img_path): + # 使用临时图片目录作为基础路径 + img_path = os.path.join(temp_image_dir, os.path.basename(img_path)) + print(f"[INFO] 转换为绝对路径: {img_path}") + + if os.path.exists(img_path): + # 生成图片ID和存储路径 + img_id = generate_uuid() + img_key = f"images/{img_id}{os.path.splitext(img_path)[1]}" + + # 读取图片内容 + with open(img_path, 'rb') as img_file: + img_data = img_file.read() + + # 设置图片的Content-Type + content_type = f"image/{os.path.splitext(img_path)[1][1:].lower()}" + if content_type == "image/jpg": + content_type = "image/jpeg" + + # 上传图片到MinIO + minio_client.put_object( + bucket_name=kb_id, + object_name=img_key, + data=BytesIO(img_data), + length=len(img_data), + content_type=content_type + ) + + # 设置图片的公共访问权限 + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": ["s3:GetObject"], + "Resource": [f"arn:aws:s3:::{kb_id}/{img_key}"] + } + ] + } + minio_client.set_bucket_policy(kb_id, json.dumps(policy)) + + print(f"[SUCCESS] 成功上传图片: {img_key}") + else: + print(f"[WARNING] 图片文件不存在: {img_path}") + except Exception as e: + print(f"[ERROR] 上传图片失败: {str(e)}") continue # 更新文档状态和块数量 diff --git a/management/web/src/layouts/components/DocumentParseProgress/index.vue b/management/web/src/layouts/components/DocumentParseProgress/index.vue index b26fd80..aa03908 100644 --- a/management/web/src/layouts/components/DocumentParseProgress/index.vue +++ b/management/web/src/layouts/components/DocumentParseProgress/index.vue @@ -148,38 +148,32 @@ export default {