使用bedrock查询rds2
Note right of Lambda Function: 函数内部完成所有工作:<br>1. 理解自然语言<br>2. 生成SQL<br>3. 安全校验<br>4. 查询RDS。Bedrock Agent ->> Lambda Function: 直接调用Action Group<br>传递完整用户问题文本。User ->> Bedrock Agent: 提出自然语言问题<br>“销售部工资最
示例流程:
sequenceDiagram
participant User
participant Bedrock Agent
participant Lambda Function
participant RDS
User ->> Bedrock Agent: 提出自然语言问题<br>“销售部工资最高的人是谁?”
Bedrock Agent ->> Lambda Function: 直接调用Action Group<br>传递完整用户问题文本
Note right of Lambda Function: 函数内部完成所有工作:<br>1. 理解自然语言<br>2. 生成SQL<br>3. 安全校验<br>4. 查询RDS
Lambda Function ->> RDS: 执行生成的SQL查询
RDS -->> Lambda Function: 返回查询结果
Lambda Function -->> Bedrock Agent: 返回最终回答文本
Bedrock Agent -->> User: 展示最终回答
示例代码
import json
import boto3
import pymysql
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 初始化客户端
bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1')
def lambda_handler(event, context):
logger.info("Received event: " + json.dumps(event))
# 从event中提取用户的问题文本
# Bedrock代理调用时,整个用户问题会在inputText中
input_text = event.get('inputText', '')
if not input_text:
return {
'messageVersion': '1.0',
'response': {
'actionGroup': event['actionGroup'],
'apiPath': event['apiPath'],
'httpMethod': event['httpMethod'],
'httpStatusCode': 400,
'response': 'Missing input text'
}
}
try:
# 1. 生成SQL(在Lambda内部完成)
prompt = f"""Human: 你是一名SQL专家。请根据以下数据库表结构,将用户的自然语言问题转换为一条安全、准确的MySQL SELECT查询语句。
数据库表结构:
- employees (id, name, department, salary, hire_date)
用户问题: {input_text}
要求:
1. 只输出SQL语句,不要输出任何其他解释或文本。
2. 必须使用合法的MySQL语法。
3. 只能查询(SELECT)数据。
Assistant: 以下是生成的SQL查询语句:\n"""
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}]
})
)
response_body = json.loads(response['body'].read().decode('utf-8'))
generated_sql = response_body['content'][0]['text'].strip()
# 清理SQL输出
if '```sql' in generated_sql:
generated_sql = generated_sql.split('```sql')[1].split('```')[0].strip()
elif '```' in generated_sql:
generated_sql = generated_sql.split('```')[1].split('```')[0].strip()
logger.info(f"Generated SQL: {generated_sql}")
# 2. 安全校验
if not generated_sql.upper().startswith('SELECT'):
raise Exception("Generated SQL is not a SELECT statement.")
# 3. 执行SQL查询
connection = pymysql.connect(
host='your-rds-endpoint.rds.amazonaws.com',
user='your_username',
password='your_password', # 请使用Secrets Manager
database='your_database',
connect_timeout=5
)
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(generated_sql)
results = cursor.fetchall()
connection.close()
# 4. 将结果格式化为自然语言回复
result_text = f"根据您的查询\"{input_text}\",找到以下结果:\n\n"
for row in results:
result_text += f"{row}\n"
# 返回给Bedrock代理的标准格式
return {
'messageVersion': '1.0',
'response': {
'actionGroup': event['actionGroup'],
'apiPath': event['apiPath'],
'httpMethod': event['httpMethod'],
'httpStatusCode': 200,
'response': result_text
}
}
except Exception as e:
logger.error(f"Error: {e}")
return {
'messageVersion': '1.0',
'response': {
'actionGroup': event['actionGroup'],
'apiPath': event['apiPath'],
'httpMethod': event['httpMethod'],
'httpStatusCode': 500,
'response': f'抱歉,处理您的请求时出现错误: {str(e)}'
}
}
更多推荐
所有评论(0)