算法工程师(提供模型训练后的文件.pkl, .h5, .onnx 文件或 TensorFlow SavedModel)和大模型工程师工作(加载模型、启动模型)协作
算法工程师和大模型工程师
应用场景:构建一个“企业级智能客服问答系统”
目标:基于一个通用的开源大模型(如 Llama 3),针对某公司的特定产品文档和知识,微调出一个能准确回答用户问题的客服助手,并将其部署为可应对高并发访问的在线API服务。
- 交付物交接:算法工程师将最终微调好的模型文件(./llama3-finetuned目录)和对应的tokenizer交付给开发工程师。
- 性能协商:开发工程师反馈模型在目标硬件上的推理速度(如 100tokens/秒),算法工程师可能需要权衡模型大小和性能(例如选择8B还是70B的模型,或者进行更激进的量化)。
- 问题排查:线上服务若出现回答质量下降,开发工程师协助算法工程师排查是否是量化或部署环节引入的问题(如INT8量化导致的精度损失);算法工程师也可能需要根据线上收集的bad cases进行新一轮的数据清洗和模型微调(持续迭代)。
- 协同优化:开发工程师发现90%的请求长度<512,但模型配置支持2048。他将这个信息反馈给算法工程师。算法工程师可以重新调整训练时的max_seq_length,从而显著提升训练和推理效率。
一、算法工程师进行模型的准备和微调
①、确定需求
与业务部门沟通,确定问答范围和预期效果,与业务部门沟通,确定问答范围和预期效果。
算法工程师定义数据格式,通常使用指令微调的格式。
清洗数据:去除无关字符、敏感信息、格式化文本。建指令微调数据集(JSONL格式),将知识转化为Q-A对。
数据集示例:
{"instruction": "我们的旗舰产品「CloudMax」的基本套餐每月多少钱?", "input": "", "output": "CloudMax的基础版套餐每月费用为99元人民币,包含100GB的云存储和10TB的月度数据流量。"}
{"instruction": "如何重置我的账户密码?", "input": "", "output": "您可以按照以下步骤重置密码:1. 访问登录页面。 2. 点击「忘记密码」。 3. 输入您的注册邮箱。 4. 检查您的邮箱并点击重置链接。 5. 设置您的新密码。"}
数据清洗:
其中raw_data(列表形式常见)来源:
# 第一种:从文件读取
import json
# 从JSON文件读取
with open('qa_data.json', 'r', encoding='utf-8') as f:
raw_data = json.load(f)
# 或从CSV文件读取
import pandas as pd
raw_data = pd.read_csv('qa_data.csv').to_dict('records')
#---------------------------------------------------------------------------
# 第二种:从数据库查询
import sqlite3
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute("SELECT question,answer FROM faq_table")
raw_data = [{"question": row[0], "answer": row[1]} for row in cursor.fetchall()]
import json
import re
def clean_text(text):
# \s:正则表达式中的特殊字符,匹配任何空白字符
# +:量词,表示"一个或多个"前面的元素(即一个或多个空白字符)
# r'\s+':匹配一个或多个连续的空白字符 ' ':替换为单个空格
#将文本中所有的空白字符序列替换为单个空格
text = re.sub(r'\s+', ' ', text)
# [^...]:字符类中的^表示"否定"或"除了",匹配不在方括号内的任何字符 ,。!?:;()《》:保留这些中文标点符号
# \w:匹配任何单词字符(字母、数字、下划线)
# \s:匹配任何空白字符(保留空白)
# \u4e00-\u9fff:Unicode范围,匹配所有中文字符
# 删除文本中所有不属于指定字符集的字符
text = re.sub(r'[^\w\s\u4e00-\u9fff,。!?:;()《》]', '', text)
return text.strip()
# 该数据来源于文件读取、数据库查询等
raw_data = [
{
"question": " CloudMax的【基础套餐】每月多少钱? ",
"answer": "基础版套餐每月费用为99元人民币!包含100GB存储。"
},
{
"question": "如何重置密码???",
"answer": "步骤:1.访问登录页。2.点击「忘记密码」。3.输入邮箱。"
}
]
processed_data = []
for item in raw_data:
processed_data.append({
"instruction": clean_text(item["question"]),
"input": "",
"output": clean_text(item["answer"])
})
# 保存处理后的数据
with open('dataset.jsonl', 'w', encoding='utf-8') as f:
for item in processed_data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
print("数据预处理完成!")
输出结果(dataset.jsonl文件内容):
{"instruction": "CloudMax的基础套餐每月多少钱", "input": "", "output": "基础版套餐每月费用为99元人民币包含100GB存储"}
{"instruction": "如何重置密码", "input": "", "output": "步骤1访问登录页2点击忘记密码3输入邮箱"}
②、选择基座模型,使用Hugging Face TRL、transformers等库进行有监督微调
from transformers import AutoModelForCausalLM,AutoTokenizer,TrainingArguments
from trl import SFTTrainer
from datasets import load_dataset
# AutoModelForCausalLM自动模型类,用于加载因果语言模型(Causal LM)基于上文预测下一个词的模型(如GPT系列、LLaMA等)
# device_map="auto"自动设备映射,用于模型并行和内存优化 没有该设置的话 模型会默认加载到第一个GPU(cuda:0)或CPU上 如果模型太大,会导致内存不足(OOM)错误。 自动检测硬件:检查可用的GPU数量、内存大小 智能分片:将大模型的不同层分配到不同的设备上 内存优化:尽可能利用所有可用的GPU内存
model_name = "meta-llama/Meta-Llama-3-8B-Instruct"
model = AutoModelForCausalLM.from_pretrained(model_name,device_map="auto")#从预训练模型加载权重和配置
# model = AutoModelForCausalLM.from_pretrained(model_name, device_map="cuda:0") 全部加载到GPU 0
# model = AutoModelForCausalLM.from_pretrained(model_name, device_map="cpu") 全部加载到CPU(用于推理,不训练)
tokenizer = AutoTokenizer.from_pretrained(model_name) #从预训练模型加载对应的tokenizer tokenizer负责:文本 → tokens(编码) 和 tokens → 文本(解码)
tokenizer.pad_token = tokenizer.eos_token #eos_token(如 <|endoftext|>):表示序列结束 pad_token:用于将不同长度的序列填充到相同长度(批处理时需要)
# 加载如上清洗后的json格式数据 data_files='dataset.jsonl':数据文件路径 split='train':加载训练集部分
dataset = load_dataset('json', data_files='dataset.jsonl', split='train')
# 将每个样本转换为模型训练所需的文本格式 让模型学会识别指令和回应的模式
def format_instruction(sample):
return f"### Instruction:\n{sample['instruction']}\n\n### Response:\n{sample['output']}"
# 训练参数配置
training_args = TrainingArguments(
out_dir = "./llama3-finetuned", # 模型保存路径
per_device_train_batch_size=4, # 每个GPU的批次大小 1-8,内存优化策略:每个GPU处理4个样本
gradient_accumulation_steps=4, # 梯度累积步数 2-8 累积4步梯度
learning_rate=2e-5, # 学习率 (SFT通常较小)1e-5 到 5e-5
num_train_epochs=3, # 训练轮数
logging_dir='./logs', # 日志目录
save_strategy="epoch", # 保存策略:每轮结束时保存
fp16=True # 使用半精度浮点数训练 使用16位浮点数,减少显存使用
)
# 自动应用格式化函数到每个样本 处理文本截断和填充 只计算response部分的损失(忽略instruction部分的损失)
trainer = SFTTrainser(
model=model, # 要微调的模型
args=training_args, # 训练参数
train_dataset=dataset, # 训练数据集
max_seq_length=1024, # 最大序列长度
tokenizer=tokenizer, # 分词器
formatting_func=format_instruction, # 数据格式化函数
)
trainer.train()
trainer.save_model()
产出:保存在目录./llama3-finetuned下的微调后模型
import joblib # 用于保存和加载模型
# 保存模型
joblib.dump(model, "model/iris_classifier.pkl")
print("Model saved to model/iris_classifier.pkl")
format_instruction函数将每个样本转化为模型训练所需要的文本格式,让模型学会识别指令和回应的模式
### Instruction:
如何重置密码?
### Response:
您可以访问登录页面点击忘记密码链接来重置密码。
大模型训练需要较大的有效批次大小(如16、32),但GPU内存有限,无法一次性加载大批次,解决方案:使用小批次但累积梯度,模拟大批次效果。有效批次大小 = 4 × 4 = 16
per_device_train_batch_size=4, # 每个GPU处理4个样本
gradient_accumulation_steps=4, # 累积4步梯度
③、评估与验证
- 在预留的测试集上评估模型性能(准确性、相关性)
- 进行人工评估,确保回答质量符合业务要求
- 将最终满意的模型文件(包含pytorch_model.bin和配置文件)交付给开发工程师
training_args = TrainingArguments(
# ... 其他参数 ...
evaluation_strategy="epoch", # 每轮评估
load_best_model_at_end=True, # 训练结束时加载最佳模型
metric_for_best_model="eval_loss", # 根据评估损失选择最佳模型
)
二、开发工程师进行模型部署和服务化
- 目标:将llama3-finetuned模型转换为一个低延迟、高吞吐、高可用的gRPC/HTTP API服务。
- 模型部署:将训练好的模型(如 .pkl, .h5, .onnx 文件或 TensorFlow SavedModel)放入一个可以处理预测请求的环境中。
- 服务化:将部署的模型包装成一个可以通过网络(通常是 HTTP/REST API)被其他应用程序(如 Web 前端、移动App、其他微服务)调用的服务
方式 | 描述 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
Web 框架(如 Flask/FastAPI) | 使用轻量级 Web 框架创建 REST API | 简单灵活,开发速度快,易于调试 | 缺乏生产级功能(如负载均衡、监控),需自行处理扩展性 | 原型开发、小规模项目、内部工具 |
专用服务框架(如 TensorFlow Serving, TorchServe) | 为特定框架(TensorFlow/PyTorch)打造的高性能服务系统 | 高性能,支持模型版本管理、自动批处理等生产级特性 | 与特定 ML 框架绑定,配置稍复杂 | 中大型生产环境,使用 TF/PyTorch 的项目 |
云平台托管服务(如 AWS SageMaker, GCP Vertex AI, Azure ML) | 全托管的机器学习平台,提供一键部署和监控 | 无需管理基础设施,自动扩缩容,集成监控和日志 | 成本较高,可能存在平台锁定(Vendor Lock-in) | 希望快速上线且不想管理基础设施的团队 |
容器化与编排(Docker + Kubernetes) | 将模型服务打包成 Docker 镜像,用 K8s 进行编排管理 | 环境一致,易于扩展和部署,云原生标准方案 | 技术栈复杂,学习和维护成本高 | 技术栈复杂,学习和维护成本高 |
model-deployment-demo/
│
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用核心代码 定义输入数据格式class PredictionInput(BaseModel): 定义输出数据格式class PredictionOutput(BaseModel):
# load_model() 启动时加载模型
# 调用预测逻辑 prediction, confidence = predict(input_data)
│ ├── model.py # 模型加载和预测逻辑 model_path = os.path.join(os.path.dirname(__file__), '..', 'model', 'iris_classifier.pkl')
# _model = joblib.load(model_path)
# probabilities = _model.predict_proba(input_data)[0] 获取预测概率
# predicted_class = _model.predict(input_data)[0] 获取最可能的类别
# confidence = probabilities[predicted_class] 获取该类的置信度(概率)
│ └── requirements.txt # Python 依赖
│
├── model/ # 存放训练好的模型 joblib.dump(model, "model/iris_classifier.pkl")
│ └── iris_classifier.pkl #
│ #
├── Dockerfile
└── train_model.py # 用于训练和保存模型的脚本
①、模型转换和优化
- 将PyTorch模型转换为更适合高性能推理的格式,如ONNX或TensorRT。
- 进行量化(如FP16,INT8)以减小模型体积和加速推理。
- 代码实现(使用NVIDIA的TensorRT工具链):
# 这是一个简化的概念性命令,实际过程更复杂
trtexec --onnx=llama3-finetuned.onnx --saveEngine=llama3.plan --fp16
②、选择推理服务器与API框架
- 选择专业的推理服务器,如NVIDIA Triton Inference Server,它支持动态批处理、模型并发、监控等企业级功能。
- 使用FastAPI编写一个轻量级的代理服务,处理业务逻辑、身份验证和流量分发。
③、编写Triton模型配置:
开发工程师需要为Triton编写模型配置文件config.pbtxt,这是核心步骤。
name: "llama3_finetuned"
platform: "tensorrt_plan"
max_batch_size: 8 # 启用动态批处理,最大批次为8
input [
{
name: "input_ids"
data_type: TYPE_INT32
dims: [ -1 ] # 可变长度序列
}
]
output [
{
name: "logits"
data_type: TYPE_FP16
dims: [ -1, 32000 ] # 词汇表大小
}
]
④、构架API服务
编写FastAPI应用,它接收用户请求,与Triton服务器通信,并对生成结果进行后处理。
from fastapi import FastAPI, HTTPException
import tritonclient.http as httpclient
import numpy as np
app = FastAPI(title="Enterprise Chatbot API")
triton_client = httpclient.InferenceServerClient(url="localhost:8000")
@app.post("/v1/chat")
async def chat_endpoint(user_input: str):
try:
# 1. 预处理:将用户输入Token化
tokenizer = ... # 加载相同的tokenizer
inputs = tokenizer(user_input, return_tensors="pt").input_ids.numpy()
# 2. 设置Triton输入
triton_input = httpclient.InferInput(
"input_ids", inputs.shape, "INT32"
)
triton_input.set_data_from_numpy(inputs)
# 3. 调用Triton进行推理
response = triton_client.infer(
model_name="llama3_finetuned",
inputs=[triton_input]
)
# 4. 后处理:从输出logits中生成文本
logits = response.as_numpy("logits")
generated_tokens = np.argmax(logits, axis=-1)
generated_text = tokenizer.decode(generated_tokens[0])
return {"response": generated_text}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
测试API的三种方法:
- 使用curl
curl -X POST "http://localhost:8000/predict" \
-H "Content-Type: application/json" \
-d '{"sepal_length": 5.1, "sepal_width": 3.5, "petal_length": 1.4, "petal_width": 0.2}'
- 使用FastAPI 自动生成的交互式文档:
在浏览器中打开 http://localhost:8000/docs,你会看到 Swagger UI,可以直接在页面上尝试 /predict 端点
- 使用 Python requests:
import requests
import json
url = "http://localhost:8000/predict"
data = {
"sepal_length": 5.1,
"sepal_width": 3.5,
"petal_length": 1.4,
"petal_width": 0.2
}
response = requests.post(url, json=data)
print(response.json())
⑤、容器化与编排:
编写Dockerfile,将Triton Server、模型文件、FastAPI应用分别容器化。
使用Docker Compose或Kubernetes编排所有服务。
FROM python:3.10-slim #使用官方 Python 运行时作为父镜像
WORKDIR /app # 设置工作目录
COPY requirements.txt . # 将依赖文件复制到容器中
RUN pip install -r requirements.txt # 安装依赖
COPY . . # 将应用程序代码和模型文件复制到容器中
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"]
⑥、部署、监控与维护
在Kubernetes集群上部署整个应用栈。
配置监控(Prometheus/Grafana)和日志(ELK)系统,监控GPU利用率、请求延迟、错误率等关键指标。
设置弹性伸缩(HPA),根据CPU/GPU负载自动扩缩容Pod实例。
更多推荐
所有评论(0)