主题039:人工智能在反应工程中的应用

1. 人工智能在反应工程中的概述

人工智能(AI)正在深刻改变化学反应工程的研究与实践方式。从分子设计到过程优化,从故障诊断到自主控制,AI技术为反应工程带来了前所未有的机遇。

1.1 AI技术的核心价值

应用领域 传统方法挑战 AI解决方案 预期收益
催化剂设计 试错法耗时 生成式AI预测 开发周期缩短50-70%
反应优化 多参数耦合复杂 强化学习 产率提升10-20%
过程控制 非线性难处理 神经网络控制 能耗降低5-15%
故障诊断 专家经验依赖 深度学习分类 故障识别率>95%
安全预警 滞后响应 时序预测模型 预警提前量>30分钟

1.2 AI技术分类

机器学习(ML):

  • 监督学习:回归、分类
  • 无监督学习:聚类、降维
  • 强化学习:序列决策

深度学习(DL):

  • 神经网络(DNN)
  • 卷积神经网络(CNN)
  • 循环神经网络(RNN/LSTM)
  • 图神经网络(GNN)
  • Transformer架构

生成式AI:

  • 生成对抗网络(GAN)
  • 变分自编码器(VAE)
  • 扩散模型
  • 大语言模型(LLM)

2. 智能优化

2.1 基于机器学习的反应优化

问题描述:

寻找最优操作条件以最大化目标函数:

max⁡xf(x) \max_{\mathbf{x}} \quad f(\mathbf{x}) xmaxf(x)

其中x=[T,P,催化剂负载,停留时间,...]\mathbf{x} = [T, P, \text{催化剂负载}, \text{停留时间}, ...]x=[T,P,催化剂负载,停留时间,...]

贝叶斯优化:

结合高斯过程代理模型和采集函数:

  1. 高斯过程先验:

