1.依赖导入

首先导入项目所需的第三方库和自定义模块,涵盖数值计算、深度学习、日志记录、可视化等功能。

import torch
import numpy as np
from tensorboardX import SummaryWriter
import matplotlib.pyplot as plt
import time
import random
import sys,os
curr_path = os.path.dirname(__file__)
parent_path = os.path.dirname(curr_path)
sys.path.append(parent_path)
from python_enviroment.pyenv_area import TF_threat_area

2.配置类 Config

Config 类是整个项目的 “参数中心”,集中管理场景设置、网络参数、训练参数等所有可配置项,避免参数散落在代码中,便于统一修改和调试。

场景设置:
class Config:
    num_agents = 3         # 智能体数量
    num_adversaries =2     # 敌方数量
    good_policy = "TD3"  # 我方策略方法 choices=["TD3","MASAC"]
    adv_policy = "PN"   # 敌方策略方法 choices=[ "ddpg", "PN"]
网络参数设置:
    StartLearnStep = 120000 #开始学习步数
    steps_per_update=50         # 网络更新步数
    Batchsize = 128        # 批次样本数
    #----td3-----
    init_noise_scale=0.8
    final_noise_scale=0.03
    #----sac-----
    gamma_sac=0.99           #SAC折扣因子(权衡当前 / 未来奖励)
    reward_scale=1
    repeat_times=2 ** 0
    target_step=2 ** 10
    max_memo=2 ** 17
    soft_update_tau=2 ** -8
    batch_size_sac=256
    net_dim=2 ** 8
预测参数设置:
    pred_trajectory=False        # 是否预测
    trajectory_pred_size=2      # 预测样本间隔
    his_time_step=10            # 历史轨迹拍数(用于预测输入)
    pre_traj_step=1             # 预测拍数
    batch_size_predict=64       # 预测网络批次样本数
meta参数设置:
    meta =True  # False #    # 是否在线优化
    MetaLearnStep =1024       # 开始更新步数--最好大于MATD3中的 Q_batchsize值
    update_step = 3           # 更新步数
打印输出设置:
    save_dir="./policy/"        # 保存网络的地址
    load_dir=''                 # "directory in which training state and model are loaded")
    log_dir="./log/"             # 保存日志的地址
    save_rate=100               # 保存的间隔
训练设置:
    Episodes = 450  # 训练回合
    EpisodeLength = 5000  # 回合max步数
    display = True # False #是否进入测试 / 可视化模式(True为测试,False为训练)
    Save_origin=True      # 保存原始数据
    State_Noise=False     # 引入噪声
    threat_area=True      # 是否是静态障碍
    max_mark=4            # meta多任务训练
    use_env=False         # 是否联合unity
    seed=0                # 随机种子
    use_RND=False
    Err_U = True # 故障约束
    adv_restore=True
    good_restore=True
    display_adv_policy=True

3.工具函数

固定随机种子,确保实验结果的可重复性(避免因随机因素导致结果波动):

def setup_seed(seed=0):
    torch.manual_seed(seed)  # PyTorch CPU随机种子
    torch.cuda.manual_seed_all(seed)  # PyTorch GPU随机种子(多卡)
    np.random.seed(seed)  # NumPy随机种子
    random.seed(seed)  # Python原生随机种子
    torch.backends.cudnn.deterministic = True  # 禁用CuDNN自动优化(确保确定性)
    torch.backends.cudnn.benchmark = False  # 禁用CuDNN基准测试(避免随机性)

4.核心逻辑(训练 / 元学习 / 可视化)核心逻辑(训练 / 元学习 / 可视化)

(1) train(常规训练)

train 函数实现多智能体强化学习的常规训练流程,核心是 “经验收集→网络更新→结果记录” 的循环,适用于固定场景下的策略学习。

def train(config):
    setup_seed()
1) 初始化操作

日志与路径初始化:用当前时间戳创建日志目录(SummaryWriter)和模型保存目录,避免文件覆盖。

    nowt = time.strftime("%Y-%m-%d", time.localtime())  # 对时间进行格式化
    train_log_dir = config.log_dir + '/' + nowt + '_train_logs'
    # if not os.path.isdir(train_log_dir):
    #     os.makedirs(str(train_log_dir))
    train_logger = SummaryWriter(str(train_log_dir)+'_yl')  # 建立一个log

训练结果保存:

    file = './policy/policy' + str(nowt)
    isExists = os.path.exists(file)
    if not isExists:
        os.makedirs(file)
        print(file + ' 创建成功')
    else:
        print(file + ' 目录已存在') # 如果目录存在则不创建,并提示目录已存在
    filename = file + '/' + str(config.Episodes)

用unity environment#T0D0:未引入包

    if config.use_env:
        env = Unity_env()
        env_info = env.get_env_info()
        #print(env_info)

