# [0628] Task04 DQN 算法及进阶

  • easy-rl PDF版本 笔记整理 P6 - P8
  • joyrl 比对 补充 P7 - P8
  • 相关 代码 整理 待整理 !!

在这里插入图片描述

最新版PDF下载
地址:https://github.com/datawhalechina/easy-rl/releases
国内地址(推荐国内读者使用)
链接: https://pan.baidu.com/s/1isqQnpVRWbb3yh83Vs0kbw 提取码: us6a

easy-rl 在线版本链接 (用于 copy 代码)
参考链接 2:https://datawhalechina.github.io/joyrl-book/

其它:
【勘误记录 链接】
——————
5、深度强化学习基础 ⭐️
开源内容:https://linklearner.com/learn/summary/11
——————————
蒙特卡洛: 要等到游戏结束。方差很大
时序差分: 不用等到游戏结束。更常用

不同的方法考虑了不同的假设,所以运算结果不同。

在这里插入图片描述
在这里插入图片描述

π ( a ∣ s ) = e Q ( s , a ) T ∑ a ′ ∈ A e Q ( s , a ′ ) T \pi(a|s)=\frac{e^{\frac{Q(s, a)}{T}}}{\sum\limits_{a^\prime\in \cal A}e^{\frac{Q(s, a^\prime)}{T}}} π(as)=aAeTQ(s,a)eTQ(s,a)

T T T 大, 探索
T T T 小, 利用

经验回放 异策略

最耗时的环节: 与环境互动。

在这里插入图片描述

DQN 和 其它 Q-learning 学习方法的区别:

DQN其它 Q-learning
神经网络 拟合 动作值 函数表格
经验回放下一状态的数据

————————

7 个技巧: 深度 Q 网络、双深度 Q 网络、优先级经验回放的双深度 Q 网络、竞争深度 Q 网络、异步优势演员-评论员算法(A3C)、分布式 Q 函数、噪声网络。

序号问题解决方案其它
1数据存储效率;泛化深度 Q 网络 (DQN)经验回放 + 目标网络
Q ( s t , a t ) ↔ r t + max ⁡ a Q ( s t + 1 , a ) Q(s_t,a_t)\leftrightarrow r_t+\max\limits_aQ(s_{t+1},a) Q(st,at)rt+amaxQ(st+1,a)
2Q 值往往被高估双深度 Q 网络 (double DQN,DDQN) Q ( s t , a t ) ↔ r t + Q ′ ( s t + 1 , arg ⁡ max ⁡ a Q ( s t + 1 , a ) ) Q(s_t,a_t)\leftrightarrow r_t+\textcolor{blue}{Q^\prime\Big(s_{t+1},\arg}\max\limits_aQ(s_{t+1},a)\textcolor{blue}{\Big)} Q(st,at)rt+Q(st+1,argamaxQ(st+1,a))
3竞争深度 Q 网络(dueling DQN)第一步计算一个与输入有关的标量 V(s);
第二步计算一个向量 A(s, a) 对应每一个动作。
最后的网络将两步的结果相加,得到最终需要的 Q 值。 Q(s, a) = V(s) + A(s, a)
4使用经验回放时,考虑数据间的权重大小优先级经验回放(prioritized experience replay,PER)
5蒙特卡洛:方差大多步方法在蒙特卡洛方法和时序差分方法中取得平衡
A3C
6网络的权重等参数加上一个高斯噪声噪声网络(noisy net)类似于 ε \varepsilon ε 贪心策略,但在同一个回合里面参数是固定的 以保证 看到同一状态采取同一动作
7分布式 Q 函数(distributional Q-function)将最终网络的输出的每一 类别的动作再进行分布操作。低估 奖励
彩虹(rainbow)

主网络: 更新参数, 选动作
目标网络: 某段时间固定, 计算值

在这里插入图片描述

在这里插入图片描述

7.6 分布式 Q 函数

不易实现

Q 函数是累积奖励的期望值,所以我们算出来的 Q 值其实是一个期望值

因为环境是有随机性的,所以在某一个状态采取某一个动作后,计算后续不同轨迹得到的奖励,得到的是一个分布。

在这里插入图片描述
对这个 分布 求均值 得到 Q 值。

这个分布 可能对于选取 最优策略有好处, 当直接用 均值 建模时,可能会丢失重要的信息。

在原来的 Q 函数里面,假设我们只能采取 a 1 a_1 a1 a 2 a_2 a2 a 3 a_3 a3 这 3 个动作,我们输入一个状态,输出 3 个值。这 3 个值分别代表 3 个动作的 Q 值,但是这些 Q 值是一个分布的期望值。所以分布式 Q 函数就是直接输出分布

假设分布的值就分布在某一个范围里面,比如 −10 ~ 10,把 −10 ~ 10 拆成一个一个的长条。例如,每一个动作的奖励空间拆成 5 个长条。假设奖励空间可以拆成 5 个长条,Q 函数的输出就是要预测我们在某一个状态采取某一个动作得到的奖励,其落在某一个长条里面的概率。所以绿色长条概率的和应该是 1,其高度代表在某一个状态采取某一个动作的时候,它落在某一个长条内的概率。绿色的代表动作 a 1 a_1 a1,红色的代表动作 a 2 a_2 a2,蓝色的代表动作 a 3 a_3 a3

实际上在做测试的时候,我们选平均值最大的动作执行

在这里插入图片描述

类似地, 也可对 动作值 进行分布建模。
如果 动作值 的分布方差很大,这代表采取这个动作虽然平均而言很不错,但也许风险很高,我们可以训练一个网络来规避风险。在两个动作平均值 (动作值)都差不多的情况下,也许可以选一个风险比较小 【方差小】的动作来执行,这就是分布式 Q 函数的好处。

7.7 rainbow

分数 取 中位数