f(x)∼GP(m(x),k(x,x′)) f(\mathbf{x}) \sim \mathcal{GP}(m(\mathbf{x}), k(\mathbf{x}, \mathbf{x}')) f(x)GP(m(x),k(x,x))

  1. 采集函数(Expected Improvement):

EI(x)=E[max⁡(f(x)−f+,0)] EI(\mathbf{x}) = \mathbb{E}[\max(f(\mathbf{x}) - f^+, 0)] EI(x)=E[max(f(x)f+,0)]

优势:

  • 样本效率高(适合昂贵实验)
  • 处理噪声观测
  • 平衡探索与利用

2.2 强化学习在过程控制中的应用

马尔可夫决策过程(MDP):

M=(S,A,P,R,γ) \mathcal{M} = (\mathcal{S}, \mathcal{A}, \mathcal{P}, \mathcal{R}, \gamma) M=(S,A,P,R,γ)

状态空间(S):

  • 温度、压力、浓度
  • 流量、液位
  • 历史趋势

动作空间(A):

  • 加热/冷却功率
  • 进料流量
  • 搅拌速度

奖励函数(R):

R=w1⋅产率+w2⋅选择性−w3⋅能耗−w4⋅偏差惩罚 R = w_1 \cdot \text{产率} + w_2 \cdot \text{选择性} - w_3 \cdot \text{能耗} - w_4 \cdot \text{偏差惩罚} R=w1产率+w2选择性w3能耗w4偏差惩罚

算法选择:

算法 适用场景 特点
DQN 离散动作 经验回放,稳定
PPO 连续动作 策略梯度,易调参
SAC 连续动作 最大熵,探索性好
TD3 连续动作 双Q网络,减少过估计

2.3 神经网络代理模型

用途: 替代昂贵的CFD或详细动力学模拟

架构:

全连接网络(DNN):

y=fNN(x;θ)=WLσ(...σ(W1x+b1)...)+bL \mathbf{y} = f_{NN}(\mathbf{x}; \boldsymbol{\theta}) = W_L \sigma(...\sigma(W_1 \mathbf{x} + b_1)...) + b_L y=fNN(x;θ)=WLσ(...σ(W1x+b1)...)+bL

物理信息神经网络(PINN):

在损失函数中加入物理约束:

L=Ldata+λLphysics \mathcal{L} = \mathcal{L}_{data} + \lambda \mathcal{L}_{physics} L=Ldata+λLphysics

其中:

Lphysics=∥∂c∂t+∇⋅(uc)−∇⋅(D∇c)−r∥2 \mathcal{L}_{physics} = \|\frac{\partial c}{\partial t} + \nabla \cdot (\mathbf{u}c) - \nabla \cdot (D\nabla c) - r\|^2 Lphysics=tc+(uc)(Dc)r2

训练策略:

  • 数据增强
  • 迁移学习
  • 集成学习

3. 故障预测与诊断

3.1 异常检测

统计方法:

  • 主成分分析(PCA)
  • 独立成分分析(ICA)
  • 偏最小二乘(PLS)

机器学习方法:

  • 孤立森林(Isolation Forest)
  • 单类SVM
  • 自编码器(Autoencoder)

深度学习方法:

变分自编码器(VAE):

L=Eqϕ(z∣x)[log⁡pθ(x∣z)]−KL(qϕ(z∣x)∥p(z)) \mathcal{L} = \mathbb{E}_{q_\phi(z|x)}[\log p_\theta(x|z)] - KL(q_\phi(z|x) \| p(z)) L=Eqϕ(zx)[logpθ(xz)]KL(qϕ(zx)p(z))

重构误差用于异常评分。

3.2 故障分类

多分类问题:

y^=arg⁡max⁡iP(y=i∣x) \hat{y} = \arg\max_i P(y=i|\mathbf{x}) y^=argimaxP(y=ix)

常用模型:

  • 随机森林
  • 梯度提升树(XGBoost, LightGBM)
  • 深度神经网络
  • 图神经网络(处理拓扑结构)

特征工程:

  • 时域特征:均值、方差、峰度
  • 频域特征:FFT、小波变换
  • 时频特征:STFT、小波包

3.3 剩余使用寿命预测

问题定义:

预测设备从当前状态到失效的时间:

RUL(t)=inf⁡{τ:h(t+τ)≤hthreshold} RUL(t) = \inf\{\tau : h(t+\tau) \leq h_{threshold}\} RUL(t)=inf{τ:h(t+τ)hthreshold}

方法:

直接预测:

RUL=fNN(xt−L:t) RUL = f_{NN}(\mathbf{x}_{t-L:t}) RUL=fNN(xtL:t)

基于退化模型:

D(t)=fLSTM(x1:t) D(t) = f_{LSTM}(\mathbf{x}_{1:t}) D(t)=fLSTM(x1:t)

然后外推到阈值。


4. 自主控制系统

4.1 模型预测控制(MPC)与AI结合

传统MPC:

min⁡u∑k=0N−1(yk−yref)TQ(yk−yref)+ΔukTRΔuk \min_{\mathbf{u}} \sum_{k=0}^{N-1} (\mathbf{y}_k - \mathbf{y}_{ref})^T Q (\mathbf{y}_k - \mathbf{y}_{ref}) + \Delta \mathbf{u}_k^T R \Delta \mathbf{u}_k umink=0N1(ykyref)TQ(ykyref)+ΔukTRΔuk

AI增强MPC:

  • 神经网络替代机理模型
  • 强化学习优化代价函数权重
  • 在线自适应更新

4.2 端到端学习控制

直接从传感器到执行器:

ut=πθ(st) \mathbf{u}_t = \pi_\theta(\mathbf{s}_t) ut=πθ(st)

模仿学习:

从专家演示中学习:

min⁡θ∑i∥πθ(si)−aiexpert∥2 \min_\theta \sum_{i} \|\pi_\theta(\mathbf{s}_i) - \mathbf{a}_i^{expert}\|^2 θminiπθ(si)aiexpert2

优势:

  • 无需显式建模
  • 可学习复杂策略
  • 适应非线性

4.3 安全约束强化学习

约束MDP:

max⁡πE[∑tγtRt]s.t.E[∑tCt]≤Cmax \max_\pi \mathbb{E}[\sum_t \gamma^t R_t] \quad \text{s.t.} \quad \mathbb{E}[\sum_t C_t] \leq C_{max} πmaxE[tγtRt]s.t.E[tCt]Cmax

方法:

  • 拉格朗日松弛
  • 屏障函数
  • 投影梯度

5. 分子与催化剂设计

5.1 生成式分子设计

变分自编码器(VAE):

编码器:qϕ(z∣x)q_\phi(z|x)qϕ(zx)
解码器:pθ(x∣z)p_\theta(x|z)pθ(xz)

在潜在空间优化:

z∗=arg⁡max⁡zfproperty(z) z^* = \arg\max_z f_{property}(z) z=argzmaxfproperty(z)

然后解码为分子。

强化学习生成:

将分子生成视为序列决策:

at∈{添加原子,添加键,终止} a_t \in \{\text{添加原子}, \text{添加键}, \text{终止}\} at{添加原子,添加键,终止}

奖励函数:

R=预测活性−λ⋅合成难度 R = \text{预测活性} - \lambda \cdot \text{合成难度} R=预测活性λ合成难度

5.2 催化剂活性预测

描述符:

  • 几何描述符:配位数、键长
  • 电子描述符:d带中心、电荷转移
  • 环境描述符:温度、压力、pH

图神经网络(GNN):

hi(l+1)=σ(W(l)⋅AGGREGATE({hj(l),j∈N(i)})) h_i^{(l+1)} = \sigma\left(W^{(l)} \cdot \text{AGGREGATE}(\{h_j^{(l)}, j \in \mathcal{N}(i)\})\right) hi(l+1)=σ(W(l)AGGREGATE({hj(l),jN(i)}))

晶体图卷积网络(CGCNN):

用于晶体催化剂预测。

5.3 高通量筛选

主动学习策略:

  1. 用少量数据训练初始模型
  2. 模型预测所有候选
  3. 选择不确定性高的候选进行DFT计算
  4. 更新模型,重复

期望改进采样:

xnext=arg⁡max⁡xEI(x) \mathbf{x}_{next} = \arg\max_x EI(\mathbf{x}) xnext=argxmaxEI(x)


6. 过程数据分析

6.1 软测量(Soft Sensor)

问题: 关键变量难以在线测量(如产物分布)

解决方案:

y^=fML(xeasy) \hat{y} = f_{ML}(\mathbf{x}_{easy}) y^=fML(xeasy)

方法:

  • 偏最小二乘(PLS)
  • 支持向量回归(SVR)
  • 神经网络
  • 高斯过程

6.2 过程监控

多变量统计过程控制(MSPC):

Hotelling’s T2T^2T2统计量:

T2=(x−μ)TS−1(x−μ) T^2 = (\mathbf{x} - \boldsymbol{\mu})^T \mathbf{S}^{-1} (\mathbf{x} - \boldsymbol{\mu}) T2=(xμ)TS1(xμ)

深度学习监控:

  • LSTM自编码器
  • 注意力机制
  • 时序卷积网络

6.3 因果推断

识别因果关系:

P(Y∣do(X))≠P(Y∣X) P(Y|do(X)) \neq P(Y|X) P(Ydo(X))=P(YX)

方法:

  • 结构因果模型
  • 因果发现算法(PC, GES)
  • 反事实推理

7. 案例研究

7.1 智能反应优化

背景: 多相催化反应,参数空间高维

方法: 贝叶斯优化 + 实验自动化

结果:

  • 实验次数减少60%
  • 找到全局最优
  • 发现意外最优区域

7.2 基于深度学习的故障诊断

背景: 连续搅拌反应器,多种故障模式

数据: 10万条历史记录,20个传感器

模型: 1D-CNN + LSTM

结果:

  • 准确率:96.5%
  • 召回率:94.2%
  • 响应时间:<1秒

7.3 强化学习控制聚合反应

背景: 放热聚合反应,温度控制困难

算法: SAC(Soft Actor-Critic)

训练: 在数字孪生中预训练,现场微调

结果:

  • 温度波动减少40%
  • 产物质量一致性提高
  • 能耗降低8%

8. Python实现

8.1 神经网络代理模型

import torch
import torch.nn as nn

class ReactorSurrogateModel(nn.Module):
    """
    反应器神经网络代理模型
    """
    
    def __init__(self, input_dim, output_dim, hidden_dims=[128, 256, 128]):
        super().__init__()
        
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_dim),
                nn.Dropout(0.1)
            ])
            prev_dim = hidden_dim
        
        layers.append(nn.Linear(prev_dim, output_dim))
        
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)