环境与动作 / 状态空间定义:python Environment设置

加载自定义环境 TF_threat_area(含威胁区域的场景);
定义动作空间 action_n(我方 3 维:攻角 / 倾侧 / 力;敌方 2 维:横向 / 纵向过载);
定义状态空间 obs_shape_n(我方 24 维:自身状态 6 维 + 敌方 1 状态 6 维 + 威胁半径 1 维 + 敌方 2 状态 6 维 + 威胁半径 1 维 + 目标点 3 维 + 故障信息 1 维;敌方 12 维:自身状态 6 维 + 我方状态 6 维)。

    if config.threat_area:
        action_n = [[0,0,0], [0, 0], [0, 0]] #动作空间我3:攻角,倾侧,力,adv2:横向过载、纵向过载,adv 2
        act_shape_n = [3, 2, 2]
        obs_shape_n = [23, 12, 12]           #状态空间:23=我6+adv6+半径+adv6+半径+目标点3          adv12:我6+adv6
        if config.Err_U:
            obs_shape_n = [24, 12, 12]        #状态空间:24=我6+adv6+半径+adv6+半径+目标点3+u          adv12:我6+adv6

智能体网络构建:

我方智能体:根据 good_policy 加载 TD3/MASAC 网络(从 trainers 目录导入预定义的算法类);
敌方智能体:根据 adv_policy 加载 PN/DDPG 网络(PN 为传统控制,无需训练;DDPG 为强化学习,需训练)。

    trainers = []
    for i in range(config.num_agents - config.num_adversaries):
        if config.good_policy == 'TD3':
            if config.use_RND:
                from trainers.MATD3.MATD3 import MATD3RND
                trainers.append(MATD3RND(state_dim=obs_shape_n[i], action_dim=act_shape_n[i],agent_index=i,
                                        num_agents=config.num_agents - config.num_adversaries,
                                        arglist=config))
            else:
                from trainers.MATD3.MATD3 import MATD3
                trainers.append(MATD3(state_dim=obs_shape_n[i], action_dim=act_shape_n[i],agent_index=i,
                                        num_agents=config.num_agents - config.num_adversaries,
                                        arglist=config))
    for i in range(config.num_agents - config.num_adversaries, config.num_agents):
        if config.adv_policy == "ddpg":
            from trainers.DDPG.ddpg import ddpg_agent
            trainers.append(ddpg_agent(obs_shape_n[1], act_shape_n[1], arglist=config, Parallel=Parallel))
            ##TODO:
        if config.adv_policy == 'PN':#比例导引
            from trainers.PN.PNN import PNN
            trainers.append(PNN())

计算参数:

    step_total = 0       # 所有的步数,用于开始学习
    done_episodes_50 = 0  # 计算成功率
    collect_data = {"tf_reward": [], "done": [],"done_rate": [], "episode_steps": []} # 收集信息
    ori_data =np.zeros((1,obs_shape_n[0])) # 保存原始数据
    mark = 0                # 定场景
    his_traj_n = None
    his_done_rate = 0
    # print("StartLearnStep: ",config.StartLearnStep)
2)训练主循环

遍历 Episodes 个回合,每个回合的流程如下:

环境重置:调用 py_env.reset(mark=mark) 初始化场景(mark 用于区分不同任务场景),获取初始状态 obs_n。
步数与数据初始化:重置当前回合步数 step_episode、奖励列表 reward_coll、原始数据 ori_data(用于保存状态轨迹)。

    for episode in range(config.Episodes):  # 回合遍历
        py_env = TF_threat_area(num_agents=config.num_agents, random_num=config.num_adversaries,threat_area=config.threat_area,Err_U=config.Err_U)
        step_episode = 0                    # 每回合的步数,用于reward
        reward_coll = []
        # print("episode: {}".format(episode))
        obs_n = py_env.reset(mark=mark)                  # 初始化环境(1,25)
        if config.use_env:#TODO:未引入包-------------------
            env.reset()  # ndarray[1,:]
            unity_obs = obs_n
            a= env.step(unity_obs)
        if config.Save_origin:
            ori_data = np.append(ori_data,obs_n,axis=0)  # 初始化原始数据(1,25)
        if config.State_Noise:
            obs_n = py_env.noise(obs_n)                  # 加噪声-->[1.18]
        obs_std, reward_std = py_env.max_min_standard(obs_n,0) # 最大最小规范化
        while True:
            step_total += 1
            step_episode += 1
3)步数级循环(每步交互)

每个回合内遍历 EpisodeLength 步,核心是 “智能体决策→环境交互→经验存储→网络更新”:

智能体决策:

若总步数 step_total ≤ StartLearnStep:我方智能体随机探索(均匀分布采样动作);
若总步数超过阈值:我方智能体通过训练好的网络输出动作(并裁剪到 [-1,1] 范围,避免动作过大);
敌方智能体:若为静态威胁区域,动作固定为 [0,0](不移动);若为动态区域,通过 PN/DDPG 输出动作。

            for i in range(config.num_agents - config.num_adversaries):
                if step_total <= config.StartLearnStep:
                    a = np.random.uniform(-1, 1, size=(act_shape_n[i],))
                else:
                    # a = trainers[i].select_action(obs_std) + np.random.normal(size=(3,))
                    # a,debug= trainers[i].act(obs_std)
                    a = trainers[i].select_action(obs_std)
                    a = a.clip(-1, 1)
                action_n[i] = a.reshape(-1)

            for i in range(config.num_agents -config.num_adversaries, config.num_agents):
                if config.threat_area: #静态障碍
                    action_n[i] = [0, 0]
                else: #动态障碍
                    if i == 1:
                        if arglist.adv_policy == 'PN':
                            action_n[i] = trainers[i].choose_action(obs_n[:, :12])
                        #TODO:ddpg
                    else:
                        if arglist.adv_policy == 'PN':
                            action_n[i] = trainers[i].choose_action(np.hstack((obs_n[:, :6], obs_n[:, 14:20])))
                        # TODO:ddpg

环境交互:调用 py_env.step(action_n, step_episode) 执行动作,获取下一状态 obs_n_、即时奖励 reward、结束标志 done、最小距离(dis_2D_min1 为我方与敌方 1 的距离,dis_2D_p 为我方与目标点的距离)。

            obs_n_, reward, done, dis_2D_min1, dis_2D_min2,dis_2D_p = py_env.step(action_n,step_episode) #状态转移

            if config.use_env:#TODO:未引入包-------------------
                a = env.step(obs_n_)
            if config.Save_origin:
                ori_data = np.append(ori_data,obs_n_,axis=0) #记录状态
            reward_coll.append(reward)                    #记录reward
            if config.State_Noise:
                obs_n_ = py_env.noise(obs_n_)  # 加噪声
            if done:
                obs_n_ = obs_n
                train_logger.add_scalar('reward_end', reward, episode)
                reward_mean = sum(reward_coll)/step_episode
                train_logger.add_scalar('reward_coll', reward_mean, episode)
                train_logger.add_scalar('dis_2D_min1',  dis_2D_min1, episode)
                train_logger.add_scalar('dis_2D_min2',  dis_2D_min2, episode)
                train_logger.add_scalar('dis_p', dis_2D_p, episode)
                collect_data["tf_reward"].append(reward_mean)
                # collect_data["adv_reward"].append(reward)
                collect_data["episode_steps"].append(step_episode)

数据预处理:
状态规范化:调用 py_env.max_min_standard 对状态和奖励做 “最大最小归一化”,避免数值范围差异影响网络训练;

obs_std_,reward_std_ = py_env.max_min_standard(obs_n_,reward) # 规范化

原始数据保存:若 Save_origin=True,将下一状态 obs_n_ 追加到 ori_data,用于后续分析。

经验存储:

将 “当前状态→动作→奖励→下一状态→结束标志” 存入我方智能体的经验回放池(add_experience),供后续采样训练。

            for i, agent in enumerate(trainers):
                done_ = 0 if done == 0 else 1  # 此处改一下合理点 
                if i in range(config.num_agents - config.num_adversaries):
                    if config.good_policy in ["maddpg", "TD3", "MASAC"]:
                        agent.add_experience(obs_std, action_n[0], reward_std_, obs_std_, done_,mark,step_episode)
                elif  i in range(config.num_agents - config.num_adversaries, config.num_agents):
                    if config.threat_area:
                        continue

网络更新:
若总步数 step_total > StartLearnStep:我方智能体调用 updata 函数,从经验池采样批次数据更新 TD3/MASAC 网络;

            loss_n = []
            if step_total == config.StartLearnStep:
                print('startlearn')
            if step_total >config.StartLearnStep:
                loss = trainers[0].updata(episode, 0, trainers[0:config.num_agents - config.num_adversaries],
                                    logger=train_logger)
                loss_n.append(loss)

若敌方策略为 DDPG:调用 trainers[1].learn(done) 更新敌方网络,并记录损失到日志。

                if config.adv_policy == "ddpg":
                    trainers[1].learn(done)
                    trainers[2].learn(done)
                    train_logger.add_scalar('M1actor_loss', trainers[1].a_loss, step_total)
                    train_logger.add_scalar('M1critic_loss',trainers[1].q_loss, step_total)
                    train_logger.add_scalar('M2actor_loss',trainers[2].a_loss, step_total)
                    train_logger.add_scalar('M2critic_loss', trainers[2].q_loss, step_total)
