A3C 原理

Asynchronous Advantage Actor-Critic

智能体的用于学习的数据,需要智能体和环境不断进行交互。和一般有监督学习的先比,数据数量太少了。

如果我们有多个智能体和环境进行互动,那么每个智能体都能产出数据,这些数据就可以一起给模型进行学习了。

在这里插入图片描述

其实 Global network 和 worker 都是一模一样的 AC结构网络。

全局网络 并不直接参加和环境的互动,工人与环境 有直接的互动,并把学习到的东西,汇报给全局网络。

在A3C中,worker 不仅要和环境互动,产生数据,而且要自己从这些数据里面学习到 "心得”。这里的所谓心得,其实就是计算出来的梯度:

需要强调的是,worker 向全局网络汇总的是梯度,而不是自己探索出来的数据。

在这一点上,很容易和后面要学习的 DPPO 混淆。DPPO 和 A3C一样,也是一个分布式的架构,但work自己并不学习,而是提交数据让全局网络学习。

worker 向全局网络汇总梯度之后,全局网络会把当前学习到的最新版本的参数,直接给 worker.

worker 按照最新的网络继续跟环境做互动。互动后,再把梯度提交,获取新的参数…如此循环。

Pendulum-v0项目

在这里插入图片描述

图中的杆围绕A 点转动,我们的任务是,让智能体学会给杆子施加一定的“力”,让杆子立起来。

这里的“力” 用了带箭头的圆环表示,圆环越大,表示用的“力”越大。

这里的“力” 之所以用双引号括起来,是这个游戏并不完全遵从我们现实世界的物埋规则。它只是一个游戏而已。正如我们之前说到,这个“力” 并不是开关,是有大小之别,因此,这是一个连续型控制的问题。

我们主要关心三要素: state, action, reward

我们输入的 state 特征有三个:
在这里插入图片描述

action 是[-2.0,2.0]范围内的数值

奖励的计算:-(theta^2 + 0.1 theta_dt^2 + 0.001 action^2)

由此,我们可以知道,reward 的最小值是-16.273,最大值是 0。获得最大值时,就是杆子正正树立的时候。我们的任务就调整输入的“力”的大小,让杆子一直树立起来。

我们把网络分为三个部分:

  1. 输入和计算,主要是用于输入和计算。
  2. 计算mu, 也就是正态分布的均值,也就是最有可能取得 action 的地方。我们用 tanh激活函数,tanh 激活函数函数长这样子,取值范围[-1.0, 1.0],也就是说,无论l1层输出的数值是多少,a层的 tanh 激活函数,也会把取值折射到[-1.0, 1.0]之间。但由于我们这个环境,动作的取值是[-2.0,2.0],所以我们再加lambda 层,把范围映射到[-2.0,2.01]中。
  3. 计算方差 sigma, 我们用 softplus激活函数。
  4. 最后我们模型的输出,需要把 mu 和 sigma 一起输出。
    在这里插入图片描述

怎么把mu 和 sigma 变成我们需要的动作呢?

我们需要把 mu 和 sigma 代表的正态分布先还原出来,然后在这个分布下再进行抽样。

  1. 这个分布其实就是我们的策略 pi
  2. 我们在 pi策略下,随机抽样出一个动作a
  3. 我们需要对a的取值范围进行裁剪,因为超过取值范围的数值输入到环境,可能会造成其他问题。这里我们用clip 函数,让a保持在[-2,2]之间。如果大于2,则返回2;如果小于-2,则返回-2。

A3C的代码实现

"""
Asynchronous Advantage Actor Critic (A3C) + RNN with continuous action space, Reinforcement Learning.

The Pendulum example.

View more on my tutorial page: https://morvanzhou.github.io/tutorials/

Using:
tensorflow 1.8.0
gym 0.10.5
"""
import cv2
import multiprocessing
import threading
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

import numpy as np
import gym
import os
import shutil
import matplotlib.pyplot as plt

GAME = 'Pendulum-v1'
OUTPUT_GRAPH = True
LOG_DIR = './log'
N_WORKERS = multiprocessing.cpu_count()
MAX_EP_STEP = 200
MAX_GLOBAL_EP = 1500
GLOBAL_NET_SCOPE = 'Global_Net'
UPDATE_GLOBAL_ITER = 5
GAMMA = 0.9
ENTROPY_BETA = 0.01
LR_A = 0.0001    # learning rate for actor
LR_C = 0.001    # learning rate for critic
GLOBAL_RUNNING_R = []
GLOBAL_EP = 0

env = gym.make(GAME, render_mode='human')

