本文引用于 Datawhale 中 “JoyRL Book” https://datawhalechina.github.io/joyrl-book/#/,具体代码详情请查看“JoyRL Book”

1. Q-learning 算法

在这里插入图片描述

图 1.1 Q-learning 算法伪代码
  • 对于实战来说最重要的一点就是写好伪代码。如果说理论部分是数学语言,实战部分就是编程语言,而伪代码则是从数学语言到编程语言之间的一个过渡,这也是笔者为什么在讲解每个算法的时候尽可能贴出伪代码的原因。在每个算法实战的内容中,笔者基本会按照定义算法,定义训练,定义环境,设置参数以及开始训练等步骤为读者们展开。由于这次我们是第一次讲到实战,所以会先讲一下定义训练,因为其中涉及到一个所有强化学习通用的训练模式。

1.1 定义训练

  • 回顾一下伪代码的第二行到最后一行,我们会发现一个强化学习训练的通用模式,首先我们会迭代很多个(M)回合,在每回合中,首先重置环境回到初始化的状态,智能体根据状态选择动作,然后环境反馈中下一个状态和对应的奖励,同时智能体会更新策略,直到回合结束。这其实就是马尔可夫决策过程中智能体与环境互动的过程,写成一段通用的代码如下:
for i_ep in range(train_eps): # 遍历每个回合
    # 重置环境,获取初始状态
    state = env.reset()  # 重置环境,即开始新的回合
    while True: # 对于比较复杂的游戏可以设置每回合最大的步长,例如while ep_step<100,即最大步长为100。
        # 智能体根据策略采样动作
        action = agent.sample_action(state)  # 根据算法采样一个动作
        # 与环境进行一次交互,得到下一个状态和奖励
        next_state, reward, terminated, _ = env.step(action)  # 智能体将样本记录到经验池中
        agent.memory.push(state, action, reward, next_state, terminated) 
        # 智能体更新策略
        agent.update(state, action, reward, next_state, terminated)  
        # 更新状态
        state = next_state  
        # 如果终止则本回合结束
        if terminated:
            break
第一步:定义训练

1.2 定义算法

  • 强化学习中有几个要素,智能体、环境、经验池(经回放),在实践中也需要逐一定义这些要素。我们一般首先定义智能体,或者说算法,在 Python 中一般定义为类即可。再考虑一下智能体在强化学习中主要负责哪些工作。
1.2.1 采样动作
  • 首先在训练中我需要采样动作与环境交互,于是我们可以定义一个类方法,命名为 sample_action ,如下:
class Agent:
    def __init__():
        pass
    def sample_action(self, state):
        ''' 采样动作,训练时用
        '''
        self.sample_count += 1
        # epsilon是会递减的,这里选择指数递减
        self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * math.exp(- self.sample_count / self.epsilon_decay) 
        # e-greedy 策略
        if np.random.uniform(0, 1) > self.epsilon:
            action = np.argmax(self.Q_table[str(state)]) # 选择Q(s,a)最大对应的动作
        else:
            action = np.random.choice(self.n_actions) # 随机选择动作
        return action     
第二步:采样动作
  • 在这里我们用了 ε \varepsilon ε - greedy 策略,其中 ε \varepsilon ε 会随着采样的步数指数衰减,感兴趣的读者也可以直接设置固定的 ε \varepsilon ε = 1 试试。 在 Q-learning 算法中还有一个重要的元素,即 Q 表,Q 表的作用是输入状态和动作,输出一个即可,这样一来我们可以用一个二维的数组来表示,比如 Q_table[0][1] = 0.1可以表示 Q ( s 0 , a 1 ) = 0.1 Q(s_0,a_1)=0.1 Q(s0,a1)=0.1 (注意 Python 中下标是从 0 开始)。而在我们的示例代码中用了一个默认字典来表示,如下:
self.Q_table  = defaultdict(lambda: np.zeros(n_actions))
1.2.2 预测动作
  • 此外对于每个智能体在训练中和在测试中采取动作的方式一般是不一样的,因为在训练中需要增加额外的探索策略,而在测试中只需要输出 Q 值对应最大的动作即可,如下:
class Agent:
    def __init__():
        pass
    def predict_action(self,state):
        ''' 预测或选择动作,测试时用
        '''
        action = np.argmax(self.Q_table[str(state)])
        return action
第三步:预测动作
1.2.3 更新方法
  • 所有强化学习算法的采样动作和预测动作方式几乎是比较固定的,对于每个智能体来说最核心的还是更新网络的方式,在 Q-learning 算法中的更新方式较为简单,而且不需要经验回放(具体会在 DQN 算法中展开),如下:
def update(self, state, action, reward, next_state, terminated):
    Q_predict = self.Q_table[str(state)][action] 
    if terminated: # 终止状态
        Q_target = reward  
    else:
        Q_target = reward + self.gamma * np.max(self.Q_table[str(next_state)]) 
    self.Q_table[str(state)][action] += self.lr * (Q_target - Q_predict)
第四步:更新方法

其中 self.lr 就是更新公式中的 α \alpha α(学习率),到这里我们就定义好了智能体。

2. DQN 算法

  • DQN 算法的训练过程分为交互采样和模型更新两个步骤,其中交互采样的目的就是与环境交互并产生样本,模型更新则是利用得到的样本来更新相关的网络参数,更新方式涉及每个强化学习算法的核心。

 算法伪代码

图 2.1 DQN 算法

  • Q-learning 算法不同的是,这里由于用的是神经网络,因此会多一个计算损失函数并进行反向传播的步骤,即梯度下降。在 DQN 算法中,我们需要定义当前网络、目标网络和经验回放等元素,这些都可以看做算法的一个模块,因此接下来我们分别用一个 Python 类来定义。

2.1 定义模型

  • 首先是定义模型,就是定义两个神经网路,即当前网络和目标网络,由于这两个网络结构相同,这里我们只用一个 Python 类来定义,如代码清单所示。定义一个全连接网络
class MLP(nn.Module): # 所有网络必须继承 nn.Module 类,这是 PyTorch 的特性
    def __init__(self, input_dim,output_dim,hidden_dim=128):
        super(MLP, self).__init__() 
        # 定义网络的层,这里都是线性层
        self.fc1 = nn.Linear(input_dim, hidden_dim) # 输入层
        self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层
        self.fc3 = nn.Linear(hidden_dim, output_dim) # 输出层
        
    def forward(self, x):
        # 各层对应的激活函数
        x = F.relu(self.fc1(x)) 
        x = F.relu(self.fc2(x))
        return self.fc3(x) # 输出层不需要激活函数
        
图 2.2 定义一个全连接网络
  • 这里我们定义了一个三层的全连接网络,输入维度就是状态数,输出维度就是动作数,中间的隐藏层采用最常用的 ReLU 激活函数。这里我们用 PyTorchModule 类来定义网络,这是 PyTorch 的特性,所有网络都必须继承这个类。在 PyTorch 中,我们只需要定义网络的前向传播,即 forward 函数,反向传播的过程 PyTorch 会自动完成,这也是 PyTorch 的特性。

2.2 经验回放

  • 经验回放的功能主要实现缓存样本和取出样本等两个功能。
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity # 经验回放的容量
        self.buffer = [] # 用列表存放样本
        self.position = 0 # 样本下标,便于覆盖旧样本
    
    def push(self, state, action, reward, next_state, done):
        ''' 缓存样本
        '''
        if len(self.buffer) < self.capacity: # 如果样本数小于容量
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity 
    
    def sample(self, batch_size):
        ''' 取出样本,即采样
        '''
        batch = random.sample(self.buffer, batch_size) # 随机采出小批量转移
        state, action, reward, next_state, done =  zip(*batch) # 解压成状态,动作等
        return state, action, reward, next_state, done
    
    def __len__(self):
        ''' 返回当前样本数
        '''
        return len(self.buffer)
        
图 2.3 定义经验回放