在这里插入图片描述

异步优势演员-评论员(asynchronous advantage actor-critic,A3C)是演员-评论员的方法,A3C 算法又被译作 异步优势动作评价算法

在这里插入图片描述

为什么分布式深度 Q 网络会低估奖励呢?因为分布式深度 Q 网络输出的是一个分布的范围,输出的范围不可能是无限的,我们一定会设一个限制,比如最大输出范围就是从 −10 ~10。假设得到的奖励超过 10,比如 100 怎么办?我们就当作没看到这件事。

超出设定范围的奖励值被丢弃——> 分布式深度 Q 网络低估奖励

————————

DQN 比 策略梯度 稳定

DQN 比较容易训练的原因: 只要能够估计出 Q 函数,就保证一定可以找到一个 比较好的策略。

回归问题

难以处理连续动作

方案不足
采样动作采样次数有限,动作不一定精确
梯度上升运算量大;局部最优
设计网络架构函数不能随便设置。
AC演员-评论员方法 = PPO 【基于策略】+ DQN【基于价值】

在这里插入图片描述



JoyRL

梯度下降法 基于一个假设:训练集中的样本是独立同分布的。

避免训练的不稳定性

经验回放的容量不能太小,太小了会导致收集到的样本具有一定的局限性,也不能太大,太大了会失去经验本身的意义。

Double DQN 并不是每隔 C 步复制参数到目标网络,而是每次随机选择其中一个网络选择动作进行更新。

竞争Dueling DQN 算法: 优化神经网络的结构
在这里插入图片描述

在网络模型中增加噪声层

根据经验回放中的每个样本计算出的 TD 误差赋予对应的优先级,然后在采样的时候取出优先级较大的样本。

单纯根据 TD 误差进行优先采样有可能会错过对当前网络“信息量”更大的样本
过拟合

分布式 DQN 算法 C51 算法

【Code】OpenAI Gym 中的 CartPole-v0 车杆平衡 【连续状态空间】

参考链接 1: https://github.com/datawhalechina/joyrl-book/tree/main/notebooks
参考链接 2: https://github.com/datawhalechina/easy-rl/tree/master/notebooks

通过向左 (action=0) 或向右 (action=1) 推车让杆保持直立。每进行一个 step 就会给一个 +1 的 reward,如果无法保持平衡那么 done 等于 true,本次 episode 失败。

—— 基本信息查看

import gym 

env = gym.make('CartPole-v0')  # 创建环境
n_states = env.observation_space.shape[0]  # 状态数
n_actions = env.action_space.n  # 动作数
print(f"状态数:{n_states}, 动作数: {n_actions}")

在这里插入图片描述
状态数是 4 个,分别为车的位置、车的速度、杆的角度以及杆顶部的速度
动作数为 2 个,并且是离散的向左或者向右。

state = env.reset()  # 初始化环境
print(f"初始状态:{state}")

在这里插入图片描述

训练思路: 执行动作,环境反馈,智能体更新

交互采样 + 模型更新

本地环境:

pip install gymnasium==0.28.1
pip install gym==0.25.2

PyTorch 环境配置链接

在这里插入图片描述

DQN_2015

论文 链接: https://sci-hub.se/10.1038/nature14236

在这里插入图片描述

