【AI Agent Skill Day 8】SQL Executor技能:自然语言转SQL的智能查询

在“AI Agent Skill技能开发实战”系列的第8天,我们聚焦于SQL Executor技能——一种将自然语言指令自动转化为结构化SQL查询并安全执行的能力。该技能是构建数据驱动型智能体(Data-Driven Agent)的核心模块,广泛应用于BI分析、数据库运维、企业报表生成、客户自助查询等场景。相比传统固定API或预设查询模板,SQL Executor通过大模型理解用户意图,动态生成精准SQL,显著提升交互灵活性与系统智能化水平。然而,其开发也面临语义歧义、SQL注入、权限越界、性能瓶颈等挑战。本文将深入剖析SQL Executor技能的架构设计、接口规范、安全机制与工程实现,并提供基于LangChain和Spring AI的完整可运行代码,助你构建生产级自然语言到SQL的智能查询系统。


技能概述

SQL Executor技能是一种将用户输入的自然语言(如“上个月销售额最高的产品是什么?”)转换为合法、高效、安全的SQL查询语句,并在目标数据库中执行后返回结构化结果的智能体能力。其核心功能边界包括:

  • 语义理解:准确解析用户查询意图,识别实体(如时间、产品名)、指标(如销售额、数量)、聚合方式(如SUM、MAX)和过滤条件。
  • Schema感知:基于数据库元数据(表结构、字段类型、主外键关系)生成语法正确的SQL。
  • 安全执行:防止SQL注入、越权访问、危险操作(如DROP、DELETE)。
  • 结果格式化:将原始查询结果转化为自然语言友好的响应(如表格、摘要、图表描述)。

该技能不负责数据可视化、业务逻辑编排或多轮对话管理,而是作为底层数据查询原子能力,供上层Agent调用。


架构设计

SQL Executor技能采用分层模块化架构,主要包含以下组件:

  1. 自然语言解析器(NL Parser):调用大语言模型(LLM),结合数据库Schema上下文,将用户输入转化为SQL草案。
  2. Schema Registry:缓存并提供目标数据库的元数据(表名、列名、数据类型、示例值、约束),用于引导LLM生成合规SQL。
  3. SQL Validator & Sanitizer:对生成的SQL进行语法校验、危险操作拦截、字段合法性检查。
  4. Query Executor:在安全沙箱中执行SQL,支持连接池、超时控制、只读模式。
  5. Result Formatter:将查询结果(DataFrame/ResultSet)转换为JSON或自然语言摘要。
  6. Audit & Logging:记录所有查询日志,用于审计、监控与反馈学习。

数据流如下:

用户输入 → NL Parser (LLM + Schema) → SQL草案 → Validator → Executor → Result → Formatter → 返回

接口设计

输入规范

{
  "query": "自然语言查询语句",
  "database": "目标数据库标识(如 sales_db)",
  "user_id": "执行用户ID(用于权限校验)",
  "timeout_seconds": 10,
  "max_rows": 100
}

输出规范

{
  "success": true,
  "sql": "生成的SQL语句",
  "result": [
    {"product_name": "iPhone 15", "revenue": 1200000},
    ...
  ],
  "execution_time_ms": 45,
  "error": null
}

异常输出

{
  "success": false,
  "error": {
    "code": "INVALID_SCHEMA",
    "message": "表 'sales' 不存在"
  }
}

代码实现(Python + LangChain)

以下为基于LangChain的完整实现,支持OpenAI、Claude、通义千问等模型:

import os
from typing import List, Dict, Any, Optional
from langchain_community.utilities import SQLDatabase
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SQLEXecutorSkill:
    def __init__(self, db_url: str, llm_model: str = "gpt-4o", max_rows: int = 100):
        self.db_url = db_url
        self.max_rows = max_rows
        # 初始化数据库连接(只读)
        self.engine = create_engine(db_url, connect_args={"options": "-c default_transaction_read_only=on"})
        self.db = SQLDatabase(self.engine, sample_rows_in_table_info=3)
        
        # 初始化LLM
        if "gpt" in llm_model:
            self.llm = ChatOpenAI(model=llm_model, temperature=0)
        elif "claude" in llm_model:
            from langchain_anthropic import ChatAnthropic
            self.llm = ChatAnthropic(model=llm_model, temperature=0)
        elif "qwen" in llm_model:
            from langchain_community.chat_models import ChatTongyi
            self.llm = ChatTongyi(model_name=llm_model, temperature=0)
        else:
            raise ValueError("Unsupported LLM model")
        
        # 创建SQL Agent Toolkit
        self.toolkit = SQLDatabaseToolkit(db=self.db, llm=self.llm)
        
        # 自定义提示词(增强安全性)
        custom_prompt = """
        你是一个专业的SQL查询生成器。请根据以下数据库Schema和用户问题,生成**仅包含SELECT语句**的SQL。
        规则:
        1. 禁止使用 DROP, DELETE, UPDATE, INSERT, ALTER, CREATE 等写操作。
        2. 所有字段必须来自提供的Schema。
        3. 如果不确定表结构,请返回错误。
        4. 使用 LIMIT {max_rows} 限制结果行数。
        
        Schema:
        {schema}
        
        问题: {input}
        SQL:
        """
        self.prompt = PromptTemplate.from_template(custom_prompt)

    def _sanitize_sql(self, sql: str) -> str:
        """基础SQL净化:移除危险关键字"""
        dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE', 'EXEC', 'TRUNCATE']
        sql_upper = sql.upper().strip()
        for kw in dangerous_keywords:
            if kw in sql_upper:
                raise ValueError(f"Detected dangerous SQL keyword: {kw}")
        return sql.strip().rstrip(';')

    def execute(self, query: str, timeout_seconds: int = 10) -> Dict[str, Any]:
        try:
            # Step 1: 获取Schema上下文
            schema_context = self.db.get_table_info()
            
            # Step 2: 调用LLM生成SQL
            chain = self.prompt | self.llm | StrOutputParser()
            raw_sql = chain.invoke({
                "input": query,
                "schema": schema_context,
                "max_rows": self.max_rows
            })
            
            # Step 3: 净化与校验
            clean_sql = self._sanitize_sql(raw_sql)
            logger.info(f"Generated SQL: {clean_sql}")
            
            # Step 4: 安全执行(只读连接 + 超时)
            with self.engine.connect() as conn:
                result = conn.execute(text(clean_sql))
                rows = [dict(row) for row in result.fetchmany(self.max_rows)]
            
            return {
                "success": True,
                "sql": clean_sql,
                "result": rows,
                "execution_time_ms": 0  # 实际可加计时
            }
            
        except Exception as e:
            logger.error(f"SQL Execution Error: {str(e)}")
            return {
                "success": False,
                "error": {
                    "code": "EXECUTION_ERROR",
                    "message": str(e)
                }
            }

# 使用示例
if __name__ == "__main__":
    # 假设已设置环境变量
    DB_URL = os.getenv("DATABASE_URL", "sqlite:///example.db")
    executor = SQLEXecutorSkill(DB_URL, llm_model="gpt-4o")
    
    response = executor.execute("上个月销售额最高的产品是什么?")
    print(response)

依赖安装

pip install langchain langchain-openai langchain-anthropic langchain-community sqlalchemy

实战案例

案例1:电商销售分析助手

业务背景:某电商平台需为运营人员提供自然语言查询能力,快速获取销售洞察。

技术选型

  • 数据库:PostgreSQL(sales表含product_id, product_name, revenue, sale_date)
  • LLM:GPT-4o(高精度SQL生成)
  • 安全策略:只读账号 + 行级权限(通过WHERE user_tenant_id = ?)

完整实现(关键部分):

# 在SQLEXecutorSkill基础上扩展行级安全
class SecureSQLEXecutor(SQLEXecutorSkill):
    def __init__(self, db_url: str, tenant_id: str, ...):
        super().__init__(db_url, ...)
        self.tenant_id = tenant_id

    def _inject_tenant_filter(self, sql: str) -> str:
        # 简单注入租户过滤(实际应使用参数化查询)
        if "WHERE" in sql.upper():
            return sql.replace("WHERE", f"WHERE tenant_id = '{self.tenant_id}' AND ")
        else:
            return f"{sql} WHERE tenant_id = '{self.tenant_id}'"

    def execute(self, query: str, ...):
        # ... 生成SQL后
        secure_sql = self._inject_tenant_filter(clean_sql)
        # 执行secure_sql

运行结果

{
  "success": true,
  "sql": "SELECT product_name, SUM(revenue) FROM sales WHERE tenant_id = 't123' AND sale_date >= '2024-05-01' GROUP BY product_name ORDER BY SUM(revenue) DESC LIMIT 1",
  "result": [{"product_name": "无线耳机", "sum": 85000}]
}

问题与解决

  • 问题:LLM忽略tenant_id字段。
  • 方案:在Schema描述中显式标注“所有查询必须包含tenant_id过滤”。

案例2:企业HR自助查询

业务背景:HR部门希望员工通过自然语言查询自己的考勤、薪资信息。

安全要求

  • 用户只能查询自己的数据
  • 敏感字段(如薪资)需脱敏

实现要点

  • _sanitize_sql中校验WHERE employee_id = current_user
  • 结果格式化时对salary字段做掩码处理

性能数据

查询复杂度 平均响应时间 准确率
简单(单表) 1.2s 98%
复杂(多表JOIN) 3.5s 89%