2.3 定义智能体

  • 智能体即策略的载体,因此有的时候也会称为策略。智能体的主要功能就是根据当前状态输出动作和更新策略,分别跟伪代码中的交互采样和模型更新过程相对应。我们会把所有的模块比如网络模型等都封装到智能体中,这样更符合伪代码的逻辑。
  • 如伪代码清单所示,两个网络就是前面所定义的全连接网络,输入为状态维度,输出则是动作维度。这里我们还定义了一个优化器,用来更新网络参数。在 DQN 算法中采样动作和预测动作跟 Q-learning 是一样的,其中采样动作使用的是 ε \varepsilon ε - greedy 策略,便于在训练过程中探索,而测试只需要检验模型的性能,因此不需要探索,只需要单纯的进行 argmax 预测即可,即选择最大值对应的动作。
class Agent:
    def __init__(self):
        # 定义当前网络
        self.policy_net = MLP(state_dim,action_dim).to(device) 
        # 定义目标网络
        self.target_net = MLP(state_dim,action_dim).to(device)
        # 将当前网络参数复制到目标网络中
        self.target_net.load_state_dict(self.policy_net.state_dict())
        # 定义优化器
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate) 
        # 经验回放
        self.memory = ReplayBuffer(buffer_size)
        self.sample_count = 0  # 记录采样步数
    def sample_action(self,state):
        ''' 采样动作,主要用于训练
        '''
        self.sample_count += 1
        # epsilon 随着采样步数衰减
        self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * math.exp(-1. * self.sample_count / self.epsilon_decay) 
        if random.random() > self.epsilon:
            with torch.no_grad(): # 不使用梯度
                state = torch.tensor(np.array(state), device=self.device, dtype=torch.float32).unsqueeze(dim=0)
                q_values = self.policy_net(state)
                action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value
        else:
            action = random.randrange(self.action_dim)
    def predict_action(self,state):
        ''' 预测动作,主要用于测试
        '''
        with torch.no_grad():
            state = torch.tensor(np.array(state), device=self.device, dtype=torch.float32).unsqueeze(dim=0)
            q_values = self.policy_net(state)
            action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value
        return action
    def update(self):
        pass
        
图 2.4 定义智能体

2.4 DQN 算法更新

  • DQN 算法更新本质上跟 Q-learning 区别不大,但由于读者可能第一次接触深度学习的实现方式,这里单独拎出来分析 DQN 算法的更新方式,
def update(self, share_agent=None):
    # 当经验回放中样本数小于更新的批大小时,不更新算法
    if len(self.memory) < self.batch_size: # when transitions in memory donot meet a batch, not update
        return
    # 从经验回放中采样
    state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(
        self.batch_size)
    # 转换成张量(便于GPU计算)
    state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float) 
    action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1) 
    reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float).unsqueeze(1) 
    next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float) 
    done_batch = torch.tensor(np.float32(done_batch), device=self.device).unsqueeze(1) 
    # 计算 Q 的实际值
    q_value_batch = self.policy_net(state_batch).gather(dim=1, index=action_batch) # shape(batchsize,1),requires_grad=True
    # 计算 Q 的估计值,即 r+\gamma Q_max
    next_max_q_value_batch = self.target_net(next_state_batch).max(1)[0].detach().unsqueeze(1) 
    expected_q_value_batch = reward_batch + self.gamma * next_max_q_value_batch* (1-done_batch)
    # 计算损失
    loss = nn.MSELoss()(q_value_batch, expected_q_value_batch)  
    # 梯度清零,避免在下一次反向传播时重复累加梯度而出现错误。
    self.optimizer.zero_grad()  
    # 反向传播
    loss.backward()
    # clip避免梯度爆炸
    for param in self.policy_net.parameters():  
        param.grad.data.clamp_(-1, 1)
    # 更新优化器
    self.optimizer.step() 
    # 每C(target_update)步更新目标网络
    if self.sample_count % self.target_update == 0: 
        self.target_net.load_state_dict(self.policy_net.state_dict())   
        
图 2.5 DQN 算法更新
  • 首先由于我们是小批量随机梯度下降,所以当经验回放不满足批大小时选择不更新,这实际上是工程性问题。然后在更新时我们取出样本,并转换成 Torch 的张量,便于我们用 GPU 计算。接着计算 Q 值的估计值和实际值,并得到损失函数。在得到损失函数并更新参数时,我们在代码上会有一个固定的写法,即梯度清零,反向传播和更新优化器的过程,跟在深度学习中的写法是一样的,最后我们需要定期更新一下目标网络,即每隔 C 步复制参数到目标网络。

