1. 项目概述

1.1 项目背景

学校数据中台积累了大量结构化数据(学生信息、教务管理、科研项目、财务数据、人事信息等),共计 47 张业务表、330 个字段、18 个业务指标。传统的数据查询需要编写 SQL,对非技术人员存在较高门槛。

智能问数 系统旨在让用户通过自然语言提问(如"统计各学院的学生人数"),系统自动理解意图、生成 SQL、执行查询并返回结构化结果,实现 NL2SQL(Natural Language to SQL) 的完整闭环。

1.2 核心能力

能力 说明
自然语言理解 从用户问题中提取关键词、识别查询意图
多路召回 字段(向量)+ 指标(向量)+ 字段取值(全文)三路并行召回
LLM 精确过滤 利用大模型对召回结果进行语义裁剪,保留最相关的表和字段
SQL 生成与校验 LLM 生成 SQL → 数据库 EXPLAIN 验证 → 错误时 LLM 自动纠正
实时流式响应 基于 SSE(Server-Sent Events)将各节点执行进度实时推送到前端

1.3 整体架构图

┌─────────────────────────────────────────────────────────────────────────┐
│                           Frontend Layer                                │
│  ┌──────────────┐    ┌──────────────┐                                   │
│  │   PC (3000)  │    │   H5 (3001)  │   Vue 3 + Vite                   │
│  └──────┬───────┘    └──────┬───────┘                                   │
│         └───────────┬───────┘                                           │
│                     │ POST /api/query (SSE)                             │
├─────────────────────┼───────────────────────────────────────────────────┤
│                     ▼                                                   │
│              FastAPI (18000)                                             │
│         ┌───────────────────────┐                                       │
│         │   API / Router Layer  │                                       │
│         │  ┌─────────────────┐  │                                       │
│         │  │  QueryService   │  │                                       │
│         │  └────────┬────────┘  │                                       │
│         └───────────┼───────────┘                                       │
│                     ▼                                                   │
│           LangGraph Agent Pipeline                                      │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │ extract    recall_column ─┐                                      │   │
│  │ keywords → recall_value  ─┼→ merge → filter_table  ─┐           │   │
│  │            recall_metric ─┘         filter_metric ─┼→ add_extra │   │
│  │                                                      → generate │   │
│  │            → validate ──┬─→ execute (success)                    │   │
│  │                         └─→ correct → execute (retry)            │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                     │          │          │           │                  │
│                     ▼          ▼          ▼           ▼                  │
│              ┌──────────┐ ┌────────┐ ┌────────┐ ┌─────────┐            │
│              │   LLM    │ │ Qdrant │ │  ES    │ │  MySQL  │            │
│              │(qwen-max)│ │(向量DB)│ │(全文)  │ │(Meta+DW)│            │
│              └──────────┘ └────────┘ └────────┘ └─────────┘            │
│                     ▲                                                   │
│              ┌──────────┐                                               │
│              │Embedding │                                               │
│              │  (TEI)   │                                               │
│              └──────────┘                                               │
└─────────────────────────────────────────────────────────────────────────┘

2. 技术栈总览

层次 技术 版本/说明
后端框架 FastAPI + Uvicorn Python 异步 Web 框架,端口 18000
Agent 编排 LangGraph 基于状态图的 Agent 工作流编排引擎
LLM 调用 LangChain + langchain-openai 统一的 LLM 调用层
大语言模型 阿里云通义千问 qwen-max 通过 DashScope OpenAI 兼容接口调用
文本向量化 BAAI/bge-large-zh-v1.5 通过 TEI(Text Embeddings Inference)本地部署,1024 维
向量数据库 Qdrant 字段信息和指标信息的向量索引与语义检索
全文搜索引擎 Elasticsearch 字段取值的全文索引与中文分词检索
关系型数据库 MySQL(阿里云 RDS) Meta 库(元数据)+ DW 库(数据仓库)
ORM SQLAlchemy 2.0 (async) + asyncmy 异步数据库访问
配置管理 OmegaConf 结构化 YAML 配置与 dataclass 映射
日志 Loguru 结构化日志,支持 request_id 链路追踪
前端 Vue 3 + Vite PC 端 + H5 移动端

3. 外部依赖与中间件详解

3.1 大语言模型 —— 阿里云通义千问(qwen-max)

