基于DeepSeek的优化搜索系统完整操作指南

一、系统概述与架构设计

1.1 系统目标

构建一个基于DeepSeek大模型的智能搜索系统,具备:

  • 高准确性的语义搜索能力
  • 可验证的事实来源
  • 优秀的响应性能
  • 良好的用户体验

1.2 技术架构

简单查询
复杂查询
用户界面
API网关
查询处理模块
查询类型判断
传统检索引擎
向量检索引擎
结果融合模块
大模型生成模块
结果后处理
缓存模块

二、环境准备与依赖安装

2.1 硬件要求

  • 最低配置: 8核CPU, 16GB内存, 100GB存储
  • 推荐配置: 16核CPU, 32GB内存, GPU(可选), 500GB SSD
  • 生产环境: 32核CPU, 64GB+内存, NVIDIA A10/A100, 1TB+ SSD

2.2 软件环境

# 创建Python环境
conda create -n deepseek-search python=3.9
conda activate deepseek-search

# 安装核心依赖
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
pip install transformers>=4.30.0
pip install langchain>=0.0.346
pip install chromadb>=0.4.15
pip install sentence-transformers>=2.2.2
pip install faiss-cpu>=1.7.4  # 或 faiss-gpu
pip install elasticsearch>=8.9.0
pip install fastapi>=0.104.0
pip install uvicorn>=0.24.0
pip install redis>=4.5.0
pip install requests>=2.31.0
pip install pydantic>=2.5.0
pip install beautifulsoup4>=4.12.0  # 网页内容提取
pip install python-multipart>=0.0.6  # 文件上传支持

2.3 目录结构

deepseek_search_system/
├── app/                          # 主应用目录
│   ├── __init__.py
│   ├── main.py                   # FastAPI主应用
│   ├── api/                      # API路由
│   │   ├── __init__.py
│   │   ├── endpoints.py          # API端点
│   │   └── models.py             # 数据模型
│   ├── core/                     # 核心逻辑
│   │   ├── __init__.py
│   │   ├── config.py             # 配置管理
│   │   ├── security.py           # 安全认证
│   │   └── exceptions.py         # 异常处理
│   ├── services/                 # 业务服务
│   │   ├── __init__.py
│   │   ├── search_service.py     # 搜索服务
│   │   ├── embedding_service.py  # 向量化服务
│   │   ├── llm_service.py        # 大模型服务
│   │   └── cache_service.py      # 缓存服务
│   ├── models/                   # 数据模型
│   │   ├── __init__.py
│   │   ├── document.py           # 文档模型
│   │   └── search.py             # 搜索模型
│   └── utils/                    # 工具函数
│       ├── __init__.py
│       ├── logger.py             # 日志配置
│       ├── file_processor.py     # 文件处理
│       └── evaluation.py         # 评估工具
├── data/                         # 数据目录
│   ├── raw/                      # 原始数据
│   ├── processed/                # 处理后的数据
│   └── vector_db/                # 向量数据库
├── tests/                        # 测试目录
├── docker/                       # Docker配置
├── requirements.txt
├── docker-compose.yml
└── README.md

三、数据准备与处理

3.1 数据源配置

创建数据配置文件 app/core/config.py:

from pydantic_settings import BaseSettings
from typing import List, Optional
import os

class Settings(BaseSettings):
    # 数据源配置
    DATA_SOURCES: List[str] = [
        "local:///data/raw/documents",
        "web://example.com/knowledge-base",
        "database://user:pass@localhost/db"
    ]
    
    # 支持的文件类型
    SUPPORTED_EXTENSIONS: List[str] = [".txt", ".pdf", ".docx", ".pptx", ".html", ".md"]
    
    # 向量数据库配置
    VECTOR_DB_PATH: str = "./data/vector_db"
    VECTOR_DIMENSION: int = 384  # sentence-transformers维度
    
    # 检索配置
    MAX_RETRIEVAL_DOCS: int = 5
    SIMILARITY_THRESHOLD: float = 0.7
    
    class Config:
        env_file = ".env"