4)回合结束处理

模型保存:每 50 个回合保存一次训练数据(np.save),若当前成功率高于历史最高,保存我方智能体的网络参数(trainers[0].save)。

 if done:
                # print("step_episode: {}".format(step_episode), reward, done)
                if done==1:
                    # print("step_episode: {}".format(step_episode), reward, done)
                    done_episodes_50 += 1
                if episode % 50 == 0:
                    train_logger.add_scalar('done_rate', done_episodes_50/50, episode / 50)
                    collect_data["done_rate"].append(done_episodes_50 / 50)
                    # print(done_episodes_50 / 50)
                    if done_episodes_50 / 50> his_done_rate:
                        trainers[0].save(filename)
                        his_done_rate =done_episodes_50 / 50
                    done_episodes_50= 0
                break
            obs_n = obs_n_
            obs_std = obs_std_
        if episode % 50 == 0:             #每50回合保存数据
            np.save(filename + "_collect_data", collect_data)

结果记录:若 done=True(回合结束,如到达目标点或碰撞),计算当前回合的平均奖励,将奖励、步数、成功率存入 collect_data,并通过 SummaryWriter 记录到 TensorBoard。

    trainers[0].save(filename+"_final_")
    # print('game over')
    if config.Save_origin:
        np.save(filename + "ori_data.npy", ori_data) # 保存原始数据
    train_logger.export_scalars_to_json('./log/_train_logs/summary.json')
    train_logger.close()
(2) 元学习训练函数 train_meta

train_meta 是在常规训练基础上的在线优化模块,核心是 “加载预训练模型→适应新场景→快速更新策略”,适用于场景动态变化的情况(如威胁区域位置改变)。

与 train 的核心差异的在于:

模型初始化:加载常规训练好的模型(trainers[0].load(filename),filename 为预训练模型路径),而非从头训练;
网络更新触发条件:总步数 step_total > MetaLearnStep 且每 10 步更新一次,且仅更新 T=7 次(快速适应新场景,避免过拟合);
更新函数:调用 updata_meta 而非 updata(元学习专用更新逻辑,通常采用 “快速梯度下降”,用少量数据快速调整参数);
回合数:仅训练 1 个回合(元学习注重 “单场景快速适应”,而非多回合积累)。

def train_meta(config):
    print('-------------meta_begin------------')
    setup_seed(0)
    time_all = []  # 计算样本数据利用周期
    time_start = time.time()
    # -------------------设训练日志-------------------
    nowt = time.strftime("%Y-%m-%d", time.localtime())  # 对时间进行格式化
    train_log_dir = config.log_dir + '/' + nowt + '_train_logs'

    train_logger = SummaryWriter(str(train_log_dir))  # 建立一个log
    # -------------------测试网络参数选择-------------------
    filename = './policy/policy2024-01-18/450'
    # -------------------用Unity Environment#TODO:未引入包-------------------
    if config.use_env:
        env = Unity_env()
        env_info = env.get_env_info()
        # print(env_info)
    # --------------------python Environment设置-----------
    if config.threat_area:
        action_n = [[0, 0, 0], [0, 0], [0, 0]]  # 动作空间我3:攻角,倾侧,力,adv2:横向过载、纵向过载,adv 2
        act_shape_n = [3, 2, 2]
        obs_shape_n = [23, 12, 12]  # 状态空间:25=我6+adv6+半径+adv6+半径+目标点3          adv12:我6+adv6
        if config.Err_U:
            obs_shape_n = [24, 12, 12]
    # -------------------构建网络---------------------------
    trainers = []
    for i in range(config.num_agents - config.num_adversaries):
        if config.good_policy == 'TD3':
            if config.use_RND:
                from trainers.MATD3.MATD3 import MATD3RND
                trainers.append(MATD3RND(state_dim=obs_shape_n[i], action_dim=act_shape_n[i], agent_index=i,
                                         num_agents=config.num_agents - config.num_adversaries,
                                         arglist=config))
            else:
                from trainers.MATD3.MATD3 import MATD3
                trainers.append(MATD3(state_dim=obs_shape_n[i], action_dim=act_shape_n[i], agent_index=i,
                                      num_agents=config.num_agents - config.num_adversaries,
                                      arglist=config))
        if config.good_policy == 'TD3_pred':
            from trainers.TD3.TD3 import TD3
            trainers.append(TD3(obs_shape_n[0], act_shape_n[0], arglist=config))
        if config.good_policy == "MASAC":
            from trainers.MASAC.MASAC import AgentModSAC
            trainers.append(AgentModSAC(net_dim=config.net_dim, state_dim=obs_shape_n[i], agent_index=i,
                                        num_agents=config.num_agents - config.num_adversaries,
                                        action_dim=act_shape_n[i],
                                        arglist=config))
        trainers[0].load(filename)  # 放入参数
    for i in range(config.num_agents - config.num_adversaries, config.num_agents):
        if config.adv_policy == 'PN':
            from trainers.PN.PNN import PNN
            trainers.append(PNN())
    # -------------------计数参数-------------------
    step_total = 0  # 所有的步数,用于开始学习
    done_episodes_50 = 0  # 计算成功率
    collect_data = {"tf_reward": [], "done": [], "done_rate": [], "episode_steps": []}  # 收集信息
    ori_data = np.zeros((1, 24))  # 保存原始数据
    mark = 1  # 定场景
    std_state = np.zeros((1, 25))
    std_a = np.zeros((1, 3))
    dis1 = np.array([0])
    dis2 = np.array([0])
    disp = np.array([0])
    q_coll = []
    T=7
