一、确定性策略梯度方法(Deterministic Policy Gradient (DPG)

1、概述

在策略梯度和Actor-Critic方法中,策略通常都是随机的

\pi_\theta (a|s)

通过输出一个概率分布,然后按照概率采样动作。这就导致了在一个状态下只能输出有限个动作

而在DPG中,策略是确定的

\mu_\theta(s)

\mu表示s到a的映射,可以通过s直接输出一个动作值。所以不再输出分布,而是直接输出连续动作值。

2、网络设置

Actor 网络(策略网络)

  • 输入:状态 s

  • 输出:确定动作 a = \mu_\theta(s)

Critic 网络(动作价值函数网络)

  • 输入:状态 s和动作a 

  • 输出:动作值Q(s,a)

和A2C的区别在于

算法 Actor 网络输出 Critic 网络输出 Critic 输入
A2C 动作分布\pi_\theta (a|s) 状态价值V(s) 状态s
DPG / DDPG 确定动作a = \mu(s)(确定性策略) 动作值 Q(s,a) 状态 + 动作(s,a)

3、更新公式

Critic 更新 (动作价值函数网络/Q网络)

类似与DQN的更新过程,也是使用TD方法进行更新:

TD target:

y=r+\gamma Q(s^{\prime},\mu(s^{\prime}|\theta^\mu)|\theta^Q)

critic网络的损失函数:

L(\theta^Q)=\mathbb{E}_{(s,a,r,s^{\prime})}\left[\left(Q(s,a|\theta^Q)-y\right)^2\right]

Actor 更新 (确定性策略网络)

Actor 目标是最大化 Critic 的 Q 值

J(\theta^\mu)=\mathbb{E}_{s\sim\mathcal{\eta }}\left[Q(s,\mu(s|\theta^\mu)|\theta^Q)\right]

梯度更新公式如下:

\nabla_{\theta^\mu}J\approx\mathbb{E}_{s\sim\mathcal{\eta }}\left[\nabla_aQ(s,a|\theta^Q)|_{a=\mu(s)}\cdot\nabla_{\theta^\mu}\mu(s|\theta^\mu)\right]

其中\eta是状态分布。

4、为什么DPG是off-policy的

可以看到,在随机梯度策略(REINFORCE / A2C / PPO)下,因为策略是随机的,所以梯度的期望受到动作概率分布的影响。

\nabla_\theta J(\theta)=\mathbb{E}_{s\sim d^\pi,a\sim\pi_\theta} \begin{bmatrix} \nabla_\theta\log\pi_\theta(a|s)Q^\pi(s,a) \end{bmatrix}

要想让其是off-policy的,就必须引入重要性采样来修正分布差异。

而因为DPG是确定性策略梯度,所以策略梯度公式为:

\nabla_{\theta^\mu}J\approx\mathbb{E}_{s\sim\mathcal{\eta }}\left[\nabla_aQ(s,a|\theta^Q)|_{a=\mu(s)}\cdot\nabla_{\theta^\mu}\mu(s|\theta^\mu)\right]

可以看到,与A2C等on-policy的公式不同的地方在于,DPG的策略梯度公式的期望只依赖于状态分布\eta,与动作的概率\pi_\theta分布无关

5、行为策略的选择

        因为DPG的策略是确定性策略,如果直接用确定性动作,智能体会一直重复同样的动作,从而无法探索新的状态。

        常用的方法是在行为策略上选择一些探索性策略来打破局部最优,可以选择通过给\mu添加一个噪声。

a=\mu _\theta(s)+N(0,\sigma^2)

相较于“ε-greedy”策略,这种方法更适合连续动作空间。

一、DDPG(Deep Deterministic Policy Gradient)

1、概述

在工程上更多的选择使用 DDPG(Deep Deterministic Policy Gradient),其将 DPG的理论基础扩展到深度网络环境。

其Critic网络的更新过程与DQN有点类似,其中有一点不同的是DQN在更新目标网络的时候采用的是硬更新(直接复制主网络的参数),而DDPG采取的是软更新

\theta^{\prime}\leftarrow\tau\theta+(1-\tau)\theta^{\prime}

软更新实际上就是一个加权平均的过程。

其中:

  • \theta:主网络的参数

  • \theta':目标网络的参数

  • \tau \in (0,1):软更新系数,如果τ=1则为硬更新。

2、代码

DDPG类

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU()
        )
        self.out = nn.Linear(128, action_dim)
        self.max_action = max_action

    def forward(self, state):
        a = self.out(self.fc(state))
        return torch.tanh(a) * self.max_action  

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim + action_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        return self.fc(x)


class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)

    def __len__(self):
        return len(self.buffer)

class DDPG:
    def __init__(self, state_dim, action_dim, max_action, gamma=0.99, lr=1e-3, tau=0.005, buffer_size=100000):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.gamma = gamma
        self.tau = tau
        self.max_action = max_action

        # Actor & Critic 网络
        self.actor = Actor(state_dim, action_dim, max_action).to(self.device)
        self.actor_target = Actor(state_dim, action_dim, max_action).to(self.device)
        self.actor_target.load_state_dict(self.actor.state_dict())

        self.critic = Critic(state_dim, action_dim).to(self.device)
        self.critic_target = Critic(state_dim, action_dim).to(self.device)
        self.critic_target.load_state_dict(self.critic.state_dict())

        # 优化器
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)

        # Replay Buffer
        self.replay_buffer = ReplayBuffer(capacity=buffer_size)

    # (选择动作 + 探索噪声)
    def choose_action(self, state, noise_scale=0.1):
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        action = self.actor(state).cpu().data.numpy().flatten()
        action += noise_scale * np.random.randn(len(action))
        return np.clip(action, -self.max_action, self.max_action)

    def store_transition(self, state, action, reward, next_state, done):
        self.replay_buffer.push(state, action, reward, next_state, done)

    # 软更新 target 网络
    def soft_update(self, net, net_target):
        for param, target_param in zip(net.parameters(), net_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

    # 更新 Actor 和 Critic
    def update(self, batch_size=64):
        if len(self.replay_buffer) < batch_size:
            return

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.FloatTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

        # Critic 更新
        with torch.no_grad():
            next_actions = self.actor_target(next_states)
            target_Q = self.critic_target(next_states, next_actions)
            y = rewards + self.gamma * (1 - dones) * target_Q

        current_Q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_Q, y)

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        #Actor 更新
        actor_loss = -self.critic(states, self.actor(states)).mean()

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        #更新 Target 网络
        self.soft_update(self.actor, self.actor_target)
        self.soft_update(self.critic, self.critic_target)

Openai Gym测试

import gymnasium as gym

env = gym.make("Pendulum-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = env.action_space.high[0]

agent = DDPG(state_dim, action_dim, max_action)
num_episodes = 1000
batch_size = 64

for episode in range(num_episodes):
    state = env.reset()[0]
    done = False
    total_reward = 0

    while not done:
        action = agent.choose_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        agent.store_transition(state, action, reward, next_state, done)
        agent.update(batch_size)

        state = next_state
        total_reward += reward

    if (episode + 1) % 100 == 0:
        print(f"Episode {episode+1}/{num_episodes}, Total Reward: {total_reward:.2f}")

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