settings = Settings()

3.2 文档处理流水线

创建文档处理器 app/utils/file_processor.py:

import os
import logging
from typing import List, Dict, Any
from langchain.document_loaders import (
    TextLoader, PyPDFLoader, Docx2txtLoader, 
    UnstructuredHTMLLoader, UnstructuredPowerPointLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter

logger = logging.getLogger(__name__)

class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", "。", "!", "?", ";", ",", " "]
        )
        
        # 文件加载器映射
        self.loader_mapping = {
            '.txt': TextLoader,
            '.pdf': PyPDFLoader,
            '.docx': Docx2txtLoader,
            '.pptx': UnstructuredPowerPointLoader,
            '.html': UnstructuredHTMLLoader,
        }
    
    def load_documents(self, file_path: str) -> List[Dict[str, Any]]:
        """加载单个文档"""
        file_ext = os.path.splitext(file_path)[1].lower()
        
        if file_ext not in self.loader_mapping:
            logger.warning(f"不支持的文件类型: {file_ext}")
            return []
        
        try:
            loader = self.loader_mapping[file_ext](file_path)
            documents = loader.load()
            
            # 添加元数据
            for doc in documents:
                doc.metadata.update({
                    'source': file_path,
                    'file_type': file_ext,
                    'file_size': os.path.getsize(file_path)
                })
            
            return documents
        except Exception as e:
            logger.error(f"加载文档失败 {file_path}: {str(e)}")
            return []
    
    def process_directory(self, directory_path: str) -> List[Dict[str, Any]]:
        """处理整个目录的文档"""
        all_documents = []
        
        for root, _, files in os.walk(directory_path):
            for file in files:
                file_ext = os.path.splitext(file)[1].lower()
                if file_ext in self.loader_mapping:
                    file_path = os.path.join(root, file)
                    documents = self.load_documents(file_path)
                    all_documents.extend(documents)
        
        logger.info(f"共加载 {len(all_documents)} 个文档片段")
        return all_documents
    
    def chunk_documents(self, documents: List[Dict]) -> List[Dict]:
        """对文档进行分块处理"""
        chunked_docs = []
        
        for doc in documents:
            chunks = self.text_splitter.split_documents([doc])
            for i, chunk in enumerate(chunks):
                chunk.metadata.update({
                    'chunk_id': i,
                    'total_chunks': len(chunks)
                })
                chunked_docs.append(chunk)
        
        logger.info(f"分块后共 {len(chunked_docs)} 个文档片段")
        return chunked_docs

3.3 向量数据库初始化

创建向量化服务 app/services/embedding_service.py:

import logging
from typing import List, Dict, Any
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings

logger = logging.getLogger(__name__)

class EmbeddingService:
    def __init__(self, model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
        self.model = SentenceTransformer(model_name)
        self.vector_dim = self.model.get_sentence_embedding_dimension()
        
        # 初始化Chromadb
        self.client = chromadb.PersistentClient(
            path="./data/vector_db",
            settings=Settings(anonymized_telemetry=False)
        )
        
        # 获取或创建集合
        self.collection = self.client.get_or_create_collection(
            name="document_embeddings",
            metadata={"description": "文档向量存储"}
        )
    
    def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
        """生成文本向量"""
        embeddings = self.model.encode(texts, show_progress_bar=True)
        return embeddings.tolist()
    
    def add_documents(self, documents: List[Dict[str, Any]]):
        """添加文档到向量数据库"""
        texts = [doc.page_content for doc in documents]
        metadatas = [doc.metadata for doc in documents]
        ids = [f"{doc.metadata['source']}_{doc.metadata['chunk_id']}" for doc in documents]
        
        # 生成向量
        embeddings = self.generate_embeddings(texts)
        
        # 添加到集合
        self.collection.add(
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas,
            ids=ids
        )
        
        logger.info(f"成功添加 {len(documents)} 个文档到向量数据库")
    
    def similarity_search(self, query: str, n_results: int = 5) -> List[Dict[str, Any]]:
        """相似度搜索"""
        # 生成查询向量
        query_embedding = self.generate_embeddings([query])[0]
        
        # 执行搜索
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=n_results,
            include=["documents", "metadatas", "distances"]
        )
        
        # 格式化结果
        formatted_results = []
        for i in range(len(results['documents'][0])):
            formatted_results.append({
                'content': results['documents'][0][i],
                'metadata': results['metadatas'][0][i],
                'similarity': 1 - results['distances'][0][i]  # 转换距离为相似度
            })
        
        return formatted_results