3. Double DQN 算法

  • Double DQN 算法跟 DQN 算法的区别在于目标值的计算方式,如代码所示
# 计算当前网络的Q值,即Q(s_t+1|a)
next_q_value_batch = self.policy_net(next_state_batch)
# 计算目标网络的Q值,即Q'(s_t+1|a)
next_target_value_batch = self.target_net(next_state_batch)
# 计算 Q'(s_t+1|a=argmax Q(s_t+1|a))
next_target_q_value_batch = next_target_value_batch.gather(1, torch.max(next_q_value_batch, 1)[1].unsqueeze(1)) 
图 3.1 Double DQN 目标值的计算

4. Noisy DQN 算法

  • Noisy DQN 算法的核心思想是将 DQN 算法中的线性层替换成带有噪声的线性层,如代码
class NoisyLinear(nn.Module):
    '''在Noisy DQN中用NoisyLinear层替换普通的nn.Linear层
    '''
    def __init__(self, input_dim, output_dim, std_init=0.4):
        super(NoisyLinear, self).__init__()
        self.input_dim  = input_dim
        self.output_dim = output_dim
        self.std_init  = std_init
        self.weight_mu    = nn.Parameter(torch.empty(output_dim, input_dim))
        self.weight_sigma = nn.Parameter(torch.empty(output_dim, input_dim))
        # 将一个 tensor 注册成 buffer,使得这个 tensor 不被当做模型参数进行优化。
        self.register_buffer('weight_epsilon', torch.empty(output_dim, input_dim)) 
        
        self.bias_mu    = nn.Parameter(torch.empty(output_dim))
        self.bias_sigma = nn.Parameter(torch.empty(output_dim))
        self.register_buffer('bias_epsilon', torch.empty(output_dim))
        
        self.reset_parameters() # 初始化参数
        self.reset_noise()  # 重置噪声
    
    def forward(self, x):
        if self.training: 
            weight = self.weight_mu + self.weight_sigma * self.weight_epsilon
            bias   = self.bias_mu + self.bias_sigma * self.bias_epsilon
        else:
            weight = self.weight_mu
            bias   = self.bias_mu
        return F.linear(x, weight, bias)
    
    def reset_parameters(self):
        mu_range = 1 / self.input_dim ** 0.5
        self.weight_mu.data.uniform_(-mu_range, mu_range)
        self.weight_sigma.data.fill_(self.std_init / self.input_dim ** 0.5)
        self.bias_mu.data.uniform_(-mu_range, mu_range)
        self.bias_sigma.data.fill_(self.std_init / self.output_dim ** 0.5)
    
    def reset_noise(self):
        epsilon_in  = self._scale_noise(self.input_dim)
        epsilon_out = self._scale_noise(self.output_dim)
        self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))
        self.bias_epsilon.copy_(self._scale_noise(self.output_dim))
    
    def _scale_noise(self, size):
        x = torch.randn(size)
        x = x.sign().mul(x.abs().sqrt())
        return x
图 4.1 带有噪声的线性层网络
  • 根据写好的 NoisyLinear 层,我们可以在 DQN 算法中将普通的线性层替换为 NoisyQNetwork 层,如代码
class NoisyQNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(NoisyQNetwork, self).__init__()
        self.fc1 =  nn.Linear(state_dim, hidden_dim)
        self.noisy_fc2 = NoisyLinear(hidden_dim, hidden_dim)
        self.noisy_fc3 = NoisyLinear(hidden_dim, action_dim)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.noisy_fc2(x))
        x = self.noisy_fc3(x)
        return x

    def reset_noise(self):
        self.noisy_fc2.reset_noise()
        self.noisy_fc3.reset_noise()

图 4.2 带噪声层的全连接网络
  • 注意在训练过程中,我们需要在每次更新后重置噪声,这样有助于提高训练的稳定性,

5. A2C 算法

  • 通常来讲,Critic 的输入是状态,输出则是一个维度的价值,而 Actor 输入的也会状态,但输出的是概率分布,因此我们可以定义两个网络,如代码清单所示。

