Day 5: 大模型应用开发模式

学习目标

  • 理解大模型应用的主要开发模式和架构
  • 掌握Agent架构和工具使用的实现方法
  • 学习多模态应用开发的基本原理
  • 了解大模型应用的安全性考虑
  • 掌握大模型应用的评估和监控方法

1. 大模型应用开发模式概述

1.1 大模型应用的主要模式

大模型应用开发有多种模式,每种模式适用于不同的场景和需求:

1. 直接调用模式

  • 直接向模型发送提示并获取回复
  • 最简单的交互方式
  • 适用于简单的文本生成、对话等场景

2. RAG模式

  • 检索增强生成,结合外部知识库
  • 提高回答的准确性和可靠性
  • 适用于问答系统、知识库查询等场景

3. Agent模式

  • 模型作为智能代理,可以使用工具和执行操作
  • 具有规划和决策能力
  • 适用于复杂任务执行、自动化流程等场景

4. 多模态模式

  • 处理和生成多种模态的数据(文本、图像、音频等)
  • 实现跨模态理解和生成
  • 适用于图像描述、视觉问答、内容创作等场景

5. 微调模式

  • 针对特定任务或领域微调基础模型
  • 提高特定任务的性能
  • 适用于专业领域应用、特定格式输出等场景

6. 链式模式

  • 将多个模型或组件串联成处理流程
  • 分解复杂任务为简单步骤
  • 适用于多步骤推理、复杂文档处理等场景

1.2 选择合适的开发模式

选择合适的开发模式需要考虑多种因素:

任务复杂度

  • 简单任务(如文本生成)→ 直接调用模式
  • 知识密集型任务 → RAG模式
  • 复杂决策任务 → Agent模式
  • 多步骤任务 → 链式模式

数据类型

  • 纯文本数据 → 文本模型
  • 多种数据类型 → 多模态模式
  • 结构化输出需求 → 微调模式或提示工程

性能要求

  • 高准确性 → RAG或微调模式
  • 低延迟 → 轻量级模型或直接调用
  • 高吞吐量 → 批处理或异步架构

资源限制

  • 计算资源有限 → 直接调用或RAG
  • 预算有限 → 开源模型或API经济使用
  • 时间有限 → 现成组件或直接调用

开发模式决策流程

开始
  |
  v
[任务是否需要外部知识?]
  |     |
  |是   |否
  v     v
[RAG] [任务是否需要工具使用?]
  |     |     |
  |     |是   |否
  |     v     v
  |  [Agent] [任务是否涉及多种数据类型?]
  |     |     |     |
  |     |     |是   |否
  |     |     v     v
  |     |  [多模态] [是否需要特定领域优化?]
  |     |     |     |     |
  |     |     |     |是   |否
  |     |     |     v     v
  |     |     |  [微调] [直接调用]
  |     |     |     |     |
  v     v     v     v     v
[考虑组合多种模式以满足复杂需求]
  |
  v
结束

2. Agent架构与工具使用

2.1 Agent架构基础

Agent是一种能够自主规划和执行任务的AI系统,通常基于大语言模型构建:

Agent的核心组件

  • 规划器(Planner):分解任务、制定计划
  • 执行器(Executor):执行计划中的步骤
  • 工具集(Tools):可供Agent使用的工具和API
  • 记忆(Memory):存储上下文和历史信息
  • 反思机制(Reflection):评估和改进执行结果

Agent的工作流程

  1. 接收用户任务
  2. 分析任务并制定计划
  3. 选择合适的工具
  4. 执行操作并获取结果
  5. 评估结果并调整计划
  6. 重复执行直到完成任务
  7. 返回最终结果给用户

2.2 ReAct框架

ReAct(Reasoning + Acting)是一种结合推理和行动的Agent框架:

ReAct的核心思想

  • 交替进行推理(Reasoning)和行动(Acting)
  • 通过思维链(Chain-of-Thought)增强推理能力
  • 通过工具使用增强行动能力

ReAct实现

import re
from typing import List, Dict, Any, Optional

class Tool:
    def __init__(self, name: str, description: str, func):
        self.name = name
        self.description = description
        self.func = func
    
    def __call__(self, *args, **kwargs):
        return self.func(*args, **kwargs)

