你有没有遇到过这样的困境:用户问你一个问题,你得先判断是该去文档库里翻翻,还是该查查数据库?更头疼的是,如果判断错了,给出的答案要么不准确,要么干脆答非所问。

今天我想和你聊聊一个实战项目——RAG-SQL Router。这不是什么高深莫测的理论,而是一个能真正解决问题的智能系统。更重要的是,我会把完整的实现思路和代码都分享给你,让你也能动手搭建一个。

一、为什么需要这样一个系统?

先说说实际场景。假设你在做一个企业内部的智能问答助手:

场景1:非结构化数据查询

  • “公司最新的休假政策是什么?”
  • “产品功能文档里关于API鉴权的部分怎么说的?”

这类问题的答案藏在PDF、Word文档、Wiki页面里——典型的非结构化数据,最适合用RAG(检索增强生成)来处理。

场景2:结构化数据查询

  • “去年第四季度销售额最高的三个地区是哪些?”
  • “目前有多少活跃用户?”

这些问题需要的是精确的数字,答案在数据库里——需要Text-to-SQL来解决。

问题来了:当用户随便问一个问题,系统怎么知道该用哪种方式回答?

这就是RAG-SQL Router要解决的核心问题:让AI自己判断该走哪条路。

二、系统架构:Agent如何做决策?

整个系统的核心是一个路由Agent,它的工作流程是这样的:

智能体根据用户问题分析意图后,自主选择文档检索或数据库查询工具来获取信息并返回结果。

关键组件解析

  1. Router Agent(路由智能体)

这是整个系统的大脑。它基于LlamaIndex的Workflow框架构建,能够:

  • 理解用户问题的语义
  • 判断问题类型(文档检索还是数据查询)
  • 选择合适的工具
  • 甚至可以同时调用多个工具
  1. RAG工具(向量检索引擎)

负责处理非结构化数据:

  • 使用LlamaCloud作为向量数据库
  • 支持PDF、Word等文档格式
  • 通过语义相似度检索相关内容
  1. SQL工具(自然语言转SQL引擎)

负责处理结构化数据:

  • 将自然语言转换为SQL查询
  • 执行数据库查询
  • 返回结构化结果
  1. Cleanlab Codex(质量保障层)

这是个亮点!很多人做Agent,但没人关心输出是否靠谱。Cleanlab Codex提供:

  • 自动检测不准确或无用的回答
  • 为每个回答提供可信度评分
  • 实时验证查询和响应
  • 允许专家直接改进回答,无需改代码

三、动手实现:完整代码解析

让我带你一步步搭建这个系统。我会用最实际的代码,而不是空谈理论。

第一步:环境准备

# 创建项目目录
mkdir rag-sql-router
cd rag-sql-router
# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows用: venv\Scripts\activate
# 安装依赖
pip install llama-index llama-index-llms-openai \
llama-index-embeddings-openai \
llama-index-indices-managed-llama-cloud \
sqlalchemy streamlit nest-asyncio

第二步:配置API密钥

创建 .env 文件:

OPENAI_API_KEY=your_openai_key
LLAMA_CLOUD_API_KEY=your_llamacloud_key
LLAMA_CLOUD_ORG_ID=your_org_id
LLAMA_CLOUD_PROJECT_NAME=your_project
LLAMA_CLOUD_INDEX_NAME=your_index

第三步:搭建SQL查询引擎

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, insert
from llama_index.core.query_engine import NLSQLTableQueryEngine
from llama_index.core import SQLDatabase
# 创建示例数据库
engine = create_engine("sqlite:///:memory:")
metadata = MetaData()
# 定义城市统计表
city_stats = Table(
'city_stats', metadata,
Column('city', String, primary_key=True),
Column('population', Integer),
Column('country', String),
)
metadata.create_all(engine)
# 插入示例数据
rows = [
{"city": "Toronto", "population": 2930000, "country": "Canada"},
{"city": "Tokyo", "population": 13960000, "country": "Japan"},
{"city": "Berlin", "population": 3645000, "country": "Germany"},
]
with engine.connect() as conn:
for row in rows:
conn.execute(insert(city_stats).values(**row))
conn.commit()
# 创建SQL数据库对象
sql_database = SQLDatabase(engine, include_tables=["city_stats"])
# 创建自然语言SQL查询引擎
sql_query_engine = NLSQLTableQueryEngine(
sql_database=sql_database,
tables=["city_stats"],
)