5.1 实现 Actor 和 Critic

class Critic(nn.Module):
    def __init__(self,state_dim):
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 1)
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        value = self.fc3(x)
        return value

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, action_dim)
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        logits_p = F.softmax(self.fc3(x), dim=1)
        return logits_p
        
图 5.1 实现 Actor 和 Critic
  • 这里由于是离散的动作空间,根据在策略梯度章节中设计的策略函数,我们使用了 softmax 函数来输出概率分布。另外,实践上来看,由于 ActorCritic 的输入是一样的,因此我们可以将两个网络合并成一个网络,以便于加速训练。

5.2 实现合并的 Actor 和 Critic

class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim):
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.action_layer = nn.Linear(256, action_dim)
        self.value_layer = nn.Linear(256, 1)
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        logits_p = F.softmax(self.action_layer(x), dim=1)
        value = self.value_layer(x)
        return logits_p, value

图 5.2 实现合并的 Actor 和 Critic 的算法
  • 注意当我们使用分开的网络时,我们需要在训练时分别更新两个网络的参数,即需要两个优化,而使用合并的网络时则只需要更新一个网络的参数即可。

5.3 动作采样

  • DQN 算法不同等确定性策略不同,A2C 的动作输出不再是 Q 值最大对应的动作,而是从概率分布中采样动作,这意味着即使是很小的概率,也有可能被采样到,这样就能保证探索性,如下代码所示
from torch.distributions import Categorical
class Agent:
    def __init__(self):
        self.model = ActorCritic(state_dim, action_dim)
    def sample_action(self,state):
        '''动作采样函数
        '''
        state = torch.tensor(state, device=self.device, dtype=torch.float32)
        logits_p, value = self.model(state)
        dist = Categorical(logits_p) 
        action = dist.sample() 
        return action

图 5.3 采样动作

注意这里直接利用了 PyTorch 中的 Categorical 分布函数,这样就能直接从概率分布中采样动作了。

5.4 策略更新

  • 我们首先需要计算出优势函数,一般先计算出回报,然后减去网络输出的值即可,如代码
class Agent:
    def _compute_returns(self, rewards, dones):
        returns = []
        discounted_sum = 0
        for reward, done in zip(reversed(rewards), reversed(dones)):
            if done:
                discounted_sum = 0
            discounted_sum = reward + (self.gamma * discounted_sum)
            returns.insert(0, discounted_sum)
        # 归一化
        returns = torch.tensor(returns, device=self.device, dtype=torch.float32).unsqueeze(dim=1)
        returns = (returns - returns.mean()) / (returns.std() + 1e-5) # 1e-5 to avoid division by zero
        return returns
    def compute_advantage(self):
        '''计算优势函数
        '''
        logits_p, states, rewards, dones = self.memory.sample()
        returns = self._compute_returns(rewards, dones)
        states = torch.tensor(states, device=self.device, dtype=torch.float32)
        logits_p, values = self.model(states)
        advantages = returns - values
        return advantages

图 5.4 计算优势函数
  • 这里我们使用了一个技巧,即将回报归一化,这样可以让优势函数的值域在 [-1,1] 之间,这样可以让优势函数更稳定,从而减少方差。

5.5 计算损失函数

计算优势之后就可以分别计算 Actor 和 Critic 的损失函数了,如代码清单所示。

class Agent:
    def compute_loss(self):
        '''计算损失函数
        '''
        logits_p, states, rewards, dones = self.memory.sample()
        returns = self._compute_returns(rewards, dones)
        states = torch.tensor(states, device=self.device, dtype=torch.float32)
        logits_p, values = self.model(states)
        advantages = returns - values
        dist = Categorical(logits_p)
        log_probs = dist.log_prob(actions)
        # 注意这里策略损失反向传播时不需要优化优势函数,因此需要将其 detach 掉
        actor_loss = -(log_probs * advantages.detach()).mean() 
        critic_loss = advantages.pow(2).mean()
        return actor_loss, critic_loss

图 5.4 计算损失函数

6. DDPG 算法

