玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!



一、数据库的种类全景图

NewSQL

TiDB

CockroachDB

Spanner

时序数据库

InfluxDB

TimescaleDB

Prometheus

向量数据库

Chroma

Milvus

Pinecone

Weaviate

Qdrant

NoSQL数据库

图数据库

Neo4j

Neptune

列族数据库

Cassandra

ClickHouse

文档数据库

MongoDB

Elasticsearch

键值存储

Redis

DynamoDB

关系型数据库

MySQL

PostgreSQL

Oracle

SQL Server

SQLite

1.1 关系型数据库(RDBMS)

特性 说明
核心特点 表格结构、SQL 查询、ACID 事务、数据完整性
代表数据库 MySQL、PostgreSQL、Oracle、SQL Server、SQLite
适用场景 结构化数据、复杂事务、报表分析
LLM 应用 用户信息、对话历史、应用配置
MySQL vs PostgreSQL 对比
特性 MySQL PostgreSQL
读取性能 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐
复杂查询 ⭐⭐⭐ ⭐⭐⭐⭐⭐
JSON 支持 基础 完善的 JSONB
全文检索 基础 强大
扩展性 主从复制 分区、并行查询
地理数据 需扩展 内置 PostGIS
适用场景 Web 应用、电商 复杂分析、GIS

1.2 非关系型数据库(NoSQL)

1.2.1 键值数据库
维度 内容
核心特点 键值对存储、O(1) 查询、高并发
代表数据库 Redis、Memcached、DynamoDB、RocksDB
适用场景 缓存、会话、实时计数器
LLM 应用 Token 缓存、会话状态、限流控制
1.2.2 文档数据库
维度 内容
核心特点 JSON/BSON 格式、灵活 Schema、水平扩展
代表数据库 MongoDB、CouchDB、DocumentDB、Elasticsearch
适用场景 内容管理、日志存储、搜索引擎
LLM 应用 训练数据、生成内容、对话记录
1.2.3 列族数据库
维度 内容
核心特点 列式存储、高吞吐、分布式
代表数据库 Cassandra、HBase、ClickHouse、Redshift
适用场景 大数据分析、日志、时序数据
LLM 应用 训练日志、监控数据、事件流
1.2.4 图数据库
维度 内容
核心特点 节点-边模型、关系遍历、图算法
代表数据库 Neo4j、Neptune、JanusGraph、TigerGraph
适用场景 社交网络、知识图谱、推荐系统
LLM 应用 知识图谱、实体关系、关联问答

1.3 向量数据库对比

数据库 开源 部署 特点 适用规模 推荐指数
Chroma 本地/云 轻量、易用、Python 原生 中小规模 ⭐⭐⭐⭐⭐
Milvus 分布式 高性能、云原生 大规模 ⭐⭐⭐⭐⭐
Pinecone 全托管 零运维、即开即用 各种规模 ⭐⭐⭐⭐
Weaviate 本地/云 混合搜索、模块化 中大规模 ⭐⭐⭐⭐
Qdrant 本地/云 Rust 编写、高性能 中大规模 ⭐⭐⭐⭐
Faiss 本地 Meta 开源、纯检索 研究/实验 ⭐⭐⭐

1.4 时序数据库

维度 内容
核心特点 时间索引、高写入、降采样聚合
代表数据库 InfluxDB、TimescaleDB、Prometheus、TDengine
适用场景 监控指标、IoT、金融行情
LLM 应用 推理延迟、Token 速率、API 调用统计

1.5 NewSQL 数据库

维度 内容
核心特点 ACID + 水平扩展、分布式事务
代表数据库 TiDB、CockroachDB、Spanner、YugabyteDB
适用场景 大规模分布式系统、多地域部署
LLM 应用 全球用户系统、分布式对话存储

二、LLM 开发中的数据库应用

2.1 开发阶段数据流

监控阶段

性能指标

时序数据库
InfluxDB

业务日志

日志系统
ELK

推理阶段

未命中

用户查询

缓存
Redis

向量检索
Chroma

LLM生成

关系数据库
PostgreSQL

训练阶段

原始数据

文档数据库
MongoDB

特征工程

列族数据库
Cassandra

模型训练