class PhysicsInformedNN(nn.Module):
    """
    物理信息神经网络(PINN)
    """
    
    def __init__(self, layers):
        super().__init__()
        self.network = self.build_network(layers)
    
    def build_network(self, layers):
        """构建网络"""
        modules = []
        for i in range(len(layers) - 1):
            modules.append(nn.Linear(layers[i], layers[i+1]))
            if i < len(layers) - 2:
                modules.append(nn.Tanh())
        return nn.Sequential(*modules)
    
    def forward(self, t, x):
        """前向传播"""
        inputs = torch.cat([t, x], dim=1)
        return self.network(inputs)
    
    def physics_loss(self, t, x, c):
        """
        物理约束损失(对流-扩散-反应方程)
        """
        c_pred = self.forward(t, x)
        
        # 计算梯度
        c_t = torch.autograd.grad(c_pred, t, grad_outputs=torch.ones_like(c_pred),
                                   create_graph=True)[0]
        c_x = torch.autograd.grad(c_pred, x, grad_outputs=torch.ones_like(c_pred),
                                   create_graph=True)[0]
        c_xx = torch.autograd.grad(c_x, x, grad_outputs=torch.ones_like(c_x),
                                    create_graph=True)[0]
        
        # 残差:dc/dt + u*dc/dx - D*d2c/dx2 - r = 0
        u, D, k = 1.0, 0.1, 1.0
        r = k * c_pred  # 一级反应
        
        residual = c_t + u * c_x - D * c_xx - r
        
        return torch.mean(residual**2)

8.2 贝叶斯优化

import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize

class BayesianOptimizer:
    """
    贝叶斯优化器
    """
    
    def __init__(self, bounds, kernel='rbf'):
        """
        Parameters:
        -----------
        bounds : list of tuples
            参数边界 [(x1_min, x1_max), (x2_min, x2_max), ...]
        kernel : str
            核函数类型
        """
        self.bounds = bounds
        self.n_params = len(bounds)
        self.X = []
        self.y = []
        
    def rbf_kernel(self, x1, x2, length_scale=1.0, sigma_f=1.0):
        """RBF核函数"""
        sqdist = np.sum(x1**2, 1).reshape(-1, 1) + \
                 np.sum(x2**2, 1) - 2 * np.dot(x1, x2.T)
        return sigma_f**2 * np.exp(-0.5 / length_scale**2 * sqdist)
    
    def fit_gp(self):
        """拟合高斯过程"""
        X = np.array(self.X)
        y = np.array(self.y).reshape(-1, 1)
        
        self.K = self.rbf_kernel(X, X) + 1e-8 * np.eye(len(X))
        self.K_inv = np.linalg.inv(self.K)
        self.y_mean = np.mean(y)
        self.y_std = np.std(y) + 1e-8
        self.y_norm = (y - self.y_mean) / self.y_std
        
    def predict(self, X_new):
        """预测均值和方差"""
        X = np.array(self.X)
        X_new = np.atleast_2d(X_new)
        
        k_s = self.rbf_kernel(X, X_new)
        k_ss = self.rbf_kernel(X_new, X_new) + 1e-8 * np.eye(len(X_new))
        
        mu = k_s.T.dot(self.K_inv).dot(self.y_norm)
        mu = mu * self.y_std + self.y_mean
        
        var = np.diag(k_ss - k_s.T.dot(self.K_inv).dot(k_s))
        
        return mu.flatten(), var
    
    def expected_improvement(self, X, xi=0.01):
        """期望改进采集函数"""
        mu, var = self.predict(X)
        sigma = np.sqrt(var + 1e-9)
        
        y_best = np.max(self.y)
        
        with np.errstate(divide='warn'):
            imp = mu - y_best - xi
            Z = imp / sigma
            ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
            ei[sigma == 0.0] = 0.0
        
        return ei
    
    def suggest_next_point(self):
        """建议下一个采样点"""
        self.fit_gp()
        
        # 随机采样候选点
        n_candidates = 1000
        candidates = np.random.rand(n_candidates, self.n_params)
        for i, (low, high) in enumerate(self.bounds):
            candidates[:, i] = candidates[:, i] * (high - low) + low
        
        # 计算EI
        ei_values = self.expected_improvement(candidates)
        
        # 返回EI最大的点
        return candidates[np.argmax(ei_values)]
    
    def optimize(self, objective_func, n_iterations=50):
        """
        运行优化
        
        Parameters:
        -----------
        objective_func : callable
            目标函数
        n_iterations : int
            迭代次数
        """
        # 初始随机采样
        n_initial = 5
        for _ in range(n_initial):
            x = np.array([np.random.uniform(low, high) 
                         for low, high in self.bounds])
            y = objective_func(x)
            self.X.append(x)
            self.y.append(y)
        
        # 贝叶斯优化迭代
        for i in range(n_iterations - n_initial):
            x_next = self.suggest_next_point()
            y_next = objective_func(x_next)
            self.X.append(x_next)
            self.y.append(y_next)
            
            print(f"Iteration {i+1}: x={x_next}, y={y_next:.4f}")
        
        # 返回最优解
        best_idx = np.argmax(self.y)
        return self.X[best_idx], self.y[best_idx]

8.3 强化学习控制器

import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class Actor(nn.Module):
    """策略网络"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Tanh()  # 输出范围[-1, 1]
        )
    
    def forward(self, state):
        return self.network(state)


class Critic(nn.Module):
    """价值网络"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
    
    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        return self.network(x)