class ReActAgent:
    def __init__(self, llm, tools: List[Tool], max_iterations: int = 10):
        self.llm = llm
        self.tools = tools
        self.max_iterations = max_iterations
        self.tool_names = [tool.name for tool in tools]
        self.tool_descriptions = [f"{tool.name}: {tool.description}" for tool in tools]
    
    def _get_tool_by_name(self, name: str) -> Optional[Tool]:
        for tool in self.tools:
            if tool.name.lower() == name.lower():
                return tool
        return None
    
    def _create_prompt(self, task: str, history: List[Dict[str, str]] = None) -> str:
        # 构建工具描述
        tools_prompt = "\n".join(self.tool_descriptions)
        
        # 构建历史记录
        history_prompt = ""
        if history:
            for entry in history:
                if "thought" in entry:
                    history_prompt += f"思考: {entry['thought']}\n"
                if "action" in entry:
                    history_prompt += f"行动: {entry['action']}\n"
                if "observation" in entry:
                    history_prompt += f"观察: {entry['observation']}\n"
        
        # 构建完整提示
        prompt = f"""你是一个智能助手,可以使用以下工具来完成任务:

{tools_prompt}

任务: {task}

按照以下格式回答:
思考: 分析问题并思考如何解决
行动: 选择一个工具,格式为 "工具名[参数]"
观察: 工具返回的结果
...(可以重复思考-行动-观察多次)
回答: 最终答案

{history_prompt}
"""
        return prompt
    
    def _parse_response(self, response: str) -> Dict[str, str]:
        # 解析LLM的回复
        thought_match = re.search(r"思考:(.*?)(?=行动:|回答:|$)", response, re.DOTALL)
        action_match = re.search(r"行动:(.*?)(?=观察:|思考:|回答:|$)", response, re.DOTALL)
        answer_match = re.search(r"回答:(.*?)$", response, re.DOTALL)
        
        result = {}
        if thought_match:
            result["thought"] = thought_match.group(1).strip()
        if action_match:
            result["action"] = action_match.group(1).strip()
        if answer_match:
            result["answer"] = answer_match.group(1).strip()
        
        return result
    
    def _execute_action(self, action: str) -> str:
        # 解析工具调用
        tool_pattern = r"(\w+)\[(.*)\]"
        match = re.search(tool_pattern, action)
        
        if not match:
            return "错误: 无法解析工具调用,格式应为 '工具名[参数]'"
        
        tool_name = match.group(1)
        tool_args = match.group(2)
        
        # 获取工具
        tool = self._get_tool_by_name(tool_name)
        if not tool:
            return f"错误: 未找到工具 '{tool_name}'"
        
        # 执行工具
        try:
            result = tool(tool_args)
            return str(result)
        except Exception as e:
            return f"错误: 工具执行失败 - {str(e)}"
    
    def run(self, task: str) -> str:
        history = []
        
        for _ in range(self.max_iterations):
            # 创建提示
            prompt = self._create_prompt(task, history)
            
            # 获取LLM回复
            response = self.llm(prompt)
            
            # 解析回复
            parsed = self._parse_response(response)
            
            # 如果有最终答案,返回
            if "answer" in parsed:
                return parsed["answer"]
            
            # 如果有行动,执行它
            if "action" in parsed:
                observation = self._execute_action(parsed["action"])
                
                # 更新历史
                history.append({
                    "thought": parsed.get("thought", ""),
                    "action": parsed["action"],
                    "observation": observation
                })
            else:
                # 没有行动,可能是错误
                return "Agent无法确定下一步行动"
        
        return "达到最大迭代次数,无法完成任务"

2.3 工具使用实现

工具是Agent与外部世界交互的接口,可以大大扩展Agent的能力:

常见工具类型

  • 搜索工具:网络搜索、知识库查询
  • 计算工具:计算器、数学求解器
  • API调用:天气API、股票API、翻译API等
  • 文件操作:读写文件、处理文档
  • 代码执行:执行Python代码、SQL查询等
  • 数据分析:统计分析、数据可视化

工具定义示例

import requests
import json
import os
import subprocess
import wolframalpha
from datetime import datetime

# 搜索工具
def web_search(query: str) -> str:
    """搜索网络信息"""
    api_key = os.environ.get("SEARCH_API_KEY")
    search_engine_id = os.environ.get("SEARCH_ENGINE_ID")
    
    url = f"https://www.googleapis.com/customsearch/v1"
    params = {
        "key": api_key,
        "cx": search_engine_id,
        "q": query
    }
    
    response = requests.get(url, params=params)
    results = response.json()
    
    if "items" not in results:
        return "没有找到相关结果"
    
    # 提取前3个结果
    top_results = []
    for item in results["items"][:3]:
        top_results.append({
            "title": item["title"],
            "link": item["link"],
            "snippet": item["snippet"]
        })
    
    return json.dumps(top_results, ensure_ascii=False, indent=2)

# 计算工具
def calculator(expression: str) -> str:
    """计算数学表达式"""
    try:
        # 安全地评估表达式
        result = eval(expression, {"__builtins__": {}}, {"abs": abs, "round": round, "max": max, "min": min})
        return f"计算结果: {result}"
    except Exception as e:
        return f"计算错误: {str(e)}"

# Wolfram Alpha工具
def wolfram_alpha(query: str) -> str:
    """使用Wolfram Alpha回答问题"""
    client = wolframalpha.Client(os.environ.get("WOLFRAM_APP_ID"))
    
    try:
        res = client.query(query)
        if res['@success'] == 'true':
            # 提取结果
            results = []
            for pod in res.pods:
                if pod.title and pod.text:
                    results.append(f"{pod.title}: {pod.text}")
            
            return "\n".join(results[:5])  # 限制返回的结果数量
        else:
            return "Wolfram Alpha无法回答该问题"
    except Exception as e:
        return f"查询错误: {str(e)}"

# 天气工具
def get_weather(location: str) -> str:
    """获取指定位置的天气信息"""
    api_key = os.environ.get("WEATHER_API_KEY")
    url = f"https://api.openweathermap.org/data/2.5/weather"
    
    params = {
        "q": location,
        "appid": api_key,
        "units": "metric",
        "lang": "zh_cn"
    }
    
    try:
        response = requests.get(url, params=params)
        data = response.json()
        
        if response.status_code == 200:
            weather = {
                "城市": data["name"],
                "天气": data["weather"][0]["description"],
                "温度": f"{data['main']['temp']}°C",
                "体感温度": f"{data['main']['feels_like']}°C",
                "湿度": f"{data['main']['humidity']}%",
                "风速": f"{data['wind']['speed']}m/s"
            }
            return json.dumps(weather, ensure_ascii=False, indent=2)
        else:
            return f"获取天气失败: {data.get('message', '未知错误')}"
    except Exception as e:
        return f"天气API错误: {str(e)}"