作用:系统的"大脑",承担所有需要语义理解和推理的任务。

调用方式:通过阿里云 DashScope 提供的 OpenAI 兼容接口,使用 LangChain 的 init_chat_model 统一初始化。

# app/agent/llm.py
llm = init_chat_model(
    model=app_config.llm.model_name,    # qwen-max
    model_provider="openai",
    api_key=app_config.llm.api_key,
    base_url=app_config.llm.base_url,   # https://dashscope.aliyuncs.com/compatible-mode/v1
    temperature=0                        # 确保输出确定性
)

在 Agent 中的使用场景(共 6 次 LLM 调用):

节点 调用目的 输出格式
extract_keywords 从自然语言中提取关键词 JSON 数组
recall_column 扩展字段召回关键词 JSON 数组
recall_metric 扩展指标召回关键词 JSON 数组
recall_value 扩展字段取值召回关键词 JSON 数组
filter_table 从候选表和字段中裁剪出必需的 JSON 对象
filter_metric 从候选指标中筛选出必需的 JSON 数组
generate_sql 根据上下文生成最终 SQL SQL 文本
correct_sql 当 SQL 验证失败时纠正 SQL 文本

配置位置conf/app_config.yamlllm

llm:
  model_name: qwen-max
  api_key: sk-xxxxx
  base_url: https://dashscope.aliyuncs.com/compatible-mode/v1

3.2 文本向量化服务 —— TEI + bge-large-zh-v1.5

作用:将文本转换为 1024 维向量,用于语义检索。向量化后的字段信息和指标信息存入 Qdrant,查询时将用户关键词向量化后进行相似度搜索。

部署方式:本地部署 HuggingFace TEI(Text Embeddings Inference)服务,加载 BAAI/bge-large-zh-v1.5 中文向量模型。

调用方式:通过 LangChain 的 HuggingFaceEndpointEmbeddings 客户端。

# app/clients/embedding_client_manager.py
class EmbeddingClientManager:
    def init(self):
        self.client = HuggingFaceEndpointEmbeddings(
            model=f"http://{config.host}:{config.port}"  # http://localhost:8888
        )

使用场景

  • 知识库构建时:将字段名称、描述、别名分别向量化后写入 Qdrant;将指标名称、描述、别名向量化后写入 Qdrant
  • 查询时:将用户关键词向量化后在 Qdrant 中检索相似的字段/指标

配置位置conf/app_config.yamlembedding

embedding:
  host: localhost
  port: 8888
  model: BAAI/bge-large-zh-v1.5

3.3 向量数据库 —— Qdrant

作用:存储字段信息和指标信息的向量索引,支持基于语义相似度的快速检索(余弦距离)。

部署位置:远程服务器(8.136.50.72:6333),通过 API Key 认证。

客户端qdrant_client.AsyncQdrantClient(异步客户端)。

# app/clients/qdrant_client_manager.py
class QdrantClientManager:
    def init(self):
        self.client = AsyncQdrantClient(
            host=config.host,     # 8.136.50.72
            port=config.port,     # 6333
            api_key=config.api_key
        )

包含两个 Collection

Collection 存储内容 向量来源 Payload 结构
test-smart-data-query-column 字段信息 字段名/描述/别名的向量 ColumnInfo(id, name, type, role, examples, description, alias, table_id)
test-smart-data-query-metric 指标信息 指标名/描述/别名的向量 MetricInfo(id, name, description, relevant_columns, alias)

Repository 层

  • ColumnQdrantRepository —— 字段向量的写入与检索
  • MetricQdrantRepository —— 指标向量的写入与检索

检索参数

await client.query_points(
    collection_name=collection_name,
    query=embedding,           # 查询向量
    score_threshold=0.6,       # 最低相似度阈值
    limit=5                    # 每次检索最多返回 5 条
)

配置位置conf/app_config.yamlqdrant

qdrant:
  host: 8.136.50.72
  port: 6333
  embedding_size: 1024
  api_key: xxxxx
  column_collection: test-smart-data-query-column
  metric_collection: test-smart-data-query-metric

3.4 全文搜索引擎 —— Elasticsearch

作用:存储字段取值的全文索引,支持基于中文分词的模糊匹配检索。当用户提问中包含具体的实体名称(如"中医学院""针灸推拿学院"等),通过 ES 全文检索可以匹配到对应字段及其取值。