N_S = env.observation_space.shape[0]
N_A = env.action_space.shape[0]
A_BOUND = [env.action_space.low, env.action_space.high]


class ACNet(object):
    def __init__(self, scope, globalAC=None):
        if scope == GLOBAL_NET_SCOPE:   # get global network
            with tf.variable_scope(scope):
                self.s = tf.placeholder(tf.float32, [None, N_S], 'S')
                self.a_params, self.c_params = self._build_net(scope)[-2:]
        else:   # local net, calculate losses
            with tf.variable_scope(scope):
                self.s = tf.placeholder(tf.float32, [None, N_S], 'S')
                self.a_his = tf.placeholder(tf.float32, [None, N_A], 'A')
                self.v_target = tf.placeholder(tf.float32, [None, 1], 'Vtarget')

                mu, sigma, self.v, self.a_params, self.c_params = self._build_net(scope)

                td = tf.subtract(self.v_target, self.v, name='TD_error')
                with tf.name_scope('c_loss'):
                    self.c_loss = tf.reduce_mean(tf.square(td))

                with tf.name_scope('wrap_a_out'):
                    mu, sigma = mu * A_BOUND[1], sigma + 1e-4

                normal_dist = tf.distributions.Normal(mu, sigma)

                with tf.name_scope('a_loss'):
                    log_prob = normal_dist.log_prob(self.a_his)
                    exp_v = log_prob * tf.stop_gradient(td)
                    entropy = normal_dist.entropy()  # encourage exploration
                    self.exp_v = ENTROPY_BETA * entropy + exp_v
                    self.a_loss = tf.reduce_mean(-self.exp_v)

                with tf.name_scope('choose_a'):  # use local params to choose action
                    self.A = tf.clip_by_value(tf.squeeze(normal_dist.sample(1), axis=[0, 1]), A_BOUND[0], A_BOUND[1])
                with tf.name_scope('local_grad'):
                    self.a_grads = tf.gradients(self.a_loss, self.a_params)
                    self.c_grads = tf.gradients(self.c_loss, self.c_params)

            with tf.name_scope('sync'):
                with tf.name_scope('pull'):
                    self.pull_a_params_op = [l_p.assign(g_p) for l_p, g_p in zip(self.a_params, globalAC.a_params)]
                    self.pull_c_params_op = [l_p.assign(g_p) for l_p, g_p in zip(self.c_params, globalAC.c_params)]
                with tf.name_scope('push'):
                    self.update_a_op = OPT_A.apply_gradients(zip(self.a_grads, globalAC.a_params))
                    self.update_c_op = OPT_C.apply_gradients(zip(self.c_grads, globalAC.c_params))

    def _build_net(self, scope):
        w_init = tf.random_normal_initializer(0., .1)
        with tf.variable_scope('critic'):   # only critic controls the rnn update
            cell_size = 64
            s = tf.expand_dims(self.s, axis=1,
                               name='timely_input')  # [time_step, feature] => [time_step, batch, feature]
            rnn_cell = tf.nn.rnn_cell.BasicRNNCell(cell_size)
            self.init_state = rnn_cell.zero_state(batch_size=1, dtype=tf.float32)
            outputs, self.final_state = tf.nn.dynamic_rnn(
                cell=rnn_cell, inputs=s, initial_state=self.init_state, time_major=True)
            cell_out = tf.reshape(outputs, [-1, cell_size], name='flatten_rnn_outputs')  # joined state representation
            l_c = tf.layers.dense(cell_out, 50, tf.nn.relu6, kernel_initializer=w_init, name='lc')
            v = tf.layers.dense(l_c, 1, kernel_initializer=w_init, name='v')  # state value

        with tf.variable_scope('actor'):  # state representation is based on critic
            l_a = tf.layers.dense(cell_out, 80, tf.nn.relu6, kernel_initializer=w_init, name='la')
            mu = tf.layers.dense(l_a, N_A, tf.nn.tanh, kernel_initializer=w_init, name='mu')
            sigma = tf.layers.dense(l_a, N_A, tf.nn.softplus, kernel_initializer=w_init, name='sigma')
        a_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope + '/actor')
        c_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope + '/critic')
        return mu, sigma, v, a_params, c_params

    def update_global(self, feed_dict):  # run by a local
        SESS.run([self.update_a_op, self.update_c_op], feed_dict)  # local grads applies to global net

    def pull_global(self):  # run by a local
        SESS.run([self.pull_a_params_op, self.pull_c_params_op])

    def choose_action(self, s, cell_state):  # run by a local
        s = s[np.newaxis, :]
        a, cell_state = SESS.run([self.A, self.final_state], {self.s: s, self.init_state: cell_state})
        return a, cell_state