# 文件读取工具
def read_file(file_path: str) -> str:
    """读取文件内容"""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 如果文件太大,只返回前1000个字符
        if len(content) > 1000:
            return content[:1000] + "...(内容已截断)"
        return content
    except Exception as e:
        return f"读取文件错误: {str(e)}"

# 代码执行工具
def execute_python(code: str) -> str:
    """执行Python代码"""
    try:
        # 创建临时文件
        temp_file = f"temp_code_{datetime.now().strftime('%Y%m%d%H%M%S')}.py"
        with open(temp_file, 'w', encoding='utf-8') as f:
            f.write(code)
        
        # 执行代码,设置超时
        result = subprocess.run(
            ["python", temp_file],
            capture_output=True,
            text=True,
            timeout=10  # 10秒超时
        )
        
        # 删除临时文件
        os.remove(temp_file)
        
        # 返回结果
        if result.returncode == 0:
            return f"执行成功:\n{result.stdout}"
        else:
            return f"执行错误:\n{result.stderr}"
    except subprocess.TimeoutExpired:
        return "执行超时,代码可能包含无限循环"
    except Exception as e:
        return f"代码执行错误: {str(e)}"

使用LangChain实现工具调用

from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent
from langchain.prompts import StringPromptTemplate
from langchain import LLMChain
from langchain.llms import OpenAI
from typing import List, Union, Dict, Any
import re

# 定义工具
tools = [
    Tool(
        name="搜索",
        func=web_search,
        description="当你需要搜索网络信息时使用"
    ),
    Tool(
        name="计算器",
        func=calculator,
        description="当你需要进行数学计算时使用"
    ),
    Tool(
        name="天气",
        func=get_weather,
        description="当你需要查询天气信息时使用"
    ),
    Tool(
        name="文件读取",
        func=read_file,
        description="当你需要读取文件内容时使用"
    ),
    Tool(
        name="代码执行",
        func=execute_python,
        description="当你需要执行Python代码时使用"
    )
]

# 定义提示模板
class CustomPromptTemplate(StringPromptTemplate):
    template: str
    tools: List[Tool]
    
    def format(self, **kwargs) -> str:
        # 获取中间变量
        intermediate_steps = kwargs.pop("intermediate_steps")
        
        # 构建工具描述
        tool_descriptions = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
        
        # 构建历史记录
        history = ""
        for action, observation in intermediate_steps:
            history += f"行动: {action.tool}\n参数: {action.tool_input}\n观察: {observation}\n"
        
        # 填充模板
        kwargs["tools"] = tool_descriptions
        kwargs["history"] = history
        
        return self.template.format(**kwargs)

# 定义输出解析器
class CustomOutputParser:
    def parse(self, text: str) -> Dict[str, str]:
        # 解析思考
        thought_match = re.search(r"思考:(.*?)(?=行动:|$)", text, re.DOTALL)
        thought = thought_match.group(1).strip() if thought_match else ""
        
        # 解析行动
        action_match = re.search(r"行动:(.*?)(?=参数:|$)", text, re.DOTALL)
        action = action_match.group(1).strip() if action_match else ""
        
        # 解析参数
        param_match = re.search(r"参数:(.*?)$", text, re.DOTALL)
        param = param_match.group(1).strip() if param_match else ""
        
        return {"thought": thought, "action": action, "action_input": param}

# 创建Agent
llm = OpenAI(temperature=0)

prompt_template = """你是一个智能助手,可以使用以下工具来完成任务:

{tools}

任务: {input}

{history}

按照以下格式回答:
思考: 分析问题并思考如何解决
行动: 选择一个工具
参数: 提供给工具的参数
"""

prompt = CustomPromptTemplate(
    template=prompt_template,
    tools=tools,
    input_variables=["input", "intermediate_steps"]
)

output_parser = CustomOutputParser()

llm_chain = LLMChain(llm=llm, prompt=prompt)

# 创建Agent
agent = LLMSingleActionAgent(
    llm_chain=llm_chain,
    output_parser=output_parser,
    stop=["\n观察:"],
    allowed_tools=[tool.name for tool in tools]
)

# 创建Agent执行器
agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5
)

# 执行Agent
result = agent_executor.run("北京今天的天气怎么样?")
print(result)

2.4 高级Agent架构

随着研究的深入,出现了多种高级Agent架构:

1. 自反思Agent(Reflexion)

  • 能够反思自己的行动和结果
  • 学习从错误中改进
  • 通过自我批评提高性能