算法: 带 经验回放 的 deep Q-learning
初始化容量为 N N N 的回放存储 D D D
用随机权重 θ \theta θ 初始化 动作-值 函数 Q Q Q
用权重 θ − = θ \theta^-=\theta θ=θ 初始化 目标动作-值函数 Q ^ \hat Q Q^
遍历 episode = 1,M:
       ~~~~~~        初始化 序列 s 1 = { x 1 } s_1=\{x_1\} s1={x1},预处理序列 ϕ 1 = ϕ ( s 1 )      \phi_1=\phi(s_1)~~~~ ϕ1=ϕ(s1)     【预处理的目的:降低输入维数并处理 Atari 2600 模拟器的一些伪影。】
       ~~~~~~        对于 t = 1 ,   T t=1, ~T t=1, T:
             ~~~~~~~~~~~~              以概率 ε \varepsilon ε 选择随机动作 a t a_t at
             ~~~~~~~~~~~~              否则 a t = arg ⁡ max ⁡ a Q ( ϕ ( s t ) , a ; θ ) a_t=\arg\max\limits_aQ(\phi(s_t),a;\theta) at=argamaxQ(ϕ(st),a;θ)
             ~~~~~~~~~~~~              在仿真器 执行动作 a t a_t at,观察 奖励 r t r_t rt 和 图像 x t + 1 x_{t+1} xt+1
             ~~~~~~~~~~~~              s t + 1 = { s t , a t , x t + 1 } s_{t+1}=\{s_t,a_t,x_{t+1}\} st+1={st,at,xt+1},并预处理 ϕ t + 1 = ϕ ( s t + 1 ) \phi_{t+1}=\phi(s_{t+1}) ϕt+1=ϕ(st+1)
             ~~~~~~~~~~~~              将 transition ( ϕ t , a t , r t , ϕ t + 1 ) (\phi_t,a_t,r_t,\phi_{t+1}) (ϕt,at,rt,ϕt+1) 存到 D D D
             ~~~~~~~~~~~~              D D D 中随机抽样 小批次 transitions ( ϕ j , a j , r j , ϕ j + 1 ) (\phi_j,a_j,r_j,\phi_{j+1}) (ϕj,aj,rj,ϕj+1)
             ~~~~~~~~~~~~                 y j = { r j              在第  j + 1  步回合终止 r j + γ max ⁡ a ′ Q ^ ( ϕ j + 1 , a ′ ; θ − ) 其它 ~~y_j=\left\{\begin{aligned}&r_j~~~~~~~~~~~~~在 第 ~j+1~步回合终止\\ &r_j+\gamma\max\limits_{a^\prime}\hat Q(\phi_{j+1}, a^\prime;\theta^-)&其它\end{aligned}\right.   yj= rj             在第 j+1 步回合终止rj+γamaxQ^(ϕj+1,a;θ)其它
             ~~~~~~~~~~~~              梯度下降 ( y j − Q ( ϕ j , a j ; θ ) ) 2 \Big(y_j-Q(\phi_j,a_j;\theta)\Big)^2 (yjQ(ϕj,aj;θ))2,针对网络参数 θ \theta θ
             ~~~~~~~~~~~~              C C C 步 重置 Q ^ = Q \hat Q=Q Q^=Q

test.py

————————————
✔ 版本 1

# pip install seaborn
# pip install gym==0.25.2

import torch.backends
import torch.nn as nn 
import torch.nn.functional as F 

from collections import deque 
import random 

import torch 
import torch.optim as optim 
import math 
import numpy as np 

import gym 
import os 

import argparse
import matplotlib.pyplot as plt 
import seaborn as sns 


# 
###### 1、算法模块
# 定义 用于 拟合 Q 函数 的 神经网络  Q(s,a; θ) 输入状态, 输出动作
class MLP(nn.Module):
    def __init__(self, n_states, n_actions, hidden_dim=128): # hidden_dim=128 隐藏层维度
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(n_states, hidden_dim)  # 输入层
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # 隐藏层
        self.fc3 = nn.Linear(hidden_dim, n_actions)  # 输出层 
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)
    
# 定义 经验回放
class ReplayBuffer(object):
    def __init__(self, capacity):
        self.capacity = capacity 
        self.buffer = deque(maxlen=self.capacity)

    def push(self, transitions):  # 存储  (s, a, r, s') 到 buffer 中
        self.buffer.append(transitions)

    def sample(self, batch_size, sequential=False):  # 抽取 小批次 buffer 数据用于 更新 主网络
        if batch_size > len(self.buffer):
            batch_size = len(self.buffer)
        if sequential:  # 顺序采样
            rand = random.randint(0, len(self.buffer) - batch_size)
            batch = [self.buffer[i] for i in range(rand, rand + batch_size)]
            return zip(*batch)
        else: # 随机采样
            batch = random.sample(self.buffer, batch_size)
            return zip(*batch)
    
    def __len__(self):   # 返回 buffer中 存储的经验 数量
        return len(self.buffer)

class DQN:
    def __init__(self, model, memory, cfg): # cfg 定义了一些 超参数
        self.n_actions = cfg['n_actions']
        self.device = torch.device(cfg['device'])
        self.gamma = cfg['gamma']  # 奖励 的 折扣因子

        # ε 贪心策略 参数
        self.sample_count = 0  # 用于 ε 衰减计数
        self.epsilon = cfg['epsilon_start']
        self.epsilon_start = cfg['epsilon_start']
        self.epsilon_end = cfg['epsilon_end']
        self.epsilon_decay = cfg['epsilon_decay']

        self.batch_size = cfg['batch_size']

        self.policy_net = model.to(self.device)
        self.target_net = model.to(self.device)  

        # 复制参数到 目标网络
        for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):
            target_param.data.copy_(param.data)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg['lr'])  # 优化器
        self.memory = memory  # 经验回放

    def sample_action(self, state):  # 状态 s 采取 动作 a 是否是 好的策略
        self.sample_count += 1

        # 让 ε 随着步数指数衰减。 权衡 探索和利用 
        self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
                        math.exp(-1. * self.sample_count / self.epsilon_decay)
        if random.random() > self.epsilon:  # ε 已经很小
            with torch.no_grad():
                state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
                q_values = self.policy_net(state)
                action = q_values.max(1)[1].item()  # 选择 最大动作值 对应的动作
        else:
            action = random.randrange(self.n_actions)
        return action 
    
    ## 用于 测试
    @torch.no_grad()
    def predict_action(self, state):  # 策略已学好。 遇到状态 s 应输出 a
        state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
        q_values = self.policy_net(state)  # 学到的 Q 函数网络 输出 相应的 q 值
        action = q_values.max(1)[1].item() 
        return action 
    
    def update(self):  # 基于数据  不断更新  Q函数 的拟合模型
        if len(self.memory) < self.batch_size:    # 缓存器 中样本数不足 一批,不更新策 
            return 
        
        # 在 缓存器 中抽样 batch (s, a, r, s')
        state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(self.batch_size)

        # 将数据 转为 tensor 
        state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)
        action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)
        reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)
        next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)
        done_batch = torch.tensor(np.float32(done_batch), device=self.device)

        q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch)
        next_q_values = self.target_net(next_state_batch).max(1)[0].detach() 

        expected_q_values = reward_batch + self.gamma * next_q_values * (1 - done_batch)
        loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1))  # 计算均方根损失

        # 优化
        self.optimizer.zero_grad()
        loss.backward()

        # Clip 防止 梯度爆炸
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1,1)
        self.optimizer.step() 

###### 2、训练模块 和 测试模块  定义
def train(cfg, env, agent):  # 训练模块
    print("------开始训练------")
    rewards = []
    for i_ep in range(cfg['train_eps']):
        ep_reward = 0   # 一个 回合 内的奖励
        state = env.reset(seed=cfg['seed'])   # 重置环境
        for _ in range(cfg['ep_max_steps']):
            action = agent.sample_action(state)  
            next_state, reward, done, _ = env.step(action)
            agent.memory.push((state, action, reward, next_state, done))
            state = next_state  # 更新下一个状态
            agent.update()  # 更新智能体
            ep_reward += reward  # 累加奖励
            if done:
                break 
        if (i_ep + 1) % cfg['C'] == 0:  # 智能体 目标网络更新
            agent.target_net.load_state_dict(agent.policy_net.state_dict())

        rewards.append(ep_reward)
        if (i_ep + 1) % 10 == 0:  # 每 10 个 回合 打印
            print(f"回合: {i_ep + 1} % {cfg['train_eps']}, 奖励:  {ep_reward:.2f}, Epsolon: {agent.epsilon:.3f}")

    print("------  完成训练 ! ------")
    env.close() 
    return {'rewards': rewards}