这段代码做了什么?

  1. 创建了一个内存SQLite数据库(适合演示,生产环境换成MySQL/PostgreSQL)
  2. 定义了城市统计表,包含城市名、人口、国家字段
  3. 插入了一些示例数据
  4. NLSQLTableQueryEngine包装数据库,它能把"人口最多的城市"这样的问题转成SQL

第四步:搭建RAG检索引擎

from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
import os
# 连接到LlamaCloud索引
rag_index = LlamaCloudIndex(
name=os.getenv("LLAMA_CLOUD_INDEX_NAME"),
project_name=os.getenv("LLAMA_CLOUD_PROJECT_NAME"),
organization_id=os.getenv("LLAMA_CLOUD_ORG_ID"),
api_key=os.getenv("LLAMA_CLOUD_API_KEY"),
)
# 创建查询引擎
rag_query_engine = rag_index.as_query_engine()

LlamaCloud是什么?

简单说,它是一个托管的向量数据库服务:

  • 你上传文档(PDF、DOCX等)
  • 它自动切分、向量化、建索引
  • 你只需要调用API查询

当然,你也可以用Qdrant、Pinecone、Weaviate等替代。

第五步:将查询引擎包装成工具

from llama_index.core.tools import QueryEngineTool
# SQL工具
sql_tool = QueryEngineTool.from_defaults(
query_engine=sql_query_engine,
name="sql_query_engine",
description=(
"用于查询城市统计数据,包括人口、国家等信息。"
"适合回答关于数字、排名、统计类的问题。"
),
)
# RAG工具
rag_tool = QueryEngineTool.from_defaults(
query_engine=rag_query_engine,
name="document_search_engine",
description=(
"用于搜索文档内容,适合回答关于政策、流程、"
"说明等需要从文档中查找信息的问题。"
),
)

为什么要包装成工具?

这里的description非常关键!Agent会读这些描述来判断该用哪个工具。所以描述要:

  • 清晰明确:说清楚工具能做什么
  • 区分明显:让Agent能轻松分辨使用场景
  • 举例说明:提示适用的问题类型

第六步:构建Router Workflow

这是整个系统最核心的部分:

from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai import OpenAI
from typing import List, Any
# 定义事件类型
class PrepEvent(Event):
"""准备阶段完成事件"""
pass
class ToolCallEvent(Event):
"""工具调用事件"""
tool_calls: List[Any]
# 定义Router Workflow
class RouterWorkflow(Workflow):
def __init__(self, tools: List[QueryEngineTool], **kwargs):
super().__init__(**kwargs)
self.tools = {tool.metadata.name: tool for tool in tools}
self.llm = OpenAI(model="gpt-4")
@step
async def prepare_chat(self, ev: StartEvent) -> PrepEvent:
"""准备对话消息"""
# 获取用户查询
user_msg = ev.query
# 构建系统提示
system_prompt = self._build_system_prompt()
# 存储到上下文
self.query = user_msg
self.messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_msg}
]
return PrepEvent()
@step
async def handle_llm_call(self, ev: PrepEvent) -> ToolCallEvent | StopEvent:
"""调用LLM决策"""
# 准备工具定义给LLM
tools_def = [
{
"type": "function",
"function": {
"name": name,
"description": tool.metadata.description,
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "用户查询"
}
},
"required": ["query"]
}
}
}
for name, tool in self.tools.items()
]
# 调用LLM
response = await self.llm.achat(
messages=self.messages,
tools=tools_def
)
# 检查是否有工具调用
if response.message.tool_calls:
return ToolCallEvent(tool_calls=response.message.tool_calls)
else:
# 没有工具调用,直接返回答案
return StopEvent(result=response.message.content)
@step
async def handle_tool_calls(self, ev: ToolCallEvent) -> StopEvent:
"""执行工具调用"""
results = []
for tool_call in ev.tool_calls:
tool_name = tool_call.function.name
tool_args = eval(tool_call.function.arguments)
# 执行工具
tool = self.tools[tool_name]
result = await tool.aquery(tool_args["query"])
results.append({
"tool": tool_name,
"result": str(result)
})
# 组合结果
final_answer = self._combine_results(results)
return StopEvent(result=final_answer)
def _build_system_prompt(self) -> str:
"""构建系统提示"""
return """你是一个智能助手,能够访问以下工具:
1. SQL查询引擎:用于查询结构化数据
2. 文档搜索引擎:用于搜索文档内容
根据用户问题,选择合适的工具。如果需要,可以同时使用多个工具。
"""
def _combine_results(self, results: List[dict]) -> str:
"""组合多个工具的结果"""
if len(results) == 1:
return results[0]["result"]
combined = "根据查询结果:\n\n"
for i, res in enumerate(results, 1):
combined += f"{i}. 从{res['tool']}得到: {res['result']}\n"
return combined

这个Workflow是怎么工作的?

  1. prepare_chat: 接收用户问题,准备系统提示和对话消息
  2. handle_llm_call: 调用LLM,让它决定用哪个工具(或多个工具)
  3. handle_tool_calls: 实际执行工具调用,获取结果
  4. 如果有多个结果,组合起来返回

关键点在于:

  • Event驱动:每个step返回一个Event,触发下一个step
  • 异步执行:所有step都是async,支持并发
  • 灵活路由:LLM可以选择0个、1个或多个工具

第七步:创建Streamlit界面

import streamlit as st
import asyncio
# 页面配置
st.set_page_config(page_title="RAG-SQL Router", page_icon="🤖")
st.title(" 智能路由问答系统")
st.markdown("**问我任何问题,我会自动选择最佳方式回答!**")
# 初始化工具和workflow
if "workflow" not in st.session_state:
# 创建工具
tools = [sql_tool, rag_tool]
# 初始化workflow
st.session_state.workflow = RouterWorkflow(
tools=tools,
timeout=60.0
)
# 聊天历史
if "messages" not in st.session_state:
st.session_state.messages = []
# 显示历史消息
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# 用户输入
if prompt := st.chat_input("在这里输入你的问题..."):
# 显示用户消息
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# 调用workflow获取答案
with st.chat_message("assistant"):
with st.spinner("思考中..."):
# 运行workflow
result = asyncio.run(
st.session_state.workflow.run(query=prompt)
)
# 显示答案
st.markdown(result)
# 保存助手回答
st.session_state.messages.append({"role": "assistant", "content": result})
# 侧边栏:显示可用工具
with st.sidebar:
st.subheader("🔧 可用工具")
st.write("1. SQL查询引擎")
st.caption("查询城市统计数据")
st.write("2. 文档搜索引擎")
st.caption("搜索文档内容")
if st.button("清空对话"):
st.session_state.messages = []
st.rerun()

第八步:运行系统

streamlit run app.py

打开浏览器,访问 http://localhost:8501,试试这些问题:

SQL类问题:

  • “哪个城市人口最多?”
  • “有多少个欧洲城市?”

RAG类问题:

  • “文档里提到的政策要点是什么?”
  • “关于API使用的说明在哪里?”

混合问题:

  • “东京的人口是多少?同时告诉我文档里关于东京的介绍。”

四、加入Cleanlab Codex:让输出更可靠

前面的系统已经能工作了,但有个问题:怎么知道AI的回答靠不靠谱?