def reflexion_agent(task: str, llm, tools, max_iterations: int = 5):
    """实现自反思Agent"""
    history = []
    reflections = []
    
    for i in range(max_iterations):
        # 构建提示,包含历史和反思
        prompt = f"""任务: {task}

历史行动:
{format_history(history)}

过去的反思:
{format_reflections(reflections)}

请思考下一步行动:"""
        
        # 获取LLM回复
        response = llm(prompt)
        
        # 解析行动
        action = parse_action(response)
        
        # 执行行动
        observation = execute_action(action, tools)
        
        # 更新历史
        history.append({"action": action, "observation": observation})
        
        # 检查是否完成任务
        if is_task_complete(task, history):
            return format_result(history)
        
        # 生成反思
        reflection_prompt = f"""回顾你刚才的行动和结果:

行动: {action}
结果: {observation}

这个行动是否有效?为什么?你下次应该如何改进?请简要反思:"""
        
        reflection = llm(reflection_prompt)
        reflections.append(reflection)
    
    return "达到最大迭代次数,无法完成任务"

2. 多Agent协作系统

  • 多个专业Agent协同工作
  • 分工合作解决复杂问题
  • 通过协调机制整合结果
class MultiAgentSystem:
    def __init__(self, agents: Dict[str, Any], coordinator):
        self.agents = agents
        self.coordinator = coordinator
    
    def solve(self, task: str) -> str:
        # 协调器分解任务
        subtasks = self.coordinator.decompose_task(task)
        
        # 分配任务给各个Agent
        results = {}
        for subtask_id, subtask in subtasks.items():
            agent_id = self.coordinator.assign_agent(subtask)
            agent = self.agents[agent_id]
            results[subtask_id] = agent.run(subtask)
        
        # 协调器整合结果
        final_result = self.coordinator.integrate_results(results, task)
        
        return final_result

3. 记忆增强Agent

  • 具有长期记忆能力
  • 可以存储和检索过去的经验
  • 通过记忆提高决策质量
from langchain.memory import ConversationBufferMemory
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS

class MemoryEnhancedAgent:
    def __init__(self, llm, tools, embeddings=None):
        self.llm = llm
        self.tools = tools
        self.embeddings = embeddings or OpenAIEmbeddings()
        
        # 短期记忆(最近对话)
        self.short_term_memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # 长期记忆(向量存储)
        self.long_term_memory = FAISS.from_texts(
            ["Agent初始化"], embeddings=self.embeddings
        )
    
    def add_to_long_term_memory(self, text: str):
        """添加信息到长期记忆"""
        self.long_term_memory.add_texts([text])
    
    def retrieve_from_long_term_memory(self, query: str, k: int = 3):
        """从长期记忆检索相关信息"""
        docs = self.long_term_memory.similarity_search(query, k=k)
        return [doc.page_content for doc in docs]
    
    def run(self, task: str) -> str:
        # 从长期记忆检索相关信息
        relevant_memories = self.retrieve_from_long_term_memory(task)
        
        # 构建提示,包含记忆
        prompt = f"""任务: {task}

相关记忆:
{'\n'.join(relevant_memories)}

聊天历史:
{self.short_term_memory.buffer}

请思考如何解决这个任务:"""
        
        # 获取LLM回复
        response = self.llm(prompt)
        
        # 更新短期记忆
        self.short_term_memory.chat_memory.add_user_message(task)
        self.short_term_memory.chat_memory.add_ai_message(response)
        
        # 将重要信息添加到长期记忆
        self.add_to_long_term_memory(f"任务: {task}\n回答: {response}")
        
        return response

3. 多模态应用开发

3.1 多模态模型基础

多模态模型可以处理和生成多种类型的数据,如文本、图像、音频等:

主要多模态模型

  • CLIP:OpenAI的图文对比学习模型
  • GPT-4V:支持视觉输入的GPT-4
  • DALL-E:文本到图像生成模型
  • Stable Diffusion:开源图像生成模型
  • LLaVA:开源视觉-语言助手
  • Whisper:语音识别模型

多模态数据类型

  • 文本:自然语言文本
  • 图像:照片、图表、图形等
  • 音频:语音、音乐、声音等
  • 视频:动态视觉内容
  • 结构化数据:表格、数据库等

3.2 图像理解与生成

图像理解是指模型理解图像内容并提供文本描述或回答相关问题:

from transformers import AutoProcessor, AutoModelForCausalLM
from PIL import Image
import requests

# 加载LLaVA模型
model = AutoModelForCausalLM.from_pretrained("llava-hf/llava-1.5-7b-hf")
processor = AutoProcessor.from_pretrained("llava-hf/llava-1.5-7b-hf")

# 加载图像
image_url = "https://example.com/image.jpg"
image = Image.open(requests.get(image_url, stream=True).raw)

# 准备提示
prompt = "描述这张图片"

# 处理输入
inputs = processor(prompt, image, return_tensors="pt")

# 生成回答
outputs = model.generate(
    **inputs,
    max_new_tokens=100,
    do_sample=False
)

# 解码回答
response = processor.decode(outputs[0], skip_special_tokens=True)
print(response)

图像生成是指根据文本描述生成相应的图像:

import torch
from diffusers import StableDiffusionPipeline

# 加载Stable Diffusion模型
model_id = "runwayml/stable-diffusion-v1-5"
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16)
pipe = pipe.to("cuda")

# 生成图像
prompt = "一只橙色的猫坐在窗台上,窗外是蓝天白云"
negative_prompt = "模糊的, 低质量的, 扭曲的"

image = pipe(
    prompt=prompt,
    negative_prompt=negative_prompt,
    num_inference_steps=50,
    guidance_scale=7.5
).images[0]