def test(cfg, env, agent):  # 训练模块
    print("------ 开始测试: ------")
    rewards = []
    for i_ep in range(cfg['test_eps']):
        ep_reward = 0
        state = env.reset(seed=cfg['seed'])  # 重置环境     
        for _ in range(cfg['ep_max_steps']):
            env.render()  # pygame 将直接可视化
            action = agent.predict_action(state) 
            next_state, reward, done, _ = env.step(action)
            ep_reward += reward   # 累加奖励
            if done:
                break 
        rewards.append(ep_reward)
        print(f"回合: {i_ep + 1} / {cfg['test_eps']}, 奖励: {ep_reward :.2f}")
    print("------ 完成测试 ------")
    env.close()
    return {'rewards': rewards}  # 字典


###### 3、环境
def all_seed(env, seed = 1):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)  # CPU 
    torch.cuda.manual_seed(seed)  # GPU 
    os.environ['PYTHONHASHSEED'] = str(seed)  # python

    torch.backends.cudnn.deterministic = True 
    torch.backends.cudnn.benchmark = False 
    torch.backends.cudnn.enabled = False 

def env_agent_config(cfg):
    env = gym.make(cfg['env_name'])
    if cfg['seed'] != 0:
        all_seed(env, seed=cfg['seed'])

    n_states = env.observation_space.shape[0]
    n_actions = env.action_space.n 
    print(f"状态数: {n_states}, 动作数: {n_actions}")
    cfg.update({"n_states": n_states, "n_actions": n_actions})  # 参数配置字典 更新
    model = MLP(n_states, n_actions, hidden_dim=cfg['hidden_dim'])
    memory = ReplayBuffer(cfg['memory_capacity'])
    agent = DQN(model, memory, cfg)
    return env, agent 


###### 4、参数 设置  可视化模块
def get_args():  # 超参数
    parser = argparse.ArgumentParser(description="hyperparameters")
    parser.add_argument('--algorithm_name', default='DQN', type=str, help='name of algorithm')  # 算法名称
    parser.add_argument('--env_name', default='CartPole-v0', type=str, help="name of environment")
    parser.add_argument('--train_eps', default=200, type=int, help="episodes of training")
    parser.add_argument('--test_eps', default = 20, type=int, help="episodes of testing")
    parser.add_argument('--ep_max_steps',default = 100000,type=int,help="steps per episode, much larger value can simulate infinite steps")
    parser.add_argument('--gamma',default=0.95,type=float,help="discounted factor")
    parser.add_argument('--epsilon_start',default=0.95,type=float,help="initial value of epsilon")
    parser.add_argument('--epsilon_end',default=0.01,type=float,help="final value of epsilon")
    parser.add_argument('--epsilon_decay',default=500,type=int,help="decay rate of epsilon, the higher value, the slower decay")
    parser.add_argument('--lr',default=0.0001,type=float,help="learning rate")
    parser.add_argument('--memory_capacity',default=100000,type=int,help="memory capacity")
    parser.add_argument('--batch_size',default=64,type=int)
    parser.add_argument('--C',default=4,type=int,help="the Q of target network update after C steps of a episode")
    parser.add_argument('--hidden_dim',default=256,type=int)
    parser.add_argument('--device',default='cpu',type=str,help="cpu or cuda") 
    parser.add_argument('--seed',default=10,type=int,help="seed") 

    args = parser.parse_args([])
    args = {**vars(args)} 

    # 打印 超参数
    print("超参数")
    print(''.join(['=']*80))
    tplt = "{:^20}\t{:^20}\t{:^20}"
    print(tplt.format("Name", "Value", "Type"))
    for k,v in args.items():
        print(tplt.format(k,v,str(type(v))))   
    print(''.join(['=']*80))      
    return args



def smooth(data, weight=0.9): # 平滑曲线
    last = data[0]
    smoothed = []
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值
        smoothed.append(smoothed_val)
        last = smoothed_val
    return smoothed  # 返回计算好的平滑数据

def plot_rewards(rewards, cfg, tag='train'):  # 训练过程的奖励 可视化
    sns.set(style='whitegrid')  #  可设置 seaborn 绘图风格
    plt.figure()
    plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algorithm_name']} for {cfg['env_name']}") # 算法名称  环境名称
    plt.xlabel('episodes')
    plt.plot(rewards, label='rewards')
    plt.plot(smooth(rewards), label='smoothed')
    # plt.legend() 
    plt.show() 



###### 5、训练 和 测试

cfg = get_args()  # 获取参数
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)

plot_rewards(res_dic['rewards'], cfg, tag = "train")

# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test")


在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

待改版本

拟改成 gymnasium 版本, 目前版本仍有报错

import torch.nn as nn 
import torch.nn.functional as F 

from collections import deque 
import random 

import torch 
import torch.optim as optim 
import math 
import numpy as np 

import gymnasium as gym
import os 

import argparse
import matplotlib.pyplot as plt 
import seaborn as sns 


# 
###### 1、算法模块
# 定义 用于 拟合 Q 函数 的 神经网络  Q(s,a; θ) 输入状态, 输出动作
class MLP(nn.Module):
    def __init__(self, n_states, n_actions, hidden_dim=128): # hidden_dim=128 隐藏层维度
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(n_states, hidden_dim)  # 输入层
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # 隐藏层
        self.fc3 = nn.Linear(hidden_dim, n_actions)  # 输出层 
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)
    