2.2 训练阶段数据库设计

训练数据管理表结构
-- 数据集元数据表
CREATE TABLE datasets (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    data_type VARCHAR(50) CHECK (data_type IN ('text', 'image', 'audio', 'video', 'multimodal')),
    format VARCHAR(50), -- jsonl, parquet, csv
    sample_count BIGINT DEFAULT 0,
    size_bytes BIGINT,
    storage_path TEXT NOT NULL,
    version VARCHAR(50),
    tags TEXT[], -- PostgreSQL 数组类型
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 训练任务表
CREATE TABLE training_tasks (
    id SERIAL PRIMARY KEY,
    dataset_id INTEGER REFERENCES datasets(id),
    model_name VARCHAR(255) NOT NULL,
    model_version VARCHAR(50),
    status VARCHAR(50) DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')),
    config JSONB, -- 训练配置
    metrics JSONB, -- 训练指标
    loss_history JSONB, -- 损失曲线
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    gpu_hours DECIMAL(10,2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建索引
CREATE INDEX idx_datasets_type ON datasets(data_type);
CREATE INDEX idx_datasets_tags ON datasets USING GIN(tags);
CREATE INDEX idx_tasks_status ON training_tasks(status);
CREATE INDEX idx_tasks_dataset ON training_tasks(dataset_id, created_at DESC);
可运行的 Python 数据管理脚本
#!/usr/bin/env python3
"""
训练数据管理示例
依赖: pip install psycopg2-binary python-dotenv
"""

import os
import json
from datetime import datetime
from typing import List, Dict, Optional
from contextlib import contextmanager

try:
    import psycopg2
    from psycopg2.extras import RealDictCursor
except ImportError:
    print("请先安装依赖: pip install psycopg2-binary")
    raise

# 数据库配置
DB_CONFIG = {
    "host": os.getenv("DB_HOST", "localhost"),
    "port": os.getenv("DB_PORT", "5432"),
    "database": os.getenv("DB_NAME", "llm_training"),
    "user": os.getenv("DB_USER", "postgres"),
    "password": os.getenv("DB_PASSWORD", "password")
}


class TrainingDataManager:
    """训练数据管理器"""
  
    def __init__(self):
        self.conn = None
  
    def connect(self):
        """建立数据库连接"""
        self.conn = psycopg2.connect(**DB_CONFIG)
        return self
  
    def close(self):
        """关闭连接"""
        if self.conn:
            self.conn.close()
  
    def __enter__(self):
        return self.connect()
  
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
  
    def create_dataset(
        self,
        name: str,
        data_type: str,
        format: str,
        sample_count: int,
        size_bytes: int,
        storage_path: str,
        version: str = "1.0",
        tags: List[str] = None
    ) -> int:
        """创建数据集记录"""
        with self.conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO datasets (name, data_type, format, sample_count, size_bytes, storage_path, version, tags)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                RETURNING id
                """,
                (name, data_type, format, sample_count, size_bytes, storage_path, version, tags or [])
            )
            dataset_id = cur.fetchone()[0]
            self.conn.commit()
            return dataset_id
  
    def create_training_task(
        self,
        dataset_id: int,
        model_name: str,
        config: Dict
    ) -> int:
        """创建训练任务"""
        with self.conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO training_tasks (dataset_id, model_name, config, status)
                VALUES (%s, %s, %s, 'pending')
                RETURNING id
                """,
                (dataset_id, model_name, json.dumps(config))
            )
            task_id = cur.fetchone()[0]
            self.conn.commit()
            return task_id
  
    def update_task_status(
        self,
        task_id: int,
        status: str,
        metrics: Optional[Dict] = None
    ):
        """更新任务状态"""
        with self.conn.cursor() as cur:
            if status == 'running' and metrics is None:
                cur.execute(
                    "UPDATE training_tasks SET status = %s, start_time = NOW() WHERE id = %s",
                    (status, task_id)
                )
            elif status in ['completed', 'failed']:
                cur.execute(
                    """
                    UPDATE training_tasks 
                    SET status = %s, end_time = NOW(), metrics = %s 
                    WHERE id = %s
                    """,
                    (status, json.dumps(metrics) if metrics else None, task_id)
                )
            else:
                cur.execute(
                    "UPDATE training_tasks SET status = %s WHERE id = %s",
                    (status, task_id)
                )
            self.conn.commit()
  
    def get_dataset_stats(self) -> List[Dict]:
        """获取数据集统计"""
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("""
                SELECT 
                    data_type,
                    COUNT(*) as dataset_count,
                    SUM(sample_count) as total_samples,
                    SUM(size_bytes) / 1024 / 1024 / 1024 as total_size_gb
                FROM datasets
                GROUP BY data_type
                ORDER BY total_samples DESC
            """)
            return cur.fetchall()


# 使用示例
if __name__ == "__main__":
    with TrainingDataManager() as manager:
        # 创建数据集
        dataset_id = manager.create_dataset(
            name="中文问答数据集_v2",
            data_type="text",
            format="jsonl",
            sample_count=100000,
            size_bytes=2147483648,  # 2GB
            storage_path="s3://datasets/chinese-qa/v2",
            version="2.0",
            tags=["中文", "问答", "监督微调"]
        )
        print(f"创建数据集 ID: {dataset_id}")
      
        # 创建训练任务
        task_id = manager.create_training_task(
            dataset_id=dataset_id,
            model_name="Qwen2-7B-Instruct",
            config={
                "learning_rate": 2e-5,
                "batch_size": 32,
                "epochs": 3,
                "max_length": 2048
            }
        )
        print(f"创建训练任务 ID: {task_id}")
      
        # 模拟训练过程
        manager.update_task_status(task_id, 'running')
        print("任务开始运行...")
      
        # 模拟训练完成
        manager.update_task_status(task_id, 'completed', {
            "final_loss": 0.0234,
            "accuracy": 0.9567,
            "eval_loss": 0.0256,
            "training_time_hours": 12.5
        })
        print("任务完成!")
      
        # 查看统计
        stats = manager.get_dataset_stats()
        print("\n数据集统计:")
        for stat in stats:
            print(f"  {stat['data_type']}: {stat['dataset_count']} 个数据集, "
                  f"{stat['total_samples']} 样本, {stat['total_size_gb']:.2f} GB")

2.3 RAG 系统完整实现

PostgreSQL LLM服务 重排序服务 向量数据库 Redis缓存 API Gateway 用户 PostgreSQL LLM服务 重排序服务 向量数据库 Redis缓存 API Gateway 用户 alt [缓存命中] [缓存未命中] 发送查询 检查缓存 返回缓存结果 返回答案 向量相似度搜索 返回Top-K文档 精排序 返回重排序结果 构建Prompt并生成 返回答案 写入缓存 记录对话历史 返回答案
可运行的 RAG 系统代码
#!/usr/bin/env python3
"""
RAG 系统完整实现
依赖: pip install chromadb redis psycopg2-binary sentence-transformers numpy
"""

import os
import json
import hashlib
import numpy as np
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime

# 检查依赖
try:
    import chromadb
    import redis
    import psycopg2
    from sentence_transformers import SentenceTransformer
except ImportError as e:
    print(f"缺少依赖: {e}")
    print("请运行: pip install chromadb redis psycopg2-binary sentence-transformers numpy")
    raise


@dataclass
class Document:
    """文档数据类"""
    id: str
    content: str
    metadata: Dict
    embedding: Optional[List[float]] = None


@dataclass
class SearchResult:
    """搜索结果数据类"""
    document: Document
    score: float
    rank: int


class RAGDatabaseManager:
    """RAG 数据库管理器"""
  
    def __init__(
        self,
        embedding_model: str = "BAAI/bge-large-zh-v1.5",
        chroma_path: str = "./chroma_db",
        redis_host: str = "localhost",
        redis_port: int = 6379,
        pg_config: Optional[Dict] = None
    ):
        # 初始化嵌入模型
        print(f"加载嵌入模型: {embedding_model}")
        self.encoder = SentenceTransformer(embedding_model)
      
        # 初始化向量数据库
        self.chroma_client = chromadb.PersistentClient(path=chroma_path)
        self.collection = self.chroma_client.get_or_create_collection(
            name="documents",
            metadata={"hnsw:space": "cosine"}
        )
      
        # 初始化缓存
        self.cache = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=0,
            decode_responses=True
        )
      
        # 初始化关系数据库
        self.pg_config = pg_config or {
            "host": "localhost",
            "port": "5432",
            "database": "rag_db",
            "user": "postgres",
            "password": "password"
        }
        self._init_postgres()
  
    def _init_postgres(self):
        """初始化 PostgreSQL 表结构"""
        conn = psycopg2.connect(**self.pg_config)
        try:
            with conn.cursor() as cur:
                # 文档元数据表
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS documents (
                        id VARCHAR(255) PRIMARY KEY,
                        title VARCHAR(500),
                        source VARCHAR(255),
                        content_length INTEGER,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                        metadata JSONB
                    )
                """)
              
                # 对话历史表
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS conversations (
                        id SERIAL PRIMARY KEY,
                        session_id VARCHAR(255),
                        query TEXT,
                        answer TEXT,
                        retrieved_docs JSONB,
                        latency_ms INTEGER,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    )
                """)
              
                # 创建索引
                cur.execute("""
                    CREATE INDEX IF NOT EXISTS idx_conversations_session 
                    ON conversations(session_id, created_at DESC)
                """)
              
                conn.commit()
        finally:
            conn.close()
  
    def embed_text(self, text: str) -> List[float]:
        """文本向量化"""
        embedding = self.encoder.encode(text, normalize_embeddings=True)
        return embedding.tolist()
  
    def add_document(self, doc: Document) -> bool:
        """添加文档到系统"""
        try:
            # 生成嵌入向量
            if doc.embedding is None:
                doc.embedding = self.embed_text(doc.content)
          
            # 存储到向量数据库
            self.collection.add(
                ids=[doc.id],
                embeddings=[doc.embedding],
                documents=[doc.content],
                metadatas=[doc.metadata]
            )
          
            # 存储元数据到 PostgreSQL
            conn = psycopg2.connect(**self.pg_config)
            try:
                with conn.cursor() as cur:
                    cur.execute(
                        """
                        INSERT INTO documents (id, title, source, content_length, metadata)
                        VALUES (%s, %s, %s, %s, %s)
                        ON CONFLICT (id) DO UPDATE SET
                            title = EXCLUDED.title,
                            source = EXCLUDED.source,
                            content_length = EXCLUDED.content_length,
                            metadata = EXCLUDED.metadata
                        """,
                        (
                            doc.id,
                            doc.metadata.get('title', ''),
                            doc.metadata.get('source', ''),
                            len(doc.content),
                            json.dumps(doc.metadata)
                        )
                    )
                    conn.commit()
            finally:
                conn.close()
          
            return True
        except Exception as e:
            print(f"添加文档失败: {e}")
            return False
  
    def search(
        self,
        query: str,
        top_k: int = 5,
        use_cache: bool = True,
        cache_ttl: int = 3600
    ) -> Tuple[List[SearchResult], float]:
        """
        检索相关文档
        返回: (搜索结果列表, 查询嵌入耗时)
        """
        import time
        start_time = time.time()
      
        # 生成查询向量
        query_embedding = self.embed_text(query)
        embed_time = time.time() - start_time
      
        # 检查缓存
        if use_cache:
            cache_key = f"rag:search:{hashlib.md5(query.encode()).hexdigest()}"
            cached = self.cache.get(cache_key)
            if cached:
                print("缓存命中!")
                results = json.loads(cached)
                return [
                    SearchResult(
                        document=Document(
                            id=r['id'],
                            content=r['content'],
                            metadata=r['metadata']
                        ),
                        score=r['score'],
                        rank=i+1
                    )
                    for i, r in enumerate(results)
                ], embed_time
      
        # 向量检索
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k,
            include=["documents", "metadatas", "distances"]
        )
      
        # 构建结果
        search_results = []
        for i in range(len(results['ids'][0])):
            doc_id = results['ids'][0][i]
            content = results['documents'][0][i]
            metadata = results['metadatas'][0][i]
            distance = results['distances'][0][i]
          
            # 余弦相似度 = 1 - 余弦距离
            score = 1 - distance
          
            search_results.append(SearchResult(
                document=Document(
                    id=doc_id,
                    content=content,
                    metadata=metadata
                ),
                score=score,
                rank=i+1
            ))
      
        # 写入缓存
        if use_cache:
            cache_data = [
                {
                    'id': r.document.id,
                    'content': r.document.content,
                    'metadata': r.document.metadata,
                    'score': r.score
                }
                for r in search_results
            ]
            self.cache.setex(cache_key, cache_ttl, json.dumps(cache_data))
      
        return search_results, embed_time
  
    def generate_answer(
        self,
        query: str,
        contexts: List[SearchResult],
        model: str = "gpt-3.5-turbo"
    ) -> Dict:
        """
        生成回答(模拟 LLM 调用)
        实际使用时替换为真实的 LLM API 调用
        """
        # 构建上下文
        context_text = "\n\n".join([
            f"[文档 {i+1}] {ctx.document.content[:500]}..."
            for i, ctx in enumerate(contexts[:3])
        ])
      
        # 模拟 Prompt
        prompt = f"""基于以下参考文档回答问题:

{context_text}

问题:{query}

请根据参考文档提供准确、简洁的回答。"""
      
        # 模拟 LLM 响应(实际应调用 OpenAI/Claude/本地模型等)
        answer = f"根据检索到的 {len(contexts)} 个相关文档,[这里是 LLM 生成的回答...]"
      
        return {
            "answer": answer,
            "prompt": prompt,
            "contexts_used": len(contexts),
            "model": model
        }
  
    def query(
        self,
        query: str,
        session_id: Optional[str] = None,
        top_k: int = 5
    ) -> Dict:
        """
        完整的 RAG 查询流程
        """
        import time
        start_time = time.time()
      
        # 1. 检索相关文档
        search_results, embed_time = self.search(query, top_k=top_k)
      
        # 2. 生成回答
        generation_result = self.generate_answer(query, search_results)
      
        total_time = (time.time() - start_time) * 1000  # 转换为毫秒
      
        # 3. 记录到数据库
        conn = psycopg2.connect(**self.pg_config)
        try:
            with conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO conversations (session_id, query, answer, retrieved_docs, latency_ms)
                    VALUES (%s, %s, %s, %s, %s)
                    """,
                    (
                        session_id or "anonymous",
                        query,
                        generation_result['answer'],
                        json.dumps([
                            {
                                'doc_id': r.document.id,
                                'score': r.score,
                                'title': r.document.metadata.get('title', '')
                            }
                            for r in search_results
                        ]),
                        int(total_time)
                    )
                )
                conn.commit()
        finally:
            conn.close()
      
        return {
            "query": query,
            "answer": generation_result['answer'],
            "sources": [
                {
                    "id": r.document.id,
                    "title": r.document.metadata.get('title', ''),
                    "score": round(r.score, 4),
                    "rank": r.rank
                }
                for r in search_results
            ],
            "metrics": {
                "embedding_time_ms": round(embed_time * 1000, 2),
                "total_time_ms": round(total_time, 2),
                "documents_retrieved": len(search_results)
            }
        }
  
    def get_stats(self) -> Dict:
        """获取系统统计信息"""
        # 向量数据库统计
        vector_count = self.collection.count()
      
        # PostgreSQL 统计
        conn = psycopg2.connect(**self.pg_config)
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT COUNT(*) FROM documents")
                doc_count = cur.fetchone()[0]
              
                cur.execute("SELECT COUNT(*) FROM conversations")
                query_count = cur.fetchone()[0]
              
                cur.execute("SELECT AVG(latency_ms) FROM conversations")
                avg_latency = cur.fetchone()[0] or 0
        finally:
            conn.close()
      
        # Redis 统计
        cache_info = self.cache.info()
      
        return {
            "vector_documents": vector_count,
            "metadata_records": doc_count,
            "total_queries": query_count,
            "avg_latency_ms": round(avg_latency, 2),
            "cache_hit_rate": cache_info.get('keyspace_hits', 0) / (
                cache_info.get('keyspace_hits', 0) + cache_info.get('keyspace_misses', 1)
            ) * 100
        }


# 使用示例
if __name__ == "__main__":
    # 初始化 RAG 系统
    rag = RAGDatabaseManager(
        embedding_model="BAAI/bge-small-zh-v1.5",  # 使用轻量级模型便于测试
        chroma_path="./demo_chroma_db"
    )
  
    print("=" * 50)
    print("RAG 系统演示")
    print("=" * 50)
  
    # 添加示例文档
    sample_docs = [
        Document(
            id="doc_001",
            content="""Chroma 是一个开源的向量数据库,专为 AI 应用设计。
            它提供了简单易用的 API,支持嵌入存储、检索和过滤。
            Chroma 可以运行在本地,也可以部署到云端。""",
            metadata={"title": "Chroma 简介", "source": "官方文档", "category": "向量数据库"}
        ),
        Document(
            id="doc_002",
            content="""PostgreSQL 是一个强大的开源关系型数据库系统。
            它拥有超过 30 年的活跃开发历史,以其可靠性、功能健壮性和性能著称。
            PostgreSQL 支持 JSON、全文检索、地理信息等多种高级特性。""",
            metadata={"title": "PostgreSQL 概述", "source": "技术博客", "category": "关系型数据库"}
        ),
        Document(
            id="doc_003",
            content="""Redis 是一个开源的内存数据结构存储系统,可用作数据库、缓存和消息代理。
            它支持字符串、哈希、列表、集合等多种数据结构,
            并提供发布订阅、Lua 脚本、事务等功能。""",
            metadata={"title": "Redis 介绍", "source": "官方文档", "category": "键值存储"}
        ),
        Document(
            id="doc_004",
            content="""RAG(Retrieval-Augmented Generation)是一种结合检索和生成的 AI 技术。
            它首先从知识库中检索相关文档,然后将这些文档作为上下文提供给大语言模型,
            从而生成更准确、更可靠的回答。""",
            metadata={"title": "RAG 技术详解", "source": "研究论文", "category": "AI 技术"}
        ),
    ]
  
    print("\n1. 添加文档到系统...")
    for doc in sample_docs:
        success = rag.add_document(doc)
        print(f"   {'✓' if success else '✗'} {doc.metadata['title']}")
  
    # 执行查询
    print("\n2. 执行 RAG 查询...")
    test_queries = [
        "什么是向量数据库?",
        "Redis 有什么功能?",
        "RAG 技术是如何工作的?"
    ]
  
    for query in test_queries:
        print(f"\n   查询: {query}")
        result = rag.query(query, session_id="demo_session")
        print(f"   回答: {result['answer'][:100]}...")
        print(f"   参考文档: {', '.join([s['title'] for s in result['sources']])}")
        print(f"   耗时: {result['metrics']['total_time_ms']}ms")
  
    # 查看统计
    print("\n3. 系统统计...")
    stats = rag.get_stats()
    for key, value in stats.items():
        print(f"   {key}: {value}")
  
    print("\n" + "=" * 50)
    print("演示完成!")
    print("=" * 50)

2.4 对话历史管理

-- 完整的数据库 Schema
-- 用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE,
    password_hash VARCHAR(255),
    preferences JSONB DEFAULT '{}',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_active TIMESTAMP
);

-- 会话表
CREATE TABLE sessions (
    id VARCHAR(255) PRIMARY KEY,
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    title VARCHAR(255),
    model_name VARCHAR(100) DEFAULT 'gpt-3.5-turbo',
    system_prompt TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 消息表
CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    session_id VARCHAR(255) REFERENCES sessions(id) ON DELETE CASCADE,
    role VARCHAR(20) CHECK (role IN ('user', 'assistant', 'system')),
    content TEXT NOT NULL,
    tokens_input INTEGER,
    tokens_output INTEGER,
    latency_ms INTEGER,
    metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 索引优化
CREATE INDEX idx_sessions_user ON sessions(user_id, updated_at DESC);
CREATE INDEX idx_messages_session ON messages(session_id, created_at);
CREATE INDEX idx_messages_role ON messages(session_id, role, created_at);

三、数据库选型决策指南

3.1 选型决策树

小规模

大规模

海量

中等

开始选型

数据是否
结构化?

是否需要
复杂事务?

是否是
向量数据?

PostgreSQL
MySQL

读多
写少?

MongoDB
Elasticsearch

Redis
DynamoDB

数据规模?

是否有
复杂关联?

Chroma

Milvus
Pinecone

Neo4j
Neptune

是否时序
数据?

InfluxDB
TimescaleDB

数据量?

Cassandra
ClickHouse

MongoDB

3.2 场景选型速查表

应用场景 核心需求 推荐组合 预估成本
RAG 问答系统 语义检索 + 元数据管理 Chroma + PostgreSQL + Redis
知识图谱问答 关系查询 + 语义检索 Neo4j + Weaviate
大模型训练平台 海量数据 + 高吞吐 Cassandra + MongoDB + PostgreSQL
聊天机器人 会话管理 + 实时响应 PostgreSQL + Redis + Chroma
多模态 AI 应用 多媒体存储 + 向量检索 MinIO + Milvus + MongoDB
实时推荐系统 低延迟 + 个性化 Redis + Faiss + Neo4j
AI Agent 平台 状态管理 + 工具调用 PostgreSQL + Redis + 图数据库

3.3 性能对比矩阵

数据库 QPS 延迟 扩展性 一致性 学习曲线
PostgreSQL ⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐
MongoDB ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐
Redis ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐
Chroma ⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐ ⭐⭐
Milvus ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐
Neo4j ⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐
InfluxDB ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐

四、最佳实践清单

4.1 部署建议

阶段 推荐方案 理由
原型开发 SQLite + Chroma + 本地 Redis 零配置、快速迭代
开发测试 Docker Compose 全栈 环境一致、易于复现
生产环境 托管云服务 + 容器编排 高可用、自动运维
大规模部署 Kubernetes + 分布式数据库 弹性扩展、故障自愈

4.2 性能优化检查表

  • 索引优化: 为高频查询字段创建索引
  • 连接池: 使用连接池避免频繁创建连接
  • 缓存策略: 热点数据缓存,设置合理 TTL
  • 读写分离: 读多写少场景配置主从复制
  • 数据分片: 大规模数据水平分片
  • 批量操作: 使用批量插入替代单条插入
  • 异步处理: 非关键操作异步执行
  • 监控告警: 设置性能基线和告警阈值

4.3 安全清单

  • 访问控制: 最小权限原则
  • 数据加密: 传输加密 + 存储加密
  • 定期备份: 自动化备份策略
  • 审计日志: 记录关键操作
  • 敏感数据: 脱敏处理
  • 漏洞扫描: 定期安全检查

五、总结

5.1 数据库选型总览

类型 核心优势 典型场景 首选推荐
关系型 ACID、SQL、成熟稳定 事务处理、报表 PostgreSQL
键值型 极速、简单、高并发 缓存、会话、队列 Redis
文档型 灵活 Schema、易扩展 内容、日志、配置 MongoDB
图数据库 关系遍历、图算法 知识图谱、推荐 Neo4j
向量数据库 语义检索、相似度 RAG、搜索、推荐 Chroma/Milvus
时序数据库 时间序列优化 监控、IoT、指标 InfluxDB
NewSQL 分布式 ACID 全球分布式应用 TiDB

5.2 未来趋势

2024 向量数据库标准化 AI 原生数据库出现 2025 多模态数据库成熟 Serverless 成为主流 2026 边缘数据库普及 自治数据库兴起 2027+ 量子数据库探索 脑机接口数据存储 数据库技术演进趋势

5.3 核心原则

没有最好的数据库,只有最适合的数据库。

选型时应综合考虑:

  1. 数据特征: 结构化程度、访问模式、增长预期
  2. 性能需求: QPS、延迟、吞吐量
  3. 团队能力: 技术栈熟悉度、运维资源
  4. 成本预算: 硬件、云服务、人力
  5. 生态集成: 与现有系统的兼容性

参考资料

资源 链接
Chroma 文档 https://docs.trychroma.com/
Milvus 文档 https://milvus.io/docs
Neo4j 文档 https://neo4j.com/docs/
PostgreSQL 文档 https://www.postgresql.org/docs/
Redis 文档 https://redis.io/docs
向量数据库对比 https://www.pinecone.io/learn/vector-database/

本文最后更新:2026-02-04

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