class DDPGAgent:
    """
    DDPG(深度确定性策略梯度)智能体
    """
    
    def __init__(self, state_dim, action_dim, action_bounds):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.action_bounds = action_bounds
        
        # Actor网络
        self.actor = Actor(state_dim, action_dim)
        self.actor_target = Actor(state_dim, action_dim)
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
        
        # Critic网络
        self.critic = Critic(state_dim, action_dim)
        self.critic_target = Critic(state_dim, action_dim)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
        
        # 软更新目标网络
        self.tau = 0.005
        self.update_target_networks()
        
        # 经验回放缓冲区
        self.replay_buffer = deque(maxlen=100000)
        self.batch_size = 64
        self.gamma = 0.99
        
    def update_target_networks(self):
        """软更新目标网络"""
        for param, target_param in zip(self.actor.parameters(), 
                                       self.actor_target.parameters()):
            target_param.data.copy_(self.tau * param.data + 
                                   (1 - self.tau) * target_param.data)
        
        for param, target_param in zip(self.critic.parameters(),
                                       self.critic_target.parameters()):
            target_param.data.copy_(self.tau * param.data +
                                   (1 - self.tau) * target_param.data)
    
    def select_action(self, state, noise=0.1):
        """选择动作"""
        state = torch.FloatTensor(state).unsqueeze(0)
        action = self.actor(state).detach().numpy()[0]
        
        # 添加探索噪声
        action += np.random.normal(0, noise, size=action.shape)
        action = np.clip(action, -1, 1)
        
        # 映射到实际动作范围
        action_scaled = []
        for i, (low, high) in enumerate(self.action_bounds):
            action_scaled.append((action[i] + 1) / 2 * (high - low) + low)
        
        return np.array(action_scaled)
    
    def train(self):
        """训练网络"""
        if len(self.replay_buffer) < self.batch_size:
            return
        
        # 采样批次
        batch = random.sample(self.replay_buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        states = torch.FloatTensor(states)
        actions = torch.FloatTensor(actions)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones).unsqueeze(1)
        
        # 更新Critic
        with torch.no_grad():
            next_actions = self.actor_target(next_states)
            target_q = self.critic_target(next_states, next_actions)
            target_q = rewards + (1 - dones) * self.gamma * target_q
        
        current_q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_q, target_q)
        
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        # 更新Actor
        predicted_actions = self.actor(states)
        actor_loss = -self.critic(states, predicted_actions).mean()
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        # 软更新目标网络
        self.update_target_networks()

8.4 异常检测

import torch
import torch.nn as nn

class Autoencoder(nn.Module):
    """
    自编码器用于异常检测
    """
    
    def __init__(self, input_dim, encoding_dim=8):
        super().__init__()
        
        # 编码器
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, encoding_dim)
        )
        
        # 解码器
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, input_dim)
        )
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded
    
    def reconstruction_error(self, x):
        """计算重构误差"""
        reconstructed = self.forward(x)
        return torch.mean((x - reconstructed)**2, dim=1)


class AnomalyDetector:
    """
    基于自编码器的异常检测器
    """
    
    def __init__(self, input_dim, encoding_dim=8, threshold_percentile=95):
        self.model = Autoencoder(input_dim, encoding_dim)
        self.threshold_percentile = threshold_percentile
        self.threshold = None
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
    
    def train(self, X_normal, epochs=100):
        """
        使用正常数据训练
        
        Parameters:
        -----------
        X_normal : array
            正常工况数据
        epochs : int
            训练轮数
        """
        X = torch.FloatTensor(X_normal)
        
        for epoch in range(epochs):
            self.optimizer.zero_grad()
            
            reconstructed = self.model(X)
            loss = nn.MSELoss()(reconstructed, X)
            
            loss.backward()
            self.optimizer.step()
            
            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.6f}")
        
        # 设置阈值
        with torch.no_grad():
            errors = self.model.reconstruction_error(X).numpy()
            self.threshold = np.percentile(errors, self.threshold_percentile)
        
        print(f"Threshold set to: {self.threshold:.6f}")
    
    def detect(self, X):
        """
        检测异常
        
        Returns:
        --------
        is_anomaly : array
            异常标记
        scores : array
            异常分数
        """
        X = torch.FloatTensor(X)
        
        with torch.no_grad():
            scores = self.model.reconstruction_error(X).numpy()
        
        is_anomaly = scores > self.threshold
        
        return is_anomaly, scores

9. 实施挑战与最佳实践

9.1 数据挑战

挑战 影响 解决方案
数据稀缺 模型欠拟合 迁移学习、数据增强、主动学习
数据质量差 噪声预测 数据清洗、异常值处理、鲁棒模型
标签不平衡 分类偏差 重采样、代价敏感学习、集成方法
概念漂移 性能下降 在线学习、模型更新机制

9.2 模型可解释性

重要性:

  • 监管要求
  • 操作员信任
  • 故障分析

方法:

  • SHAP值
  • LIME
  • 注意力可视化
  • 概念激活向量(CAV)

9.3 部署考虑

边缘部署:

  • 模型压缩(剪枝、量化)
  • 轻量化架构
  • 推理优化

安全与可靠性:

  • 输入验证
  • 模型监控
  • 回退机制
  • 人机协作

10. 未来展望

10.1 技术趋势

  • 基础模型:化学领域大模型
  • 神经符号AI:结合知识与数据
  • 量子机器学习:量子化学计算
  • 联邦学习:跨企业协作

10.2 应用拓展

  • 自主实验室:AI+机器人
  • 闭环发现:设计-合成-测试-分析自动化
  • 数字孪生增强:AI驱动的数字孪生

10.3 挑战与机遇

挑战:

  • 数据共享与隐私
  • 标准化与互操作
  • 人才培养

机遇:

  • 加速创新
  • 降本增效
  • 可持续发展

11. 总结

人工智能为化学反应工程带来了革命性的变革:

  1. 智能优化:贝叶斯优化、强化学习实现高效过程优化
  2. 预测维护:深度学习实现故障早期预警
  3. 自主控制:AI控制器实现复杂过程自动化
  4. 分子设计:生成式AI加速催化剂开发
  5. 数据分析:软测量、过程监控提升操作水平

关键成功因素

  • 高质量数据基础设施
  • 领域知识与AI技术结合
  • 渐进式实施策略
  • 持续迭代优化

展望

AI将成为反应工程的标准工具,推动行业向智能化、自主化方向发展,实现更安全、更高效、更可持续的化学制造。

"""
主题039:人工智能在反应工程中的应用 - 仿真代码
包含:神经网络代理模型、贝叶斯优化、强化学习控制、异常检测
"""

import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from scipy.stats import norm
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

print("="*60)
print("主题039:人工智能在反应工程中的应用仿真")
print("="*60)

# ============================================
# 1. 神经网络代理模型(简化版,使用NumPy实现)
# ============================================
print("\n" + "="*60)
print("1. 神经网络代理模型")
print("="*60)