四、搜索服务实现

4.1 混合检索服务

创建搜索服务 app/services/search_service.py:

import logging
from typing import List, Dict, Any, Optional
from app.services.embedding_service import EmbeddingService
from elasticsearch import Elasticsearch

logger = logging.getLogger(__name__)

class SearchService:
    def __init__(self):
        self.embedding_service = EmbeddingService()
        self.es_client = Elasticsearch(["http://localhost:9200"])
        
    def hybrid_search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """混合检索:结合向量检索和传统检索"""
        try:
            # 1. 向量检索
            vector_results = self.embedding_service.similarity_search(query, n_results=top_k)
            
            # 2. 传统检索 (Elasticsearch)
            traditional_results = self.traditional_search(query, size=top_k)
            
            # 3. 结果融合与去重
            fused_results = self.fuse_results(vector_results, traditional_results, top_k)
            
            return fused_results
            
        except Exception as e:
            logger.error(f"搜索失败: {str(e)}")
            return []
    
    def traditional_search(self, query: str, size: int = 5) -> List[Dict[str, Any]]:
        """传统关键词检索"""
        try:
            response = self.es_client.search(
                index="documents",
                body={
                    "query": {
                        "multi_match": {
                            "query": query,
                            "fields": ["title^2", "content", "keywords"]
                        }
                    },
                    "size": size
                }
            )
            
            results = []
            for hit in response['hits']['hits']:
                results.append({
                    'content': hit['_source'].get('content', ''),
                    'metadata': {
                        'source': hit['_source'].get('source', ''),
                        'score': hit['_score'],
                        'type': 'traditional'
                    },
                    'similarity': hit['_score'] / 10  # 归一化到0-1范围
                })
            
            return results
            
        except Exception as e:
            logger.warning(f"传统检索失败: {str(e)}")
            return []
    
    def fuse_results(self, vector_results: List, traditional_results: List, top_k: int) -> List[Dict]:
        """结果融合算法"""
        all_results = {}
        
        # 添加向量检索结果
        for result in vector_results:
            content_key = result['content'][:100]  # 使用内容前100字符作为去重键
            if content_key not in all_results:
                all_results[content_key] = {
                    **result,
                    'combined_score': result['similarity'] * 0.7  # 向量检索权重
                }
        
        # 添加传统检索结果
        for result in traditional_results:
            content_key = result['content'][:100]
            if content_key in all_results:
                # 如果已存在,提升分数
                all_results[content_key]['combined_score'] += result['similarity'] * 0.3
            else:
                all_results[content_key] = {
                    **result,
                    'combined_score': result['similarity'] * 0.3  # 传统检索权重
                }
        
        # 按综合分数排序
        sorted_results = sorted(
            all_results.values(), 
            key=lambda x: x['combined_score'], 
            reverse=True
        )
        
        return sorted_results[:top_k]

4.2 大模型集成服务

创建LLM服务 app/services/llm_service.py:

import logging
import json
from typing import List, Dict, Any
import requests

logger = logging.getLogger(__name__)