(3) 测试与可视化函数 display

display 是训练结果的验证与可视化模块,核心是 “加载训练好的模型→多回合测试→轨迹与区域可视化”,用于直观评估策略性能。

与 train 的核心差异在于:

模型加载:加载元学习优化后的模型(trainers[0].load(filename+"2024-03-22meta")),仅用于推理,不更新网络;
测试回合数:共测试 51 个回合(足够统计成功率);
可视化逻辑:
3D 轨迹图:绘制我方智能体的飞行轨迹(红色)、敌方位置(蓝色球体)、目标点(绿色星标)、威胁区域(彩色曲面),直观展示避障效果;
2D 经纬度图:绘制我方轨迹(折线)、目标点(绿色星标)、初始威胁区域(蓝色圆),评估导航路径是否合理;
成功率统计:每 50 个回合计算一次成功率(done_episodes_50 / 50),并打印结果,量化策略性能。

    for episode in range(1):  # 回合遍历
        py_env = TF_threat_area(num_agents=config.num_agents, random_num=config.num_adversaries,
                                threat_area=config.threat_area)
        step_episode = 0  # 每回合的步数,用于reward
        reward_coll = []
        obs_n = py_env.reset(mark=mark)  # 初始化环境(1,25)
        if config.use_env:  # TODO:未引入包-------------------
            env.reset()  # ndarray[1,:]
            unity_obs = obs_n
            a = env.step(unity_obs)
        if config.Save_origin:
            ori_data = np.append(ori_data, obs_n, axis=0)  # 初始化原始数据(1,25)
        if config.State_Noise:
            obs_n = py_env.noise(obs_n)  # 加噪声-->[1.18]
        obs_std, reward_std = py_env.max_min_standard(obs_n, 0)  # 最大最小规范化
        t = 0
        while True:
            step_total += 1
            step_episode += 1
            for i in range(config.num_agents - config.num_adversaries):
                a = trainers[i].select_action(obs_std)
                action_n[i] = a.reshape(-1)
            for i in range(config.num_agents - config.num_adversaries, config.num_agents):
                if config.threat_area:  # 静态障碍
                    action_n[i] = [0, 0]
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            q_act = trainers[0].q_critic(torch.FloatTensor(obs_std).to(device),
                                         torch.FloatTensor(action_n[0].reshape(1, -1)).to(device))
            q_coll.append(q_act)
            obs_n_, reward, done, dis_2D_min1, dis_2D_min2, dis_2D_p = py_env.step(action_n, step_episode)  # 状态转移
            if config.use_env:  # TODO:未引入包-------------------
                a = env.step(obs_n_)
            if config.Save_origin:
                ori_data = np.append(ori_data, obs_n_, axis=0)  # 记录状态
            reward_coll.append(reward)  # 记录reward
            train_logger.add_scalar('reward', reward, step_total)
            train_logger.add_scalar('action_u1', action_n[0][0], step_total)
            train_logger.add_scalar('action_u2', action_n[0][1], step_total)
            if config.State_Noise:
                obs_n_ = py_env.noise(obs_n_)  # 加噪声
            if done:

                print("step_episode: {}".format(step_episode), reward, done)
                obs_n_ = obs_n
                dis1 = np.append(dis1, dis_2D_min1)
                dis2 = np.append(dis2, dis_2D_min2)
                disp = np.append(disp, dis_2D_p)
                train_logger.add_scalar('reward_end', reward, episode)
                reward_mean = sum(reward_coll) / step_episode
                train_logger.add_scalar('reward_coll', reward_mean, episode)
                train_logger.add_scalar('dis_2D_min1', dis_2D_min1, episode)
                train_logger.add_scalar('dis_2D_min2', dis_2D_min2, episode)
                train_logger.add_scalar('dis_p', dis_2D_p, episode)
                collect_data["tf_reward"].append(reward_mean)
                # collect_data["adv_reward"].append(reward)
                collect_data["episode_steps"].append(step_episode)
            obs_std_, reward_std_ = py_env.max_min_standard(obs_n_, reward)  # 规范化
            # -------------------存新数据-----------------------
            done_ = 0 if done == 0 else 1  # 此处改一下合理点
            if config.good_policy in ["maddpg", "TD3", "MASAC"]:
                trainers[0].add_experience(obs_std, action_n[0], reward_std_, obs_std_, done_, 0, step_episode)

            # ------------------更新参数------------------------
            loss_n = []
            if step_total > config.MetaLearnStep and step_total%10==0 and t<T:
                loss = trainers[0].updata_meta(episode, 0, trainers[0:config.num_agents - config.num_adversaries],
                                               logger=train_logger)
                loss_n.append(loss)
                t+=1
            # ---------------------------记成功率---------------
            if done:
                time_end = time.time()
                time_all.append(time_end - time_start)
                print('样本数据利用周期:',time_all)
                if done == 1:
                    done_episodes_50 += 1
                if episode % 50 == 0:
                    train_logger.add_scalar('done_rate', done_episodes_50 / 50, episode / 50)
                    collect_data["done_rate"].append(done_episodes_50 / 50)
                    # print(done_episodes_50 / 50)
                    done_episodes_50 = 0
                break
            obs_n = obs_n_
            obs_std = obs_std_
        if episode % 50 == 0:  # 每50回合保存数据,第0个回合也保存
            np.save(filename + nowt + 'meta' + "_collect_data", collect_data)
    # -------------------训练结束-------------------
    trainers[0].save(filename + nowt + 'meta')  # 保存网络参数
    # print('game over')
    np.save(filename + nowt + 'meta' + "dis1.npy", dis1)
    np.save(filename + nowt + 'meta' + "dis2.npy", dis2)
    np.save(filename + nowt + 'meta' + "disp.npy", disp)
    if config.Save_origin:
        np.save(filename + nowt + 'meta' + "ori_data.npy", ori_data)  # 保存原始数据
        np.save(filename + nowt + 'meta' + "std_state_data.npy", std_state)
        np.save(filename + nowt + 'meta' + "std_a_data.npy", std_a)
    train_logger.export_scalars_to_json(train_log_dir + 'summary.json')
    train_logger.close()
    print('-------------meta_end-------------')