class SimpleNeuralNetwork:
    """简化版神经网络代理模型"""
    
    def __init__(self, input_size, hidden_size, output_size):
        # 初始化权重
        self.W1 = np.random.randn(input_size, hidden_size) * 0.1
        self.b1 = np.zeros(hidden_size)
        self.W2 = np.random.randn(hidden_size, output_size) * 0.1
        self.b2 = np.zeros(output_size)
        self.loss_history = []
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def forward(self, X):
        """前向传播"""
        self.z1 = np.dot(X, self.W1) + self.b1
        self.a1 = self.relu(self.z1)
        self.z2 = np.dot(self.a1, self.W2) + self.b2
        return self.z2
    
    def backward(self, X, y, learning_rate=0.01):
        """反向传播"""
        m = X.shape[0]
        
        # 输出层梯度
        dz2 = self.z2 - y
        dW2 = np.dot(self.a1.T, dz2) / m
        db2 = np.sum(dz2, axis=0) / m
        
        # 隐藏层梯度
        da1 = np.dot(dz2, self.W2.T)
        dz1 = da1 * (self.z1 > 0).astype(float)
        dW1 = np.dot(X.T, dz1) / m
        db1 = np.sum(dz1, axis=0) / m
        
        # 更新权重
        self.W2 -= learning_rate * dW2
        self.b2 -= learning_rate * db2
        self.W1 -= learning_rate * dW1
        self.b1 -= learning_rate * db1
    
    def train(self, X, y, epochs=1000, learning_rate=0.01):
        """训练网络"""
        for epoch in range(epochs):
            output = self.forward(X)
            loss = np.mean((output - y)**2)
            self.loss_history.append(loss)
            self.backward(X, y, learning_rate)
            
            if (epoch + 1) % 200 == 0:
                print(f"  Epoch {epoch+1}/{epochs}, Loss: {loss:.6f}")
    
    def predict(self, X):
        """预测"""
        return self.forward(X).flatten()

# 生成训练数据:CSTR反应器模型
def cstr_model(y, t, T, Q, V, k0, Ea, dH, rho, Cp, UA, Tc):
    """CSTR模型"""
    cA, T_react = y
    R = 8.314
    
    # 反应速率
    k = k0 * np.exp(-Ea / (R * T_react))
    r = k * cA
    
    # 物料平衡
    dcA_dt = Q/V * (1000 - cA) - r
    
    # 能量平衡
    dT_dt = Q/V * (T - T_react) + (-dH)/(rho*Cp) * r - UA/(V*rho*Cp) * (T_react - Tc)
    
    return [dcA_dt, dT_dt]

# 生成数据集
print("\n生成训练数据...")
np.random.seed(42)
n_samples = 500

# 输入:温度、流量、冷却温度
X_train = np.column_stack([
    np.random.uniform(300, 400, n_samples),  # 进料温度
    np.random.uniform(0.005, 0.02, n_samples),  # 流量
    np.random.uniform(280, 320, n_samples)   # 冷却温度
])

# 输出:稳态浓度和温度
y_train = []
for i in range(n_samples):
    T, Q, Tc = X_train[i]
    y0 = [500, T]
    sol = odeint(cstr_model, y0, [0, 1000], 
                 args=(T, Q, 1.0, 1e10, 8000, -50000, 1000, 4180, 5000, Tc))
    y_train.append(sol[-1])

y_train = np.array(y_train)

# 训练神经网络代理模型
print("\n训练神经网络代理模型...")
nn_model = SimpleNeuralNetwork(input_size=3, hidden_size=20, output_size=2)
nn_model.train(X_train, y_train, epochs=1000, learning_rate=0.01)

# 测试代理模型
print("\n测试代理模型...")
X_test = np.array([[350, 0.01, 300], [380, 0.015, 290]])
nn_pred = nn_model.predict(X_test)

print("\n代理模型预测 vs 实际模拟:")
for i, x in enumerate(X_test):
    T, Q, Tc = x
    y0 = [500, T]
    sol = odeint(cstr_model, y0, [0, 1000], 
                 args=(T, Q, 1.0, 1e10, 8000, -50000, 1000, 4180, 5000, Tc))
    actual = sol[-1]
    print(f"  输入: T={T}K, Q={Q:.4f}m³/s, Tc={Tc}K")
    print(f"    预测: cA={nn_pred[i*2]:.2f}, T={nn_pred[i*2+1]:.2f}")
    print(f"    实际: cA={actual[0]:.2f}, T={actual[1]:.2f}")

# 绘制训练损失
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].semilogy(nn_model.loss_history)
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss (log scale)')
axes[0].set_title('Neural Network Training Loss')
axes[0].grid(True)