部署位置:阿里云 Elasticsearch 服务(公网访问),通过 basic_auth 认证。

客户端elasticsearch.AsyncElasticsearch

# app/clients/es_client_manager.py
class ESClientManager:
    def init(self):
        self.client = AsyncElasticsearch(
            hosts=[f"http://{config.host}:{config.port}"],
            basic_auth=(config.user, config.password)
        )

索引设计

索引名 字段 类型 分析器
test-smart-data-query id keyword
value text ik_max_word(中文分词)
column_id keyword

ik_max_word 分析器会将中文文本进行最大粒度分词,例如"针灸推拿学院"会被分为"针灸"“推拿”“学院”“针灸推拿”"针灸推拿学院"等 token,从而提高召回率。

数据来源meta_config.yaml 中标记 sync: true 的字段,从数据仓库中查询实际取值后写入 ES。共有 93 个字段被标记为需要同步。

检索方式

await client.search(
    index=index_name,
    query={"match": {"value": keyword}},
    min_score=0.6,     # 最低分数阈值
    size=5             # 每次最多返回 5 条
)

配置位置conf/app_config.yamles

es:
  host: es-cn-xxxxx.public.elasticsearch.aliyuncs.com
  port: 9200
  index_name: test-smart-data-query
  user: elastic
  password: xxxxx

3.5 关系型数据库 —— MySQL(阿里云 RDS)

作用:项目使用两个 MySQL 数据库实例(同一 RDS 实例的不同 database):

数据库 名称 作用
Meta 库 smart-data-query-meta 存储元数据知识库(表信息、字段信息、指标信息、字段-指标关联关系)
DW 库 edu_core 学校数据仓库,包含 47 张业务数据表,是最终 SQL 的执行目标

客户端:SQLAlchemy 2.0 异步引擎 + asyncmy 驱动。

# app/clients/mysql_client_manager.py
class MySQLClientManager:
    def init(self):
        self.engine = create_async_engine(
            f"mysql+asyncmy://{config.user}:{config.password}@{config.host}:{config.port}/{config.database}"
        )
        self.session_factory = async_sessionmaker(self.engine)

Meta 库表结构(4 张表):

table_info         字段表信息(id, name, role, description)
column_info        字段信息(id, name, type, role, examples, description, alias, table_id)
metric_info        指标信息(id, name, description, relevant_columns, alias)
column_metric      字段-指标关联表(column_id, metric_id)

DW 库使用场景

操作 节点/脚本 说明
SHOW COLUMNS FROM table 知识库构建 获取字段类型
SELECT DISTINCT column FROM table 知识库构建 获取字段样本值
SELECT VERSION() add_extra_context 获取数据库版本信息
EXPLAIN sql validate_sql 验证生成的 SQL 语法
执行最终 SQL execute_sql 执行查询并返回结果

Repository 层

  • MetaMySQLRepository —— 元数据的 CRUD(表信息、字段信息、指标信息、关联关系)
  • DWMySQLRepository —— 数据仓库操作(字段类型查询、取值查询、SQL 验证与执行)

配置位置conf/app_config.yamldb_meta / db_dw

db_meta:
  host: rm-xxxxx.mysql.rds.aliyuncs.com
  port: 3306
  user: test1
  password: xxxxx
  database: smart-data-query-meta

db_dw:
  host: rm-xxxxx.mysql.rds.aliyuncs.com
  port: 3306
  user: test1
  password: xxxxx
  database: edu_core

3.6 LangGraph —— Agent 工作流编排引擎

作用:LangGraph 是 LangChain 生态中的有状态图执行引擎,本项目用它来编排 NL2SQL 的完整 Agent 工作流。

核心概念

概念 在本项目中的实现
State(状态) DataAgentState —— 一个 TypedDict,各节点通过读写 State 传递数据
Context(上下文) DataAgentContext —— 注入外部依赖(数据库连接、向量客户端等)
Node(节点) 12 个异步函数,每个处理一个子任务
Edge(边) 定义节点间的执行顺序和并行关系
Conditional Edge validate_sql 后的条件分支(成功→执行,失败→纠正)
stream_writer 各节点通过它向前端推送 SSE 进度消息

State 定义app/agent/state.py):