# 定义 经验回放
class ReplayBuffer(object):
    def __init__(self, capacity):
        self.capacity = capacity 
        self.buffer = deque(maxlen=self.capacity)

    def push(self, transitions):  # 存储  (s, a, r, s') 到 buffer 中
        self.buffer.append(transitions)

    def sample(self, batch_size, sequential=False):  # 抽取 小批次 buffer 数据用于 更新 主网络
        if batch_size > len(self.buffer):
            batch_size = len(self.buffer)
        if sequential:  # 顺序采样
            rand = random.randint(0, len(self.buffer) - batch_size)
            batch = [self.buffer[i] for i in range(rand, rand + batch_size)]
            return zip(*batch)
        else: # 随机采样
            batch = random.sample(self.buffer, batch_size)
            return zip(*batch)
    
    def __len__(self):   # 返回 buffer中 存储的经验/数据 数量
        return len(self.buffer)

class DQN:
    def __init__(self, model, memory, cfg): # cfg 定义了一些 超参数
        self.n_actions = cfg['n_actions']
        self.device = torch.device(cfg['device'])
        self.gamma = cfg['gamma']  # 奖励 的 折扣因子

        # ε 贪心策略 参数
        self.sample_count = 0  # 用于 ε 衰减计数
        self.epsilon_start = cfg['epsilon_start']
        self.epsilon_end = cfg['epsilon_end']
        self.epsilon_decay = cfg['epsilon_decay']

        self.batch_size = cfg['batch_size']

        self.policy_net = model.to(self.device)
        self.target_net = model.to(self.device)  

        # 复制参数到 目标网络
        for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):
            target_param.data.copy_(param.data)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg['lr'])  # 优化器
        self.memory = memory  # 经验回放

    def sample_action(self, state):  # 状态 s 执行 动作 a 是否是 好的策略
        self.sample_count += 1

        # 让 ε 随着步数指数衰减。 权衡 探索和利用 
        self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
                        math.exp(-1. * self.sample_count / self.epsilon_decay)
        if random.random() > self.epsilon:  # ε 已经很小
            with torch.no_grad():
                state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
                q_values = self.policy_net(state)
                action = q_values.max(1)[1].item()  # 选择 最大动作值 对应的动作
        else:
            action = random.randrange(self.n_actions)
        return action 
    
    ## 用于 测试
    @torch.no_grad()
    def predict_action(self, state):  # 策略已学好。 遇到状态 s 应输出 a
        state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
        q_values = self.policy_net(state)  # 学到的 Q 函数网络 输出 相应的 q 值
        action = q_values.max(1)[1].item() 
        return action 
    
    def update(self):  # 基于数据  不断更新  Q函数 的拟合模型
        if len(self.memory) < self.batch_size:    # 缓存器 中样本数不足 一批,不更新策 
            return 
        
        # 在 缓存器 中抽样 batch (s, a, r, s')
        state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(self.batch_size)

        # 将数据 转为 tensor 
        state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)
        action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)
        reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)
        next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)
        done_batch = torch.tensor(np.float32(done_batch), device=self.device)

        q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch)
        next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 最大的 q

        expected_q_values = reward_batch + self.gamma * next_q_values * (1 - done_batch)  # done_batch 记录 是否是批次的结束。若是,要更新 主网络
        loss = nn.MSELoss()(q_values, expected_q_values)  # 计算均方根损失

        # 优化
        self.optimizer.zero_grad()
        loss.backward()

        # Clip 防止 梯度爆炸
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1,1)
        self.optimizer.step() 

###### 2、训练模块 和 测试模块  定义
def train(cfg, env, agent):  # 训练模块
    print("------开始训练------")
    rewards = []
    steps = []
    for i_ep in range(cfg['train_eps']):
        ep_reward = 0   # 一个 回合 内的奖励
        ep_step = 0 
        state, info = env.reset(seed=cfg['seed'])   # 重置环境
        for _ in range(cfg['ep_max_steps']):
            ep_step += 1
            action = agent.sample_action(state)  
            next_state, reward, terminated, truncated, info = env.step(action)
            agent.memory.push((state, action, reward, next_state, terminated))
            state = next_state  # 更新下一个状态
            agent.update()  # 更新智能体
            ep_reward += reward  # 累加奖励
            if terminated:
                break 
        if (i_ep + 1) % cfg['target_update'] == 0:  # 智能体 目标网络更新  每 C 步 更新 目标网络
            agent.target_net.load_state_dict(agent.policy_net.state_dict())

        steps.append(ep_reward)
        rewards.append(ep_reward)
        if (i_ep + 1) % 10 == 0:  # 每 10 个 回合 打印
            print(f"回合: {i_ep + 1} % {cfg['train_eps']}, 奖励:  {ep_reward:.2f}, Epsolon: {agent.epsilon:.3f}")

    print("------  完成训练 ! ------")
    env.close() 
    return {'rewards': rewards}

def test(cfg, env, agent):  # 训练模块
    print("------ 开始测试: ------")
    # 测试时 不用 计步数了
    rewards = []
    for i_ep in range(cfg['test_eps']):
        ep_reward = 0
        state, info = env.reset(seed=cfg['seed'])   # 重置环境 
        for _ in range(cfg['ep_max_steps']):
            action = agent.predict_action(state) 
            next_state, reward, terminated, truncated, info = env.step(action)
            ep_reward += reward   # 累加奖励
            if terminated:
                break 
        rewards.append(ep_reward)
        print(f"回合: {i_ep + 1} / {cfg['test_eps']}, 奖励: {ep_reward :.2f}")
    print("------ 完成测试 ------")
    env.close()
    return {'rewards': rewards}  # 字典