class DeepSeekService:
    def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def generate_answer(self, query: str, context_docs: List[Dict], **kwargs) -> Dict[str, Any]:
        """基于检索结果生成答案"""
        
        # 构建Prompt
        prompt = self._build_prompt(query, context_docs)
        
        # 调用DeepSeek API
        payload = {
            "model": "deepseek-chat",
            "messages": [
                {
                    "role": "system",
                    "content": "你是一个专业的AI助手,基于提供的文档内容准确回答问题。如果文档中没有相关信息,请明确说明。"
                },
                {
                    "role": "user", 
                    "content": prompt
                }
            ],
            "temperature": 0.1,  # 低随机性以保证准确性
            "max_tokens": 2000,
            **kwargs
        }
        
        try:
            response = requests.post(
                f"{self.base_url}/chat/completions",
                headers=self.headers,
                json=payload,
                timeout=30
            )
            response.raise_for_status()
            
            result = response.json()
            answer = result['choices'][0]['message']['content']
            
            return {
                "answer": answer,
                "usage": result.get('usage', {}),
                "context_docs": context_docs
            }
            
        except Exception as e:
            logger.error(f"DeepSeek API调用失败: {str(e)}")
            return {
                "answer": "抱歉,暂时无法生成答案,请稍后重试。",
                "error": str(e),
                "context_docs": context_docs
            }
    
    def _build_prompt(self, query: str, context_docs: List[Dict]) -> str:
        """构建Prompt模板"""
        
        context_str = ""
        for i, doc in enumerate(context_docs, 1):
            source = doc['metadata'].get('source', '未知来源')
            similarity = doc.get('similarity', 0)
            context_str += f"【文档{i}】来源: {source} (相关度: {similarity:.2f})\n"
            context_str += f"内容: {doc['content']}\n\n"
        
        prompt = f"""请基于以下文档内容回答问题。请遵循以下要求:

1. 仅使用提供的文档内容回答问题
2. 如果文档中没有相关信息,请明确说明"根据现有资料无法回答该问题"
3. 回答要简洁明了,重点突出
4. 在回答末尾注明参考的文档来源

问题:{query}

相关文档:
{context_str}

请基于以上文档回答:"""
        
        return prompt

五、API服务搭建

5.1 FastAPI应用

创建主应用 app/main.py:

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
import logging

from app.core.config import settings
from app.services.search_service import SearchService
from app.services.llm_service import DeepSeekService

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 创建FastAPI应用
app = FastAPI(
    title="DeepSeek智能搜索系统",
    description="基于DeepSeek大模型的优化搜索系统",
    version="1.0.0"
)

# CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 依赖注入
def get_search_service():
    return SearchService()

def get_llm_service():
    return DeepSeekService(api_key=settings.DEEPSEEK_API_KEY)

# 数据模型
class SearchRequest(BaseModel):
    query: str
    top_k: Optional[int] = 5
    use_llm: Optional[bool] = True

class SearchResponse(BaseModel):
    answer: str
    sources: List[dict]
    processing_time: float
    usage: Optional[dict] = None

class HealthResponse(BaseModel):
    status: str
    version: str
    components: dict

# API路由
@app.get("/")
async def root():
    return {"message": "DeepSeek智能搜索系统 API"}

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """健康检查端点"""
    return {
        "status": "healthy",
        "version": "1.0.0",
        "components": {
            "api": "ok",
            "vector_db": "ok",
            "llm_service": "ok"
        }
    }