这就是Cleanlab Codex的价值。它能:

  1. 自动检测问题回答
  • 答非所问
  • 信息不完整
  • 事实错误
  1. 提供可信度评分
  • 每个回答都有0-1的分数
  • 分数低于阈值触发告警
  1. 支持专家反馈
  • SME(领域专家)可以直接标注
  • 系统自动学习改进

集成代码示例:

from cleanlab_codex import CleanlabCodex
# 初始化Codex
codex = CleanlabCodex(api_key=os.getenv("CLEANLAB_API_KEY"))
# 在workflow中使用
@step
async def validate_response(self, ev: StopEvent) -> StopEvent:
"""验证响应质量"""
# 获取原始回答
response = ev.result
# 验证质量
validation = codex.validate(
query=self.query,
response=response,
context=self.retrieved_context
)
# 添加可信度评分
confidence = validation.trustworthiness_score
if confidence < 0.7:
response += f"\n\n 可信度: {confidence:.2%} (建议人工确认)"
else:
response += f"\n\n 可信度: {confidence:.2%}"
return StopEvent(result=response)

五、实际应用场景

这套系统不是玩具,我们来看看能解决什么实际问题:

场景1:企业知识库

问题: 公司有大量文档(员工手册、产品文档、流程规范)和业务数据(销售、用户、财务)。员工经常不知道去哪找信息。

解决方案:

  • RAG工具索引所有文档
  • SQL工具连接业务数据库
  • 员工直接问问题,系统自动路由

效果:

  • 减少80%的重复咨询
  • 提升员工自助查询效率
  • 数据和文档统一入口

场景2:客户服务系统

问题: 客服需要回答产品使用问题(文档)和订单状态(数据库)。

解决方案:

  • RAG工具索引产品文档、FAQ
  • SQL工具连接订单系统
  • 客服输入问题,系统提供参考答案

效果:

  • 新客服上手快
  • 答案标准化
  • 响应时间缩短50%

场景3:数据分析助手

问题: 业务人员不会写SQL,但需要经常查数据。

解决方案:

  • SQL工具连接数据仓库
  • RAG工具提供分析方法论文档
  • 自然语言查询,自动生成SQL

效果:

  • 降低对数据团队的依赖
  • 提升数据驱动决策效率
  • 减少重复分析工作

六、关键经验和坑

搭建这个系统时,我踩过不少坑,分享几个关键经验:

1. 工具描述要精准

坑: 最开始我的SQL工具描述是"用于查询数据"——太模糊了!结果Agent经常选错。

解决: 改成"用于查询城市统计数据,包括人口、国家等信息。适合回答关于数字、排名、统计类的问题。"——具体、清晰、有例子。

2. 处理好并发调用

坑: 用户问"东京人口多少?文档里怎么说的?"——需要同时调用两个工具,但结果怎么组合?

解决:

  • Workflow支持并发step
  • _combine_results里做好结果聚合
  • 让LLM再做一次总结

3. 数据库连接要健壮

坑: SQLite内存数据库重启就没了,生产环境不能用。

解决:

# 生产环境用持久化数据库
engine = create_engine(
"postgresql://user:pass@host:5432/db",
pool_pre_ping=True,  # 检查连接有效性
pool_size=10,  # 连接池
)

4. Token消耗要控制

坑: 每次查询都把全部上下文传给LLM,token消耗巨大。

解决:

  • 只传必要的上下文
  • 使用更便宜的模型做路由(gpt-3.5-turbo)
  • 缓存常见问题的答案

5. 错误处理要完善

坑: SQL语法错误、网络超时、API限流都会导致系统崩溃。

解决:

try:
result = await tool.aquery(query)
except SQLAlchemyError as e:
result = f"数据库查询失败: {str(e)}"
except TimeoutError:
result = "查询超时,请稍后重试"
except Exception as e:
result = f"系统错误: {str(e)}"
# 记录日志
logger.error(f"Tool error: {e}", exc_info=True)