###### 3、环境
def all_seed(env, seed = 1):
    # env.seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)  # CPU 
    torch.cuda.manual_seed(seed)  # GPU 
    os.environ['PYTHONHASHSEED'] = str(seed)  # python

    torch.backends.cudnn.deterministic = True 
    torch.backends.cudnn.benchmark = False 
    torch.backends.cudnn.enabled = False 

def env_agent_config(cfg):
    env = gym.make(cfg['env_name'])
    if cfg['seed'] != 0:
        all_seed(env, seed=cfg['seed'])

    n_states = env.observation_space.shape[0]
    n_actions = env.action_space.n 
    print(f"状态数: {n_states}, 动作数: {n_actions}")
    cfg.update({"n_states": n_states, "n_actions": n_actions})  # 参数配置字典 更新
    model = MLP(n_states, n_actions, hidden_dim=cfg['hidden_dim'])
    memory = ReplayBuffer(cfg['memory_capacity'])
    agent = DQN(model, memory, cfg)
    return env, agent 


###### 4、参数 设置  可视化模块
def get_args():  # 超参数
    parser = argparse.ArgumentParser(description="hyperparameters")
    parser.add_argument('--algorithm_name', default='DQN', type=str, help='name of algorithm')  # 算法名称
    parser.add_argument('--env_name', default='CartPole-v1', type=str, help="name of environment")
    parser.add_argument('--train_eps', default=200, type=int, help="episodes of training")
    parser.add_argument('--test_eps', default = 20, type=int, help="episodes of testing")
    parser.add_argument('--ep_max_steps',default = 100000,type=int,help="steps per episode, much larger value can simulate infinite steps")
    parser.add_argument('--gamma',default=0.95,type=float,help="discounted factor")
    parser.add_argument('--epsilon_start',default=0.95,type=float,help="initial value of epsilon")
    parser.add_argument('--epsilon_end',default=0.01,type=float,help="final value of epsilon")
    parser.add_argument('--epsilon_decay',default=500,type=int,help="decay rate of epsilon, the higher value, the slower decay")
    parser.add_argument('--lr',default=0.0001,type=float,help="learning rate")
    parser.add_argument('--memory_capacity',default=100000,type=int,help="memory capacity")
    parser.add_argument('--batch_size',default=64,type=int)
    parser.add_argument('--target_update',default=4,type=int)  # 伪代码里的 C
    parser.add_argument('--hidden_dim',default=256,type=int)
    parser.add_argument('--device',default='cpu',type=str,help="CPU or GPU") 
    parser.add_argument('--seed',default=10,type=int,help="seed") 

    args = parser.parse_args([])
    args = {**vars(args)} 

    # 打印 超参数
    print("超参数")
    print(''.join(['=']*80))
    tplt = "{:^20}\t{:^20}\t{:^20}"
    print(tplt.format("Name", "Value", "Type"))
    for k, v in args.items():
        print(tplt.format(k,v,str(type(v))))   
    print(''.join(['=']*80))      
    return args



def smooth(data, weight=0.9): # 平滑曲线
    last = data[0]
    smoothed = []
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值
        smoothed.append(smoothed_val)
        last = smoothed_val
    return smoothed  # 返回计算好的平滑数据

def plot_rewards(rewards, cfg, tag='train'):  # 训练过程的奖励 可视化
    sns.set(style='whitegrid') #  可设置 seaborn 绘图风格
    plt.figure()
    plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algorithm_name']} for {cfg['env_name']}") # 算法名称  环境名称
    plt.xlabel('episodes')
    plt.plot(rewards, label='rewards')
    plt.plot(smooth(rewards), label='smoothed')
    plt.show() 



###### 5、训练 和 测试

cfg = get_args()  # 获取参数
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)

plot_rewards(res_dic['rewards'], cfg, tag = "train")

# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], cfg, tag="test")

▢ Double DQN_2016

论文 PDF 链接:Deep Reinforcement Learning with Double Q-learning

来自 https://arxiv.org/pdf/1511.06581 的伪代码

在这里插入图片描述

算法: Double DQN
输入:空的回放缓存 replay buffer D \cal D D、初始网络权重 θ \theta θ、目标网络权重 θ − = θ \theta^-=\theta θ=θ
          ~~~~~~~~~           回放缓存 replay buffer 的最大容量 N r N_r Nr,训练 batch-size N b N_b Nb 、目标网络的替换频次 N − N^- N