6.1 DDPG 伪代码

  • 如图所示,算法的训练方式其实更像 DQN 算法。注意在第 15 步中 DDPG 算法将当前网络参数复制到目标网络的方式是软更新,即每次一点点地将参数复制到目标网络中,与之对应的是 DQN 算法中的硬更新。软更新的好处是更加平滑缓慢,可以避免因权重更新过于迅速而导致的震荡,同时降低训练发散的风险。
    在这里插入图片描述
图 6.1 DDPG 算法伪代码

6.2 DDPG 定义模型

  • 如代码清单所示,DDPG 算法的模型结构跟 Actor - Critic 算法几乎是一样的,只是由于 DDPG 算法的 CriticQ 函数,因此也需要将动作作为输入。除了模型之外,目标网络和经验回放的定义方式跟 DQN 算法一样,这里不做展开。
import torch
import torch.nn as nn
import torch.nn.functional as F
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim = 256, init_w=3e-3):
        super(Actor, self).__init__()  
        self.linear1 = nn.Linear(state_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, action_dim)
        
        self.linear3.weight.data.uniform_(-init_w, init_w)
        self.linear3.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = torch.tanh(self.linear3(x)) # 输入0到1之间的值
        return x
        
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256, init_w=3e-3):
        super(Critic, self).__init__()
        
        self.linear1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, 1)
        # 随机初始化为较小的值
        self.linear3.weight.data.uniform_(-init_w, init_w)
        self.linear3.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state, action):
        # 按维数1拼接
        x = torch.cat([state, action], 1)
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        return x

图 6.2 实现 DDPG 算法的 Actor 和 Critic
  • 由于 DDPG 算法输出的是确定性策略,因此不需要像其他策略梯度算法那样,通过借助高斯分布来采样动作的概率分布,直接输出 Actor 的值即可,如代码清单所示。
class Agent:
    def __init__(self):
        pass
    def sample_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        action = self.actor(state)
        return action.detach().cpu().numpy()[0, 0]

图 6.3 DDPG 算法的动作采样

3.4 策略更新

  • DDPG 算法的策略更新则更像 Actor - Critic 算法。
class Agent:
    def __init__(self):
        pass
    def update(self):
        # 从经验回放中中随机采样一个批量的样本
        state, action, reward, next_state, done = self.memory.sample(self.batch_size)
        actor_loss = self.critic(state, self.actor(state))
        actor_loss = - actor_loss.mean()

        next_action = self.target_actor(next_state)
        target_value = self.target_critic(next_state, next_action.detach())
        expected_value = reward + (1.0 - done) * self.gamma * target_value
        expected_value = torch.clamp(expected_value, -np.inf, np.inf)

        actual_value = self.critic(state, action)
        critic_loss = nn.MSELoss()(actual_value, expected_value.detach())
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        # 各自目标网络的参数软更新
        for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
            target_param.data.copy_(
                target_param.data * (1.0 - self.tau) +
                param.data * self.tau
            )
        for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
            target_param.data.copy_(
                target_param.data * (1.0 - self.tau) +
                param.data * self.tau
            )

图 6.4 DDPG 算法的策略更新

7. PPO 算法

7.1 PPO 伪代码

off - policy 算法不同,PPO 算法每次会采样若干个时步的样本,然后利用这些样本更新策略,而不是存入经验回放中进行采样更新。
在这里插入图片描述

图 7.1 PPO 算法伪代码

7.2 PPO 算法更新

  • 无论是连续动作空间还是离散动作空间,PPO 算法的动作采样方式跟前面章节讲的 Actor - Critic 算法是一样的,在本次实战中就不做展开