# 绘制预测vs实际
y_pred_all = nn_model.predict(X_train)
axes[1].scatter(y_train[:, 0], y_pred_all[::2], alpha=0.5, label='Concentration')
axes[1].plot([0, 1000], [0, 1000], 'r--', label='Perfect prediction')
axes[1].set_xlabel('Actual')
axes[1].set_ylabel('Predicted')
axes[1].set_title('Surrogate Model: Predicted vs Actual')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.savefig('neural_network_surrogate.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 neural_network_surrogate.png")

# ============================================
# 2. 贝叶斯优化
# ============================================
print("\n" + "="*60)
print("2. 贝叶斯优化")
print("="*60)

class BayesianOptimizer:
    """贝叶斯优化器"""
    
    def __init__(self, bounds):
        self.bounds = bounds
        self.n_params = len(bounds)
        self.X = []
        self.y = []
    
    def rbf_kernel(self, x1, x2, length_scale=1.0, sigma_f=1.0):
        """RBF核函数"""
        if len(x1.shape) == 1:
            x1 = x1.reshape(1, -1)
        if len(x2.shape) == 1:
            x2 = x2.reshape(1, -1)
        
        sqdist = np.sum(x1**2, 1).reshape(-1, 1) + \
                 np.sum(x2**2, 1) - 2 * np.dot(x1, x2.T)
        return sigma_f**2 * np.exp(-0.5 / length_scale**2 * sqdist)
    
    def fit_gp(self):
        """拟合高斯过程"""
        X = np.array(self.X)
        y = np.array(self.y).reshape(-1, 1)
        
        self.K = self.rbf_kernel(X, X) + 1e-5 * np.eye(len(X))
        try:
            self.K_inv = np.linalg.inv(self.K)
        except:
            self.K_inv = np.linalg.pinv(self.K)
        
        self.y_mean = np.mean(y)
        self.y_std = np.std(y) + 1e-8
        self.y_norm = (y - self.y_mean) / self.y_std
    
    def predict(self, X_new):
        """预测均值和方差"""
        X = np.array(self.X)
        X_new = np.atleast_2d(X_new)
        
        k_s = self.rbf_kernel(X, X_new)
        k_ss = self.rbf_kernel(X_new, X_new) + 1e-5 * np.eye(len(X_new))
        
        mu = k_s.T.dot(self.K_inv).dot(self.y_norm)
        mu = mu * self.y_std + self.y_mean
        
        var = np.diag(k_ss - k_s.T.dot(self.K_inv).dot(k_s))
        var = np.maximum(var, 0)
        
        return mu.flatten(), var
    
    def expected_improvement(self, X, xi=0.01):
        """期望改进采集函数"""
        mu, var = self.predict(X)
        sigma = np.sqrt(var + 1e-9)
        
        y_best = np.max(self.y)
        
        with np.errstate(divide='ignore'):
            imp = mu - y_best - xi
            Z = imp / sigma
            Z = np.where(sigma == 0, 0, Z)
            ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
            ei[sigma == 0.0] = 0.0
        
        return ei
    
    def suggest_next_point(self):
        """建议下一个采样点"""
        self.fit_gp()
        
        # 随机采样候选点
        n_candidates = 500
        candidates = np.random.rand(n_candidates, self.n_params)
        for i, (low, high) in enumerate(self.bounds):
            candidates[:, i] = candidates[:, i] * (high - low) + low
        
        # 计算EI
        ei_values = self.expected_improvement(candidates)
        
        return candidates[np.argmax(ei_values)]
    
    def optimize(self, objective_func, n_iterations=30):
        """运行优化"""
        print(f"\n开始贝叶斯优化 ({n_iterations} iterations)...")
        
        # 初始随机采样
        n_initial = 5
        for i in range(n_initial):
            x = np.array([np.random.uniform(low, high) 
                         for low, high in self.bounds])
            y = objective_func(x)
            self.X.append(x)
            self.y.append(y)
            print(f"  Initial {i+1}: x={x}, y={y:.4f}")
        
        # 贝叶斯优化迭代
        for i in range(n_iterations - n_initial):
            x_next = self.suggest_next_point()
            y_next = objective_func(x_next)
            self.X.append(x_next)
            self.y.append(y_next)
            
            if (i + 1) % 5 == 0:
                print(f"  Iteration {i+1}: x={x_next}, y={y_next:.4f}")
        
        # 返回最优解
        best_idx = np.argmax(self.y)
        return self.X[best_idx], self.y[best_idx]

# 定义目标函数:最大化产率
def objective_yield(params):
    """目标函数:最大化产率"""
    T, Q, Tc = params
    
    # 使用代理模型快速评估
    x = np.array([[T, Q, Tc]])
    pred = nn_model.predict(x)
    cA, T_react = pred[0], pred[1]
    
    # 产率 = 转化率 * 选择性(简化模型)
    conversion = (1000 - cA) / 1000
    
    # 选择性随温度变化(存在最优温度)
    selectivity = np.exp(-((T_react - 370)**2) / (2 * 30**2))
    
    # 产率
    yield_val = conversion * selectivity
    
    return yield_val

# 运行贝叶斯优化
bounds = [(300, 400), (0.005, 0.02), (280, 320)]
bo = BayesianOptimizer(bounds)
best_params, best_yield = bo.optimize(objective_yield, n_iterations=30)

print(f"\n优化结果:")
print(f"  最优参数: T={best_params[0]:.2f}K, Q={best_params[1]:.5f}m³/s, Tc={best_params[2]:.2f}K")
print(f"  最优产率: {best_yield:.4f}")

# 绘制优化过程
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].plot(bo.y, 'b-o', markersize=4)
axes[0].axhline(y=best_yield, color='r', linestyle='--', label=f'Best: {best_yield:.4f}')
axes[0].set_xlabel('Iteration')
axes[0].set_ylabel('Objective Value (Yield)')
axes[0].set_title('Bayesian Optimization Progress')
axes[0].legend()
axes[0].grid(True)

# 绘制累积最优
best_so_far = np.maximum.accumulate(bo.y)
axes[1].plot(best_so_far, 'g-s', markersize=4)
axes[1].set_xlabel('Iteration')
axes[1].set_ylabel('Best Value So Far')
axes[1].set_title('Cumulative Best Value')
axes[1].grid(True)