def display(config):
    print('-------------diaplay_begin-------------')
    setup_seed()
    # -------------------设训练日志-------------------
    nowt = time.strftime("%Y-%m-%d", time.localtime())  # 对时间进行格式化
    train_log_dir = config.log_dir + '/' + nowt + '_train_logs'
    train_logger = SummaryWriter(str(train_log_dir))  # 建立一个log
    # -------------------测试网络参数选择-------------------
    filename = './policy/policy2024-01-18/450'
    # -------------------用Unity Environment#TODO:未引入包-------------------
    if config.use_env:
        env = Unity_env()
        env_info = env.get_env_info()
        # print(env_info)
    # --------------------python Environment设置-----------
    if config.threat_area:
        action_n = [[0, 0, 0], [0, 0], [0, 0]]  # 动作空间我3:攻角,倾侧,力,adv2:横向过载、纵向过载,adv 2
        act_shape_n = [3, 2, 2]
        obs_shape_n = [23, 12, 12]  # 状态空间:25=我6+adv6+半径+adv6+半径+目标点3          adv12:我6+adv6
        if config.Err_U:
            obs_shape_n = [24, 12, 12]
    # -------------------构建网络---------------------------
    trainers = []
    for i in range(config.num_agents - config.num_adversaries):
        if config.good_policy == 'TD3':
            if config.use_RND:
                from trainers.MATD3.MATD3 import MATD3RND
                trainers.append(MATD3RND(state_dim=obs_shape_n[i], action_dim=act_shape_n[i], agent_index=i,
                                         num_agents=config.num_agents - config.num_adversaries,
                                         arglist=config))
            else:
                from trainers.MATD3.MATD3 import MATD3
                trainers.append(MATD3(state_dim=obs_shape_n[i], action_dim=act_shape_n[i], agent_index=i,
                                      num_agents=config.num_agents - config.num_adversaries,
                                      arglist=config))
        if config.good_policy == 'TD3_pred':
            from trainers.TD3.TD3 import TD3
            trainers.append(TD3(obs_shape_n[0], act_shape_n[0], arglist=config))
        if config.good_policy == "MASAC":
            from trainers.MASAC.MASAC import AgentModSAC
            trainers.append(AgentModSAC(net_dim=config.net_dim, state_dim=obs_shape_n[i], agent_index=i,
                                        num_agents=config.num_agents - config.num_adversaries,
                                        action_dim=act_shape_n[i],
                                        arglist=config))
        trainers[0].load(filename+"2024-03-22meta")   #放入参数
        # trainers[0].load(filename)  # 放入参数
    for i in range(config.num_agents - config.num_adversaries, config.num_agents):
        if config.adv_policy == "ddpg":
            from trainers.DDPG.ddpg import ddpg_agent
            trainers.append(ddpg_agent(obs_shape_n[1], act_shape_n[1], arglist=config, Parallel=Parallel))
            ##TODO:
        if config.adv_policy == 'PN':
            from trainers.PN.PNN import PNN
            trainers.append(PNN())
    # -------------------计数参数-------------------
    step_total = 0  # 所有的步数,用于开始学习
    done_episodes_50 = 0  # 计算成功率
    collect_data = {"tf_reward": [], "done": [], "done_rate": [], "episode_steps": []}  # 收集信息
    ori_data = np.zeros((1, 24))  # 保存原始数据
    mark = 1  # 定场景
    std_state = np.zeros((1, 25))
    std_a = np.zeros((1, 3))
    dis1 = np.array([0])
    dis2 = np.array([0])
    disp = np.array([0])
    q_coll = []
    # -------------------测试开始-------------------
    for episode in range(51):  # 回合遍历
        action_collect = np.zeros((1, 3))
        py_env = TF_threat_area(num_agents=config.num_agents, random_num=config.num_adversaries,
                                threat_area=config.threat_area)
        step_episode = 0  # 每回合的步数,用于reward
        reward_coll = []
        obs_n = py_env.reset(mark=mark)  # 初始化环境(1,25)
        if config.use_env:  # TODO:未引入包-------------------
            env.reset()  # ndarray[1,:]
            unity_obs = obs_n
            a = env.step(unity_obs)
        if config.Save_origin:
            ori_data = np.append(ori_data, obs_n, axis=0)  # 初始化原始数据(1,25)
        if config.State_Noise:
            obs_n = py_env.noise(obs_n)  # 加噪声-->[1.18]
        obs_std, reward_std = py_env.max_min_standard(obs_n, 0)  # 最大最小规范化
        while True:
            step_total += 1
            step_episode += 1
            for i in range(config.num_agents - config.num_adversaries):
                a = trainers[i].select_action(obs_std)
                action_collect= np.append(action_collect, a, axis=0)
                action_n[i] = a.reshape(-1)
            for i in range(config.num_agents - config.num_adversaries, config.num_agents):
                if config.threat_area:  # 静态障碍
                    action_n[i] = [0, 0]
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            q_act = trainers[0].q_critic(torch.FloatTensor(obs_std).to(device),
                                         torch.FloatTensor(action_n[0].reshape(1, -1)).to(device))
            q_coll.append(q_act)
            obs_n_, reward, done, dis_2D_min1, dis_2D_min2, dis_2D_p = py_env.step(action_n, step_episode)  # 状态转移
            if config.Save_origin:
                ori_data = np.append(ori_data, obs_n_, axis=0)  # 记录状态
            reward_coll.append(reward)  # 记录reward
            train_logger.add_scalar('reward', reward, step_total)
            train_logger.add_scalar('action_u1', action_n[0][0], step_total)
            train_logger.add_scalar('action_u2', action_n[0][1], step_total)
            if config.State_Noise:
                obs_n_ = py_env.noise(obs_n_)  # 加噪声
            if done:
                obs_n_ = obs_n
                dis1 = np.append(dis1, dis_2D_min1)
                dis2 = np.append(dis2, dis_2D_min2)
                disp = np.append(disp, dis_2D_p)
                train_logger.add_scalar('reward_end', reward, episode)
                reward_mean = sum(reward_coll) / step_episode
                train_logger.add_scalar('reward_coll', reward_mean, episode)
                train_logger.add_scalar('dis_2D_min1', dis_2D_min1, episode)
                train_logger.add_scalar('dis_2D_min2', dis_2D_min2, episode)
                train_logger.add_scalar('dis_p', dis_2D_p, episode)
                collect_data["tf_reward"].append(reward_mean)
                # collect_data["adv_reward"].append(reward)
                collect_data["episode_steps"].append(step_episode)
            obs_std_, reward_std_ = py_env.max_min_standard(obs_n_, reward)  # 规范化

            if done:

                print("step_episode: {}".format(step_episode), reward, done)
                fig = plt.figure(episode)
                ax = fig.add_subplot(projection='3d')
                ax.plot(ori_data[-step_episode:-1, 1], ori_data[-step_episode:-1, 2], ori_data[-step_episode:-1, 0] - py_env.Re, color="r")
                ax.scatter(ori_data[-1, 7], ori_data[-1, 8], ori_data[-1, 6] - py_env.Re, color="b")
                ax.scatter(ori_data[-1, 14], ori_data[-1, 15], ori_data[-1, 13] - py_env.Re, color="b")
                ax.scatter(py_env.target[1], py_env.target[2], py_env.target[0] - py_env.Re, color="g", marker="*",
                            s=100)
                u = np.linspace(0, 2 * np.pi, 50)  # 把圆分按角度为50等分
                h1 = np.linspace(0, 110e3, 20)  # 把高度1均分为20份
                x1 = np.outer(ori_data[-1, 7] + 60e3/ py_env.Re / np.cos(ori_data[-1, 8]) * np.sin(u), np.ones(len(h1)))  # x值重复20次
                z1 = np.outer(ori_data[-1, 8] + 60e3/ py_env.Re * np.cos(u), np.ones(len(h1)))  # y值重复20次
                y1 = np.outer(np.ones(len(u)), h1)  # x,y 对应的高度
                h2 = np.linspace(0, 110e3, 20)  # 把高度1均分为20份
                x2 = np.outer(ori_data[-1, 14] + 80e3/ py_env.Re / np.cos(ori_data[-1, 15])  * np.sin(u), np.ones(len(h2)))  # x值重复20次
                z2 = np.outer(ori_data[-1, 15] + 80e3/ py_env.Re   * np.cos(u), np.ones(len(h2)))  # y值重复20次
                y2 = np.outer(np.ones(len(u)), h2)  # x,y 对应的高度
                ax.plot_surface(x1,z1 ,y1 , cmap=plt.get_cmap('rainbow'), alpha=0.5)
                ax.plot_surface(x2,z2,y2, cmap=plt.get_cmap('rainbow'), alpha=0.5)
                ax.set_xlabel("lon Axis/°")
                ax.set_ylabel("lat Axis/°")
                ax.set_zlabel("Y Axis/m")
                ax.set_title("Mid-TF simulation")
                plt.show()
                u = np.linspace(0, 2 * np.pi, 50)  # 把圆分按角度为50等分
                a1 = ori_data[-step_episode, 12] / py_env.Re / np.cos(ori_data[-step_episode, 8])
                b1 = ori_data[-step_episode, 12] / py_env.Re
                a2 = ori_data[-step_episode, 19] / py_env.Re / np.cos(ori_data[-step_episode, 16])
                b2 = ori_data[-step_episode, 19] / py_env.Re
                x1 = ori_data[-step_episode, 7] + a1 * np.sin(u)  # x值重复20次
                z1 = ori_data[-step_episode, 8] + b1 * np.cos(u)  # y值重复20次
                x2 = ori_data[-step_episode, 14] + a2 * np.sin(u)  # x值重复20次
                z2 = ori_data[-step_episode, 15] + b2 * np.cos(u)  # y值重复20次
                plt.plot(ori_data[-step_episode:-1, 1], ori_data[-step_episode:-1, 2])  # 经纬度
                plt.scatter(220 * np.pi / 180, 65 * np.pi / 180, color="g", marker="*", s=100)
                plt.plot(x1, z1, color="b")
                plt.plot(x2, z2, color="b")
                plt.ylabel('lat')
                plt.xlabel('lon')
                plt.title("TF-area lon-lat")
                plt.show()
                if done == 1:
                    done_episodes_50 += 1
                if episode % 50 == 0:
                    train_logger.add_scalar('done_rate', done_episodes_50 / 50, episode / 50)
                    collect_data["done_rate"].append(done_episodes_50 / 50)
                    print('进行50次TF试验,成功率为:',done_episodes_50 / 50)
                    done_episodes_50 = 0
                break
            obs_n = obs_n_
            obs_std = obs_std_
        if episode % 50 == 0:  # 每50回合保存数据
            np.save(filename + "_collect_data", collect_data)
    # -------------------训练结束-------------------
    np.save(filename + "dis1.npy", dis1)
    np.save(filename + "dis2.npy", dis2)
    np.save(filename + "disp.npy", disp)
    if config.Save_origin:
        np.save(filename + "11ori_data.npy", ori_data)  # 保存原始数据
        np.save(filename + "std_state_data.npy", std_state)
        np.save(filename + "std_a_data.npy", std_a)
    train_logger.export_scalars_to_json(train_log_dir + 'summary.json')
    train_logger.close()
    print('-------------display_end-------------')
# Press the green button in the gutter to run the script.

3、入口函数与执行流程

代码的入口是 if __name__ == '__main__':,通过 Config 类的 display 参数控制执行流程:

if __name__ == '__main__':
    config = Config()  # 实例化配置类
    if config.display:
        if config.meta:
            train_meta(config)  # 1. 若启用元学习,先执行元学习优化
        display(config)        # 2. 执行测试与可视化
    else:
        train(config)          # 3. 若不启用可视化,执行常规训练

结果:

进行50次TF试验,成功率为: 0.96

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