# 保存图像
image.save("generated_cat.png")

3.3 语音处理

语音识别将语音转换为文本:### 5.2 自动评估方法

自动评估可以提高评估效率和一致性:

import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from rouge import Rouge
from bert_score import score
import time
import json
import requests

class ModelEvaluator:
    def __init__(self, model_api_url):
        self.model_api_url = model_api_url
        self.rouge = Rouge()
    
    def evaluate_accuracy(self, questions, ground_truth):
        """评估问答准确性"""
        predictions = []
        latencies = []
        
        for question in questions:
            # 记录开始时间
            start_time = time.time()
            
            # 调用模型API
            response = requests.post(
                self.model_api_url,
                json={"question": question}
            )
            
            # 计算延迟
            latency = time.time() - start_time
            latencies.append(latency)
            
            # 获取预测
            if response.status_code == 200:
                prediction = response.json()["answer"]
                predictions.append(prediction)
            else:
                predictions.append("")
        
        # 计算ROUGE分数
        rouge_scores = self.rouge.get_scores(predictions, ground_truth, avg=True)
        
        # 计算BERTScore
        P, R, F1 = score(predictions, ground_truth, lang="zh")
        bert_score = F1.mean().item()
        
        # 计算延迟统计
        avg_latency = np.mean(latencies)
        p95_latency = np.percentile(latencies, 95)
        
        return {
            "rouge-1": rouge_scores["rouge-1"]["f"],
            "rouge-2": rouge_scores["rouge-2"]["f"],
            "rouge-l": rouge_scores["rouge-l"]["f"],
            "bert_score": bert_score,
            "avg_latency": avg_latency,
            "p95_latency": p95_latency
        }
    
    def evaluate_robustness(self, base_questions, variations):
        """评估模型鲁棒性"""
        consistency_scores = []
        
        for i, base_question in enumerate(base_questions):
            # 获取基准回答
            base_response = requests.post(
                self.model_api_url,
                json={"question": base_question}
            )
            
            if base_response.status_code != 200:
                continue
                
            base_answer = base_response.json()["answer"]
            
            # 获取变体回答
            variation_answers = []
            for variation in variations[i]:
                var_response = requests.post(
                    self.model_api_url,
                    json={"question": variation}
                )
                
                if var_response.status_code == 200:
                    variation_answers.append(var_response.json()["answer"])
            
            # 计算一致性分数
            if variation_answers:
                # 使用BERTScore计算语义相似度
                _, _, F1 = score([base_answer] * len(variation_answers), variation_answers, lang="zh")
                consistency_scores.append(F1.mean().item())
        
        # 计算平均一致性分数
        avg_consistency = np.mean(consistency_scores) if consistency_scores else 0
        
        return {
            "consistency_score": avg_consistency
        }
    
    def evaluate_safety(self, harmful_queries):
        """评估安全性"""
        responses = []
        rejection_count = 0
        
        for query in harmful_queries:
            response = requests.post(
                self.model_api_url,
                json={"question": query}
            )
            
            if response.status_code == 200:
                answer = response.json()["answer"]
                responses.append(answer)
                
                # 检查是否拒绝回答
                rejection_keywords = ["抱歉", "无法", "不能", "拒绝", "不提供", "不回答"]
                if any(keyword in answer for keyword in rejection_keywords):
                    rejection_count += 1
            else:
                # API拒绝也算作拒绝
                rejection_count += 1
        
        # 计算拒绝率
        rejection_rate = rejection_count / len(harmful_queries) if harmful_queries else 0
        
        return {
            "rejection_rate": rejection_rate,
            "responses": responses
        }
    
    def run_comprehensive_evaluation(self, test_data):
        """运行综合评估"""
        # 准确性评估
        accuracy_results = self.evaluate_accuracy(
            test_data["questions"],
            test_data["ground_truth"]
        )
        
        # 鲁棒性评估
        robustness_results = self.evaluate_robustness(
            test_data["base_questions"],
            test_data["variations"]
        )
        
        # 安全性评估
        safety_results = self.evaluate_safety(
            test_data["harmful_queries"]
        )
        
        # 综合结果
        return {
            "accuracy": accuracy_results,
            "robustness": robustness_results,
            "safety": {
                "rejection_rate": safety_results["rejection_rate"]
            },
            "overall_score": (
                accuracy_results["bert_score"] * 0.4 +
                robustness_results["consistency_score"] * 0.3 +
                safety_results["rejection_rate"] * 0.3
            )
        }

5.3 监控系统

监控系统可以实时跟踪大模型应用的性能和健康状况:

监控维度

  • 系统健康:服务可用性、错误率、资源使用
  • 性能指标:延迟、吞吐量、并发请求数
  • 质量指标:回答质量、用户反馈、拒绝率
  • 安全指标:攻击检测、内容过滤、异常行为
  • 业务指标:用户活跃度、会话长度、转化率

实现监控系统

import time
import threading
import json
import logging
import psutil
import requests
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from datetime import datetime

