主题039:人工智能在反应工程中的应用
人工智能(AI)正在深刻改变化学反应工程的研究与实践方式。从分子设计到过程优化,从故障诊断到自主控制,AI技术为反应工程带来了前所未有的机遇。机器学习(ML):深度学习(DL):生成式AI:问题描述:寻找最优操作条件以最大化目标函数:maxxf(x)\max_{\mathbf{x}} \quad f(\mathbf{x})xmaxf(x)其中x=[T,P,催化剂负载,停留时间,...]\mat
主题039:人工智能在反应工程中的应用
1. 人工智能在反应工程中的概述
人工智能(AI)正在深刻改变化学反应工程的研究与实践方式。从分子设计到过程优化,从故障诊断到自主控制,AI技术为反应工程带来了前所未有的机遇。
1.1 AI技术的核心价值
| 应用领域 | 传统方法挑战 | AI解决方案 | 预期收益 |
|---|---|---|---|
| 催化剂设计 | 试错法耗时 | 生成式AI预测 | 开发周期缩短50-70% |
| 反应优化 | 多参数耦合复杂 | 强化学习 | 产率提升10-20% |
| 过程控制 | 非线性难处理 | 神经网络控制 | 能耗降低5-15% |
| 故障诊断 | 专家经验依赖 | 深度学习分类 | 故障识别率>95% |
| 安全预警 | 滞后响应 | 时序预测模型 | 预警提前量>30分钟 |
1.2 AI技术分类
机器学习(ML):
- 监督学习:回归、分类
- 无监督学习:聚类、降维
- 强化学习:序列决策
深度学习(DL):
- 神经网络(DNN)
- 卷积神经网络(CNN)
- 循环神经网络(RNN/LSTM)
- 图神经网络(GNN)
- Transformer架构
生成式AI:
- 生成对抗网络(GAN)
- 变分自编码器(VAE)
- 扩散模型
- 大语言模型(LLM)
2. 智能优化
2.1 基于机器学习的反应优化
问题描述:
寻找最优操作条件以最大化目标函数:
maxxf(x) \max_{\mathbf{x}} \quad f(\mathbf{x}) xmaxf(x)
其中x=[T,P,催化剂负载,停留时间,...]\mathbf{x} = [T, P, \text{催化剂负载}, \text{停留时间}, ...]x=[T,P,催化剂负载,停留时间,...]
贝叶斯优化:
结合高斯过程代理模型和采集函数:
- 高斯过程先验:
f(x)∼GP(m(x),k(x,x′)) f(\mathbf{x}) \sim \mathcal{GP}(m(\mathbf{x}), k(\mathbf{x}, \mathbf{x}')) f(x)∼GP(m(x),k(x,x′))
- 采集函数(Expected Improvement):
EI(x)=E[max(f(x)−f+,0)] EI(\mathbf{x}) = \mathbb{E}[\max(f(\mathbf{x}) - f^+, 0)] EI(x)=E[max(f(x)−f+,0)]
优势:
- 样本效率高(适合昂贵实验)
- 处理噪声观测
- 平衡探索与利用
2.2 强化学习在过程控制中的应用
马尔可夫决策过程(MDP):
M=(S,A,P,R,γ) \mathcal{M} = (\mathcal{S}, \mathcal{A}, \mathcal{P}, \mathcal{R}, \gamma) M=(S,A,P,R,γ)
状态空间(S):
- 温度、压力、浓度
- 流量、液位
- 历史趋势
动作空间(A):
- 加热/冷却功率
- 进料流量
- 搅拌速度
奖励函数(R):
R=w1⋅产率+w2⋅选择性−w3⋅能耗−w4⋅偏差惩罚 R = w_1 \cdot \text{产率} + w_2 \cdot \text{选择性} - w_3 \cdot \text{能耗} - w_4 \cdot \text{偏差惩罚} R=w1⋅产率+w2⋅选择性−w3⋅能耗−w4⋅偏差惩罚
算法选择:
| 算法 | 适用场景 | 特点 |
|---|---|---|
| DQN | 离散动作 | 经验回放,稳定 |
| PPO | 连续动作 | 策略梯度,易调参 |
| SAC | 连续动作 | 最大熵,探索性好 |
| TD3 | 连续动作 | 双Q网络,减少过估计 |
2.3 神经网络代理模型
用途: 替代昂贵的CFD或详细动力学模拟
架构:
全连接网络(DNN):
y=fNN(x;θ)=WLσ(...σ(W1x+b1)...)+bL \mathbf{y} = f_{NN}(\mathbf{x}; \boldsymbol{\theta}) = W_L \sigma(...\sigma(W_1 \mathbf{x} + b_1)...) + b_L y=fNN(x;θ)=WLσ(...σ(W1x+b1)...)+bL
物理信息神经网络(PINN):
在损失函数中加入物理约束:
L=Ldata+λLphysics \mathcal{L} = \mathcal{L}_{data} + \lambda \mathcal{L}_{physics} L=Ldata+λLphysics
其中:
Lphysics=∥∂c∂t+∇⋅(uc)−∇⋅(D∇c)−r∥2 \mathcal{L}_{physics} = \|\frac{\partial c}{\partial t} + \nabla \cdot (\mathbf{u}c) - \nabla \cdot (D\nabla c) - r\|^2 Lphysics=∥∂t∂c+∇⋅(uc)−∇⋅(D∇c)−r∥2
训练策略:
- 数据增强
- 迁移学习
- 集成学习
3. 故障预测与诊断
3.1 异常检测
统计方法:
- 主成分分析(PCA)
- 独立成分分析(ICA)
- 偏最小二乘(PLS)
机器学习方法:
- 孤立森林(Isolation Forest)
- 单类SVM
- 自编码器(Autoencoder)
深度学习方法:
变分自编码器(VAE):
L=Eqϕ(z∣x)[logpθ(x∣z)]−KL(qϕ(z∣x)∥p(z)) \mathcal{L} = \mathbb{E}_{q_\phi(z|x)}[\log p_\theta(x|z)] - KL(q_\phi(z|x) \| p(z)) L=Eqϕ(z∣x)[logpθ(x∣z)]−KL(qϕ(z∣x)∥p(z))
重构误差用于异常评分。
3.2 故障分类
多分类问题:
y^=argmaxiP(y=i∣x) \hat{y} = \arg\max_i P(y=i|\mathbf{x}) y^=argimaxP(y=i∣x)
常用模型:
- 随机森林
- 梯度提升树(XGBoost, LightGBM)
- 深度神经网络
- 图神经网络(处理拓扑结构)
特征工程:
- 时域特征:均值、方差、峰度
- 频域特征:FFT、小波变换
- 时频特征:STFT、小波包
3.3 剩余使用寿命预测
问题定义:
预测设备从当前状态到失效的时间:
RUL(t)=inf{τ:h(t+τ)≤hthreshold} RUL(t) = \inf\{\tau : h(t+\tau) \leq h_{threshold}\} RUL(t)=inf{τ:h(t+τ)≤hthreshold}
方法:
直接预测:
RUL=fNN(xt−L:t) RUL = f_{NN}(\mathbf{x}_{t-L:t}) RUL=fNN(xt−L:t)
基于退化模型:
D(t)=fLSTM(x1:t) D(t) = f_{LSTM}(\mathbf{x}_{1:t}) D(t)=fLSTM(x1:t)
然后外推到阈值。
4. 自主控制系统
4.1 模型预测控制(MPC)与AI结合
传统MPC:
minu∑k=0N−1(yk−yref)TQ(yk−yref)+ΔukTRΔuk \min_{\mathbf{u}} \sum_{k=0}^{N-1} (\mathbf{y}_k - \mathbf{y}_{ref})^T Q (\mathbf{y}_k - \mathbf{y}_{ref}) + \Delta \mathbf{u}_k^T R \Delta \mathbf{u}_k umink=0∑N−1(yk−yref)TQ(yk−yref)+ΔukTRΔuk
AI增强MPC:
- 神经网络替代机理模型
- 强化学习优化代价函数权重
- 在线自适应更新
4.2 端到端学习控制
直接从传感器到执行器:
ut=πθ(st) \mathbf{u}_t = \pi_\theta(\mathbf{s}_t) ut=πθ(st)
模仿学习:
从专家演示中学习:
minθ∑i∥πθ(si)−aiexpert∥2 \min_\theta \sum_{i} \|\pi_\theta(\mathbf{s}_i) - \mathbf{a}_i^{expert}\|^2 θmini∑∥πθ(si)−aiexpert∥2
优势:
- 无需显式建模
- 可学习复杂策略
- 适应非线性
4.3 安全约束强化学习
约束MDP:
maxπE[∑tγtRt]s.t.E[∑tCt]≤Cmax \max_\pi \mathbb{E}[\sum_t \gamma^t R_t] \quad \text{s.t.} \quad \mathbb{E}[\sum_t C_t] \leq C_{max} πmaxE[t∑γtRt]s.t.E[t∑Ct]≤Cmax
方法:
- 拉格朗日松弛
- 屏障函数
- 投影梯度
5. 分子与催化剂设计
5.1 生成式分子设计
变分自编码器(VAE):
编码器:qϕ(z∣x)q_\phi(z|x)qϕ(z∣x)
解码器:pθ(x∣z)p_\theta(x|z)pθ(x∣z)
在潜在空间优化:
z∗=argmaxzfproperty(z) z^* = \arg\max_z f_{property}(z) z∗=argzmaxfproperty(z)
然后解码为分子。
强化学习生成:
将分子生成视为序列决策:
at∈{添加原子,添加键,终止} a_t \in \{\text{添加原子}, \text{添加键}, \text{终止}\} at∈{添加原子,添加键,终止}
奖励函数:
R=预测活性−λ⋅合成难度 R = \text{预测活性} - \lambda \cdot \text{合成难度} R=预测活性−λ⋅合成难度
5.2 催化剂活性预测
描述符:
- 几何描述符:配位数、键长
- 电子描述符:d带中心、电荷转移
- 环境描述符:温度、压力、pH
图神经网络(GNN):
hi(l+1)=σ(W(l)⋅AGGREGATE({hj(l),j∈N(i)})) h_i^{(l+1)} = \sigma\left(W^{(l)} \cdot \text{AGGREGATE}(\{h_j^{(l)}, j \in \mathcal{N}(i)\})\right) hi(l+1)=σ(W(l)⋅AGGREGATE({hj(l),j∈N(i)}))
晶体图卷积网络(CGCNN):
用于晶体催化剂预测。
5.3 高通量筛选
主动学习策略:
- 用少量数据训练初始模型
- 模型预测所有候选
- 选择不确定性高的候选进行DFT计算
- 更新模型,重复
期望改进采样:
xnext=argmaxxEI(x) \mathbf{x}_{next} = \arg\max_x EI(\mathbf{x}) xnext=argxmaxEI(x)
6. 过程数据分析
6.1 软测量(Soft Sensor)
问题: 关键变量难以在线测量(如产物分布)
解决方案:
y^=fML(xeasy) \hat{y} = f_{ML}(\mathbf{x}_{easy}) y^=fML(xeasy)
方法:
- 偏最小二乘(PLS)
- 支持向量回归(SVR)
- 神经网络
- 高斯过程
6.2 过程监控
多变量统计过程控制(MSPC):
Hotelling’s T2T^2T2统计量:
T2=(x−μ)TS−1(x−μ) T^2 = (\mathbf{x} - \boldsymbol{\mu})^T \mathbf{S}^{-1} (\mathbf{x} - \boldsymbol{\mu}) T2=(x−μ)TS−1(x−μ)
深度学习监控:
- LSTM自编码器
- 注意力机制
- 时序卷积网络
6.3 因果推断
识别因果关系:
P(Y∣do(X))≠P(Y∣X) P(Y|do(X)) \neq P(Y|X) P(Y∣do(X))=P(Y∣X)
方法:
- 结构因果模型
- 因果发现算法(PC, GES)
- 反事实推理
7. 案例研究
7.1 智能反应优化
背景: 多相催化反应,参数空间高维
方法: 贝叶斯优化 + 实验自动化
结果:
- 实验次数减少60%
- 找到全局最优
- 发现意外最优区域
7.2 基于深度学习的故障诊断
背景: 连续搅拌反应器,多种故障模式
数据: 10万条历史记录,20个传感器
模型: 1D-CNN + LSTM
结果:
- 准确率:96.5%
- 召回率:94.2%
- 响应时间:<1秒
7.3 强化学习控制聚合反应
背景: 放热聚合反应,温度控制困难
算法: SAC(Soft Actor-Critic)
训练: 在数字孪生中预训练,现场微调
结果:
- 温度波动减少40%
- 产物质量一致性提高
- 能耗降低8%
8. Python实现
8.1 神经网络代理模型
import torch
import torch.nn as nn
class ReactorSurrogateModel(nn.Module):
"""
反应器神经网络代理模型
"""
def __init__(self, input_dim, output_dim, hidden_dims=[128, 256, 128]):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim),
nn.Dropout(0.1)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
class PhysicsInformedNN(nn.Module):
"""
物理信息神经网络(PINN)
"""
def __init__(self, layers):
super().__init__()
self.network = self.build_network(layers)
def build_network(self, layers):
"""构建网络"""
modules = []
for i in range(len(layers) - 1):
modules.append(nn.Linear(layers[i], layers[i+1]))
if i < len(layers) - 2:
modules.append(nn.Tanh())
return nn.Sequential(*modules)
def forward(self, t, x):
"""前向传播"""
inputs = torch.cat([t, x], dim=1)
return self.network(inputs)
def physics_loss(self, t, x, c):
"""
物理约束损失(对流-扩散-反应方程)
"""
c_pred = self.forward(t, x)
# 计算梯度
c_t = torch.autograd.grad(c_pred, t, grad_outputs=torch.ones_like(c_pred),
create_graph=True)[0]
c_x = torch.autograd.grad(c_pred, x, grad_outputs=torch.ones_like(c_pred),
create_graph=True)[0]
c_xx = torch.autograd.grad(c_x, x, grad_outputs=torch.ones_like(c_x),
create_graph=True)[0]
# 残差:dc/dt + u*dc/dx - D*d2c/dx2 - r = 0
u, D, k = 1.0, 0.1, 1.0
r = k * c_pred # 一级反应
residual = c_t + u * c_x - D * c_xx - r
return torch.mean(residual**2)
8.2 贝叶斯优化
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize
class BayesianOptimizer:
"""
贝叶斯优化器
"""
def __init__(self, bounds, kernel='rbf'):
"""
Parameters:
-----------
bounds : list of tuples
参数边界 [(x1_min, x1_max), (x2_min, x2_max), ...]
kernel : str
核函数类型
"""
self.bounds = bounds
self.n_params = len(bounds)
self.X = []
self.y = []
def rbf_kernel(self, x1, x2, length_scale=1.0, sigma_f=1.0):
"""RBF核函数"""
sqdist = np.sum(x1**2, 1).reshape(-1, 1) + \
np.sum(x2**2, 1) - 2 * np.dot(x1, x2.T)
return sigma_f**2 * np.exp(-0.5 / length_scale**2 * sqdist)
def fit_gp(self):
"""拟合高斯过程"""
X = np.array(self.X)
y = np.array(self.y).reshape(-1, 1)
self.K = self.rbf_kernel(X, X) + 1e-8 * np.eye(len(X))
self.K_inv = np.linalg.inv(self.K)
self.y_mean = np.mean(y)
self.y_std = np.std(y) + 1e-8
self.y_norm = (y - self.y_mean) / self.y_std
def predict(self, X_new):
"""预测均值和方差"""
X = np.array(self.X)
X_new = np.atleast_2d(X_new)
k_s = self.rbf_kernel(X, X_new)
k_ss = self.rbf_kernel(X_new, X_new) + 1e-8 * np.eye(len(X_new))
mu = k_s.T.dot(self.K_inv).dot(self.y_norm)
mu = mu * self.y_std + self.y_mean
var = np.diag(k_ss - k_s.T.dot(self.K_inv).dot(k_s))
return mu.flatten(), var
def expected_improvement(self, X, xi=0.01):
"""期望改进采集函数"""
mu, var = self.predict(X)
sigma = np.sqrt(var + 1e-9)
y_best = np.max(self.y)
with np.errstate(divide='warn'):
imp = mu - y_best - xi
Z = imp / sigma
ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
ei[sigma == 0.0] = 0.0
return ei
def suggest_next_point(self):
"""建议下一个采样点"""
self.fit_gp()
# 随机采样候选点
n_candidates = 1000
candidates = np.random.rand(n_candidates, self.n_params)
for i, (low, high) in enumerate(self.bounds):
candidates[:, i] = candidates[:, i] * (high - low) + low
# 计算EI
ei_values = self.expected_improvement(candidates)
# 返回EI最大的点
return candidates[np.argmax(ei_values)]
def optimize(self, objective_func, n_iterations=50):
"""
运行优化
Parameters:
-----------
objective_func : callable
目标函数
n_iterations : int
迭代次数
"""
# 初始随机采样
n_initial = 5
for _ in range(n_initial):
x = np.array([np.random.uniform(low, high)
for low, high in self.bounds])
y = objective_func(x)
self.X.append(x)
self.y.append(y)
# 贝叶斯优化迭代
for i in range(n_iterations - n_initial):
x_next = self.suggest_next_point()
y_next = objective_func(x_next)
self.X.append(x_next)
self.y.append(y_next)
print(f"Iteration {i+1}: x={x_next}, y={y_next:.4f}")
# 返回最优解
best_idx = np.argmax(self.y)
return self.X[best_idx], self.y[best_idx]
8.3 强化学习控制器
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class Actor(nn.Module):
"""策略网络"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Tanh() # 输出范围[-1, 1]
)
def forward(self, state):
return self.network(state)
class Critic(nn.Module):
"""价值网络"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
return self.network(x)
class DDPGAgent:
"""
DDPG(深度确定性策略梯度)智能体
"""
def __init__(self, state_dim, action_dim, action_bounds):
self.state_dim = state_dim
self.action_dim = action_dim
self.action_bounds = action_bounds
# Actor网络
self.actor = Actor(state_dim, action_dim)
self.actor_target = Actor(state_dim, action_dim)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
# Critic网络
self.critic = Critic(state_dim, action_dim)
self.critic_target = Critic(state_dim, action_dim)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
# 软更新目标网络
self.tau = 0.005
self.update_target_networks()
# 经验回放缓冲区
self.replay_buffer = deque(maxlen=100000)
self.batch_size = 64
self.gamma = 0.99
def update_target_networks(self):
"""软更新目标网络"""
for param, target_param in zip(self.actor.parameters(),
self.actor_target.parameters()):
target_param.data.copy_(self.tau * param.data +
(1 - self.tau) * target_param.data)
for param, target_param in zip(self.critic.parameters(),
self.critic_target.parameters()):
target_param.data.copy_(self.tau * param.data +
(1 - self.tau) * target_param.data)
def select_action(self, state, noise=0.1):
"""选择动作"""
state = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state).detach().numpy()[0]
# 添加探索噪声
action += np.random.normal(0, noise, size=action.shape)
action = np.clip(action, -1, 1)
# 映射到实际动作范围
action_scaled = []
for i, (low, high) in enumerate(self.action_bounds):
action_scaled.append((action[i] + 1) / 2 * (high - low) + low)
return np.array(action_scaled)
def train(self):
"""训练网络"""
if len(self.replay_buffer) < self.batch_size:
return
# 采样批次
batch = random.sample(self.replay_buffer, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(states)
actions = torch.FloatTensor(actions)
rewards = torch.FloatTensor(rewards).unsqueeze(1)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones).unsqueeze(1)
# 更新Critic
with torch.no_grad():
next_actions = self.actor_target(next_states)
target_q = self.critic_target(next_states, next_actions)
target_q = rewards + (1 - dones) * self.gamma * target_q
current_q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_q, target_q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新Actor
predicted_actions = self.actor(states)
actor_loss = -self.critic(states, predicted_actions).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 软更新目标网络
self.update_target_networks()
8.4 异常检测
import torch
import torch.nn as nn
class Autoencoder(nn.Module):
"""
自编码器用于异常检测
"""
def __init__(self, input_dim, encoding_dim=8):
super().__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, encoding_dim)
)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(encoding_dim, 16),
nn.ReLU(),
nn.Linear(16, 32),
nn.ReLU(),
nn.Linear(32, input_dim)
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
def reconstruction_error(self, x):
"""计算重构误差"""
reconstructed = self.forward(x)
return torch.mean((x - reconstructed)**2, dim=1)
class AnomalyDetector:
"""
基于自编码器的异常检测器
"""
def __init__(self, input_dim, encoding_dim=8, threshold_percentile=95):
self.model = Autoencoder(input_dim, encoding_dim)
self.threshold_percentile = threshold_percentile
self.threshold = None
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
def train(self, X_normal, epochs=100):
"""
使用正常数据训练
Parameters:
-----------
X_normal : array
正常工况数据
epochs : int
训练轮数
"""
X = torch.FloatTensor(X_normal)
for epoch in range(epochs):
self.optimizer.zero_grad()
reconstructed = self.model(X)
loss = nn.MSELoss()(reconstructed, X)
loss.backward()
self.optimizer.step()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.6f}")
# 设置阈值
with torch.no_grad():
errors = self.model.reconstruction_error(X).numpy()
self.threshold = np.percentile(errors, self.threshold_percentile)
print(f"Threshold set to: {self.threshold:.6f}")
def detect(self, X):
"""
检测异常
Returns:
--------
is_anomaly : array
异常标记
scores : array
异常分数
"""
X = torch.FloatTensor(X)
with torch.no_grad():
scores = self.model.reconstruction_error(X).numpy()
is_anomaly = scores > self.threshold
return is_anomaly, scores
9. 实施挑战与最佳实践
9.1 数据挑战
| 挑战 | 影响 | 解决方案 |
|---|---|---|
| 数据稀缺 | 模型欠拟合 | 迁移学习、数据增强、主动学习 |
| 数据质量差 | 噪声预测 | 数据清洗、异常值处理、鲁棒模型 |
| 标签不平衡 | 分类偏差 | 重采样、代价敏感学习、集成方法 |
| 概念漂移 | 性能下降 | 在线学习、模型更新机制 |
9.2 模型可解释性
重要性:
- 监管要求
- 操作员信任
- 故障分析
方法:
- SHAP值
- LIME
- 注意力可视化
- 概念激活向量(CAV)
9.3 部署考虑
边缘部署:
- 模型压缩(剪枝、量化)
- 轻量化架构
- 推理优化
安全与可靠性:
- 输入验证
- 模型监控
- 回退机制
- 人机协作
10. 未来展望
10.1 技术趋势
- 基础模型:化学领域大模型
- 神经符号AI:结合知识与数据
- 量子机器学习:量子化学计算
- 联邦学习:跨企业协作
10.2 应用拓展
- 自主实验室:AI+机器人
- 闭环发现:设计-合成-测试-分析自动化
- 数字孪生增强:AI驱动的数字孪生
10.3 挑战与机遇
挑战:
- 数据共享与隐私
- 标准化与互操作
- 人才培养
机遇:
- 加速创新
- 降本增效
- 可持续发展
11. 总结
人工智能为化学反应工程带来了革命性的变革:
- 智能优化:贝叶斯优化、强化学习实现高效过程优化
- 预测维护:深度学习实现故障早期预警
- 自主控制:AI控制器实现复杂过程自动化
- 分子设计:生成式AI加速催化剂开发
- 数据分析:软测量、过程监控提升操作水平
关键成功因素
- 高质量数据基础设施
- 领域知识与AI技术结合
- 渐进式实施策略
- 持续迭代优化
展望
AI将成为反应工程的标准工具,推动行业向智能化、自主化方向发展,实现更安全、更高效、更可持续的化学制造。
"""
主题039:人工智能在反应工程中的应用 - 仿真代码
包含:神经网络代理模型、贝叶斯优化、强化学习控制、异常检测
"""
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from scipy.stats import norm
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
print("="*60)
print("主题039:人工智能在反应工程中的应用仿真")
print("="*60)
# ============================================
# 1. 神经网络代理模型(简化版,使用NumPy实现)
# ============================================
print("\n" + "="*60)
print("1. 神经网络代理模型")
print("="*60)
class SimpleNeuralNetwork:
"""简化版神经网络代理模型"""
def __init__(self, input_size, hidden_size, output_size):
# 初始化权重
self.W1 = np.random.randn(input_size, hidden_size) * 0.1
self.b1 = np.zeros(hidden_size)
self.W2 = np.random.randn(hidden_size, output_size) * 0.1
self.b2 = np.zeros(output_size)
self.loss_history = []
def relu(self, x):
return np.maximum(0, x)
def forward(self, X):
"""前向传播"""
self.z1 = np.dot(X, self.W1) + self.b1
self.a1 = self.relu(self.z1)
self.z2 = np.dot(self.a1, self.W2) + self.b2
return self.z2
def backward(self, X, y, learning_rate=0.01):
"""反向传播"""
m = X.shape[0]
# 输出层梯度
dz2 = self.z2 - y
dW2 = np.dot(self.a1.T, dz2) / m
db2 = np.sum(dz2, axis=0) / m
# 隐藏层梯度
da1 = np.dot(dz2, self.W2.T)
dz1 = da1 * (self.z1 > 0).astype(float)
dW1 = np.dot(X.T, dz1) / m
db1 = np.sum(dz1, axis=0) / m
# 更新权重
self.W2 -= learning_rate * dW2
self.b2 -= learning_rate * db2
self.W1 -= learning_rate * dW1
self.b1 -= learning_rate * db1
def train(self, X, y, epochs=1000, learning_rate=0.01):
"""训练网络"""
for epoch in range(epochs):
output = self.forward(X)
loss = np.mean((output - y)**2)
self.loss_history.append(loss)
self.backward(X, y, learning_rate)
if (epoch + 1) % 200 == 0:
print(f" Epoch {epoch+1}/{epochs}, Loss: {loss:.6f}")
def predict(self, X):
"""预测"""
return self.forward(X).flatten()
# 生成训练数据:CSTR反应器模型
def cstr_model(y, t, T, Q, V, k0, Ea, dH, rho, Cp, UA, Tc):
"""CSTR模型"""
cA, T_react = y
R = 8.314
# 反应速率
k = k0 * np.exp(-Ea / (R * T_react))
r = k * cA
# 物料平衡
dcA_dt = Q/V * (1000 - cA) - r
# 能量平衡
dT_dt = Q/V * (T - T_react) + (-dH)/(rho*Cp) * r - UA/(V*rho*Cp) * (T_react - Tc)
return [dcA_dt, dT_dt]
# 生成数据集
print("\n生成训练数据...")
np.random.seed(42)
n_samples = 500
# 输入:温度、流量、冷却温度
X_train = np.column_stack([
np.random.uniform(300, 400, n_samples), # 进料温度
np.random.uniform(0.005, 0.02, n_samples), # 流量
np.random.uniform(280, 320, n_samples) # 冷却温度
])
# 输出:稳态浓度和温度
y_train = []
for i in range(n_samples):
T, Q, Tc = X_train[i]
y0 = [500, T]
sol = odeint(cstr_model, y0, [0, 1000],
args=(T, Q, 1.0, 1e10, 8000, -50000, 1000, 4180, 5000, Tc))
y_train.append(sol[-1])
y_train = np.array(y_train)
# 训练神经网络代理模型
print("\n训练神经网络代理模型...")
nn_model = SimpleNeuralNetwork(input_size=3, hidden_size=20, output_size=2)
nn_model.train(X_train, y_train, epochs=1000, learning_rate=0.01)
# 测试代理模型
print("\n测试代理模型...")
X_test = np.array([[350, 0.01, 300], [380, 0.015, 290]])
nn_pred = nn_model.predict(X_test)
print("\n代理模型预测 vs 实际模拟:")
for i, x in enumerate(X_test):
T, Q, Tc = x
y0 = [500, T]
sol = odeint(cstr_model, y0, [0, 1000],
args=(T, Q, 1.0, 1e10, 8000, -50000, 1000, 4180, 5000, Tc))
actual = sol[-1]
print(f" 输入: T={T}K, Q={Q:.4f}m³/s, Tc={Tc}K")
print(f" 预测: cA={nn_pred[i*2]:.2f}, T={nn_pred[i*2+1]:.2f}")
print(f" 实际: cA={actual[0]:.2f}, T={actual[1]:.2f}")
# 绘制训练损失
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].semilogy(nn_model.loss_history)
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss (log scale)')
axes[0].set_title('Neural Network Training Loss')
axes[0].grid(True)
# 绘制预测vs实际
y_pred_all = nn_model.predict(X_train)
axes[1].scatter(y_train[:, 0], y_pred_all[::2], alpha=0.5, label='Concentration')
axes[1].plot([0, 1000], [0, 1000], 'r--', label='Perfect prediction')
axes[1].set_xlabel('Actual')
axes[1].set_ylabel('Predicted')
axes[1].set_title('Surrogate Model: Predicted vs Actual')
axes[1].legend()
axes[1].grid(True)
plt.tight_layout()
plt.savefig('neural_network_surrogate.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 neural_network_surrogate.png")
# ============================================
# 2. 贝叶斯优化
# ============================================
print("\n" + "="*60)
print("2. 贝叶斯优化")
print("="*60)
class BayesianOptimizer:
"""贝叶斯优化器"""
def __init__(self, bounds):
self.bounds = bounds
self.n_params = len(bounds)
self.X = []
self.y = []
def rbf_kernel(self, x1, x2, length_scale=1.0, sigma_f=1.0):
"""RBF核函数"""
if len(x1.shape) == 1:
x1 = x1.reshape(1, -1)
if len(x2.shape) == 1:
x2 = x2.reshape(1, -1)
sqdist = np.sum(x1**2, 1).reshape(-1, 1) + \
np.sum(x2**2, 1) - 2 * np.dot(x1, x2.T)
return sigma_f**2 * np.exp(-0.5 / length_scale**2 * sqdist)
def fit_gp(self):
"""拟合高斯过程"""
X = np.array(self.X)
y = np.array(self.y).reshape(-1, 1)
self.K = self.rbf_kernel(X, X) + 1e-5 * np.eye(len(X))
try:
self.K_inv = np.linalg.inv(self.K)
except:
self.K_inv = np.linalg.pinv(self.K)
self.y_mean = np.mean(y)
self.y_std = np.std(y) + 1e-8
self.y_norm = (y - self.y_mean) / self.y_std
def predict(self, X_new):
"""预测均值和方差"""
X = np.array(self.X)
X_new = np.atleast_2d(X_new)
k_s = self.rbf_kernel(X, X_new)
k_ss = self.rbf_kernel(X_new, X_new) + 1e-5 * np.eye(len(X_new))
mu = k_s.T.dot(self.K_inv).dot(self.y_norm)
mu = mu * self.y_std + self.y_mean
var = np.diag(k_ss - k_s.T.dot(self.K_inv).dot(k_s))
var = np.maximum(var, 0)
return mu.flatten(), var
def expected_improvement(self, X, xi=0.01):
"""期望改进采集函数"""
mu, var = self.predict(X)
sigma = np.sqrt(var + 1e-9)
y_best = np.max(self.y)
with np.errstate(divide='ignore'):
imp = mu - y_best - xi
Z = imp / sigma
Z = np.where(sigma == 0, 0, Z)
ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
ei[sigma == 0.0] = 0.0
return ei
def suggest_next_point(self):
"""建议下一个采样点"""
self.fit_gp()
# 随机采样候选点
n_candidates = 500
candidates = np.random.rand(n_candidates, self.n_params)
for i, (low, high) in enumerate(self.bounds):
candidates[:, i] = candidates[:, i] * (high - low) + low
# 计算EI
ei_values = self.expected_improvement(candidates)
return candidates[np.argmax(ei_values)]
def optimize(self, objective_func, n_iterations=30):
"""运行优化"""
print(f"\n开始贝叶斯优化 ({n_iterations} iterations)...")
# 初始随机采样
n_initial = 5
for i in range(n_initial):
x = np.array([np.random.uniform(low, high)
for low, high in self.bounds])
y = objective_func(x)
self.X.append(x)
self.y.append(y)
print(f" Initial {i+1}: x={x}, y={y:.4f}")
# 贝叶斯优化迭代
for i in range(n_iterations - n_initial):
x_next = self.suggest_next_point()
y_next = objective_func(x_next)
self.X.append(x_next)
self.y.append(y_next)
if (i + 1) % 5 == 0:
print(f" Iteration {i+1}: x={x_next}, y={y_next:.4f}")
# 返回最优解
best_idx = np.argmax(self.y)
return self.X[best_idx], self.y[best_idx]
# 定义目标函数:最大化产率
def objective_yield(params):
"""目标函数:最大化产率"""
T, Q, Tc = params
# 使用代理模型快速评估
x = np.array([[T, Q, Tc]])
pred = nn_model.predict(x)
cA, T_react = pred[0], pred[1]
# 产率 = 转化率 * 选择性(简化模型)
conversion = (1000 - cA) / 1000
# 选择性随温度变化(存在最优温度)
selectivity = np.exp(-((T_react - 370)**2) / (2 * 30**2))
# 产率
yield_val = conversion * selectivity
return yield_val
# 运行贝叶斯优化
bounds = [(300, 400), (0.005, 0.02), (280, 320)]
bo = BayesianOptimizer(bounds)
best_params, best_yield = bo.optimize(objective_yield, n_iterations=30)
print(f"\n优化结果:")
print(f" 最优参数: T={best_params[0]:.2f}K, Q={best_params[1]:.5f}m³/s, Tc={best_params[2]:.2f}K")
print(f" 最优产率: {best_yield:.4f}")
# 绘制优化过程
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].plot(bo.y, 'b-o', markersize=4)
axes[0].axhline(y=best_yield, color='r', linestyle='--', label=f'Best: {best_yield:.4f}')
axes[0].set_xlabel('Iteration')
axes[0].set_ylabel('Objective Value (Yield)')
axes[0].set_title('Bayesian Optimization Progress')
axes[0].legend()
axes[0].grid(True)
# 绘制累积最优
best_so_far = np.maximum.accumulate(bo.y)
axes[1].plot(best_so_far, 'g-s', markersize=4)
axes[1].set_xlabel('Iteration')
axes[1].set_ylabel('Best Value So Far')
axes[1].set_title('Cumulative Best Value')
axes[1].grid(True)
plt.tight_layout()
plt.savefig('bayesian_optimization.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 bayesian_optimization.png")
# ============================================
# 3. 强化学习控制器(简化版Q-learning)
# ============================================
print("\n" + "="*60)
print("3. 强化学习控制器")
print("="*60)
class ReactorEnv:
"""反应器环境"""
def __init__(self):
self.T_target = 370 # 目标温度
self.cA_target = 200 # 目标浓度
self.T = 350
self.cA = 500
self.time = 0
self.dt = 1.0
def reset(self):
self.T = 350 + np.random.randn() * 10
self.cA = 500 + np.random.randn() * 50
self.time = 0
return self._get_state()
def _get_state(self):
"""获取状态(离散化)"""
T_bin = int(np.clip((self.T - 300) / 10, 0, 9))
cA_bin = int(np.clip((self.cA - 0) / 100, 0, 9))
return T_bin * 10 + cA_bin
def step(self, action):
"""执行动作"""
# 动作:0=减少加热, 1=保持, 2=增加加热
power_change = (action - 1) * 50 # -50, 0, +50 kW
# 简化动力学
dT = (power_change / 1000) * self.dt - 0.1 * (self.T - 300) * self.dt
self.T += dT
# 浓度变化(简化)
k = 0.1 * np.exp(0.05 * (self.T - 350))
self.cA -= k * self.cA * self.dt
self.cA = max(0, self.cA)
self.time += self.dt
# 计算奖励
T_error = abs(self.T - self.T_target)
cA_error = abs(self.cA - self.cA_target)
reward = -0.1 * T_error - 0.01 * cA_error - 0.1 * abs(power_change)
# 终止条件
done = self.time > 100 or T_error > 100
return self._get_state(), reward, done
class QLearningAgent:
"""Q-learning智能体"""
def __init__(self, n_states=100, n_actions=3, learning_rate=0.1, gamma=0.95):
self.q_table = np.zeros((n_states, n_actions))
self.lr = learning_rate
self.gamma = gamma
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
def select_action(self, state):
"""选择动作(ε-贪婪)"""
if np.random.rand() < self.epsilon:
return np.random.randint(3)
else:
return np.argmax(self.q_table[state])
def update(self, state, action, reward, next_state):
"""更新Q表"""
best_next = np.max(self.q_table[next_state])
self.q_table[state, action] += self.lr * (reward + self.gamma * best_next - self.q_table[state, action])
def decay_epsilon(self):
"""衰减探索率"""
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# 训练强化学习控制器
print("\n训练Q-learning控制器...")
env = ReactorEnv()
agent = QLearningAgent()
n_episodes = 500
rewards_history = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
action = agent.select_action(state)
next_state, reward, done = env.step(action)
agent.update(state, action, reward, next_state)
state = next_state
total_reward += reward
agent.decay_epsilon()
rewards_history.append(total_reward)
if (episode + 1) % 100 == 0:
print(f" Episode {episode+1}: Total Reward = {total_reward:.2f}, Epsilon = {agent.epsilon:.3f}")
print("\n训练完成!")
# 测试训练好的控制器
print("\n测试训练好的控制器...")
state = env.reset()
T_history = [env.T]
cA_history = [env.cA]
action_history = []
for _ in range(100):
action = np.argmax(agent.q_table[state]) # 贪婪策略
state, _, done = env.step(action)
T_history.append(env.T)
cA_history.append(env.cA)
action_history.append(action)
if done:
break
print(f" 最终温度: {T_history[-1]:.2f}K (目标: {env.T_target}K)")
print(f" 最终浓度: {cA_history[-1]:.2f} (目标: {env.cA_target})")
# 绘制强化学习结果
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 奖励历史
axes[0, 0].plot(rewards_history, alpha=0.3)
window = 20
if len(rewards_history) >= window:
smoothed = np.convolve(rewards_history, np.ones(window)/window, mode='valid')
axes[0, 0].plot(range(window-1, len(rewards_history)), smoothed, 'r-', linewidth=2, label='Smoothed')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Total Reward')
axes[0, 0].set_title('RL Training Progress')
axes[0, 0].legend()
axes[0, 0].grid(True)
# 温度控制
axes[0, 1].plot(T_history, 'b-', label='Temperature')
axes[0, 1].axhline(y=env.T_target, color='r', linestyle='--', label='Target')
axes[0, 1].set_xlabel('Time Step')
axes[0, 1].set_ylabel('Temperature (K)')
axes[0, 1].set_title('Temperature Control')
axes[0, 1].legend()
axes[0, 1].grid(True)
# 浓度控制
axes[1, 0].plot(cA_history, 'g-', label='Concentration')
axes[1, 0].axhline(y=env.cA_target, color='r', linestyle='--', label='Target')
axes[1, 0].set_xlabel('Time Step')
axes[1, 0].set_ylabel('Concentration')
axes[1, 0].set_title('Concentration Control')
axes[1, 0].legend()
axes[1, 0].grid(True)
# Q表可视化
im = axes[1, 1].imshow(agent.q_table, aspect='auto', cmap='RdYlGn')
axes[1, 1].set_xlabel('Action')
axes[1, 1].set_ylabel('State')
axes[1, 1].set_title('Q-Table Visualization')
axes[1, 1].set_xticks([0, 1, 2])
axes[1, 1].set_xticklabels(['Cool', 'Hold', 'Heat'])
plt.colorbar(im, ax=axes[1, 1])
plt.tight_layout()
plt.savefig('reinforcement_learning_control.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 reinforcement_learning_control.png")
# ============================================
# 4. 异常检测
# ============================================
print("\n" + "="*60)
print("4. 异常检测")
print("="*60)
class AnomalyDetector:
"""基于统计的异常检测器"""
def __init__(self, n_features=3, threshold_factor=3.0):
self.mean = None
self.std = None
self.threshold_factor = threshold_factor
self.threshold = None
def fit(self, X_normal):
"""使用正常数据拟合"""
self.mean = np.mean(X_normal, axis=0)
self.std = np.std(X_normal, axis=0) + 1e-8
# 计算马氏距离阈值
distances = self._mahalanobis_distance(X_normal)
self.threshold = np.percentile(distances, 95) * self.threshold_factor
print(f" 异常检测阈值: {self.threshold:.2f}")
def _mahalanobis_distance(self, X):
"""计算标准化欧氏距离"""
normalized = (X - self.mean) / self.std
return np.sqrt(np.sum(normalized**2, axis=1))
def detect(self, X):
"""检测异常"""
distances = self._mahalanobis_distance(X)
is_anomaly = distances > self.threshold
return is_anomaly, distances
# 生成正常和异常数据
print("\n生成过程数据...")
np.random.seed(123)
# 正常数据
n_normal = 800
X_normal = np.column_stack([
np.random.normal(370, 5, n_normal), # 温度
np.random.normal(200, 20, n_normal), # 浓度
np.random.normal(0.01, 0.002, n_normal) # 流量
])
# 异常数据(故障注入)
n_anomaly = 200
X_anomaly = np.column_stack([
np.random.normal(390, 10, n_anomaly), # 温度异常升高
np.random.normal(300, 30, n_anomaly), # 浓度异常
np.random.normal(0.005, 0.001, n_anomaly) # 流量异常
])
# 合并数据
X_all = np.vstack([X_normal, X_anomaly])
y_true = np.array([0]*n_normal + [1]*n_anomaly) # 0=正常, 1=异常
# 训练检测器
detector = AnomalyDetector()
detector.fit(X_normal)
# 检测
is_anomaly, scores = detector.detect(X_all)
# 评估性能
true_positives = np.sum((is_anomaly == True) & (y_true == 1))
false_positives = np.sum((is_anomaly == True) & (y_true == 0))
true_negatives = np.sum((is_anomaly == False) & (y_true == 0))
false_negatives = np.sum((is_anomaly == False) & (y_true == 1))
accuracy = (true_positives + true_negatives) / len(y_true)
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
print(f"\n异常检测性能:")
print(f" 准确率: {accuracy:.3f}")
print(f" 精确率: {precision:.3f}")
print(f" 召回率: {recall:.3f}")
print(f" F1分数: {f1:.3f}")
# 绘制异常检测结果
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 异常分数
axes[0, 0].plot(scores, 'b-', alpha=0.7, label='Anomaly Score')
axes[0, 0].axhline(y=detector.threshold, color='r', linestyle='--', label=f'Threshold')
axes[0, 0].fill_between(range(len(scores)), 0, detector.threshold, alpha=0.2, color='green', label='Normal Zone')
axes[0, 0].set_xlabel('Sample Index')
axes[0, 0].set_ylabel('Anomaly Score')
axes[0, 0].set_title('Anomaly Detection Scores')
axes[0, 0].legend()
axes[0, 0].grid(True)
# 散点图:温度 vs 浓度
colors = ['green' if not a else 'red' for a in is_anomaly]
axes[0, 1].scatter(X_all[:, 0], X_all[:, 1], c=colors, alpha=0.5, s=20)
axes[0, 1].set_xlabel('Temperature (K)')
axes[0, 1].set_ylabel('Concentration')
axes[0, 1].set_title('Anomaly Detection: Temperature vs Concentration')
axes[0, 1].grid(True)
# ROC曲线(简化版)
thresholds = np.linspace(0, np.max(scores), 100)
tpr_list = [] # True Positive Rate
fpr_list = [] # False Positive Rate
for thresh in thresholds:
pred = scores > thresh
tp = np.sum((pred == True) & (y_true == 1))
fp = np.sum((pred == True) & (y_true == 0))
tn = np.sum((pred == False) & (y_true == 0))
fn = np.sum((pred == False) & (y_true == 1))
tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
tpr_list.append(tpr)
fpr_list.append(fpr)
axes[1, 0].plot(fpr_list, tpr_list, 'b-', linewidth=2)
axes[1, 0].plot([0, 1], [0, 1], 'k--', alpha=0.5)
axes[1, 0].set_xlabel('False Positive Rate')
axes[1, 0].set_ylabel('True Positive Rate')
axes[1, 0].set_title('ROC Curve')
axes[1, 0].grid(True)
# 混淆矩阵
confusion = np.array([[true_negatives, false_positives],
[false_negatives, true_positives]])
im = axes[1, 1].imshow(confusion, cmap='Blues')
axes[1, 1].set_xticks([0, 1])
axes[1, 1].set_yticks([0, 1])
axes[1, 1].set_xticklabels(['Normal', 'Anomaly'])
axes[1, 1].set_yticklabels(['Normal', 'Anomaly'])
axes[1, 1].set_xlabel('Predicted')
axes[1, 1].set_ylabel('Actual')
axes[1, 1].set_title('Confusion Matrix')
# 添加数值标签
for i in range(2):
for j in range(2):
text = axes[1, 1].text(j, i, confusion[i, j],
ha="center", va="center", color="black", fontsize=12)
plt.colorbar(im, ax=axes[1, 1])
plt.tight_layout()
plt.savefig('anomaly_detection.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 anomaly_detection.png")
# ============================================
# 5. 综合对比图
# ============================================
print("\n" + "="*60)
print("5. 生成综合对比图")
print("="*60)
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
# 1. 代理模型准确性
y_pred_all = nn_model.predict(X_train)
axes[0, 0].scatter(y_train[:, 0], y_pred_all[::2], alpha=0.5, c='blue', s=20)
axes[0, 0].plot([0, 1000], [0, 1000], 'r--', linewidth=2)
axes[0, 0].set_xlabel('Actual Concentration')
axes[0, 0].set_ylabel('Predicted Concentration')
axes[0, 0].set_title('Surrogate Model Accuracy')
axes[0, 0].grid(True)
# 2. 贝叶斯优化收敛
axes[0, 1].plot(bo.y, 'b-o', markersize=3, alpha=0.7)
axes[0, 1].axhline(y=best_yield, color='r', linestyle='--', linewidth=2)
axes[0, 1].set_xlabel('Iteration')
axes[0, 1].set_ylabel('Yield')
axes[0, 1].set_title('Bayesian Optimization Convergence')
axes[0, 1].grid(True)
# 3. RL控制效果
axes[0, 2].plot(T_history, 'b-', label='Temperature', linewidth=2)
axes[0, 2].axhline(y=env.T_target, color='r', linestyle='--', label='Target', linewidth=2)
axes[0, 2].set_xlabel('Time Step')
axes[0, 2].set_ylabel('Temperature (K)')
axes[0, 2].set_title('RL Temperature Control')
axes[0, 2].legend()
axes[0, 2].grid(True)
# 4. 异常检测分布
axes[1, 0].hist(scores[y_true==0], bins=30, alpha=0.7, label='Normal', color='green')
axes[1, 0].hist(scores[y_true==1], bins=30, alpha=0.7, label='Anomaly', color='red')
axes[1, 0].axvline(x=detector.threshold, color='black', linestyle='--', linewidth=2, label='Threshold')
axes[1, 0].set_xlabel('Anomaly Score')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].set_title('Anomaly Score Distribution')
axes[1, 0].legend()
axes[1, 2].grid(True)
# 5. 训练损失
axes[1, 1].semilogy(nn_model.loss_history, 'b-', linewidth=1)
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Loss (log scale)')
axes[1, 1].set_title('Neural Network Training')
axes[1, 1].grid(True)
# 6. RL奖励曲线
window = 20
if len(rewards_history) >= window:
smoothed = np.convolve(rewards_history, np.ones(window)/window, mode='valid')
axes[1, 2].plot(range(window-1, len(rewards_history)), smoothed, 'g-', linewidth=2)
axes[1, 2].set_xlabel('Episode')
axes[1, 2].set_ylabel('Average Reward')
axes[1, 2].set_title('RL Training Progress')
axes[1, 2].grid(True)
plt.suptitle('AI in Reaction Engineering - Comprehensive Results', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.savefig('ai_reaction_engineering_comprehensive.png', dpi=150, bbox_inches='tight')
print("\n综合结果已保存到 ai_reaction_engineering_comprehensive.png")
print("\n" + "="*60)
print("所有仿真完成!")
print("="*60)
print("\n生成的图表:")
print(" 1. neural_network_surrogate.png - 神经网络代理模型")
print(" 2. bayesian_optimization.png - 贝叶斯优化")
print(" 3. reinforcement_learning_control.png - 强化学习控制")
print(" 4. anomaly_detection.png - 异常检测")
print(" 5. ai_reaction_engineering_comprehensive.png - 综合结果")





更多推荐



所有评论(0)