@app.post("/search", response_model=SearchResponse)
async def search(
    request: SearchRequest,
    search_service: SearchService = Depends(get_search_service),
    llm_service: DeepSeekService = Depends(get_llm_service)
):
    """搜索端点"""
    import time
    start_time = time.time()
    
    try:
        # 执行检索
        retrieved_docs = search_service.hybrid_search(
            query=request.query, 
            top_k=request.top_k
        )
        
        if not retrieved_docs:
            return SearchResponse(
                answer="未找到相关文档,请尝试其他关键词。",
                sources=[],
                processing_time=time.time() - start_time
            )
        
        # 使用大模型生成答案(如果启用)
        if request.use_llm and retrieved_docs:
            llm_result = llm_service.generate_answer(request.query, retrieved_docs)
            answer = llm_result["answer"]
            usage = llm_result.get("usage")
        else:
            # 直接返回检索结果
            answer = "以下是与您查询相关的内容:\n\n" + "\n\n".join([
                f"{i+1}. {doc['content'][:200]}..." for i, doc in enumerate(retrieved_docs)
            ])
            usage = None
        
        # 提取来源信息
        sources = []
        for doc in retrieved_docs:
            sources.append({
                "content_preview": doc['content'][:100] + "...",
                "source": doc['metadata'].get('source', '未知'),
                "similarity": round(doc.get('similarity', 0), 3)
            })
        
        return SearchResponse(
            answer=answer,
            sources=sources,
            processing_time=round(time.time() - start_time, 2),
            usage=usage
        )
        
    except Exception as e:
        logger.error(f"搜索处理失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"搜索处理失败: {str(e)}")

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8000,
        reload=True
    )

5.2 配置文件

创建环境配置 .env:

# DeepSeek API配置
DEEPSEEK_API_KEY=your_deepseek_api_key_here
DEEPSEEK_BASE_URL=https://api.deepseek.com

# 向量数据库配置
VECTOR_DB_PATH=./data/vector_db
VECTOR_DIMENSION=384

# 检索配置
MAX_RETRIEVAL_DOCS=5
SIMILARITY_THRESHOLD=0.7

# Elasticsearch配置
ES_HOST=http://localhost:9200
ES_INDEX=documents

# 应用配置
LOG_LEVEL=INFO
API_HOST=0.0.0.0
API_PORT=8000

六、系统部署

6.1 Docker配置

创建 Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY app/ ./app/
COPY data/ ./data/

# 创建必要的目录
RUN mkdir -p ./data/vector_db ./data/raw ./data/processed

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "-m", "app.main"]

创建 docker-compose.yml:

version: '3.8'

services:
  deepseek-search:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}
      - ES_HOST=http://elasticsearch:9200
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    depends_on:
      - elasticsearch
      - redis
    restart: unless-stopped

  elasticsearch:
    image: elasticsearch:8.9.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  es_data:
  redis_data:

6.2 部署脚本

创建部署脚本 deploy.sh:

#!/bin/bash

# 设置变量
APP_NAME="deepseek-search"
DOCKER_REGISTRY="your-registry.com"
VERSION="1.0.0"

echo "开始部署 DeepSeek 搜索系统..."

# 1. 构建Docker镜像
echo "构建Docker镜像..."
docker build -t $DOCKER_REGISTRY/$APP_NAME:$VERSION .

# 2. 推送镜像(如果使用镜像仓库)
# docker push $DOCKER_REGISTRY/$APP_NAME:$VERSION

# 3. 启动服务
echo "启动服务..."
docker-compose down
docker-compose up -d

# 4. 等待服务就绪
echo "等待服务启动..."
sleep 30

# 5. 健康检查
echo "执行健康检查..."
curl -f http://localhost:8000/health || exit 1

echo "部署完成!"
echo "API地址: http://localhost:8000"
echo "文档地址: http://localhost:8000/docs"

七、系统测试与评估

7.1 测试脚本

创建测试脚本 tests/test_system.py:

import requests
import json
import time

BASE_URL = "http://localhost:8000"

def test_health():
    """测试健康检查"""
    response = requests.get(f"{BASE_URL}/health")
    assert response.status_code == 200
    data = response.json()
    assert data['status'] == 'healthy'
    print("✓ 健康检查通过")

