企业级大模型创建与部署全流程(数据集—>创建模型—>训练模型—>评估模型—>部署模型)
前向传播是数据流动的过程,全连接层和激活函数是构建模型的基础,输出层和任务头决定任务类型,学习率和损失函数驱动训练,反向传播和优化器调整权重,而过拟合是需要避免的陷阱。②、模型架构(任务头+反向传播+):在预训练模型的基础上,添加任务特定的输出层。①、使用测试集评估模型性能。大模型预训练后,针对不同的下游任务添加的“专用输出层”(后面接不同的任务头来做具体任务,预训练模型通常不包括任务特定的输出层
一、数据准备
①、数据收集:收集公司内部客服对话记录、产品手册、常见问题解答(FAQ)等
//智能客服系统场景
{"id": 1, "question": "如何重置密码?", "answer": "您可以在登录页面点击忘记密码,然后按照提示操作。", "category": "账户管理"}
{"id": 2, "question": "我的订单还没有发货?", "answer": "请提供订单号,我为您查询。", "category": "订单查询"}
//金融风控系统
transaction_id,user_id,amount,location,time,is_fraud
1001,12345,1500.00,"New York","2023-01-01 10:00:00",0
1002,12346,5000.00,"Tokyo","2023-01-01 11:00:00",1
//医疗诊断辅助系统
{
"patient_id": "P001",
"symptoms": ["发热", "咳嗽", "乏力"],
"examination_results": {"temperature": 38.5, "blood_pressure": "120/80"},
"diagnosis": "流感",
"treatment_plan": "休息,多喝水,服用抗病毒药物"
}
//法律文档分析
{
"clause": "本合同自双方签字盖章之日起生效,有效期三年。",
"explanation": "该条款规定了合同的生效时间和有效期。",
"risk_level": "低"
}
//多语言翻译系统
This is a product description. 这是产品描述。
The device has a battery life of 24 hours. 该设备电池续航为24小时。
②、数据清洗:去除敏感信息、无关信息、进行脱敏处理
③、数据标注:监督学习,需要将问题与对应的答案进行配对;也可以使用无监督、半监督方法
④、数据格式:训练集、验证集、测试集,问题和答案对应,有时需要构造负样本(错误答案)用于训练
⑤、数据预处理:根据所选模型的输入进行预处理,如分词、截断、添加特殊标记
import pandas as pd
import re
from transformers import AutoTokenizer
class DataPreprocessor:
def __init__(self,config):
self.tokenizer = AutoTokenizer.from_pretrained(config.model_name)
self.max_length = config.max_length
def clean_text(self,text):
"""文本清洗"""
#移除特殊字符
text = re.sub(r'[^\w\s.,?!]', '', text)
#标准化空白字符
text = re.sub(r'\s+', ' ', text)
return text.strip()
def tokenize_dataset(self,dataset):
"""分词处理"""
return self.tokenizer(
dataset['text'],
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
def create_training_splits(self,data,ratio=(0.7,0.15,0.15)):
"""划分数据集"""
train_size = int(len(data) * ratios[0])
val_size = int(len(data) * ratios[1])
train_data = data[:train_size]
val_data = data[train_size:train_size+val_size]
test_data = data[train_size+val_size]
return train_data,val_data,test_data
二、创建模型
①、选择需训练模型:比如问答任务选择BERT、RoBERTa等;需要生成答案选择GPT、T5等
业务—选择—>预训练模型
- 任务类型
文本分类、情感分析、命名实体识别等理解任务:BERT、RoBERTa、DistilBERT等
文本生成、对话系统、创意协作等生成任务:GPT-2、GPT-3、BART、T5
序列到序列任务(翻译、摘要):T5、BART、mT5(多语言T5)等 - 语言:单语言任务选择对应语言的模型,多语言任务选择多语言模型
- 领域:如果有特定领域(如医疗、法律),优先选择领域内预训练的模型
- 资源限制:如果计算资源有限,选择轻量级模型如DistilBERT、TinyBERT等
- 性能要求:如果追求最高性能,可以选择大型模型如RoBERTa-large、GPT-3等,但需权衡计算成本
- BERT(Bidirectional Encoder Representations from Transformers):用于理解上下文,适合NLU任务,如分类、命名实体识别等
- RoBERTa(Robustly Optimized BERT Pretraining Approach):对BERT的改进,移除了下一句预测任务,使用更大的批次和更多的数据
- DistilBERT:轻量级BERT,模型更小,速度更快,适合资源有限的环境
- GPT(Generative Pre-trained Transformer):自回归模型,适合生成任务,如文本生成、对话生成等
- BART(Bidirectional and Auto-Regressive Transformers):结合了BERT和GPT的特点,适用于生成和重构任务
- T5(Text-to-Text Transfer Transformer):将所有的NLP任务都转换为文本到文本的格式,统一了处理方式
- ELECTRA(Efficiently Learning an Encoder that Classifies Token Replacements Accurately):使用替换令牌检测任务,效率更高
- 多语言模型:如mBERT(多语言BERT)、XLM-RoBERTa,适用于多语言任务
- 领域特定预训练模型:如BioBERT(生物医学领域)、SciBERT(科学文献领域)、ClinicalBERT(临床领域)等
| 领域 | 模型名称 | 特点 | 适用任务 |
|---|---|---|---|
| 通用 | BERT | 双向编码,理解能力强 | 分类、NER、问答 |
| 通用 | RoBERTa | 优化版BERT,移除NSP任务 | 所有NLU任务 |
| 生成 | GPT系列 | 自回归生成,创意能力强 | 文本生成、对话 |
| 多任务 | T5 | 同一文本到文本框架 | 翻译、摘要、分类 |
| 多语言 | XLM-R | 100中语言支持 | 跨语言任务 |
| 生物医学 | BioBERT | 在PubMed上训练 | 医学NER、QA |
| 科学 | SciBERT | 科学文献训练 | 科学文本理解 |
| 法律 | LegalBERT | 法律文本训练 | 合同分析 |
| 金融 | FinBERT | 金融新闻训练 | 情感分析、预测 |
| 代码 | CodeBERT | 代码数据训练 | 代码生成、理解 |
class ModelFactory:
@staticmethod
def create_model(model_type,config):
"""根据需求创建模型"""
models = {
'classification':{
'bert': 'bert-base-uncased',
'roberta': 'roberta-base',
'distilbert': 'distilbert-base-uncased'
},
'generation':{
'gpt2': 'gpt2-medium',
't5': 't5-base',
'bart': 'facebook/bart-base'
},
'multimodal':{
'clip': 'openai/clip-vit-base-patch32'
}
}
# 考虑因素:计算资源、延迟要求、准确率需求
if config.gpu_memory < 8 # GPU内存小于8G
return models[model_type]['distilbert']
elif config.latency_requirement < 100:# 延迟要求<100ms
return models[model_type]['distilbert']
else:
return models[model_type]['bert']
②、模型架构(任务头+反向传播+):在预训练模型的基础上,添加任务特定的输出层。对于分类任务,可以添加一个全连接层;对于生成任务,则使用预训练模型的生成头
前向传播:用户评论“物流太慢了!”→ RoBERTa编码→全连接层→Softmax→输出[0.1, 0.2, 0.7](负面概率70%)
输入 → 前向传播(全连接层→激活函数→...)→ 输出层 → 任务头 → 预测结果
↑ ↓
反向传播 ← 损失函数(对比真实标签)
↑ ↓
优化器(学习率控制步长)← 梯度(来自反向传播)
全连接层(Fully Connected Layer):深度学习中,每个节点都与上一层的所有节点连接的层。
在NLP任务中,通常在预训练模型的输出之上添加一个或多个全连接层,将模型输出的高维向量映射到任务所需的维度
激活函数(Activation Function):常见的有ReLU、Sigmoid、Tanh。业务场景中,选激活函数影响模型性能,比如Sigmoid在二分类输出层常用,但可能有梯度消失,所以深层网络用ReLU
过拟合(Overfitting):模型在训练集表现好,测试集差,因为记住了噪声。nn.Dropout(p=0.5),或者L2正则化(weight_decay)
import torch
import torch.nn.as nn
class FullyConnectedLayerExplained:
"""全连接层详解"""
def __init__(self):
#最简单的全连接层示例(输入768维,输出256维)
self.fc_example = nn.Linear(in_features=768,out_features=256)
#多层全连接网络示例
self.mlp = nn.Sequential(
nn.Linear(768,512),# 第一层:768 -> 512
nn.ReLU(),# 激活函数,给神经网络引入非线性,解决梯度消失
nn.Dropout(0.1), # 防止过拟合,解决方式:正则化、Dropout、早停
nn.Linear(512,256),# 第二层:512 -> 256
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(256,10) # 输出层(模型最后一层):256 -> 10 (假设10分类)
)
任务头:
大模型预训练后,针对不同的下游任务添加的“专用输出层”(后面接不同的任务头来做具体任务,预训练模型通常不包括任务特定的输出层)
BERT预训练模型在预训练时使用MLM(掩码语言模型)和NSP(下一句预测)头,但在下游任务中我们需要根据任务替换这些头。常见的头有:
- 分类头:用于文本分类,通常是1层全连接+Softmax(如情感分析),将[CLS]标记的输出向量映射到类别数量的维度,然后接softmax。BERT后面加一个线性层作为分类头。代码里可能是model.classifier = nn.Linear(hidden_size, num_classes)
- 序列标注头:CRF层或全连接层(如NER)。用于命名实体识别等任务,通常是一个全连接层,将每个token的输出向量映射到标签数量维度
- 生成头:Transformer解码器(如文本生成),用于生成任务,如GPT-2、BART、T5等,通常是一个线性层,将隐藏状态映射到词汇表大小的维度,然后通过softmax生成下一个token的概率
- 问答头:用于问答任务,通常包括两个线性层,分别预测答案的可是和结束位置
# 同一预训练模型通过更换任务头,既可以做“用户意图分类”(3类),也可以做“商品属性抽取”(NER任务头)
from transformers import BertForSequenceClassification
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=3)# 3分类任务头
class PretrainedHeadsExplained:
"""预训练模型头部结构详解"""
def explain_pretrained_heads(self):
"""解释不同预训练模型的头部结构"""
print("预训练模型生成头(Pretrained Heads)详解:")
print("\n1. BERT的预训练头:")
print(" a) MLM头(Masked Language Modeling):")
print(" - 目的:预测被掩码的token")
print(" - 结构:Linear(768, 30522) + Softmax")
print(" - 输出:词汇表概率分布")
print("\n b) NSP头(Next Sentence Prediction):")
print(" - 目的:判断两个句子是否连续")
print(" - 结构:Linear(768, 2) + Softmax")
print(" - 输出:[是连续,不连续]")
print("\n2. GPT的预训练头:")
print(" - 目的:预测下一个token")
print(" - 结构:Linear(768, 50257) + Softmax")
print(" - 特点:自回归生成,每次生成一个token")
print("\n3. T5的预训练头:")
print(" - 目的:文本到文本转换")
print(" - 结构:编码器-解码器 + LM头")
print(" - 输出:任意文本序列")
print("\n4. 为什么需要自定义头?")
print(" - 预训练头针对预训练任务设计")
print(" - 下游任务通常与预训练任务不同")
print(" - 需要适配任务特定的输出格式")
根据业务添加输出层的步骤:
- 确定任务类型(分类、生成、序列标注)和输出形式(类别数、序列长度)
- 分局任务类型选择合适的头结构(如分类任务使用分类头)
- 在预训练模型的基础上构建模型,将预训练模型的输出作为自定义头的输入
- 训练时,可以先冻结预训练模型,只训练头部份,然后在微调整个模型,或者直接一起训练
# 生成头与分类头的区别
def compare_heads():
"""比较不同任务的头结构"""
print("任务特定头结构对比:")
print("\n1. 分类头(Classification Head):")
print(" 输入: [batch_size, hidden_size]")
print(" 结构: Linear(hidden_size, num_classes)")
print(" 输出: [batch_size, num_classes]")
print(" 示例: 情感分析(正面/负面/中性)")
print("\n2. 序列标注头(Sequence Labeling Head):")
print(" 输入: [batch_size, seq_len, hidden_size]")
print(" 结构: Linear(hidden_size, num_tags) 每个token独立")
print(" 输出: [batch_size, seq_len, num_tags]")
print(" 示例: 命名实体识别(PER, ORG, LOC, O)")
print("\n3. 生成头(Generation Head):")
print(" 输入: [batch_size, seq_len, hidden_size]")
print(" 结构: Linear(hidden_size, vocab_size)")
print(" 输出: [batch_size, seq_len, vocab_size]")
print(" 示例: 机器翻译、文本摘要")
print("\n4. 问答头(Question Answering Head):")
print(" 输入: [batch_size, seq_len, hidden_size]")
print(" 结构: 两个Linear层(分别预测开始和结束位置)")
print(" 输出: [batch_size, seq_len] × 2")
print(" 示例: SQuAD问答任务")
# 自定义模型架构
import torch.nn as nn
from transformers import AutoModel,AutoConfig
class CustomEnterpriseModel(nn.Module):
def __init__(self,config):
super().__init__()
#加载预训练模型
self.base_model = AutoModel.from_pretrained(config.base_model)
#添加企业特定层
self.domain_adapter = nn.Linear(768,512)
self.task_head = nn.MuduleDict({
'classification'nn.Linear(512,config.num_classes):,
'regression':nn.Linear(512,1),
'generation':nn.Linear(512,config.vocab_size)
})
#企业知识注入层
self.knowledge_fusion = KnowledgeFusionLayer(
external_knowledge = config.knowledge_base
)
def forward(self,input_ids,attention_mask,task_type='classification'):
#基础模型特征提取
outputs = self.base_model(input_ids,attention_mask=attention_mask)
sequence_output = outputs.last_hidden_state
#领域适配
adapted_features = self.domain_adapter(dequence_output[:,0,:])
#知识融合
enhanced_features = self.knowledge_fusion(adapted_features)
#任务特定输出
return self.task_head[task_type](enhanced_features)
#为不同任务添加输出层
from transformers import AutoModel,AutoConfig
import torch.nn as nn
class TaskSpecificModelFactory:
"""任务特定模型工厂"""
@staticmethod
def create_model_for_task(model_name:str,task_type:str,**kwargs):
"""
根据任务创建定制化模型
参数:
model_name: 预训练模型名称
task_type: 任务类型
**kwargs: 任务特定参数
"""
#加载预训练模型配置
config = AutoConfig.from_pretrained(model_name)
if task_type == "text_classification":
return TaskSpecificModelFactory._create_classification_model(
model_name,config,**kwargs
)
elif task_type == "named_entity_recognition":
return TaskSpecificModelFactory._create_ner_model(
model_name,config,**kwargs
)
elif task_type == "question_answering":
return TaskSpecificModelFactory._create_qa_model(
model_name,config,**kwargs
)
elif task_type == "text_generation":
return TaskSpecificModelFactory._create_generation_model(
model_name,config,**kwargs
)
else:
raise ValueError(f"不主持的任务类型:{task_type}")
@staticmethod
def _create_classification_model(model_name, config, num_labels=2, dropout_prob=0.1):
"""创建分类模型"""
class ClassificationModel(nn.Module):
super().__init__()
#加载预训练模型(不包括头部)
self.backbone = AutoModel.from_pretrained(model_name, add_pooling_layer=False)
#获取隐藏层大小
hidden_size = self.backbone.config.hidden_size
#添加分类头
self.dropout = nn.Dropput(dropout_prob)
self.classifier = nn.Linear(hidden_size,num_labels)
#初始化分类头权重
self._init_weights(self.classifier)
def _init_weights(self,module):
"""初始化权重"""
if isinstance(module,nn.Linear):
module.weight.data.normal_(mean=0.0,std=0.02)
if module.bias is not None:
module.bias.data.zero_()
def forward(self,input_ids,attention_mask=None,token_type_ids=None):
#通过预训练模型获取特征
outputs = self.backbone(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids
)
# 取[CLS]标记的隐藏状态(对于分类任务)
# sequence_output形状: [batch_size, seq_len, hidden_size]
sequence_output = outputs.last_hidden_state
# 取第一个token([CLS])的表示
cls_output = sequence_output[:, 0, :]
# 应用dropout
cls_output = self.dropout(cls_output)
# 分类头
logits = self.classifier(cls_output)
return logits
return cl
③、环境配置:安装必要的深度学习框架(如PyTorch、TensorFlow)和模型库(如Hugging Face Transformers)
三、训练模型(微调)
①、设置训练参数:包括学习率(Learning Rate, LR)、批次大小、训练轮数(epochs)等
学习率:优化器更新权重时的步长(每次调整的幅度)
太大,可能跳过最优解(震荡不收敛)太小,训练缓慢,容易陷入局部最优
预训练大模型微调时,学习率通常较小(如1e-5),避免破坏预训练知识。
从头训练小模型时,学习率可能设为1e-3
optimizer = torch.optim.Adam(model.parameters(),lr=1e-4) 学习率0.0001
②、设置优化器和损失函数(Loss Function):例如,使用Adam优化器和交叉熵损失函数等
损失函数:衡量模型预测值与真实值的差距(“误差大小”),指导优化方向
- 分类:交叉熵损失(CrossEntropyLoss,等价于LogSoftmax+NLLLoss)
- 回归:均方误差(MSELoss)、平均绝对误差(MAELoss)
- 序列任务:CTC Loss(语音识别)、交叉熵(机器翻译)
#优化策略
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup
class OptimizedTraining:
def __init__(self,model,config):
self.model = model
self.config = config
def setup_optimization(self):
"""配置优化器和学习率调度"""
#参数分组,不同层使用不同学习率
no_decay = ['bias','LayerNorm.weight']
optimizer_grouped_parameters = [
{
'params': [p for n, p in self.model.named_parameters()
if not any(nd in n for nd in no_decay)],
'weight_decay': self.config.weight_decay,
'lr': self.config.learning_rate
},
{
'params': [p for n, p in self.model.named_parameters()
if any(nd in n for nd in no_decay)],
'weight_decay': 0.0,
'lr': self.config.learning_rate * 0.1
}
]
optimizer = AdamW(
optimizer_grouped_parameters,
lr=self.config.learning_rate,
betas=(0.9,0.999)
)
#学习率调度
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=self.config.warmup_steps,
num_training_steps=self.config.total_steps
)
return optimizer,scheduler
def gradient_checkpointing(self):
"""梯度检查点节省内存"""
if hasattr(self.model,'gradient_checkpointing_enable'):
self.moel.gradient_checkpointing_enable()
③、训练循环:在每个epoch中,将训练数据输入模型、计算损失、反向传播更新参数。同时,使用验证集监控模型性能,防止过拟合(Overfitting)
过拟合:模型过度学习训练数据的噪声(死记硬背),导致在测试集上表现差(泛化能力差)
解决方法:
- 数据增强(Data Augmentation):对文本同义词替换、图像旋转
- 正则化(Regularization):L2正则(权重衰减)、Dropout(随机失活神经元) self.dropout = nn.Dropout(0.5) # 训练时50%神经元失活
- 早停(Early Stopping):验证集指标不再提升时停止训练
- 简化模型:减少层数/神经元数(降低复杂度)
#分布式训练配置
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader,DistributedSampler
class DistributedTrainer:
def __init__(self,config):
self.config = config
self.setup_distributed()
def setup_distributed(self):
"""设置分布式训练环境"""
torch.distributed.init_process_group(backend='nccl')
self.local_rank = torch.distributed.get_rank()
self.world_size = torch.distributed.get_world_size()
def train(self,model,dataset,optimizer):
"""分布式训练循环"""
#数据并行采样
sampler = DistributedSampler(
dataset,
num_replicas = self.world_size,
rank = self.local_rank
)
dataloader = DataLoader(
dataset,
batch_size = self.config.batch_size,
sampler = sampler,
num_workers=4
)
#模型并行
model = DDP(model.to(self.local_rank),device_ids=[self.local_rank])
#混合精度训练
scaler = torch.cuda.amp.GradScaler()
for epoch in range(self.config.epochs):
sampler.set_epoch(epoch)
for batch in dataloader:
with torch.cuda.amp.autocast():
loss = model(**batch).loss
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
import wandb
from torch.utils.tensorboard import SummaryWriter
class TrainingMonitor:
def __init__(self,config):
self.config = config
self.setup_logging()
def setup_logging(self):
"""设置训练监控"""
#WandB集成
wandb.init(
project=config.project_name,
config=config.__dict__
)
#TensorBoard
self.writer = SummaryWriter(log_dir=config.log_dir)
def log_metrics(self,metrics,step):
"""记录训练指标"""
#WandB记录
wandb.log(metircs,step=step)
#TensorBorad记录
for key,value in metrics.items():
self.writer.add_scalar(key,value,step)
#企业监控系统集成
self._send_to_enterprise_monitoring(metrics)
④、模型保存:保存训练好的模型权重
四、评估模型
①、使用测试集评估模型性能。评估指标根据任务而定,例如对于分类任务使用准确率、精确率、召回率、F1分数;对于生成任务可以使用BLEU、ROUGE等
②、进行端到端的测试:模拟真实场景,检查模型输出是否符合预期
# 综合评估框架
from sklearn.metrics import precision_recall_fscore_support,accuracy_score
import numpy as np
class ModelEvaluator:
def __init__(self,model,tokenizer):
self.model = model
self.tokenizer = tokenizer
def evaluate(self,test_dataset):
"""综合评估"""
results = {}
# 基础性能指标
results.update(self._compute_basic_metrics(test_dataset))
# 业务特定指标
results.update(self._compute_business_metrics(test_dataset))
# 偏差检测
results.update(self._detect_bias(test_dataset))
# 鲁棒性测试
results.update(self._robustness_test(test_dataset))
return results
def _compute_basic_metrics(self,dataset):
"""计算基础指标"""
predictions,labels = self._get_predictions(dataset)
metrics = {
'accuracy': accuracy_score(labels, predictions),
'precision': precision_recall_fscore_support(
labels, predictions, average='weighted')[0],
'recall': precision_recall_fscore_support(
labels, predictions, average='weighted')[1],
'f1': precision_recall_fscore_support(
labels, predictions, average='weighted')[2]
}
return metrics
def _detect_bias(self, dataset):
"""检测模型偏差"""
bias_metrics = {}
#按性别、年龄等敏感属性分组评估
for attribute in ['gender','age_group','region']
subgroups = dataset.groupby(attribute)
for name,group in subgroups:
subgroup_metrics = self._compute_basic_metrics(group)
bias_metrics[f'{attribute}_{name}_f1'] = subgroup_metrics['f1']
return {'bias_analysis',bias_metrics}
class ABTesting:
def __init__(self,model_a,model_b):
self.model_a = model_a
self.model_b = model_b
def run_experiment(self,traffic_split=0.5,duration_days=7):
"""运行A/B测试"""
results = {
'model_a':{'conversions':0,'total':0},
'model_b':{'conversions':0,'total':0}
}
#模拟流量分配
for request in self.simulate_traffic(duration_days):
if np.random() > traffic_split:
model = self.model_a
key = 'model_a'
else:
model = self.model_b
key = 'model_b'
prediction = model.predict(request)
results[key]['total'] += 1
if self.is_success(prediction,request):
results[key]['conversions'] += 1
#统计显著性检验
significance = self._calculate_singificance(results)
return {
'results': results,
'significance': significance,
'recommendation': self._make_recommendation(results, significance)
}
前向传播是数据流动的过程,全连接层和激活函数是构建模型的基础,输出层和任务头决定任务类型,学习率和损失函数驱动训练,反向传播和优化器调整权重,而过拟合是需要避免的陷阱。
分类任务:情感分析或垃圾邮件检测,特点时输出离散
生成任务
回归任务
五、部署模型
①、模型导出:
②、选择部署平台:
③、构建API服务
④、考虑性能
# 生产环境部署:deployment.py
from fastapi import FastAPI, HTTPException
import torch
import onnxruntime
from prometheus_client import Counter, Histogram
class ModelDeployment:
def __init__(self, config):
self.app = FastAPI()
self.config = config
self.setup_monitoring()
self.load_model()
self.setup_routes()
def load_model(self):
"""加载优化后的模型"""
if self.config.quantization == 'int8':
# 量化模型加载
self.model = self._load_quantized_model()
elif self.config.optimization == 'onnx':
# ONNX运行时
self.session = onnxruntime.InferenceSession(
"model.onnx",
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
else:
# 标准PyTorch模型
self.model = torch.jit.load("model.pt")
def setup_monitoring(self):
"""设置监控指标"""
self.request_counter = Counter(
'model_requests_total',
'Total number of requests'
)
self.latency_histogram = Histogram(
'model_latency_seconds',
'Request latency in seconds'
)
self.error_counter = Counter(
'model_errors_total',
'Total number of errors'
)
def setup_routes(self):
"""设置API路由"""
@self.app.post("/predict")
async def predict(request: dict):
with self.latency_histogram.time():
try:
self.request_counter.inc()
# 输入验证
validated_input = self.validate_input(request)
# 推理
prediction = self.inference(validated_input)
# 后处理
result = self.post_process(prediction)
return result
except Exception as e:
self.error_counter.inc()
raise HTTPException(status_code=500, detail=str(e))
# 容器化部署:Dockerfile
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04
# 安装依赖
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# 复制应用代码
COPY requirements.txt .
COPY . .
# 安装Python包
RUN pip3 install --no-cache-dir -r requirements.txt
# 下载模型文件
RUN python3 download_model.py
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python3 health_check.py
# 启动服务
EXPOSE 8000
CMD ["uvicorn", "deployment:app", "--host", "0.0.0.0", "--port", "8000"]
# Kubernetes部署配置kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-service
spec:
replicas: 3
selector:
matchLabels:
app: llm-service
template:
metadata:
labels:
app: llm-service
spec:
containers:
- name: llm-container
image: your-registry/llm-service:latest
resources:
limits:
nvidia.com/gpu: 1
memory: "8Gi"
cpu: "4"
requests:
memory: "4Gi"
cpu: "2"
env:
- name: MODEL_PATH
value: "/app/models"
- name: MAX_BATCH_SIZE
value: "16"
ports:
- containerPort: 8000
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: llm-service
spec:
selector:
app: llm-service
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
六、使用模型
①、客户端通过HTTP请求调用API,传入问题,获取答案
#客户端SDK client_sdk.py
import request
import json
from typing import Dict,Any
import backoff
class LLMClient:
def __init__(self,base_url:str,api_key:str):
self.base_url = base_url
self.headers = {
"Authorization":f"Bearer {api_key}",
"Content_type":"application/json"
}
@backoff.on_exception(
backoff.expo,
requests.exceptions.RequestException,
max_tries=3
)
def predict(self,text:str,**kwargs)->Dict[str,Any]:
"""调用模型API"""
payload = {
"text":text,
**kwargs
}
response = requests.post(
f"{self.base_url}/predict",
headers=self.headers,
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()
def batch_predict(self,texts:list,batch_size:int=10):
"""批量预测"""
results = []
for i in range(0,len(texts),batch_size):
batch = texts[i:i_batch_size]
batch_result = self.predict(batch)
results.extend(batch_result)
return results
②、集成到现有系统:例如将模型集成到公司网站、聊天机器人或移动应用中
# enterprise_integration.py
class CRMIntegration:
def __init__(self,llm_client):
self.llm_client = llm_client
def analyze_customer_sentiment(self,customer_id):
"""分析客户情绪"""
conversations = self.get_customer_conversations(customer_id)
#使用LLM分析情绪
sentiment_results = []
for conv in conversations:
analysis = self.llm_client.predict(
text=conv('content'),
task="sentiment_analysis"
)
sentiment_results.append(analysis)
#生成客户洞察报告
insight_report = self.generate_insight_report(sentiment_results)
#更新CRM系统
self.update_crm_profile(customer_id,insight_report)
return insight_report
def automate_response(self,inquiry):
"""自动生成回复"""
#检索相关知识库
relevant_info = self.retrieve_knowledge_base(inquiry)
#生成个性化回复
response = self.llm_client.predict(
text_inquiry,
context=relevant_info,
task="response_generation"
)
#安全过滤
filtered_response = self.safety_filter(response)
return filtered_response
# 实时推理服务realtime_inference.py
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
class RealtimeInterenceService:
def __init__(self,model,max_workers=4):
self.model = model
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.session = None
async def process_stream(self,websocket):
"""处理WebSocket流"""
async for message in websocket:
#异步处理推理请求
result = await asyncio.get_event_loop().run_in_executor(
self.executor,
self._inference,
message
)
await websocket.send_json(result)
def _inference(self,data):
"""执行模型推理"""
with torch.no_grad():
inputs = self.preprocess(data)
outputs = self.model(**inputs)
return self.postprocess(outputs)
③、与Java集成
- 模型部署为HTTP服务(使用Python的FaseAPI框架)
- Java系统通过HTTP客户端调用模型服务
步骤一:使用FastAPI部署模型
假设有一个训练好的模型,并保存为model.pkl(或使用其他格式,如PyTorch的.pt或TensorFlow的SavedModel)
这里以PyTorch模型为例。
步骤二:创建FastAPI应用,包含安装了fastapi、uvicorn、torch等库
步骤三:在Java中调用API
更多推荐


所有评论(0)