class DataAgentState(TypedDict):
    query: str                              # 用户原始问题
    keywords: Annotated[list, operator.add]  # 提取的关键词(可追加)
    retrieved_columns: list[ColumnInfo]      # 召回的字段列表
    retrieved_values: list[ValueInfo]        # 召回的字段取值
    retrieved_metrics: list[MetricInfo]      # 召回的指标列表
    table_infos: list[TableInfoState]        # 合并后的表结构信息
    metric_infos: list[MetricInfoState]      # 合并后的指标信息
    date_info: DateInfoState                 # 当前日期上下文
    db_info: dict                            # 数据库版本信息
    sql: str                                 # 生成的 SQL
    error: Optional[str]                     # SQL 验证错误信息

Context 定义app/agent/context.py):

class DataAgentContext(TypedDict):
    embedding_client: HuggingFaceEndpointEmbeddings
    column_qdrant_repository: ColumnQdrantRepository
    value_es_repository: ValueESRepository
    metric_qdrant_repository: MetricQdrantRepository
    meta_mysql_repository: MetaMySQLRepository
    dw_mysql_repository: DWMySQLRepository

3.7 LangChain —— LLM 调用与链式编排

作用:提供统一的 LLM 调用接口和 Prompt → LLM → OutputParser 的链式编排。

调用模式

# 典型的 LangChain 链
prompt = PromptTemplate(template=load_prompt("xxx"), input_variables=[...])
output_parser = JsonOutputParser()  # 或 StrOutputParser
chain = prompt | llm | output_parser
result = await chain.ainvoke({...})

组件说明

组件 说明
PromptTemplate 加载 .prompt 文件,填充变量后构造提示词
JsonOutputParser 将 LLM 输出解析为 JSON(用于关键词提取、过滤等)
StrOutputParser 将 LLM 输出保留为字符串(用于 SQL 生成)
init_chat_model 统一的模型初始化函数,支持 openai/anthropic 等多种 provider

3.8 OmegaConf —— 结构化配置管理

作用:将 YAML 配置文件与 Python dataclass 绑定,提供类型安全的配置访问。

工作原理

# app/conf/app_config.py
config_file = Path(__file__).parents[2] / 'conf' / 'app_config.yaml'
context = OmegaConf.load(config_file)            # 加载 YAML
schema = OmegaConf.structured(AppConfig)          # 创建 schema
app_config: AppConfig = OmegaConf.to_object(      # 合并为带类型的对象
    OmegaConf.merge(schema, context)
)

配置文件

文件 作用
conf/app_config.yaml 应用运行配置(数据库、中间件连接信息、日志配置)
conf/meta_config.yaml 元数据配置(47 张表、330 个字段、18 个指标的定义)

3.9 Loguru —— 结构化日志

作用:提供结构化日志输出,支持 request_id 链路追踪,方便在并发请求中追踪单个查询的完整执行链路。

日志格式

2026-04-09 10:30:15.123 | INFO     | request_id - abc123 | module:function:42 - 消息内容

特色机制:通过 contextvars.ContextVar 实现 request_id 注入——FastAPI 中间件在每个请求开始时生成 UUID,所有后续日志自动携带该 ID。

# app/core/context.py
request_id_ctx_var = ContextVar("request_id", default="1")

# main.py 中间件
@app.middleware("http")
async def add_process_time_header(request, call_next):
    request_id_ctx_var.set(uuid.uuid4())
    return await call_next(request)

4. 元数据知识库构建流程

在 Agent 查询之前,需要先运行 build_meta_knowledge.py 脚本构建元数据知识库。该过程将 meta_config.yaml 中定义的表/字段/指标信息同步到三个存储中间件。

4.1 构建流程图