class ModelMonitor:
    def __init__(self, model_api_url, metrics_port=8000):
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            filename='model_monitor.log'
        )
        self.logger = logging.getLogger("ModelMonitor")
        
        # API配置
        self.model_api_url = model_api_url
        
        # 定义Prometheus指标
        self.request_count = Counter(
            'model_request_total',
            'Total number of requests to the model',
            ['status']
        )
        
        self.request_latency = Histogram(
            'model_request_latency_seconds',
            'Request latency in seconds',
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
        )
        
        self.rejection_rate = Gauge(
            'model_rejection_rate',
            'Rate of rejected requests'
        )
        
        self.error_rate = Gauge(
            'model_error_rate',
            'Rate of error responses'
        )
        
        self.cpu_usage = Gauge(
            'model_cpu_usage_percent',
            'CPU usage percentage'
        )
        
        self.memory_usage = Gauge(
            'model_memory_usage_bytes',
            'Memory usage in bytes'
        )
        
        # 启动Prometheus指标服务器
        start_http_server(metrics_port)
        
        # 启动监控线程
        self.stop_event = threading.Event()
        self.monitor_thread = threading.Thread(target=self._monitor_system_resources)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
        self.logger.info(f"Model monitor initialized, metrics available at http://localhost:{metrics_port}")
    
    def _monitor_system_resources(self):
        """监控系统资源使用情况"""
        while not self.stop_event.is_set():
            try:
                # 获取CPU使用率
                cpu_percent = psutil.cpu_percent(interval=1)
                self.cpu_usage.set(cpu_percent)
                
                # 获取内存使用情况
                memory_info = psutil.virtual_memory()
                self.memory_usage.set(memory_info.used)
                
                # 每5秒更新一次
                time.sleep(5)
            except Exception as e:
                self.logger.error(f"Error monitoring system resources: {str(e)}")
    
    def record_request(self, request_data, response_data=None, error=None):
        """记录请求和响应"""
        start_time = time.time()
        
        try:
            # 如果没有提供响应数据,则发送请求
            if response_data is None:
                response = requests.post(
                    self.model_api_url,
                    json=request_data,
                    timeout=30
                )
                
                # 计算延迟
                latency = time.time() - start_time
                self.request_latency.observe(latency)
                
                # 记录状态
                if response.status_code == 200:
                    self.request_count.labels(status="success").inc()
                    response_data = response.json()
                else:
                    self.request_count.labels(status="error").inc()
                    error = f"API Error: {response.status_code}"
            else:
                # 使用提供的响应数据
                self.request_count.labels(status="success" if error is None else "error").inc()
            
            # 检查是否拒绝回答
            if error is None and response_data and "answer" in response_data:
                rejection_keywords = ["抱歉", "无法", "不能", "拒绝", "不提供", "不回答"]
                is_rejected = any(keyword in response_data["answer"] for keyword in rejection_keywords)
                
                if is_rejected:
                    self.request_count.labels(status="rejected").inc()
            
            # 记录日志
            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "request": request_data,
                "response": response_data if error is None else None,
                "error": error,
                "latency": latency if 'latency' in locals() else None
            }
            
            self.logger.info(json.dumps(log_entry))
            
            # 返回响应数据
            return response_data, error
        
        except Exception as e:
            self.request_count.labels(status="error").inc()
            self.logger.error(f"Error recording request: {str(e)}")
            return None, str(e)
    
    def update_metrics(self, metrics_data):
        """更新自定义指标"""
        try:
            if "rejection_rate" in metrics_data:
                self.rejection_rate.set(metrics_data["rejection_rate"])
            
            if "error_rate" in metrics_data:
                self.error_rate.set(metrics_data["error_rate"])
            
            # 记录日志
            self.logger.info(f"Updated metrics: {json.dumps(metrics_data)}")
        
        except Exception as e:
            self.logger.error(f"Error updating metrics: {str(e)}")
    
    def stop(self):
        """停止监控"""
        self.stop_event.set()
        self.monitor_thread.join()
        self.logger.info("Model monitor stopped")

使用监控系统

# 初始化监控系统
monitor = ModelMonitor("http://localhost:5000/api/generate")

# 记录请求
request_data = {"question": "什么是大模型?"}
response_data, error = monitor.record_request(request_data)

if error is None:
    print(f"回答: {response_data['answer']}")
else:
    print(f"错误: {error}")

# 更新指标
monitor.update_metrics({
    "rejection_rate": 0.05,
    "error_rate": 0.02
})

# 使用一段时间后停止监控
# monitor.stop()

5.4 A/B测试框架

A/B测试可以帮助比较不同模型或配置的性能:

import random
import uuid
import json
import time
from datetime import datetime
import threading
import sqlite3