def test_search():
    """测试搜索功能"""
    test_queries = [
        "什么是机器学习?",
        "如何优化深度学习模型?",
        "人工智能的发展历史"
    ]
    
    for query in test_queries:
        payload = {
            "query": query,
            "top_k": 3,
            "use_llm": True
        }
        
        start_time = time.time()
        response = requests.post(f"{BASE_URL}/search", json=payload)
        end_time = time.time()
        
        assert response.status_code == 200
        data = response.json()
        
        print(f"✓ 查询 '{query}' 测试通过")
        print(f"  响应时间: {data['processing_time']}s")
        print(f"  实际耗时: {end_time - start_time:.2f}s")
        print(f"  返回答案长度: {len(data['answer'])}")
        print(f"  来源数量: {len(data['sources'])}")
        print()

def test_performance():
    """性能测试"""
    query = "测试查询"
    
    times = []
    for i in range(10):
        start_time = time.time()
        response = requests.post(f"{BASE_URL}/search", json={
            "query": query,
            "top_k": 5,
            "use_llm": True
        })
        end_time = time.time()
        
        if response.status_code == 200:
            times.append(end_time - start_time)
    
    avg_time = sum(times) / len(times)
    print(f"平均响应时间: {avg_time:.2f}s")
    print(f"最大响应时间: {max(times):.2f}s")
    print(f"最小响应时间: {min(times):.2f}s")
    
    # 性能要求:P95 < 3s
    p95 = sorted(times)[int(len(times) * 0.95)]
    assert p95 < 3, f"P95响应时间 {p95:.2f}s 超过3秒"

if __name__ == "__main__":
    test_health()
    test_search()
    test_performance()
    print("所有测试通过!")

7.2 评估指标

创建评估工具 app/utils/evaluation.py:

import json
from typing import List, Dict
from sklearn.metrics.precision_recall_fscore_support import precision_recall_fscore_support

class SearchEvaluator:
    def __init__(self):
        self.metrics = {}
    
    def evaluate_retrieval(self, queries: List[str], ground_truth: Dict):
        """评估检索效果"""
        precisions = []
        recalls = []
        f1_scores = []
        
        for query in queries:
            # 这里需要实际的检索结果和标注数据
            # 简化示例
            retrieved_docs = []  # 实际检索到的文档
            relevant_docs = ground_truth.get(query, [])
            
            if not relevant_docs:
                continue
                
            # 计算检索指标
            true_positives = len(set(retrieved_docs) & set(relevant_docs))
            precision = true_positives / len(retrieved_docs) if retrieved_docs else 0
            recall = true_positives / len(relevant_docs) if relevant_docs else 0
            f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
            
            precisions.append(precision)
            recalls.append(recall)
            f1_scores.append(f1)
        
        return {
            'precision': sum(precisions) / len(precisions),
            'recall': sum(recalls) / len(recalls),
            'f1_score': sum(f1_scores) / len(f1_scores)
        }

八、使用指南

8.1 快速启动

# 1. 克隆项目
git clone <repository-url>
cd deepseek-search-system

# 2. 配置环境变量
cp .env.example .env
# 编辑 .env 文件,设置 DeepSeek API Key

# 3. 启动服务
docker-compose up -d

# 4. 初始化数据(如果需要)
python scripts/init_data.py

# 5. 测试服务
python tests/test_system.py

8.2 API使用示例

import requests
import json

# 搜索请求
url = "http://localhost:8000/search"
payload = {
    "query": "如何优化神经网络训练速度?",
    "top_k": 5,
    "use_llm": True
}

response = requests.post(url, json=payload)
result = response.json()

print("答案:", result["answer"])
print("处理时间:", result["processing_time"])
print("来源:")
for source in result["sources"]:
    print(f"- {source['source']} (相关度: {source['similarity']})")

8.3 监控与维护

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs -f deepseek-search

# 备份数据
tar -czf backup_$(date +%Y%m%d).tar.gz data/vector_db/

# 性能监控
curl http://localhost:8000/health

在这里插入图片描述

这个完整的操作指南涵盖了从环境准备到部署运维的全流程。按照这个指南,您可以构建一个功能完整的基于DeepSeek的优化搜索系统。每个步骤都提供了具体的代码实现和配置示例,可以直接使用或根据实际需求进行调整。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