meta_config.yaml
       │
       ▼
 ┌─────────────────┐
 │ 1. 清空旧数据     │  DELETE FROM column_metric/column_info/metric_info/table_info
 └────────┬────────┘
          ▼
 ┌─────────────────┐     ┌──────────────┐
 │ 2. 保存表/字段    │────→│  MySQL Meta  │  table_info + column_info
 │    到 Meta 库     │     └──────────────┘
 └────────┬────────┘
          ▼
 ┌─────────────────┐     ┌──────────────┐
 │ 3. 字段信息       │────→│   Qdrant     │  column collection
 │    建立向量索引   │     │  (向量检索)   │
 └────────┬────────┘     └──────────────┘
          ▼
 ┌─────────────────┐     ┌──────────────┐
 │ 4. 字段取值       │────→│Elasticsearch │  value index
 │    建立全文索引   │     │  (全文检索)   │
 └────────┬────────┘     └──────────────┘
          ▼
 ┌─────────────────┐     ┌──────────────┐
 │ 5. 保存指标信息   │────→│  MySQL Meta  │  metric_info + column_metric
 │    到 Meta 库     │     └──────────────┘
 └────────┬────────┘
          ▼
 ┌─────────────────┐     ┌──────────────┐
 │ 6. 指标信息       │────→│   Qdrant     │  metric collection
 │    建立向量索引   │     │  (向量检索)   │
 └────────┬────────┘     └──────────────┘
          ▼
       构建完成

4.2 各步骤详解

步骤 1 — 清空旧数据

按外键依赖顺序执行 DELETE:column_metriccolumn_infometric_infotable_info,确保可重复执行。

步骤 2 — 保存表和字段到 Meta 库

遍历 meta_config.yaml 中所有表定义,为每个字段从数据仓库获取真实的列类型(SHOW COLUMNS)和样本值(SELECT DISTINCT ... LIMIT 10),构造 TableInfoColumnInfo 实体后批量写入 Meta 库。

步骤 3 — 字段信息建立向量索引

对每个字段,将其名称、描述、所有别名分别向量化(调用 TEI 服务),每个向量作为一个 Point 写入 Qdrant 的 column collection。Payload 中携带完整的 ColumnInfo。这意味着一个字段会产生 2 + N 个向量点(名称 + 描述 + N 个别名)。

步骤 4 — 字段取值建立全文索引

遍历所有标记 sync: true 的字段,从数据仓库查询其全部取值(最多 100000 条),构造 ValueInfo 后批量写入 ES 索引。ES 使用 ik_max_word 中文分析器进行分词索引。

步骤 5 — 保存指标信息到 Meta 库

将 18 个业务指标定义写入 metric_info 表,同时将指标与字段的关联关系写入 column_metric 表。

步骤 6 — 指标信息建立向量索引

与字段向量化类似,将每个指标的名称、描述、所有别名分别向量化后写入 Qdrant 的 metric collection。


5. Agent 节点流转详解

5.1 完整流转图

                        START
                          │
                          ▼
                 ┌─────────────────┐
                 │ extract_keywords │  LLM 提取关键词
                 └────────┬────────┘
                          │
              ┌───────────┼───────────┐
              ▼           ▼           ▼           ← 三路并行
      ┌──────────┐ ┌──────────┐ ┌──────────┐
      │recall_col│ │recall_val│ │recall_met│
      │  (Qdrant)│ │   (ES)   │ │  (Qdrant)│
      └────┬─────┘ └────┬─────┘ └────┬─────┘
           └─────────────┼─────────────┘
                         ▼
              ┌─────────────────────┐
              │ merge_retrieved_info │  合并+补全+分组
              └──────────┬──────────┘
                         │
                ┌────────┴────────┐
                ▼                 ▼               ← 两路并行
        ┌──────────────┐  ┌──────────────┐
        │ filter_table  │  │ filter_metric│
        │   (LLM)       │  │   (LLM)      │
        └───────┬──────┘  └───────┬──────┘
                └────────┬────────┘
                         ▼
              ┌─────────────────────┐
              │  add_extra_context   │  添加日期+DB信息
              └──────────┬──────────┘
                         ▼
              ┌─────────────────────┐
              │    generate_sql      │  LLM 生成 SQL
              └──────────┬──────────┘
                         ▼
              ┌─────────────────────┐
              │    validate_sql      │  EXPLAIN 验证
              └──────────┬──────────┘
                         │
                  ┌──────┴──────┐
                  │  error?     │
              ┌───┘             └───┐
              ▼ (无错误)            ▼ (有错误)
      ┌──────────────┐    ┌──────────────┐
      │  execute_sql  │    │  correct_sql  │  LLM 纠正 SQL
      └──────────────┘    └──────┬───────┘
              ▲                   │
              └───────────────────┘
                         │
                        END

5.2 各节点详解