错误处理

常见异常及处理策略:

错误类型 原因 处理方式
Schema Mismatch LLM引用不存在的表/字段 返回明确错误,建议可用字段
Ambiguous Query “销量”指代不清(订单量 vs 商品数) 触发澄清追问(需上层Agent支持)
SQL Injection Attempt 用户输入含恶意代码 直接拒绝,记录安全日志
Timeout 查询超时 中断执行,返回超时提示
Permission Denied 无表访问权限 返回“无权访问该数据”

性能优化

  1. Schema缓存:避免每次请求都拉取元数据

    from functools import lru_cache
    @lru_cache(maxsize=1)
    def get_schema(self):
        return self.db.get_table_info()
    
  2. 查询结果缓存:对相同自然语言查询缓存SQL和结果(需考虑数据时效性)

  3. 连接池:使用SQLAlchemy的连接池管理

  4. 异步执行:对长查询采用异步任务队列


安全考量

  • 只读连接:数据库账号仅授予SELECT权限
  • 输入净化:双重校验(LLM提示词 + 后处理规则)
  • 字段白名单:限制可查询字段集合
  • 审计日志:记录用户、查询、SQL、执行时间
  • 沙箱隔离:在Docker容器中运行查询服务

测试方案

单元测试(pytest)

def test_dangerous_sql_blocked():
    executor = SQLEXecutorSkill("sqlite:///:memory:")
    with pytest.raises(ValueError):
        executor._sanitize_sql("DROP TABLE users;")

def test_valid_select_allowed():
    sql = "SELECT name FROM users LIMIT 5"
    assert executor._sanitize_sql(sql) == sql

集成测试

  • Mock数据库,验证LLM生成SQL的准确性
  • 注入边界case(空表、NULL值、特殊字符)

端到端测试

  • 模拟真实用户查询,比对预期结果

最佳实践

  1. Schema描述要具体:提供字段注释、示例值,提升LLM准确率
  2. 限制查询复杂度:禁止子查询嵌套过深、多表JOIN超过3张
  3. 结果分页:默认LIMIT 100,支持分页参数
  4. 模型微调:针对垂直领域微调SQL生成模型
  5. 人工审核机制:对高风险查询(如全表扫描)触发人工确认

扩展方向

  • 多数据库支持:自动适配MySQL、PostgreSQL、Snowflake方言
  • SQL解释:返回“该查询统计了…”的自然语言解释
  • 可视化建议:根据结果结构推荐图表类型
  • MCP协议集成:标准化为Model Context Protocol技能
  • 反馈学习:收集用户修正,持续优化SQL生成

总结

SQL Executor技能是连接自然语言与结构化数据的关键桥梁。通过本文的架构设计、安全机制与完整代码实现,开发者可快速构建可靠、高效的智能查询系统。核心在于平衡灵活性(自然语言理解)与安全性(SQL执行控制)。下一期(Day 9)将深入Shell Command技能,探讨系统命令的安全执行与权限管控。


技能开发实践要点

  1. 始终使用只读数据库连接
  2. 双重校验生成的SQL(提示词约束 + 后处理规则)
  3. 缓存Schema元数据以提升性能
  4. 对结果集大小和查询时间严格限制
  5. 记录完整审计日志用于安全追溯
  6. 针对业务领域优化Schema描述
  7. 提供清晰的错误反馈引导用户修正
  8. 避免在生产环境直接暴露原始SQL给用户

进阶学习资源

  1. LangChain SQL Agent官方文档:https://python.langchain.com/docs/use_cases/sql
  2. Vanna.ai:开源NL2SQL框架(GitHub)
  3. SQLCoder:专用于SQL生成的微调模型(Hugging Face)
  4. Apache Calcite:SQL解析与优化引擎
  5. Google’s SQLNet论文:早期NL2SQL研究
  6. Spring AI Database Integration Guide
  7. MCP Protocol Specification v0.3
  8. OWASP SQL Injection Prevention Cheat Sheet

文章标签:AI Agent, SQL Executor, 自然语言转SQL, LangChain, 智能查询, 数据库安全, 大模型应用, 技能开发

文章简述:本文深入解析AI Agent技能开发中的SQL Executor模块,详细阐述如何将自然语言安全、准确地转换为SQL查询并在数据库中执行。内容涵盖技能架构设计、接口规范、LangChain完整实现、两大实战案例(电商分析与HR自助查询)、安全防护机制(只读连接、SQL净化、权限控制)、性能优化策略及全面测试方案。通过提供可直接运行的Python代码和最佳实践指南,帮助开发者构建生产级NL2SQL智能查询系统,有效平衡交互灵活性与数据安全性。适用于AI工程师、全栈开发者及数据平台架构师,是构建数据驱动型智能体的核心技能之一。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