plt.tight_layout()
plt.savefig('bayesian_optimization.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 bayesian_optimization.png")

# ============================================
# 3. 强化学习控制器(简化版Q-learning)
# ============================================
print("\n" + "="*60)
print("3. 强化学习控制器")
print("="*60)

class ReactorEnv:
    """反应器环境"""
    
    def __init__(self):
        self.T_target = 370  # 目标温度
        self.cA_target = 200  # 目标浓度
        self.T = 350
        self.cA = 500
        self.time = 0
        self.dt = 1.0
        
    def reset(self):
        self.T = 350 + np.random.randn() * 10
        self.cA = 500 + np.random.randn() * 50
        self.time = 0
        return self._get_state()
    
    def _get_state(self):
        """获取状态(离散化)"""
        T_bin = int(np.clip((self.T - 300) / 10, 0, 9))
        cA_bin = int(np.clip((self.cA - 0) / 100, 0, 9))
        return T_bin * 10 + cA_bin
    
    def step(self, action):
        """执行动作"""
        # 动作:0=减少加热, 1=保持, 2=增加加热
        power_change = (action - 1) * 50  # -50, 0, +50 kW
        
        # 简化动力学
        dT = (power_change / 1000) * self.dt - 0.1 * (self.T - 300) * self.dt
        self.T += dT
        
        # 浓度变化(简化)
        k = 0.1 * np.exp(0.05 * (self.T - 350))
        self.cA -= k * self.cA * self.dt
        self.cA = max(0, self.cA)
        
        self.time += self.dt
        
        # 计算奖励
        T_error = abs(self.T - self.T_target)
        cA_error = abs(self.cA - self.cA_target)
        reward = -0.1 * T_error - 0.01 * cA_error - 0.1 * abs(power_change)
        
        # 终止条件
        done = self.time > 100 or T_error > 100
        
        return self._get_state(), reward, done

class QLearningAgent:
    """Q-learning智能体"""
    
    def __init__(self, n_states=100, n_actions=3, learning_rate=0.1, gamma=0.95):
        self.q_table = np.zeros((n_states, n_actions))
        self.lr = learning_rate
        self.gamma = gamma
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
    
    def select_action(self, state):
        """选择动作(ε-贪婪)"""
        if np.random.rand() < self.epsilon:
            return np.random.randint(3)
        else:
            return np.argmax(self.q_table[state])
    
    def update(self, state, action, reward, next_state):
        """更新Q表"""
        best_next = np.max(self.q_table[next_state])
        self.q_table[state, action] += self.lr * (reward + self.gamma * best_next - self.q_table[state, action])
    
    def decay_epsilon(self):
        """衰减探索率"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

# 训练强化学习控制器
print("\n训练Q-learning控制器...")
env = ReactorEnv()
agent = QLearningAgent()

n_episodes = 500
rewards_history = []

for episode in range(n_episodes):
    state = env.reset()
    total_reward = 0
    done = False
    
    while not done:
        action = agent.select_action(state)
        next_state, reward, done = env.step(action)
        agent.update(state, action, reward, next_state)
        state = next_state
        total_reward += reward
    
    agent.decay_epsilon()
    rewards_history.append(total_reward)
    
    if (episode + 1) % 100 == 0:
        print(f"  Episode {episode+1}: Total Reward = {total_reward:.2f}, Epsilon = {agent.epsilon:.3f}")

print("\n训练完成!")

# 测试训练好的控制器
print("\n测试训练好的控制器...")
state = env.reset()
T_history = [env.T]
cA_history = [env.cA]
action_history = []

for _ in range(100):
    action = np.argmax(agent.q_table[state])  # 贪婪策略
    state, _, done = env.step(action)
    T_history.append(env.T)
    cA_history.append(env.cA)
    action_history.append(action)
    if done:
        break

print(f"  最终温度: {T_history[-1]:.2f}K (目标: {env.T_target}K)")
print(f"  最终浓度: {cA_history[-1]:.2f} (目标: {env.cA_target})")

# 绘制强化学习结果
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 奖励历史
axes[0, 0].plot(rewards_history, alpha=0.3)
window = 20
if len(rewards_history) >= window:
    smoothed = np.convolve(rewards_history, np.ones(window)/window, mode='valid')
    axes[0, 0].plot(range(window-1, len(rewards_history)), smoothed, 'r-', linewidth=2, label='Smoothed')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Total Reward')
axes[0, 0].set_title('RL Training Progress')
axes[0, 0].legend()
axes[0, 0].grid(True)

# 温度控制
axes[0, 1].plot(T_history, 'b-', label='Temperature')
axes[0, 1].axhline(y=env.T_target, color='r', linestyle='--', label='Target')
axes[0, 1].set_xlabel('Time Step')
axes[0, 1].set_ylabel('Temperature (K)')
axes[0, 1].set_title('Temperature Control')
axes[0, 1].legend()
axes[0, 1].grid(True)

# 浓度控制
axes[1, 0].plot(cA_history, 'g-', label='Concentration')
axes[1, 0].axhline(y=env.cA_target, color='r', linestyle='--', label='Target')
axes[1, 0].set_xlabel('Time Step')
axes[1, 0].set_ylabel('Concentration')
axes[1, 0].set_title('Concentration Control')
axes[1, 0].legend()
axes[1, 0].grid(True)

# Q表可视化
im = axes[1, 1].imshow(agent.q_table, aspect='auto', cmap='RdYlGn')
axes[1, 1].set_xlabel('Action')
axes[1, 1].set_ylabel('State')
axes[1, 1].set_title('Q-Table Visualization')
axes[1, 1].set_xticks([0, 1, 2])
axes[1, 1].set_xticklabels(['Cool', 'Hold', 'Heat'])
plt.colorbar(im, ax=axes[1, 1])

plt.tight_layout()
plt.savefig('reinforcement_learning_control.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 reinforcement_learning_control.png")

# ============================================
# 4. 异常检测
# ============================================
print("\n" + "="*60)
print("4. 异常检测")
print("="*60)

class AnomalyDetector:
    """基于统计的异常检测器"""
    
    def __init__(self, n_features=3, threshold_factor=3.0):
        self.mean = None
        self.std = None
        self.threshold_factor = threshold_factor
        self.threshold = None
    
    def fit(self, X_normal):
        """使用正常数据拟合"""
        self.mean = np.mean(X_normal, axis=0)
        self.std = np.std(X_normal, axis=0) + 1e-8
        
        # 计算马氏距离阈值
        distances = self._mahalanobis_distance(X_normal)
        self.threshold = np.percentile(distances, 95) * self.threshold_factor
        print(f"  异常检测阈值: {self.threshold:.2f}")
    
    def _mahalanobis_distance(self, X):
        """计算标准化欧氏距离"""
        normalized = (X - self.mean) / self.std
        return np.sqrt(np.sum(normalized**2, axis=1))
    
    def detect(self, X):
        """检测异常"""
        distances = self._mahalanobis_distance(X)
        is_anomaly = distances > self.threshold
        return is_anomaly, distances

# 生成正常和异常数据
print("\n生成过程数据...")
np.random.seed(123)

# 正常数据
n_normal = 800
X_normal = np.column_stack([
    np.random.normal(370, 5, n_normal),   # 温度
    np.random.normal(200, 20, n_normal),  # 浓度
    np.random.normal(0.01, 0.002, n_normal)  # 流量
])

# 异常数据(故障注入)
n_anomaly = 200
X_anomaly = np.column_stack([
    np.random.normal(390, 10, n_anomaly),   # 温度异常升高
    np.random.normal(300, 30, n_anomaly),   # 浓度异常
    np.random.normal(0.005, 0.001, n_anomaly)  # 流量异常
])

# 合并数据
X_all = np.vstack([X_normal, X_anomaly])
y_true = np.array([0]*n_normal + [1]*n_anomaly)  # 0=正常, 1=异常

# 训练检测器
detector = AnomalyDetector()
detector.fit(X_normal)

# 检测
is_anomaly, scores = detector.detect(X_all)

# 评估性能
true_positives = np.sum((is_anomaly == True) & (y_true == 1))
false_positives = np.sum((is_anomaly == True) & (y_true == 0))
true_negatives = np.sum((is_anomaly == False) & (y_true == 0))
false_negatives = np.sum((is_anomaly == False) & (y_true == 1))

accuracy = (true_positives + true_negatives) / len(y_true)
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

print(f"\n异常检测性能:")
print(f"  准确率: {accuracy:.3f}")
print(f"  精确率: {precision:.3f}")
print(f"  召回率: {recall:.3f}")
print(f"  F1分数: {f1:.3f}")

# 绘制异常检测结果
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 异常分数
axes[0, 0].plot(scores, 'b-', alpha=0.7, label='Anomaly Score')
axes[0, 0].axhline(y=detector.threshold, color='r', linestyle='--', label=f'Threshold')
axes[0, 0].fill_between(range(len(scores)), 0, detector.threshold, alpha=0.2, color='green', label='Normal Zone')
axes[0, 0].set_xlabel('Sample Index')
axes[0, 0].set_ylabel('Anomaly Score')
axes[0, 0].set_title('Anomaly Detection Scores')
axes[0, 0].legend()
axes[0, 0].grid(True)

# 散点图:温度 vs 浓度
colors = ['green' if not a else 'red' for a in is_anomaly]
axes[0, 1].scatter(X_all[:, 0], X_all[:, 1], c=colors, alpha=0.5, s=20)
axes[0, 1].set_xlabel('Temperature (K)')
axes[0, 1].set_ylabel('Concentration')
axes[0, 1].set_title('Anomaly Detection: Temperature vs Concentration')
axes[0, 1].grid(True)

# ROC曲线(简化版)
thresholds = np.linspace(0, np.max(scores), 100)
tpr_list = []  # True Positive Rate
fpr_list = []  # False Positive Rate

for thresh in thresholds:
    pred = scores > thresh
    tp = np.sum((pred == True) & (y_true == 1))
    fp = np.sum((pred == True) & (y_true == 0))
    tn = np.sum((pred == False) & (y_true == 0))
    fn = np.sum((pred == False) & (y_true == 1))
    
    tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
    
    tpr_list.append(tpr)
    fpr_list.append(fpr)

axes[1, 0].plot(fpr_list, tpr_list, 'b-', linewidth=2)
axes[1, 0].plot([0, 1], [0, 1], 'k--', alpha=0.5)
axes[1, 0].set_xlabel('False Positive Rate')
axes[1, 0].set_ylabel('True Positive Rate')
axes[1, 0].set_title('ROC Curve')
axes[1, 0].grid(True)

# 混淆矩阵
confusion = np.array([[true_negatives, false_positives],
                      [false_negatives, true_positives]])
im = axes[1, 1].imshow(confusion, cmap='Blues')
axes[1, 1].set_xticks([0, 1])
axes[1, 1].set_yticks([0, 1])
axes[1, 1].set_xticklabels(['Normal', 'Anomaly'])
axes[1, 1].set_yticklabels(['Normal', 'Anomaly'])
axes[1, 1].set_xlabel('Predicted')
axes[1, 1].set_ylabel('Actual')
axes[1, 1].set_title('Confusion Matrix')

# 添加数值标签
for i in range(2):
    for j in range(2):
        text = axes[1, 1].text(j, i, confusion[i, j],
                              ha="center", va="center", color="black", fontsize=12)

plt.colorbar(im, ax=axes[1, 1])
plt.tight_layout()
plt.savefig('anomaly_detection.png', dpi=150, bbox_inches='tight')
print("\n结果已保存到 anomaly_detection.png")

# ============================================
# 5. 综合对比图
# ============================================
print("\n" + "="*60)
print("5. 生成综合对比图")
print("="*60)

fig, axes = plt.subplots(2, 3, figsize=(16, 10))

# 1. 代理模型准确性
y_pred_all = nn_model.predict(X_train)
axes[0, 0].scatter(y_train[:, 0], y_pred_all[::2], alpha=0.5, c='blue', s=20)
axes[0, 0].plot([0, 1000], [0, 1000], 'r--', linewidth=2)
axes[0, 0].set_xlabel('Actual Concentration')
axes[0, 0].set_ylabel('Predicted Concentration')
axes[0, 0].set_title('Surrogate Model Accuracy')
axes[0, 0].grid(True)

# 2. 贝叶斯优化收敛
axes[0, 1].plot(bo.y, 'b-o', markersize=3, alpha=0.7)
axes[0, 1].axhline(y=best_yield, color='r', linestyle='--', linewidth=2)
axes[0, 1].set_xlabel('Iteration')
axes[0, 1].set_ylabel('Yield')
axes[0, 1].set_title('Bayesian Optimization Convergence')
axes[0, 1].grid(True)

# 3. RL控制效果
axes[0, 2].plot(T_history, 'b-', label='Temperature', linewidth=2)
axes[0, 2].axhline(y=env.T_target, color='r', linestyle='--', label='Target', linewidth=2)
axes[0, 2].set_xlabel('Time Step')
axes[0, 2].set_ylabel('Temperature (K)')
axes[0, 2].set_title('RL Temperature Control')
axes[0, 2].legend()
axes[0, 2].grid(True)

# 4. 异常检测分布
axes[1, 0].hist(scores[y_true==0], bins=30, alpha=0.7, label='Normal', color='green')
axes[1, 0].hist(scores[y_true==1], bins=30, alpha=0.7, label='Anomaly', color='red')
axes[1, 0].axvline(x=detector.threshold, color='black', linestyle='--', linewidth=2, label='Threshold')
axes[1, 0].set_xlabel('Anomaly Score')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].set_title('Anomaly Score Distribution')
axes[1, 0].legend()
axes[1, 2].grid(True)

# 5. 训练损失
axes[1, 1].semilogy(nn_model.loss_history, 'b-', linewidth=1)
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Loss (log scale)')
axes[1, 1].set_title('Neural Network Training')
axes[1, 1].grid(True)

# 6. RL奖励曲线
window = 20
if len(rewards_history) >= window:
    smoothed = np.convolve(rewards_history, np.ones(window)/window, mode='valid')
    axes[1, 2].plot(range(window-1, len(rewards_history)), smoothed, 'g-', linewidth=2)
axes[1, 2].set_xlabel('Episode')
axes[1, 2].set_ylabel('Average Reward')
axes[1, 2].set_title('RL Training Progress')
axes[1, 2].grid(True)

plt.suptitle('AI in Reaction Engineering - Comprehensive Results', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.savefig('ai_reaction_engineering_comprehensive.png', dpi=150, bbox_inches='tight')
print("\n综合结果已保存到 ai_reaction_engineering_comprehensive.png")

print("\n" + "="*60)
print("所有仿真完成!")
print("="*60)
print("\n生成的图表:")
print("  1. neural_network_surrogate.png - 神经网络代理模型")
print("  2. bayesian_optimization.png - 贝叶斯优化")
print("  3. reinforcement_learning_control.png - 强化学习控制")
print("  4. anomaly_detection.png - 异常检测")
print("  5. ai_reaction_engineering_comprehensive.png - 综合结果")

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