feat:Elasticsearch添加/图片解析保存 (#27)

* feat(elasticsearch): 添加Elasticsearch集成以支持知识库文档索引

在`database.py`中添加Elasticsearch客户端连接配置和初始化逻辑,并在`knowledgebases/service.py`中实现文档内容块的上传和索引功能。通过Elasticsearch,文档内容将被索引并支持快速搜索,提升知识库的检索效率。

* feat(知识库): 添加图片处理功能并优化资源清理

在知识库服务中添加对图片块的处理功能,支持上传图片到MinIO并设置公共访问权限。同时,在知识库页面组件中添加资源清理逻辑,确保在组件卸载或停用时释放资源。
This commit is contained in:
zstar 2025-04-15 00:35:33 +08:00 committed by GitHub
parent dbc75f7ed8
commit 99cc31dc67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 401 additions and 40 deletions

View File

@ -4,6 +4,7 @@ from utils import generate_uuid, encrypt_password
from datetime import datetime
from minio import Minio
from dotenv import load_dotenv
from elasticsearch import Elasticsearch
# 加载环境变量
load_dotenv("../../docker/.env")
@ -22,6 +23,7 @@ def is_running_in_docker():
# 根据运行环境选择合适的主机地址
DB_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost'
MINIO_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost'
ES_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost' # 添加 ES 主机地址
# 数据库连接配置
DB_CONFIG = {
@ -40,6 +42,14 @@ MINIO_CONFIG = {
"secure": False
}
# Elasticsearch连接配置
ES_CONFIG = {
"host": f"http://{ES_HOST}:{os.getenv('ES_PORT', '9200')}",
"user": os.getenv("ELASTIC_USER", "elastic"),
"password": os.getenv("ELASTIC_PASSWORD", "infini_rag_flow"),
"use_ssl": os.getenv("ES_USE_SSL", "false").lower() == "true"
}
def get_db_connection():
"""创建MySQL数据库连接"""
try:
@ -63,6 +73,29 @@ def get_minio_client():
print(f"MinIO连接失败: {str(e)}")
raise e
def get_es_client():
"""创建Elasticsearch客户端连接"""
try:
# 构建连接参数
es_params = {
"hosts": [ES_CONFIG["host"]]
}
# 如果提供了用户名和密码,添加认证信息
if ES_CONFIG["user"] and ES_CONFIG["password"]:
es_params["basic_auth"] = (ES_CONFIG["user"], ES_CONFIG["password"])
# 如果需要SSL添加SSL配置
if ES_CONFIG["use_ssl"]:
es_params["use_ssl"] = True
es_params["verify_certs"] = False # 在开发环境中可以设置为False生产环境应该设置为True
es_client = Elasticsearch(**es_params)
return es_client
except Exception as e:
print(f"Elasticsearch连接失败: {str(e)}")
raise e
def test_connections():
"""测试数据库和MinIO连接"""
try:
@ -80,6 +113,14 @@ def test_connections():
buckets = minio_client.list_buckets()
print(f"MinIO连接测试成功共有 {len(buckets)} 个存储桶")
# 测试Elasticsearch连接
try:
es_client = get_es_client()
es_info = es_client.info()
print(f"Elasticsearch连接测试成功版本: {es_info.get('version', {}).get('number', '未知')}")
except Exception as e:
print(f"Elasticsearch连接测试失败: {str(e)}")
return True
except Exception as e:
print(f"连接测试失败: {str(e)}")

View File

@ -0,0 +1,161 @@
import os
import sys
import argparse
from minio import Minio
from dotenv import load_dotenv
# 加载环境变量
load_dotenv("../../docker/.env")
def is_running_in_docker():
# 检查是否存在/.dockerenv文件
docker_env = os.path.exists('/.dockerenv')
# 或者检查cgroup中是否包含docker字符串
try:
with open('/proc/self/cgroup', 'r') as f:
return docker_env or 'docker' in f.read()
except:
return docker_env
MINIO_HOST = 'host.docker.internal' if is_running_in_docker() else 'localhost'
# MinIO连接配置
MINIO_CONFIG = {
"endpoint": f"{MINIO_HOST}:{os.getenv('MINIO_PORT', '9000')}",
"access_key": os.getenv("MINIO_USER", "rag_flow"),
"secret_key": os.getenv("MINIO_PASSWORD", "infini_rag_flow"),
"secure": False
}
def get_minio_client():
"""创建MinIO客户端"""
return Minio(
endpoint=MINIO_CONFIG["endpoint"],
access_key=MINIO_CONFIG["access_key"],
secret_key=MINIO_CONFIG["secret_key"],
secure=MINIO_CONFIG["secure"]
)
def get_image_url(kb_id, image_key):
"""获取图片的公共访问URL
Args:
kb_id: 知识库ID
image_key: 图片在MinIO中的键
Returns:
图片的公共访问URL
"""
try:
minio_client = get_minio_client()
# 检查桶和对象是否存在
if not minio_client.bucket_exists(kb_id):
print(f"[ERROR] 存储桶不存在: {kb_id}")
return None
try:
minio_client.stat_object(kb_id, image_key)
except Exception as e:
print(f"[ERROR] 对象不存在: {kb_id}/{image_key}, 错误: {str(e)}")
return None
# 获取MinIO服务器配置
minio_endpoint = MINIO_CONFIG["endpoint"]
use_ssl = MINIO_CONFIG["secure"]
# 构建URL
protocol = "https" if use_ssl else "http"
url = f"{protocol}://{minio_endpoint}/{kb_id}/{image_key}"
return url
except Exception as e:
print(f"[ERROR] 获取图片URL失败: {str(e)}")
return None
def list_bucket_images(kb_id, prefix="images/"):
"""列出知识库中的所有图片
Args:
kb_id: 知识库ID
prefix: 图片前缀默认为"images/"
Returns:
图片列表
"""
try:
minio_client = get_minio_client()
# 检查桶是否存在
if not minio_client.bucket_exists(kb_id):
print(f"[ERROR] 存储桶不存在: {kb_id}")
return []
# 列出桶中的所有对象
objects = minio_client.list_objects(kb_id, prefix=prefix, recursive=True)
image_list = []
for obj in objects:
image_list.append(obj.object_name)
return image_list
except Exception as e:
print(f"[ERROR] 列出图片失败: {str(e)}")
return []
def main():
parser = argparse.ArgumentParser(description="查询MinIO中图片的外链")
parser.add_argument("--kb_id", help="知识库ID", required=False)
parser.add_argument("--image_key", help="图片在MinIO中的键", required=False)
parser.add_argument("--list", action="store_true", help="列出知识库中的所有图片")
parser.add_argument("--list_buckets", action="store_true", help="列出所有存储桶")
args = parser.parse_args()
# 列出所有存储桶
if args.list_buckets:
try:
minio_client = get_minio_client()
buckets = minio_client.list_buckets()
print("可用的存储桶列表:")
for bucket in buckets:
print(f" - {bucket.name}")
return
except Exception as e:
print(f"[ERROR] 列出存储桶失败: {str(e)}")
return
# 检查必要参数
if not args.kb_id:
print("[ERROR] 请提供知识库ID (--kb_id)")
parser.print_help()
return
# 列出知识库中的所有图片
if args.list:
images = list_bucket_images(args.kb_id)
if images:
print(f"知识库 {args.kb_id} 中的图片列表:")
for i, image in enumerate(images, 1):
url = get_image_url(args.kb_id, image)
print(f" {i}. {image}")
print(f" URL: {url}")
else:
print(f"知识库 {args.kb_id} 中没有图片")
return
# 获取单个图片的URL
if not args.image_key:
print("[ERROR] 请提供图片键 (--image_key) 或使用 --list 列出所有图片")
parser.print_help()
return
url = get_image_url(args.kb_id, args.image_key)
if url:
print(f"图片 {args.kb_id}/{args.image_key} 的URL:")
print(url)
else:
print(f"无法获取图片 {args.kb_id}/{args.image_key} 的URL")
if __name__ == "__main__":
main()

View File

@ -7,4 +7,5 @@ Werkzeug==3.1.3
PyJWT==2.10.1
dotenv==0.9.9
magic-pdf[full]==1.3.0
transformers==4.49.0
transformers==4.49.0
elasticsearch==8.12.0

View File

@ -3,7 +3,7 @@ import json
from flask import current_app
from datetime import datetime
from utils import generate_uuid
from database import DB_CONFIG, get_minio_client
from database import DB_CONFIG, get_minio_client, get_es_client
import io
import os
import json
@ -11,8 +11,8 @@ import threading
import time
import tempfile
import shutil
from elasticsearch import Elasticsearch
from io import BytesIO
# 解析相关模块
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
from magic_pdf.data.dataset import PymuDocDataset
@ -916,7 +916,51 @@ class KnowledgebaseService:
print(f"[DEBUG] 开始上传到MinIO目标桶: {kb_id}")
# 获取Elasticsearch客户端
es_client = get_es_client()
# 获取tenant_id (文档创建者id)
tenant_query = """
SELECT created_by FROM document WHERE id = %s
"""
cursor.execute(tenant_query, (doc_id,))
tenant_result = cursor.fetchone()
tenant_id = tenant_result['created_by']
print(f"[DEBUG] 文档 {doc_id} 的tenant_id: {tenant_id}")
# 确保索引存在
index_name = f"ragflow_{tenant_id}"
if not es_client.indices.exists(index=index_name):
# 创建索引设置为0个副本
es_client.indices.create(
index=index_name,
body={
"settings": {
"number_of_shards": 2,
"number_of_replicas": 0 # 单节点环境设置为0个副本
},
"mappings": {
"properties": {
"doc_id": {"type": "keyword"},
"kb_id": {"type": "keyword"},
"docnm_kwd": {"type": "keyword"},
"title_tks": {"type": "keyword"},
"title_sm_tks": {"type": "keyword"},
"content_with_weight": {"type": "text"},
"content_ltks": {"type": "keyword"},
"content_sm_ltks": {"type": "keyword"},
"page_num_int": {"type": "integer"},
"position_int": {"type": "integer"},
"top_int": {"type": "integer"},
"create_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"create_timestamp_flt": {"type": "float"},
"img_id": {"type": "keyword"},
"q_1024_vec": {"type": "keyword"}
}
}
}
)
# 处理内容块并上传到MinIO
chunk_count = 0
chunk_ids_list = []
@ -926,16 +970,52 @@ class KnowledgebaseService:
if not content.strip():
print(f"[DEBUG] 跳过空文本块 {chunk_idx}")
continue
# 生成唯一ID
chunk_id = generate_uuid()
try:
# 1. 上传到MinIO
minio_client.put_object(
bucket_name=kb_id,
object_name=chunk_id,
data=BytesIO(content.encode('utf-8')),
length=len(content)
)
# 分词处理
content_tokens = tokenize_text(content)
# 获取当前时间
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_timestamp = datetime.now().timestamp()
# 创建Elasticsearch文档
es_doc = {
"doc_id": doc_id,
"kb_id": kb_id,
"docnm_kwd": doc['name'],
"title_tks": doc['name'], # 简化处理,使用文档名作为标题
"title_sm_tks": doc['name'], # 简化处理
"content_with_weight": content,
"content_ltks": content_tokens,
"content_sm_ltks": content_tokens, # 简化处理
"page_num_int": [1], # 默认页码
"position_int": [[1, 0, 0, 0, 0]], # 默认位置
"top_int": [1], # 默认顶部位置
"create_time": current_time,
"create_timestamp_flt": current_timestamp,
"img_id": "", # 如果没有图片,置空
"q_1024_vec": []
}
# 2. 存储到Elasticsearch
es_client.index(
index=index_name,
# id=es_chunk_id,
body=es_doc
)
chunk_count += 1
chunk_ids_list.append(chunk_id)
print(f"成功上传文本块 {chunk_count}/{len(content_list)}")
@ -944,7 +1024,59 @@ class KnowledgebaseService:
continue
elif chunk_data["type"] == "image":
print(f"[INFO] 跳过图像块处理: {chunk_data['img_path']}")
print(f"[INFO] 处理图像块: {chunk_data['img_path']}")
try:
# 获取图片路径
img_path = chunk_data['img_path']
# 检查是否为相对路径,如果是则添加临时目录前缀
if not os.path.isabs(img_path):
# 使用临时图片目录作为基础路径
img_path = os.path.join(temp_image_dir, os.path.basename(img_path))
print(f"[INFO] 转换为绝对路径: {img_path}")
if os.path.exists(img_path):
# 生成图片ID和存储路径
img_id = generate_uuid()
img_key = f"images/{img_id}{os.path.splitext(img_path)[1]}"
# 读取图片内容
with open(img_path, 'rb') as img_file:
img_data = img_file.read()
# 设置图片的Content-Type
content_type = f"image/{os.path.splitext(img_path)[1][1:].lower()}"
if content_type == "image/jpg":
content_type = "image/jpeg"
# 上传图片到MinIO
minio_client.put_object(
bucket_name=kb_id,
object_name=img_key,
data=BytesIO(img_data),
length=len(img_data),
content_type=content_type
)
# 设置图片的公共访问权限
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{kb_id}/{img_key}"]
}
]
}
minio_client.set_bucket_policy(kb_id, json.dumps(policy))
print(f"[SUCCESS] 成功上传图片: {img_key}")
else:
print(f"[WARNING] 图片文件不存在: {img_path}")
except Exception as e:
print(f"[ERROR] 上传图片失败: {str(e)}")
continue
# 更新文档状态和块数量