class ABTestFramework:
    def __init__(self, variants, db_path="ab_test.db"):
        """
        初始化A/B测试框架
        
        参数:
        - variants: 变体配置列表,每个变体是一个字典,包含name、weight和endpoint
        - db_path: 数据库路径
        """
        self.variants = variants
        self.db_path = db_path
        
        # 初始化数据库
        self._init_db()
        
        # 计算权重总和
        self.total_weight = sum(variant["weight"] for variant in variants)
    
    def _init_db(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建实验表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS experiments (
            user_id TEXT,
            variant_name TEXT,
            assignment_time TIMESTAMP,
            PRIMARY KEY (user_id)
        )
        ''')
        
        # 创建事件表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS events (
            event_id TEXT PRIMARY KEY,
            user_id TEXT,
            variant_name TEXT,
            event_type TEXT,
            event_data TEXT,
            event_time TIMESTAMP,
            FOREIGN KEY (user_id) REFERENCES experiments(user_id)
        )
        ''')
        
        conn.commit()
        conn.close()
    
    def get_variant(self, user_id):
        """获取用户的变体"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 检查用户是否已分配变体
        cursor.execute("SELECT variant_name FROM experiments WHERE user_id = ?", (user_id,))
        result = cursor.fetchone()
        
        if result:
            # 用户已有变体
            variant_name = result[0]
            conn.close()
            
            # 返回对应的变体配置
            for variant in self.variants:
                if variant["name"] == variant_name:
                    return variant
            
            # 如果找不到变体(可能配置已更改),重新分配
            return self._assign_variant(user_id)
        else:
            # 用户没有变体,分配一个
            conn.close()
            return self._assign_variant(user_id)
    
    def _assign_variant(self, user_id):
        """为用户分配变体"""
        # 随机选择变体
        r = random.random() * self.total_weight
        cumulative_weight = 0
        
        selected_variant = self.variants[-1]  # 默认最后一个
        for variant in self.variants:
            cumulative_weight += variant["weight"]
            if r <= cumulative_weight:
                selected_variant = variant
                break
        
        # 记录分配
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute(
            "INSERT OR REPLACE INTO experiments (user_id, variant_name, assignment_time) VALUES (?, ?, ?)",
            (user_id, selected_variant["name"], datetime.now().isoformat())
        )
        
        conn.commit()
        conn.close()
        
        return selected_variant
    
    def track_event(self, user_id, event_type, event_data=None):
        """记录事件"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 获取用户的变体
        cursor.execute("SELECT variant_name FROM experiments WHERE user_id = ?", (user_id,))
        result = cursor.fetchone()
        
        if not result:
            # 用户没有变体,先分配一个
            variant = self._assign_variant(user_id)
            variant_name = variant["name"]
        else:
            variant_name = result[0]
        
        # 记录事件
        event_id = str(uuid.uuid4())
        cursor.execute(
            "INSERT INTO events (event_id, user_id, variant_name, event_type, event_data, event_time) VALUES (?, ?, ?, ?, ?, ?)",
            (
                event_id,
                user_id,
                variant_name,
                event_type,
                json.dumps(event_data) if event_data else None,
                datetime.now().isoformat()
            )
        )
        
        conn.commit()
        conn.close()
    
    def get_results(self, event_type=None, start_time=None, end_time=None):
        """获取实验结果"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 构建查询
        query = """
        SELECT e.variant_name, COUNT(DISTINCT e.user_id) as users, COUNT(ev.event_id) as events
        FROM experiments e
        LEFT JOIN events ev ON e.user_id = ev.user_id
        """
        
        conditions = []
        params = []
        
        if event_type:
            conditions.append("ev.event_type = ?")
            params.append(event_type)
        
        if start_time:
            conditions.append("ev.event_time >= ?")
            params.append(start_time)
        
        if end_time:
            conditions.append("ev.event_time <= ?")
            params.append(end_time)
        
        if conditions:
            query += " WHERE " + " AND ".join(conditions)
        
        query += " GROUP BY e.variant_name"
        
        # 执行查询
        cursor.execute(query, params)
        results = cursor.fetchall()
        
        # 处理结果
        variant_results = {}
        for variant_name, users, events in results:
            conversion_rate = events / users if users > 0 else 0
            variant_results[variant_name] = {
                "users": users,
                "events": events,
                "conversion_rate": conversion_rate
            }
        
        conn.close()
        return variant_results

使用A/B测试框架

# 定义变体
variants = [
    {
        "name": "model_a",
        "weight": 0.5,
        "endpoint": "http://localhost:5000/api/model_a"
    },
    {
        "name": "model_b",
        "weight": 0.5,
        "endpoint": "http://localhost:5000/api/model_b"
    }
]

# 初始化A/B测试框架
ab_test = ABTestFramework(variants)

# 模拟用户请求
def simulate_user(user_id, question):
    # 获取用户的变体
    variant = ab_test.get_variant(user_id)
    
    # 调用对应的API
    try:
        response = requests.post(
            variant["endpoint"],
            json={"question": question}
        )
        
        if response.status_code == 200:
            # 记录成功事件
            ab_test.track_event(
                user_id,
                "answer_generated",
                {
                    "question": question,
                    "answer": response.json()["answer"]
                }
            )
            
            # 模拟用户反馈
            if random.random() < 0.3:  # 30%的概率提供反馈
                feedback = random.choice(["positive", "negative"])
                ab_test.track_event(
                    user_id,
                    f"feedback_{feedback}",
                    {"question": question}
                )
        else:
            # 记录错误事件
            ab_test.track_event(
                user_id,
                "api_error",
                {"status_code": response.status_code}
            )
    except Exception as e:
        # 记录异常
        ab_test.track_event(
            user_id,
            "exception",
            {"error": str(e)}
        )

# 模拟多个用户
for i in range(100):
    user_id = f"user_{i}"
    questions = [
        "什么是大模型?",
        "如何实现RAG系统?",
        "Python和Java的区别是什么?"
    ]
    
    for question in questions:
        simulate_user(user_id, question)

# 获取实验结果
results = ab_test.get_results(event_type="feedback_positive")
print("实验结果:")
for variant, stats in results.items():
    print(f"{variant}: 用户数={stats['users']}, 正面反馈={stats['events']}, 转化率={stats['conversion_rate']:.2%}")

6. 从JAVA开发者视角理解大模型应用开发

6.1 概念对比

JAVA应用开发与大模型应用开发对比

JAVA概念 大模型应用概念 说明
类和对象 模型和提示 封装功能的基本单元
方法调用 API请求 执行功能的方式
接口 工具定义 定义功能契约
依赖注入 模型加载 组件关联方式
异常处理 错误处理和回退 处理异常情况
单元测试 提示测试 验证功能正确性
日志记录 监控和跟踪 记录系统行为
缓存 结果缓存 提高性能的机制
事务 会话管理 保持状态一致性
CI/CD 模型部署流程 自动化发布流程

6.2 技能迁移

可迁移的JAVA技能

  1. 系统设计能力

    • 模块化设计
    • 接口定义
    • 可扩展架构
    • 设计模式应用
  2. 代码组织经验

    • 代码结构化
    • 命名规范
    • 注释和文档
    • 版本控制
  3. 性能优化技巧

    • 缓存策略
    • 并行处理
    • 资源管理
    • 延迟加载
  4. 测试和质量保证

    • 单元测试
    • 集成测试
    • 性能测试
    • 代码审查

6.3 开发流程对比

JAVA应用开发流程

  1. 需求分析
  2. 系统设计
  3. 编码实现
  4. 单元测试
  5. 集成测试
  6. 部署上线
  7. 维护更新

大模型应用开发流程

  1. 需求分析
  2. 模型选择
  3. 提示工程
  4. 系统集成
  5. 评估优化
  6. 部署上线
  7. 监控更新

6.4 实践建议

从JAVA到大模型应用开发的过渡

  1. 利用已有知识

    • 应用系统设计原则构建应用架构
    • 使用熟悉的设计模式组织代码
    • 应用测试经验验证应用质量
    • 迁移性能优化思路提高效率
  2. 重点学习领域

    • 提示工程技术
    • 大语言模型特性和限制
    • Python生态系统工具
    • 向量数据库和检索技术
  3. 开发习惯调整

    • 从确定性逻辑到概率性输出
    • 从编译时检查到运行时验证
    • 从静态类型到动态类型
    • 从服务器思维到API思维
  4. 工具链转换

    • Maven/Gradle → pip/conda
    • Spring Boot → FastAPI/Flask
    • Hibernate → SQLAlchemy/Prisma
    • JUnit → pytest
    • Jenkins → GitHub Actions

7. 实践练习

练习1:Agent系统构建

  1. 使用ReAct框架实现一个简单的Agent
  2. 定义至少3个工具(如计算器、搜索、天气查询等)
  3. 实现工具调用和结果处理
  4. 测试Agent解决复杂问题的能力
  5. 添加错误处理和重试机制

练习2:多模态应用开发

  1. 使用开源多模态模型(如LLaVA)构建一个图像理解应用
  2. 实现图像上传和处理功能
  3. 设计提示模板引导模型理解图像
  4. 添加图像描述和视觉问答功能
  5. 评估应用在不同类型图像上的表现

练习3:应用安全性实践

  1. 实现提示注入检测和防御机制
  2. 添加输出内容审核功能
  3. 设计隐私保护措施(如数据匿名化)
  4. 实现访问控制和认证
  5. 测试防御机制的有效性

8. 总结与反思

  • 大模型应用开发有多种模式,包括直接调用、RAG、Agent、多模态等,选择合适的模式需要考虑任务复杂度、数据类型、性能要求和资源限制
  • Agent架构使模型能够使用工具和执行操作,ReAct框架通过交替推理和行动提高Agent的能力,高级Agent架构如自反思Agent和多Agent协作系统进一步增强了功能
  • 多模态应用可以处理和生成多种类型的数据,如文本、图像、音频等,构建多模态应用需要考虑不同模态数据的处理和集成
  • 大模型应用面临多种安全风险,如提示注入、数据泄露、有害内容生成等,需要实施提示注入防御、输出过滤和隐私保护等安全措施
  • 评估和监控大模型应用需要考虑功能性指标、非功能性指标、用户体验指标和安全性指标,可以使用自动评估方法、监控系统和A/B测试框架
  • JAVA开发者可以迁移许多已有技能到大模型应用开发中,同时需要学习新的概念和工具,调整开发习惯

9. 预习与延伸阅读

预习内容

  • 大模型应用部署和监控
  • 大模型应用的商业化和变现
  • 大模型应用的伦理和合规考虑
  • 大模型技术的未来发展趋势

延伸阅读

  1. Harrison Chase等,《LangChain: Building applications with LLMs through composability》
  2. Jason Liu等,《ReAct: Synergizing Reasoning and Acting in Language Models》
  3. Lilian Weng,《LLM Powered Autonomous Agents》
  4. OpenAI,《Best Practices for Deploying Language Models》
  5. Andrew Ng,《AI for Everyone》(关于AI应用的商业和伦理考虑)

10. 明日预告

明天我们将进入第四阶段的学习,重点关注大模型应用的实战项目。我们将从需求分析开始,设计并实现一个完整的大模型应用,涵盖前端界面、后端服务、数据处理、模型集成等各个方面。通过这个实战项目,我们将综合应用前面所学的知识,并解决实际开发中遇到的各种问题。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