节点 1:extract_keywords(提取关键词)
属性 说明
输入 state.query(用户原始问题)
输出 state.keywords(关键词列表)
依赖 LLM
作用 从自然语言问题中提取核心关键词,作为后续三路召回的检索种子

工作原理:将用户问题发送给 LLM,要求其提取关键词并以 JSON 数组返回。例如,“统计各学院的学生人数” → ["学院", "学生", "人数"]


节点 2/3/4:recall_column / recall_value / recall_metric(三路并行召回)

这三个节点并行执行(LangGraph 的 fan-out 机制),从不同存储中间件中召回候选信息。

recall_column(字段召回 — Qdrant 向量检索)
属性 说明
输入 state.query, state.keywords
输出 state.retrieved_columns
依赖 LLM, Embedding, Qdrant
作用 通过语义相似度从向量数据库中召回相关字段

工作流程

  1. 调用 LLM 扩展关键词(Prompt: extend_keywords_for_column_recall.prompt
  2. 将扩展后的关键词逐一向量化
  3. 在 Qdrant column collection 中进行向量检索(cosine similarity ≥ 0.6,top 5)
  4. 对检索结果按 column_id 去重,返回 ColumnInfo 列表
recall_value(字段取值召回 — ES 全文检索)
属性 说明
输入 state.query, state.keywords
输出 state.retrieved_values
依赖 LLM, Elasticsearch
作用 通过中文分词匹配从 ES 中召回相关字段取值

工作流程

  1. 调用 LLM 扩展关键词(Prompt: extend_keywords_for_value_recall.prompt),识别实体名称如"中医学院""休学"等
  2. 将扩展后的关键词逐一在 ES 中全文检索
  3. 对检索结果按 value_id 去重,返回 ValueInfo 列表
recall_metric(指标召回 — Qdrant 向量检索)
属性 说明
输入 state.query, state.keywords
输出 state.retrieved_metrics
依赖 LLM, Embedding, Qdrant
作用 通过语义相似度从向量数据库中召回相关业务指标

工作流程

  1. 调用 LLM 扩展关键词(Prompt: extend_keywords_for_metric_recall.prompt),生成指标概念短语如"平均绩点"“GPA均值”
  2. 将扩展后的关键词逐一向量化
  3. 在 Qdrant metric collection 中进行向量检索
  4. 对检索结果按 metric_id 去重,返回 MetricInfo 列表

节点 5:merge_retrieved_info(合并召回信息)
属性 说明
输入 state.retrieved_columns, state.retrieved_values, state.retrieved_metrics
输出 state.table_infos, state.metric_infos
依赖 MySQL Meta
作用 将三路召回结果合并为统一的表结构视图,补充缺失信息

工作流程

  1. 指标关联字段补全:遍历召回的指标,将其 relevant_columns 中引用的字段补充到字段列表
  2. 字段取值合并:将 ES 召回的字段取值合并到对应字段的 examples
  3. 按表分组:将所有字段按 table_id 分组,形成 表→字段列表 的映射
  4. 主外键补全:为每个涉及的表查询其主键和外键字段,确保后续 JOIN 可用
  5. 构造输出:将表信息和指标信息封装为 TableInfoStateMetricInfoState

这一步的关键价值在于:将分散在三个中间件中的召回结果,整合为一个完整且自洽的候选 Schema 视图。


节点 6/7:filter_table / filter_metric(LLM 精确过滤,两路并行)
filter_table(过滤表和字段)
属性 说明
输入 state.query, state.table_infos
输出 state.table_infos(裁剪后)
依赖 LLM
作用 从候选表和字段中裁剪出回答该问题所必需的最小集合

Prompt 核心规则filter_table_info.prompt):

  • 只能从候选中选择,禁止新增
  • 字段以"是否在本次查询中被实际使用"为唯一标准
  • 多表关联时必须包含 JOIN 所需的主外键
  • 查询结果必须包含人类可读的名称字段(规则 6):若按实体分组,必须包含名称字段而非仅返回 ID;若名称在维度表中,必须选中维度表

LLM 输出格式{"表名1": ["字段1", "字段2"], "表名2": [...]}

filter_metric(过滤指标)
属性 说明
输入 state.query, state.metric_infos
输出 state.metric_infos(裁剪后)
依赖 LLM
作用 从候选指标中筛选出回答该问题所必需的指标