遍历 episode e ∈ { 1 , 2 , ⋯   , M } e\in\{1, 2,\cdots,M\} e{1,2,,M}
       ~~~~~~        初始化 帧序列 x ← ( ) \bf x\leftarrow() x()
       ~~~~~~        对于 t ∈ { 0 , 1 , ⋯   } t\in\{0,1, \cdots\} t{0,1,}:
             ~~~~~~~~~~~~              令 状态 s ← x s\leftarrow\bf x sx,抽样动作 a ∼ π B     a\sim\pi_{\cal B}~~~ aπB    【必须从 buffer里抽取?——>模拟 均匀分布】
             ~~~~~~~~~~~~              给定 ( s , a ) (s, a) (s,a), 根据 E \mathcal{E} E 抽取下一个环境 x t x^t xt, 得到奖励 r r r, 将 x t x^t xt 添加到 x \bf x x 中。
             ~~~~~~~~~~~~              如果 ∣ x ∣ > N f |{\bf x}|>N_f x>Nf, 删掉 x {\bf x} x 中最旧的帧 x t min ⁡     x_{t_{\min}}~~~ xtmin    【设的阈值?】
             ~~~~~~~~~~~~              s ′ ← x s^\prime\leftarrow\bf x sx, 将 ( s , a , r , s ′ ) (s, a, r, s^\prime) (s,a,r,s) 添加到 D \cal D D,若是 ∣ D ∣ ≥ N r |{\cal D}|\geq N_r DNr, 替换最旧的数据
             ~~~~~~~~~~~~              抽样长度为 N b N_b Nb 的小批量元组 ( s , a , r , s ′ ) ∼ U n i f ( D )      (s, a,r,s^\prime)\sim {\rm Unif}(\cal D)~~~~ (s,a,r,s)Unif(D)     【近似 均匀分布】
             ~~~~~~~~~~~~             对于长度为 N b N_b Nb 的元组中的每一个样本计算 目标值
             ~~~~~~~~~~~~                 y j = { r              s ′  是  e p i s o d e  终点 r + γ Q ( s ′ , arg ⁡ max ⁡ a ′ Q ( s ′ , a ′ ; θ ) ; θ − )       其它 ~~y_j=\left\{\begin{aligned}&r ~~~~~~~~~~~~s^\prime ~是~{\rm episode}~ 终点\\ &r+\gamma Q\Big(s^\prime,\arg\max\limits_{a^\prime}Q(s^\prime, a^\prime;\theta);\theta^-\Big)~~~~~~其它\end{aligned}\right.   yj= r            s  episode 终点r+γQ(s,argamaxQ(s,a;θ);θ)      其它
             ~~~~~~~~~~~~              对损失   ∣ ∣ y j − Q ( s , a ; θ ) ∣ ∣ 2   ~||y_j-Q(s,a;\theta)||^2~  ∣∣yjQ(s,a;θ)2  梯度下降
             ~~~~~~~~~~~~              N − N^- N 步 替换目标参数   θ − ← θ ~\theta^-\leftarrow\theta  θθ

$\mathcal{E}$     E ~~~\mathcal{E}    E

test.py

在这里插入代码片

▢ 竞争 Dueling DQN 算法_2016

论文 PDF 链接: Dueling Network Architectures for Deep Reinforcement Learning

Q ( s , a ; θ , α , β ) = V ( s ; θ , β ) + ( A ( s , a ; θ , α ) − 1 ∣ A ∣ ∑ a ′ A ( s , a ′ ; θ , α ) ) Q(s,a;\theta,\alpha,\beta)=V(s;\theta,\beta)+\Big(A(s,a;\theta,\alpha)-\frac{1}{|\mathcal A|}\sum\limits_{a^\prime}A(s,a^\prime;\theta,\alpha)\Big) Q(s,a;θ,α,β)=V(s;θ,β)+(A(s,a;θ,α)A1aA(s,a;θ,α))

  • θ \theta θ 是卷积层参数。 α \alpha α β \beta β 分别是两个输出全连接层的参数
  • 状态-动作值函数 Q ( s , a ; θ , α , β ) Q(s,a;\theta,\alpha,\beta) Q(s,a;θ,α,β) 是真实 Q Q Q 函数的参数化估计
  • V ( s ; θ , β ) V(s;\theta,\textcolor{blue}{\beta}) V(s;θ,β) 不是 状态值 函数好的估计
  • 取决于状态的动作优势函数 A ( s , a ; θ , α ) A(s,a;\theta,\textcolor{blue}{\alpha}) A(s,a;θ,α) 也不是 优势函数的合理估计。每个行动重要性的度量

▢ Noisy DQN_2018

论文 PDF 链接:Noisy Networks for Exploration

在这里插入图片描述

▢ PER DQN_2016

论文 PDF 链接:Prioritized Experience Replay

▢ 分布式 DQN_2017

论文 PDF 链接:A Distributional Perspective on Reinforcement Learning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/757429.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue3 window.location 获取正在访问的地址,也可以通过useRoute来获取相关信息。

1、一般我们在开发的vue3项目的时候&#xff0c;地址是这样&#xff1a;http://192.168.1.101:3100/#/login 然后我们在布署完成以后一般是这样https://xxx.yyyyy.com/uusys/#/login 其实xxx可以是www&#xff0c;也可以是一个二级域名 yyyyy.com是域名&#xff0c;uusys一般…

Kafka~消息发送过程与ISR机制了解

消息发送过程 使用Kafka发送消息时&#xff0c;一般有两种方式分别是&#xff1a; 同步发送异步发送 同步发送时&#xff0c;可以在发送消息后&#xff0c;通过get方法等待消息结果&#xff0c;这种情况能够准确的拿到消息最终的发送结果&#xff0c;要么是成功、要么是失败…

前端路由管理

前端路由管理简介&#xff1a; 当谈到前端路由管理时&#xff0c;通常指的是在单页面应用程序&#xff08;SPA&#xff09;中管理页面间导航和URL的过程。路由管理器是一个工具&#xff0c;可以帮助前端开发者定义应用程序的不同视图之间的关系&#xff0c;同时能够响应URL的改…

Attention (注意力机制)

1. 背景&#xff1a; 字面的意思&#xff1a;给你一些东西(看见一个美女:).....)&#xff0c;你会注意什么&#xff1f; 大数据的时代下&#xff0c;有太多的数据&#xff0c;我们又该如何选择重要的数据呢&#xff1f; Attention 诞生了&#xff0c;但是又该如何去做呢(i.e., …

springboot在线考试 LW +PPT+源码+讲解

第三章 系统分析 3.1 可行性分析 一个完整的系统&#xff0c;可行性分析是必须要有的&#xff0c;因为他关系到系统生存问题&#xff0c;对开发的意义进行分析&#xff0c;能否通过本系统来补充线下在线考试管理模式中的缺限&#xff0c;去解决其中的不足等&#xff0c;通过对…

[OtterCTF 2018]Play Time

还是这个程序 。。要找到游戏名字查看 进程 psscan pstree pslist 0x000000007d686b30 Rick And Morty 3820 2728 0x000000000b59a000 2018-08-04 19:32:55 UTC0000 0x000000007d7cb740 LunarMS.exe 708 2728 0x00000000731cb000 2018-08-04 19:27:39 UTC0000…

嵌入式Linux系统编程 — 4.7 regcomp、regexec、regfree正则表达式函数

目录 1 为什么需要正则表达式 2 正则表达式简介 3 正则表达式规则 4 regcomp、regexec、regfree函数 4.1 函数介绍 4.2 URL格式案例 1 为什么需要正则表达式 在许多的应用程序当中&#xff0c; 有这样的应用场景&#xff1a; 给定一个字符串&#xff0c;检查该字符串是否…

Spring学习01-[Spring实现IOC的几种方式]

Spring实现IOC的几种方式 基于xml实现Spring的IOC基于注解实现Spring的IOC基于JavaConfig实现的Spring的IOC基于SpringBoot实现Spring的IOC 基于xml实现Spring的IOC 引入spring核心依赖 <!--spring核心容器--><dependency><groupId>org.springframework<…

14 卡尔曼滤波及代码实现

文章目录 14 卡尔曼滤波及代码实现14.0 基本概念14.1 公式推导14.2 代码实现 14 卡尔曼滤波及代码实现 14.0 基本概念 卡尔曼滤波是一种利用线性系统状态方程&#xff0c;通过系统输入输出观测数据&#xff0c;对系统状态进行最优估计的算法。由于观测数据包括系统中的噪声和…

【智能制造-4】机器人控制器

机器人控制器中分哪几个模块&#xff1f; 机器人控制器通常由以下几个主要模块组成: 运动控制模块: 负责机器人各轴电机的位置、速度、加速度等控制 实现机器人末端执行器的精确定位和运动控制传感器接口模块: 负责机器人各种传感器信号的采集和处理 为运动控制、环境感知等提…

实用的vueuseHooks,提高编码效率

文章目录 写在前面vueuse 官网安装HooksuseStorage [地址](https://vueuse.org/core/useStorage/)传统方法数据持久化 举例子传统持久化的弊端useStorage 数据持久化 举例子使用useStorage 更改存储数据使用useStorage 删除存储数据 useScriptTag [地址](https://vueuse.org/co…

Detailed Steps for Troubleshooting ORA-00600 [kdsgrp1] (文档 ID 1492150.1)

Detailed Steps for Troubleshooting ORA-00600 [kdsgrp1] (文档 ID 1492150.1)​编辑转到底部 In this Document Purpose Troubleshooting Steps References APPLIES TO: Oracle Database - Enterprise Edition Oracle Database Cloud Schema Service - Version N/A and lat…

鸿蒙开发Ability Kit(程序框架服务):【选择申请权限的方式】

选择申请权限的方式 应用在访问数据或者执行操作时&#xff0c;需要评估该行为是否需要应用具备相关的权限。如果确认需要目标权限&#xff0c;则需要在应用安装包中申请目标权限。 每一个权限的权限等级、授权方式不同&#xff0c;申请权限的方式也不同&#xff0c;开发者在…

41割队伍

上海市计算机学会竞赛平台 | YACSYACS 是由上海市计算机学会于2019年发起的活动,旨在激发青少年对学习人工智能与算法设计的热情与兴趣,提升青少年科学素养,引导青少年投身创新发现和科研实践活动。https://www.iai.sh.cn/problem/387 题目描述 给定 𝑛n 个数字 𝑎1,�…

MySQL高级-MVCC-基本概念(当前读、快照读)

文章目录 1、MVCC基本概念1.1、当前读1.1.1、创建表 stu1.1.2、测试 1.2、快照读 1、MVCC基本概念 全称Multi-Version Concurrency Control&#xff0c;多版本并发控制。指维护一个数据的多个版本&#xff0c;使得读写操作没有冲突&#xff0c;快照读为MySQL实现MVCC提供了一个…

网易云音乐数据爬取与可视化分析系统

摘要 本系统采用Python语言&#xff0c;基于网易云音乐&#xff0c;通过数据挖掘技术对该平台的音乐数据进行了深入的研究和分析&#xff0c;旨在挖掘出音乐市场的规律&#xff0c;为音乐人、唱片公司、音乐爱好者等提供数据支持。系统的开发意义在于&#xff1a;一方面为音乐…

flink 处理函数和流转换

目录 处理函数分类 概览介绍 KeydProcessFunction和ProcessFunction 定时器TimeService 窗口处理函数 多流转换 分流-侧输出流 合流 联合&#xff08;Uniion&#xff09; 连接&#xff08;connect&#xff09; 广播连接流&#xff08;BroadcatConnectedStream&#xf…

大模型微调实战之基于星火大模型的群聊对话分角色要素提取挑战赛:Task01:跑通Baseline

目录 0 背景1 环境配置1.1 下载包1.2 配置密钥1.3 测试模型 2 解决问题2.1 获取数据2.2 设计Prompt2.2 设计处理函数2.3 开始提取 附全流程代码 0 背景 Datawhale AI夏令营第二期开始啦&#xff0c;去年有幸参与过第一期&#xff0c;收获很多&#xff0c;这次也立马参与了第二…

昇思MindSpore学习笔记5--数据变换Transforms

摘要&#xff1a; 昇思MindSpore的数据变换&#xff0c;包括通用变换Common Transforms、图像变换Vision Transforms、标准化Normalize、文本变换Text Transforms、匿名函数变换Lambda Transforms。 一、数据变换Transforms概念 原始数据需预处理后才能送入神经网络进行训练…

【网络】计算机网络-基本知识

目录 概念计算机网络功能计算机网络的组成计算机网络的分类 网络地址网络地址的分类 计算机网络相关性能指标速率带宽吞吐量时延时延的种类&#xff1a; 时延带宽积往返时延RTT利用率 概念 计算机网络是指将多台计算机通过通信设备连接起来&#xff0c;实现数据和资源的共享。…