七、性能优化建议

如果你要把这套系统用到生产环境,这些优化必不可少:

1. 缓存机制

from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_query(query_hash: str):
"""缓存查询结果"""
# 实际查询逻辑
pass
# 使用时
query_hash = hashlib.md5(query.encode()).hexdigest()
result = cached_query(query_hash)

2. 批量查询

对于相似查询,批量处理:

async def batch_query(queries: List[str]):
"""批量执行查询"""
tasks = [workflow.run(query=q) for q in queries]
return await asyncio.gather(*tasks)

3. 监控和日志

import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rag_sql_router.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 在关键位置记录
logger.info(f"Query received: {query}")
logger.info(f"Tool selected: {tool_name}")
logger.info(f"Response time: {elapsed:.2f}s")

4. 限流保护

from functools import wraps
import time
def rate_limit(max_calls: int, time_window: int):
"""限流装饰器"""
calls = []
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
now = time.time()
# 清理过期记录
calls[:] = [c for c in calls if c > now - time_window]
if len(calls) >= max_calls:
raise Exception("请求过于频繁,请稍后再试")
calls.append(now)
return await func(*args, **kwargs)
return wrapper
return decorator
@rate_limit(max_calls=10, time_window=60)
async def handle_query(query: str):
"""处理查询,每分钟最多10次"""
pass

八、总结

看到这里,你大概已经能感受到:RAG-SQL Router 这东西看起来像“多加了一个路由层”,但真正解决的是落地时最容易被忽视的一件事——把“该去哪儿找答案”这一步交给系统,而不是交给用户。

很多企业内部问答做不起来,并不是因为模型不够强,而是因为入口不够统一:同一句话问出来,有时候是制度条款,有时候是数据口径,有时候还夹杂着流程步骤。人能凭经验判断“先查文档还是先查库”,但系统如果没有这个判断能力,就会变成两种尴尬:

  • 只做RAG:回答得像“引用资料”,听起来挺像那么回事,但数字经不起核对;
  • 只做Text-to-SQL:能查到数,但对“为什么这么算”“规则写在哪”完全没有解释能力。

Router 的价值,就是让这两套能力不再互相拖后腿,而是互相补位:需要确定性的时候走SQL,需要背景解释的时候走RAG,复杂一点的问题就两条路一起跑,最后再把结果合到一张桌子上给你。

学AI大模型的正确顺序,千万不要搞错了

🤔2026年AI风口已来!各行各业的AI渗透肉眼可见,超多公司要么转型做AI相关产品,要么高薪挖AI技术人才,机遇直接摆在眼前!

有往AI方向发展,或者本身有后端编程基础的朋友,直接冲AI大模型应用开发转岗超合适!

就算暂时不打算转岗,了解大模型、RAG、Prompt、Agent这些热门概念,能上手做简单项目,也绝对是求职加分王🔋

在这里插入图片描述

📝给大家整理了超全最新的AI大模型应用开发学习清单和资料,手把手帮你快速入门!👇👇

学习路线:

✅大模型基础认知—大模型核心原理、发展历程、主流模型(GPT、文心一言等)特点解析
✅核心技术模块—RAG检索增强生成、Prompt工程实战、Agent智能体开发逻辑
✅开发基础能力—Python进阶、API接口调用、大模型开发框架(LangChain等)实操
✅应用场景开发—智能问答系统、企业知识库、AIGC内容生成工具、行业定制化大模型应用
✅项目落地流程—需求拆解、技术选型、模型调优、测试上线、运维迭代
✅面试求职冲刺—岗位JD解析、简历AI项目包装、高频面试题汇总、模拟面经

以上6大模块,看似清晰好上手,实则每个部分都有扎实的核心内容需要吃透!

我把大模型的学习全流程已经整理📚好了!抓住AI时代风口,轻松解锁职业新可能,希望大家都能把握机遇,实现薪资/职业跃迁~

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