Prompt 核心规则filter_metric_info.prompt):

  • 以"是否在本次问题中被实际用于度量或统计"为唯一标准
  • 仅用于筛选/分组/限定范围的字段不视为指标
  • 不涉及任何度量或统计时返回空数组 []

节点 8:add_extra_context(添加额外上下文)
属性 说明
输入
输出 state.date_info, state.db_info
依赖 MySQL DW
作用 为 SQL 生成提供时间上下文和数据库环境信息

获取内容

字段 来源 示例值
date datetime.today() 2026-04-09
weekday strftime("%A") Wednesday
quarter 计算公式 Q2
version SELECT VERSION() 8.0.28
dialect SQLAlchemy engine mysql

这些信息帮助 LLM 在生成 SQL 时正确处理"去年""本学期"等时间表达。


节点 9:generate_sql(SQL 生成)
属性 说明
输入 state.query, state.table_infos, state.metric_infos, state.date_info, state.db_info
输出 state.sql
依赖 LLM
作用 根据完整上下文生成最终 SQL

Prompt 输入generate_sql.prompt):

  • 用户问题
  • 过滤后的表结构信息(YAML 格式)
  • 过滤后的指标信息(YAML 格式)
  • 日期上下文
  • 数据库环境信息

LLM 综合所有上下文信息,生成一条可直接执行的 SQL 语句。


节点 10:validate_sql(SQL 验证)
属性 说明
输入 state.sql
输出 state.error(None 表示通过,否则为错误信息)
依赖 MySQL DW
作用 通过 EXPLAIN 验证 SQL 语法正确性

工作原理:在数据仓库上执行 EXPLAIN {sql}。如果 SQL 语法有误(表不存在、字段拼写错误、语法错误等),EXPLAIN 会抛出异常,节点将错误信息写入 state.error


节点 11:correct_sql(SQL 纠正)— 条件触发
属性 说明
触发条件 state.error is not None(validate_sql 失败时)
输入 state.sql, state.error, 以及完整上下文
输出 state.sql(纠正后的 SQL)
依赖 LLM
作用 将失败的 SQL 和错误信息交给 LLM 修正

Prompt 输入correct_sql.prompt):包含原始 SQL、错误信息、用户问题、表/指标信息、日期/DB 信息,要求 LLM 修正后重新输出。

纠正后的 SQL 将直接进入 execute_sql(不再二次验证),这是一次重试机制


节点 12:execute_sql(执行 SQL)
属性 说明
输入 state.sql
输出 SSE 事件(结果数据流式推送到前端)
依赖 MySQL DW
作用 在数据仓库上执行最终 SQL,返回查询结果

工作原理

  1. 执行 SQL 语句
  2. 将结果序列化为字典列表(处理 Decimal→float、datetime→str 等类型转换)
  3. 通过 stream_writer 推送 {"type": "result", "data": [...]} 事件

5.3 条件分支逻辑

graph_builder.add_conditional_edges(
    "validate_sql",
    lambda state: "execute_sql" if state["error"] is None else "correct_sql",
    {"execute_sql": "execute_sql", "correct_sql": "correct_sql"}
)
  • 验证通过(error 为 None)→ 直接执行 SQL
  • 验证失败(error 非 None)→ 进入 correct_sql 节点由 LLM 纠正后再执行

6. API 层设计

6.1 应用生命周期(Lifespan)

# app/core/lifespan.py
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动:初始化所有客户端连接
    embedding_client_manager.init()    # TEI 向量服务
    qdrant_client_manager.init()       # Qdrant 连接
    es_client_manager.init()           # ES 连接
    meta_mysql_client_manager.init()   # Meta 库连接池
    dw_mysql_client_manager.init()     # DW 库连接池
    yield
    # 关闭:释放所有连接
    await qdrant_client_manager.close()
    await es_client_manager.close()
    await meta_mysql_client_manager.close()
    await dw_mysql_client_manager.close()

6.2 依赖注入链

FastAPI 的 Depends 机制实现了从连接池到 Service 的完整依赖注入链:

get_meta_session ─────────→ get_meta_mysql_repository ──┐
get_dw_session ───────────→ get_dw_mysql_repository ────┤
get_embedding_client ───────────────────────────────────┤
get_column_qdrant_repository ───────────────────────────┼→ get_query_service
get_value_es_repository ────────────────────────────────┤
get_metric_qdrant_repository ───────────────────────────┘