def update(self):
    # 采样样本
    old_states, old_actions, old_log_probs, old_rewards, old_dones = self.memory.sample()
    # 转换成tensor
    old_states = torch.tensor(np.array(old_states), device=self.device, dtype=torch.float32)
    old_actions = torch.tensor(np.array(old_actions), device=self.device, dtype=torch.float32)
    old_log_probs = torch.tensor(old_log_probs, device=self.device, dtype=torch.float32)
    # 计算回报
    returns = []
    discounted_sum = 0
    for reward, done in zip(reversed(old_rewards), reversed(old_dones)):
        if done:
            discounted_sum = 0
        discounted_sum = reward + (self.gamma * discounted_sum)
        returns.insert(0, discounted_sum)
    # 归一化
    returns = torch.tensor(returns, device=self.device, dtype=torch.float32)
    returns = (returns - returns.mean()) / (returns.std() + 1e-5) # 1e-5 to avoid division by zero
    for _ in range(self.k_epochs): # 小批量随机下降
        #  计算优势
        values = self.critic(old_states) 
        advantage = returns - values.detach()
        probs = self.actor(old_states)
        dist = Categorical(probs)
        new_probs = dist.log_prob(old_actions)
        # 计算重要性权重
        ratio = torch.exp(new_probs - old_log_probs) #
        surr1 = ratio * advantage
        surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantage
        # 注意dist.entropy().mean()的目的是最大化策略熵
        actor_loss = -torch.min(surr1, surr2).mean() + self.entropy_coef * dist.entropy().mean()
        critic_loss = (returns - values).pow(2).mean()
        # 反向传播
        self.actor_optimizer.zero_grad()
        self.critic_optimizer.zero_grad()
        actor_loss.backward()
        critic_loss.backward()
        self.actor_optimizer.step()
        self.critic_optimizer.step()

图 7.2 PPO 算法更新

8. SAC 算法(最大熵强化学习)

8.1 SAC 算法伪代码

在这里插入图片描述

图 8.1 SAC伪代码

8.2 定义模型

  • 首先我们定义 ActorCritic ,即值网络和策略网络,跟 A2C 算法其实是一样的,如代码清单所示。
class ValueNet(nn.Module):
    def __init__(self, state_dim, hidden_dim, init_w=3e-3):
        super(ValueNet, self).__init__()
        '''定义值网络
        '''
        self.linear1 = nn.Linear(state_dim, hidden_dim) # 输入层
        self.linear2 = nn.Linear(hidden_dim, hidden_dim) # 隐藏层
        self.linear3 = nn.Linear(hidden_dim, 1)

        self.linear3.weight.data.uniform_(-init_w, init_w) # 初始化权重
        self.linear3.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        return x
class PolicyNet(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim, init_w=3e-3, log_std_min=-20, log_std_max=2):
        super(PolicyNet, self).__init__()
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max
        
        self.linear1 = nn.Linear(state_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        
        # 初始化权重
        self.mean_linear = nn.Linear(hidden_dim, action_dim)
        self.mean_linear.weight.data.uniform_(-init_w, init_w)
        self.mean_linear.bias.data.uniform_(-init_w, init_w)
        
        self.log_std_linear = nn.Linear(hidden_dim, action_dim)
        self.log_std_linear.weight.data.uniform_(-init_w, init_w)
        self.log_std_linear.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        
        mean    = self.mean_linear(x)
        log_std = self.log_std_linear(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
        
        return mean, log_std
    
    def evaluate(self, state, epsilon=1e-6):
        mean, log_std = self.forward(state)
        std = log_std.exp()
        # 计算动作
        normal = Normal(mean, std)
        z = normal.sample()
        action = torch.tanh(z)
        # 计算动作概率
        log_prob = normal.log_prob(z) - torch.log(1 - action.pow(2) + epsilon)
        log_prob = log_prob.sum(-1, keepdim=True)
        
        return action, log_prob, z, mean, log_std
        
    def get_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        mean, log_std = self.forward(state)
        std = log_std.exp()
        
        normal = Normal(mean, std)
        z      = normal.sample()
        action = torch.tanh(z)
        
        action  = action.detach().cpu().numpy()
        return action[0]

图 8.2 Actor 和 Critic 网络

8.2 Soft Q 网络

然后再额外定义一个 Soft Q 网络

class SoftQNet(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim, init_w=3e-3):
        super(SoftQNet, self).__init__()
        '''定义Q网络,state_dim, action_dim, hidden_dim, init_w分别为状态维度、动作维度隐藏层维度和初始化权重
        '''
        self.linear1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, 1)
        
        self.linear3.weight.data.uniform_(-init_w, init_w)
        self.linear3.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state, action):
        x = torch.cat([state, action], 1)
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        return x

图 8.3 Soft Q 网络
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