View File

@ -148,38 +148,32 @@ export default {
</script>
<template>
<el-dialog
title="文档解析进度"
v-model:visible="dialogVisible"
:close-on-click-modal="false"
:close-on-press-escape="false"
:show-close="isCompleted || hasError"
width="500px"
@close="handleClose"
>
<div class="progress-container">
<el-progress
:percentage="progressPercentage"
:status="progressStatus"
/>
<div class="progress-message">
{{ progressMessage }}
</div>
<div class="document-parse-progress-wrapper">
<el-dialog v-model="dialogVisible" title="文档解析进度" width="500px">
<div class="progress-container">
<el-progress
:percentage="progressPercentage"
:status="progressStatus"
/>
<div class="progress-message">
{{ progressMessage }}
</div>
<div class="progress-logs">
<div v-for="(log, index) in logs" :key="index" class="log-item">
<span class="log-time">{{ log.time }}</span>
<span class="log-message">{{ log.message }}</span>
<div class="progress-logs">
<div v-for="(log, index) in logs" :key="index" class="log-item">
<span class="log-time">{{ log.time }}</span>
<span class="log-message">{{ log.message }}</span>
</div>
</div>
</div>
</div>
<template #footer>
<span class="dialog-footer">
<el-button @click="handleClose" :disabled="!isCompleted && !hasError">取消</el-button>
<el-button type="primary" @click="handleClose" v-if="isCompleted || hasError">确定</el-button>
</span>
</template>
</el-dialog>
<template #footer>
<span class="dialog-footer">
<el-button @click="handleClose" :disabled="!isCompleted && !hasError">取消</el-button>
<el-button type="primary" @click="handleClose" v-if="isCompleted || hasError">确定</el-button>
</span>
</template>
</el-dialog>
</div>
</template>
<style scoped>

View File

@ -1,16 +1,13 @@
<script lang="ts" setup>
import type { FormInstance } from "element-plus"
import { log } from "node:console"
import DocumentParseProgress from "@/layouts/components/DocumentParseProgress/index.vue"
import {
deleteDocumentApi,
getDocumentListApi,
getFileListApi,
runDocumentParseApi,
uploadDocumentApi
runDocumentParseApi
} from "@@/apis/kbs/document"
import {
addDocumentToKnowledgeBaseApi,
batchDeleteKnowledgeBaseApi,
createKnowledgeBaseApi,
deleteKnowledgeBaseApi,
@ -20,7 +17,7 @@ import { usePagination } from "@@/composables/usePagination"
import { CaretRight, Delete, Plus, Refresh, Search, View } from "@element-plus/icons-vue"
import axios from "axios"
import { ElMessage, ElMessageBox } from "element-plus"
import { onActivated, onMounted, reactive, ref, watch } from "vue"
import { onActivated, onBeforeUnmount, onMounted, reactive, ref, watch } from "vue"
import "element-plus/dist/index.css"
import "element-plus/theme-chalk/el-message-box.css"
import "element-plus/theme-chalk/el-message.css"
@ -37,6 +34,35 @@ const uploadLoading = ref(false)
const showParseProgress = ref(false)
const currentDocId = ref("")
//
function cleanupResources() {
//
if (multipleSelection.value) {
multipleSelection.value = []
}
loading.value = false
documentLoading.value = false
fileLoading.value = false
uploadLoading.value = false
//
viewDialogVisible.value = false
createDialogVisible.value = false
addDocumentDialogVisible.value = false
showParseProgress.value = false
}
//
onDeactivated(() => {
cleanupResources()
})
//
onBeforeUnmount(() => {
cleanupResources()
})
//
interface KnowledgeBaseData {
id: string
@ -906,6 +932,11 @@ onActivated(() => {
</template>
<style lang="scss" scoped>
.app-container {
display: flex;
flex-direction: column;
min-height: 94%;
}
.el-alert {
margin-bottom: 20px;
}
@ -968,6 +999,7 @@ onActivated(() => {
.pagination-container {
margin-top: 20px;
margin-bottom: 20px;
display: flex;
justify-content: flex-end;
}