6.3 API 接口

方法 路径 请求体 响应
POST /api/query {"query": "统计各学院的学生人数"} SSE 流(text/event-stream)

SSE 事件格式

# 进度事件(各节点执行状态)
data: {"type": "progress", "step": "提取关键词", "status": "running"}
data: {"type": "progress", "step": "提取关键词", "status": "success"}
data: {"type": "progress", "step": "召回字段", "status": "running"}
...

# 结果事件
data: {"type": "result", "data": [{"college_name": "中医学院", "student_count": 1200}, ...]}

# 错误事件
data: {"type": "error", "message": "执行SQL失败: ..."}

6.4 请求处理流程

HTTP POST /api/query
       │
       ▼
  QueryRouter 接收请求
       │
       ▼
  FastAPI Depends 注入 QueryService
  (创建 DB Session、Repository 实例)
       │
       ▼
  QueryService.query() 启动 LangGraph Agent
       │
       ▼
  graph.astream(stream_mode="custom") 流式执行
       │
       ▼
  各节点通过 stream_writer 推送进度/结果
       │
       ▼
  StreamingResponse 以 SSE 格式返回

7. 提示词工程

系统共使用 7 份提示词文件,位于 prompts/ 目录:

文件 使用节点 作用
extend_keywords_for_column_recall.prompt recall_column 扩展字段召回关键词(仅输出字段级语义)
extend_keywords_for_metric_recall.prompt recall_metric 扩展指标召回关键词(指标概念短语)
extend_keywords_for_value_recall.prompt recall_value 扩展字段取值关键词(实体名、枚举值等)
filter_table_info.prompt filter_table 从候选表中裁剪必需表和字段
filter_metric_info.prompt filter_metric 从候选指标中筛选必需指标
generate_sql.prompt generate_sql 根据完整上下文生成 SQL
correct_sql.prompt correct_sql 根据错误信息纠正 SQL

每份提示词都经过针对学校数据中台场景的定制优化,包含明确的角色定义、任务描述、规则约束、禁止事项和输出格式要求。


8. 数据流全链路示例

以用户问题 “统计各学院的学生人数” 为例,展示完整的数据流转过程:

步骤 1:extract_keywords

输入: "统计各学院的学生人数"
LLM 输出: ["学院", "学生", "人数", "统计"]

步骤 2:三路并行召回

recall_column(Qdrant 向量检索):

LLM 扩展关键词: ["学院名称", "学院", "学生学号", "学生人数"]
向量检索结果: base_college.college_name, base_college.college_id, 
             std_student.college_id, std_student.student_id, ...

recall_value(ES 全文检索):

LLM 扩展关键词: ["学院"]
ES 检索结果: "中医学院", "针灸推拿学院", "药学院", ...

recall_metric(Qdrant 向量检索):

LLM 扩展关键词: ["学生人数", "学生总数", "在校生人数"]
向量检索结果: TOTAL_STUDENTS

步骤 3:merge_retrieved_info

合并后的表结构:
- base_college: [college_id(PK), college_name]
- std_student: [student_id(PK), college_id(FK), name, ...]
指标: [TOTAL_STUDENTS]

步骤 4:filter_table + filter_metric(并行)

filter_table LLM 输出:

{
  "base_college": ["college_id", "college_name"],
  "std_student": ["student_id", "college_id"]
}

filter_metric LLM 输出:

["TOTAL_STUDENTS"]

步骤 5:add_extra_context

date_info: {date: "2026-04-09", weekday: "Wednesday", quarter: "Q2"}
db_info: {version: "8.0.28", dialect: "mysql"}

步骤 6:generate_sql

SELECT bc.college_name AS 学院名称, COUNT(ss.student_id) AS 学生人数
FROM std_student ss
JOIN base_college bc ON ss.college_id = bc.college_id
GROUP BY bc.college_id, bc.college_name
ORDER BY 学生人数 DESC

步骤 7:validate_sql

EXPLAIN SELECT bc.college_name ...  → 通过 ✓
error = None

步骤 8:execute_sql

[
  {"学院名称": "中医学院", "学生人数": 1200},
  {"学院名称": "针灸推拿学院", "学生人数": 980},
  {"学院名称": "药学院", "学生人数": 860},
  ...
]

前端实时展示为表格。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