class Worker(object):
    def __init__(self, name, globalAC):
        self.name = name

        if self.name == 'W_0':
            self.env = gym.make(GAME, render_mode='human')
        else:
            self.env = gym.make(GAME).unwrapped
        # self.env = gym.make(GAME).unwrapped

        self.AC = ACNet(name, globalAC)

    def work(self):
        global GLOBAL_RUNNING_R, GLOBAL_EP
        total_step = 1
        buffer_s, buffer_a, buffer_r = [], [], []
        while not COORD.should_stop() and GLOBAL_EP < MAX_GLOBAL_EP:
            s = self.env.reset()[0]
            ep_r = 0
            rnn_state = SESS.run(self.AC.init_state)    # zero rnn state at beginning
            keep_state = rnn_state.copy()       # keep rnn state for updating global net
            for ep_t in range(MAX_EP_STEP):

                a, rnn_state_ = self.AC.choose_action(s, rnn_state)  # get the action and next rnn state

                s_, r, done, info, _ = self.env.step(a)


                done = True if ep_t == MAX_EP_STEP - 1 else False

                ep_r += 0 if r != r else r
                buffer_s.append(s)
                buffer_a.append(a)
                buffer_r.append((r+8)/8)    # normalize

                if total_step % UPDATE_GLOBAL_ITER == 0 or done:   # update global and assign to local net
                    if done:
                        v_s_ = 0   # terminal
                    else:
                        v_s_ = SESS.run(self.AC.v, {self.AC.s: s_[np.newaxis, :], self.AC.init_state: rnn_state_})[0, 0]
                    buffer_v_target = []
                    for r in buffer_r[::-1]:    # reverse buffer r
                        v_s_ = r + GAMMA * v_s_
                        buffer_v_target.append(v_s_)
                    buffer_v_target.reverse()

                    buffer_s, buffer_a, buffer_v_target = np.vstack(buffer_s), np.vstack(buffer_a), np.vstack(buffer_v_target)

                    feed_dict = {
                        self.AC.s: buffer_s,
                        self.AC.a_his: buffer_a,
                        self.AC.v_target: buffer_v_target,
                        self.AC.init_state: keep_state,
                    }

                    self.AC.update_global(feed_dict)
                    buffer_s, buffer_a, buffer_r = [], [], []
                    self.AC.pull_global()
                    keep_state = rnn_state_.copy()   # replace the keep_state as the new initial rnn state_

                s = s_
                rnn_state = rnn_state_  # renew rnn state
                total_step += 1

                if done:
                    if len(GLOBAL_RUNNING_R) == 0:  # record running episode reward
                        GLOBAL_RUNNING_R.append(ep_r)
                    else:
                        GLOBAL_RUNNING_R.append(0.9 * GLOBAL_RUNNING_R[-1] + 0.1 * ep_r)
                    try:
                        print(
                            self.name,
                            "Ep:", GLOBAL_EP,
                            "| Ep_r: " , GLOBAL_RUNNING_R[-1],
                        )
                    except:
                        pass
                    GLOBAL_EP += 1
                    break

if __name__ == "__main__":
    SESS = tf.Session()

    with tf.device("/cpu:0"):
        OPT_A = tf.train.RMSPropOptimizer(LR_A, name='RMSPropA')
        OPT_C = tf.train.RMSPropOptimizer(LR_C, name='RMSPropC')
        GLOBAL_AC = ACNet(GLOBAL_NET_SCOPE)  # we only need its params
        workers = []
        # Create worker
        for i in range(N_WORKERS):
            i_name = 'W_%i' % i   # worker name
            o=0
            workers.append(Worker(i_name, GLOBAL_AC))

    COORD = tf.train.Coordinator()
    SESS.run(tf.global_variables_initializer())

    if OUTPUT_GRAPH:
        if os.path.exists(LOG_DIR):
            shutil.rmtree(LOG_DIR)
        tf.summary.FileWriter(LOG_DIR, SESS.graph)

    worker_threads = []

    for worker in workers[::-1]:
        if worker.name == 'W_0':
            worker.work()
        else:
            job = lambda: worker.work()
            t = threading.Thread(target=job)
            t.start()
            worker_threads.append(t)
    COORD.join(worker_threads)

    plt.plot(np.arange(len(GLOBAL_RUNNING_R)), GLOBAL_RUNNING_R)
    plt.xlabel('step')
    plt.ylabel('Total moving reward')
    plt.show()


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