【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例

这篇具有很好参考价值的文章主要介绍了【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例,机器学习,人工智能

Deep Q-Learning(深度 Q 学习)是一种强化学习算法,用于解决决策问题,其中代理(agent)通过学习在不同环境中采取行动来最大化累积奖励。Lunar Lander 是一个经典的强化学习问题,其中代理的任务是控制一个着陆舱在月球表面着陆,最小化着陆过程中的燃料消耗。

以下是使用 Deep Q-Learning 解决 Lunar Lander 问题的基本步骤:

  1. 环境建模: 首先,需要对 Lunar Lander 环境进行建模。这包括定义状态空间、动作空间、奖励函数等。在 Lunar Lander 中,状态可以包括着陆舱的位置、速度、角度等信息,动作可以是推力引擎的火力等。

  2. 深度 Q 网络: 创建一个深度神经网络,该网络将输入状态,并输出每个可能动作的 Q 值。Q 值表示在给定状态下采取某个动作的累积奖励的估计。网络的目标是通过学习调整 Q 值以最大化累积奖励。

  3. 经验回放: 使用经验回放机制来改善学习稳定性。这涉及到存储代理先前的经验,并从中随机抽样进行训练。这有助于解决样本相关性问题,提高算法的收敛性。

  4. ε-贪婪策略: 引入 ε-贪婪策略,以平衡探索和利用。在一部分情况下,代理将以高概率选择当前认为最佳的动作(贪婪),而在其他情况下,它会以较高概率选择一个随机动作(探索)。

  5. 目标 Q 值计算: 使用目标 Q 值来更新网络参数。目标 Q 值通过将当前状态的即时奖励与下一个状态的最大 Q 值相结合得到。这有助于更有效地传播奖励信号。

  6. 训练: 通过与环境的交互,不断地更新深度 Q 网络的参数。代理通过学习来优化其行为,以最大化预期的累积奖励。

  7. 调优对算法的超参数进行调优,包括学习率、折扣因子、神经网络结构等。这有助于提高算法的性能和稳定性。

使用 Deep Q-Learning 解决 Lunar Lander 问题是一个复杂的任务,需要仔细调整和实验。算法的性能可能受到许多因素的影响,包括网络结构的选择、超参数的设置以及环境的建模等。

Lunar Lander环境

这个环境是 Box2D 环境的一部分,其中包含有关环境的一般信息。【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例,机器学习,人工智能

描述 Description

该环境是一个经典的火箭轨迹优化问题。根据庞特里亚金最大值原理,最佳的方法是全速点火或关闭引擎。这就是为什么这个环境具有离散动作的原因:引擎开启或关闭。

有两个环境版本:离散或连续。着陆点始终位于坐标 (0,0)。坐标是状态向量中的前两个数字。在着陆点外着陆是可能的。燃料是无限的,因此代理可以学会飞行,然后在第一次尝试时着陆。

要查看启发式着陆,请运行:

python gymnasium/envs/box2d/lunar_lander.py

动作空间 Action Space

有四个可用的离散动作:

0:什么都不做

1:点火左定向引擎

2:点火主引擎

3:点火右定向引擎

观察空间 Observation Space

状态是一个8维向量:着陆器在 x 和 y 上的坐标,它在 x 和 y 上的线速度,它的角度,它的角速度,以及两个表示每条腿是否与地面接触的布尔值。

奖励 Rewards

每一步都会获得一个奖励。一个 episode(回合)总奖励是该 episode 中所有步骤的奖励之和。

对于每一步,奖励:

着陆器离着陆点越近/远,奖励增加/减少。

着陆器移动越慢/快,奖励增加/减少。

着陆器倾斜度越大,奖励减少(角度非水平)。

每个与地面接触的腿奖励增加10分。

每帧侧引擎点火,奖励减少0.03分。

每帧主引擎点火,奖励减少0.3分。

episode 因坠毁或安全着陆而额外获得-100或+100分的奖励。

如果一个 episode 得分至少为200分,则认为它是一个解决方案。

起始状态 Starting State

着陆器位于视口的顶部中心,对其质心施加随机初始力。

回合终止条件 Episode Termination

如果满足以下条件回合结束:

着陆器坠毁(着陆器本体与月球接触);

着陆器超出视口范围(x 坐标大于1);

着陆器处于非唤醒状态。根据 Box2D 文档,处于非唤醒状态的身体是不移动且不与任何其他身体碰撞的身体:

当 Box2D 确定一个身体(或一组身体)已经停止时,该身体进入了一种几乎没有 CPU 开销的休眠状态。如果一个身体醒着并与休眠中的身体发生碰撞,那么休眠中的身体会醒来。如果连接到它们的关节或接触被销毁,身体也会醒来。

参数 Arguments

要使用连续环境,您需要指定 continuous=True 参数,如下所示:

 
 
import gymnasium as gym
env = gym.make(
    "LunarLander-v2",
    continuous: bool = False,
    gravity: float = -10.0,
    enable_wind: bool = False,
    wind_power: float = 15.0,
    turbulence_power: float = 1.5,
)

如果传递 continuous=True,将使用连续动作(对应于引擎的油门),并且动作空间将是 Box(-1, +1, (2,), dtype=np.float32)。动作的第一个坐标确定主引擎的油门,而第二个坐标指定侧推器的油门。给定一个动作 np.array([main, lateral]),如果 main < 0,则主引擎将完全关闭,并且油门在 0 <= main <= 1 时按比例从 50% 缩放到 100%(特别是,主引擎在功率低于 50% 时不起作用)。同样,如果 -0.5 < lateral < 0.5,则侧推器将不会点火。如果 lateral < -0.5,则左推进器将点火,如果 lateral > 0.5,则右推进器将点火。同样,油门在 -1 到 -0.5 之间(以及 0.5 到 1 之间)按比例从 50% 缩放到 100%。

gravity 确定了重力常数,它被限制在 0 和 -12 之间。

如果传递 enable_wind=True,则着陆器将受到风的影响。风是使用函数 tanh(sin(2 k (t+C)) + sin(pi k (t+C))) 生成的。k 设置为 0.01。C 在 -9999 到 9999 之间随机抽样。

wind_power 确定了施加在飞行器上的线性风的最大幅度。wind_power 的推荐值在 0.0 到 20.0 之间。turbulence_power 确定了施加在飞行器上的旋转风的最大幅度。turbulence_power 的推荐值在 0.0 到 2.0 之间。

版本历史 Version History

v2:计算能量消耗,并在 v0.24 版本中,添加了具有风力和 turbulence_power 参数的湍流

v1:在状态向量中添加了与地面接触的腿;与地面接触会奖励 +10 分,如果失去接触则减去 -10 分;奖励重新调整为 200 分;更难的初始随机推力。

v0:初始版本

备注 Notes

在环境实现中存在一些意外的错误。

着陆器身体上侧推器的位置会随着着陆器的方向而变化。这反过来导致对着陆器施加方向依赖性扭矩。

状态的单位不一致。即:

角速度以每秒 0.4 弧度为单位。为了转换为每秒弧度,需要将该值乘以 2.5 的因子。

对于 VIEWPORT_W、VIEWPORT_H、SCALE 和 FPS 的默认值,比例因子相等:'x': 10 'y': 6.666 'vx': 5 'vy': 7.5 'angle': 1 'angular velocity': 2.5

在进行更正后,状态的单位如下:'x':(单位) 'y':(单位) 'vx':(单位/秒) 'vy':(单位/秒) 'angle':(弧度) 'angular velocity':(弧度/秒)

示例代码

################ 完整代码
import gymnasium as gym
from gym.wrappers.monitoring.video_recorder import VideoRecorder
from IPython.display import HTML, display
import imageio
import base64
import io
import glob
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple


######## 创建网络架构
class Network(nn.Module):
    def __init__(self, state_size, action_size, seed=42):
        super(Network, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)


    def forward(self, state):
        x = self.fc1(state)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)


######## 设置环境:使用Gymnasium创建了LunarLander环境
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)
######## 初始化超参数:定义了学习率、批处理大小、折扣因子等超参数。
learning_rate = 5e-4
minibatch_size = 100
discount_factor = 0.99
replay_buffer_size = int(1e5)
interpolation_parameter = 1e-3


####### 实现经验回放:实现了经验回放(Experience Replay)的类 ReplayMemory,用于存储和采样Agent的经验
class ReplayMemory(object):
    def __init__(self, capacity):
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = capacity
        self.memory = []


    def push(self, event):
        self.memory.append(event)
        if len(self.memory) > self.capacity:
            del self.memory[0]


    def sample(self, batch_size):
        experiences = random.sample(self.memory, k=batch_size)
        states = torch.from_numpy(np.vstack(
            [e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(
            np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
        rewards = torch.from_numpy(np.vstack(
            [e[2] for e in experiences if e is not None])).float().to(self.device)
        next_states = torch.from_numpy(np.vstack(
            [e[3] for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(
            np.uint8)).float().to(self.device)
        return states, next_states, actions, rewards, dones


########## 实现 DQN 代理:创建了一个Agent类,包含本地Q网络和目标Q网络,包含了采取动作、学习、软更新等方法。
class Agent():
    # 初始化函数,参数为状态大小和动作大小
    def __init__(self, state_size, action_size):
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.local_qnetwork = Network(state_size, action_size).to(self.device)
        self.target_qnetwork = Network(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(
            self.local_qnetwork.parameters(), lr=learning_rate)
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0
    # 定义一个函数,用于存储经验并决定何时从中学习
    def step(self, state, action, reward, next_state, done):
        self.memory.push((state, action, reward, next_state, done))
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(100)
                self.learn(experiences, discount_factor)
    # 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数
    def act(self, state, epsilon=0.):
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))
    # 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子
    def learn(self, experiences, discount_factor):
        states, next_states, actions, rewards, dones = experiences
        next_q_targets = self.target_qnetwork(
            next_states).detach().max(1)[0].unsqueeze(1)
        q_targets = rewards + discount_factor * next_q_targets * (1 - dones)
        q_expected = self.local_qnetwork(states).gather(1, actions)
        loss = F.mse_loss(q_expected, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.soft_update(self.local_qnetwork,
                         self.target_qnetwork, interpolation_parameter)
    # 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数
    def soft_update(self, local_model, target_model, interpolation_parameter):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(interpolation_parameter * local_param.data + (
                1.0 - interpolation_parameter) * target_param.data)


####### 训练DQN代理
agent = Agent(state_size, number_actions)


number_episodes = 2000
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen=100)


for episode in range(1, number_episodes + 1):
    state, _ = env.reset()
    score = 0
    for t in range(maximum_number_timesteps_per_episode):
        action = agent.act(state, epsilon)
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
    scores_on_100_episodes.append(score)
    epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(
        episode, np.mean(scores_on_100_episodes)), end="")
    if episode % 100 == 0:
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(
            episode, np.mean(scores_on_100_episodes)))
    if np.mean(scores_on_100_episodes) >= 200.0:
        print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(
            episode - 100, np.mean(scores_on_100_episodes)))
        torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
        break


####### 可视化结果
def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)




show_video_of_model(agent, 'LunarLander-v2')




def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")


show_video()

终端输出:

State shape:  (8,)
State size:  8
Number of actions:  4
Episode 100     Average Score: -174.79
Episode 200     Average Score: -102.59
Episode 300     Average Score: -68.797
Episode 400     Average Score: -38.18
Episode 500     Average Score: 24.301
Episode 600     Average Score: 149.24
Episode 700     Average Score: 134.89
Episode 800     Average Score: 185.41
Episode 826     Average Score: 200.92
Environment solved in 726 episodes!     Average Score: 200.92
IMAGEIO FFMPEG_WRITER WARNING: input image is not divisible by macro_block_size=16, resizing from (600, 400) to (608, 400) to ensure video compatibility with most codecs and players. To prevent resizing, make your input image divisible by the macro_block_size or set the macro_block_size to 1 (risking incompatibility).
[swscaler @ 000001e276cb1f00] Warning: data is not aligned! This can lead to a speed loss
<IPython.core.display.HTML object>

总结:以上代码是使用深度 Q 学习(DQN)算法训练一个智能体在月球着陆环境中控制火箭的示例。代码分为以下几个部分:

• 导入所需的库和模块,包括 gymnasium(一个开源的强化学习环境库),torch(一个开源的深度学习框架),以及一些辅助的库和模块,如 imageio(用于处理图像和视频),base64(用于编码和解码数据),deque(用于实现双端队列),namedtuple(用于创建命名元组)等。

import io  # 导入io模块,用于读写文件
import glob  # 导入glob模块,用于查找文件
import gymnasium as gym  # 导入环境库
import os  # os 是操作系统相关的库,用来处理文件和目录等。
import random  # random 是随机数生成和操作的库,用来实现 epsilon 贪心策略等。
import numpy as np  # numpy 是科学计算的库,用来处理多维数组和矩阵等。
import torch  # torch 是 PyTorch 框架的主要库,用来实现张量和神经网络等。
import torch.nn as nn  # torch.nn 是 PyTorch 框架的神经网络模块,用来定义神经网络的层和损失函数等。
import torch.optim as optim  # torch.optim 是 PyTorch 框架的优化器模块,用来定义优化算法和更新参数等。
# torch.nn.functional 是 PyTorch 框架的函数式接口,用来实现激活函数和池化等。
import torch.nn.functional as F
# torch.autograd 是 PyTorch 框架的自动微分模块,用来实现反向传播和梯度计算等。
import torch.autograd as autograd
# torch.autograd.Variable 是 PyTorch 框架的变量类,用来封装张量和梯度等。
from torch.autograd import Variable
# collections 是 Python 的内置模块,用来实现特殊的容器类型,如双端队列和命名元组等。
from collections import deque, namedtuple

• 定义 Network 类,继承自 torch.nn.Module 类,用来构建神经网络模型,包括三个全连接层和一个前向传播函数,输入是状态的维度,输出是动作的个数。

class Network(nn.Module):  # 继承nn模块中的Module类
    def __init__(self, state_size, action_size, seed=42):  # 初始化函数,参数为状态大小(8),动作大小(4),随机种子(42)
        super(Network, self).__init__()  # 调用父类的初始化函数
        self.seed = torch.manual_seed(seed)  # 设置随机种子
        # 创建第一个全连接层,输入为状态大小,输出为64个神经元,这里64是为了适应月球着陆的问题
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)  # 创建第二个全连接层,输入为64个神经元,输出为64个神经元
        self.fc3 = nn.Linear(64, action_size)  # 创建第三个全连接层,输入为64个神经元,输出为动作大小
        # 完成神经网络的构建


    def forward(self, state):  # 前向传播函数
        x = self.fc1(state)  # 从输入层接收状态
        x = F.relu(x)  # 使用激活函数
        x = self.fc2(x)  # 从第一个隐藏层接收输出
        x = F.relu(x)  # 使用激活函数
        return self.fc3(x)  # 返回最后一层的输出

• 创建月球着陆环境,使用 gym.make 函数,获取环境的状态空间和动作空间的属性,如状态的形状,状态的维度,动作的个数等。

# https://gymnasium.farama.org/environments/box2d/lunar_lander/
env = gym.make('LunarLander-v2')
# 导入月球着陆环境
state_shape = env.observation_space.shape  # 状态的形状,这里是8维向量
state_size = env.observation_space.shape[0]  # 状态的大小,这里是8个元素,包括坐标,速度等
number_actions = env.action_space.n  # 动作的数量,这里是4个
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

• 定义一些超参数,如学习率,批次大小,折扣因子,回放缓冲区的容量,软更新的插值参数等。

learning_rate = 5e-4  # 学习率,这里是0.00005
minibatch_size = 100  # 批次大小,用于更新参数
discount_factor = 0.99  # 折扣因子,用于计算未来奖励,越接近1越考虑未来,越接近0越考虑当前
replay_buffer_size = int(1e5)  # 重放缓冲区的大小,用于存储人工智能的经验,稳定和改善学习,这里是10万个经验
interpolation_parameter = 1e-3  # 插值参数,用于更新目标网络,这里是0.001
# 所有的参数都是通过实验得到的最优值

• 定义 ReplayMemory 类,用来实现经验回放机制,包括一个初始化函数,一个存储经验的函数,一个采样经验的函数,使用双端队列来存储经验元组,使用随机采样的方法来获取一批经验,将经验转换为 PyTorch 张量并发送到 cpu 或 gpu 上。

# 定义一个重放缓冲区的类
class ReplayMemory(object):
    def __init__(self, capacity):  # 初始化函数,参数为缓冲区的容量
        # 判断是否有cuda可用,如果有则使用cuda,否则使用cpu
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = capacity  # 设置缓冲区的容量
        self.memory = []  # 创建一个列表,用于存储经验,每个经验包括状态,动作,奖励,下一个状态,是否结束等


    def push(self, event):  # 定义一个函数,用于向缓冲区中添加经验
        self.memory.append(event)  # 将经验添加到列表中
        if len(self.memory) > self.capacity:  # 如果列表的长度超过了容量
            del self.memory[0]  # 删除最旧的经验


    def sample(self, batch_size):  # 定义一个函数,用于从缓冲区中随机抽取一批经验
        experiences = random.sample(self.memory, k=batch_size)  # 从列表中随机抽取k个经验
        # 从经验中提取每个元素,并将它们堆叠在一起
        # 使用列表推导式,从每个经验中提取状态,使用np.vstack将它们垂直堆叠,然后转换为pytorch张量,使用.float()将它们转换为浮点数,使用.to(self.device)将它们发送到cpu或gpu
        states = torch.from_numpy(np.vstack(
            [e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long(
        ).to(self.device)  # 同理,从每个经验中提取动作,使用.long()将它们转换为整数
        rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float(
        ).to(self.device)  # 同理,从每个经验中提取奖励,使用.float()将它们转换为浮点数
        next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float(
        ).to(self.device)  # 同理,从每个经验中提取下一个状态,使用.float()将它们转换为浮点数
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float(
        ).to(self.device)  # 同理,从每个经验中提取是否结束的标志,使用.astype(np.uint8)将它们转换为无符号整数,使用.float()将它们转换为浮点数
        return states, next_states, actions, rewards, dones  # 返回这些元素,注意顺序要一致

• 定义 Agent 类,用来实现深度 Q 学习的智能体,包括一个初始化函数,一个执行一步操作的函数,一个选择动作的函数,一个学习的函数,一个软更新的函数,使用两个神经网络模型,一个是本地 Q 网络,用来选择和评估动作,一个是目标 Q 网络,用来计算目标 Q 值,使用 Adam 优化器来更新本地 Q 网络的参数,使用 ReplayMemory 类的实例来存储和采样经验,使用时间步来控制何时学习,使用 epsilon 贪心策略来平衡探索和利用,使用均方误差损失函数来计算预期 Q 值和目标 Q 值之间的差异,使用软更新的方法来更新目标 Q 网络的参数。

# 定义一个代理类
class Agent():
    def __init__(self, state_size, action_size):  # 初始化函数,参数为状态大小和动作大小
        # 判断是否有cuda可用,如果有则使用cuda,否则使用cpu
        self.device = torch.device(
            "cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size  # 创建对象变量,存储状态大小
        self.action_size = action_size  # 创建对象变量,存储动作大小
        # Q学习 --
        self.local_qnetwork = Network(state_size, action_size).to(
            self.device)  # 创建一个本地网络,用于选择动作,将其发送到设备上,随机种子已经在之前提供
        self.target_qnetwork = Network(state_size, action_size).to(
            self.device)  # 创建一个目标网络,用于计算目标值,将其发送到设备上


        # 创建一个优化器,用于更新本地网络的参数,使用Adam算法,学习率为之前定义的值
        self.optimizer = optim.Adam(
            self.local_qnetwork.parameters(), lr=learning_rate)
        # 创建一个重放缓冲区,用于存储人工智能的经验,容量为之前定义的值
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0  # 时间步数


    def step(self, state, action, reward, next_state, done):  # 定义一个函数,用于存储经验并决定何时从中学习
        self.memory.push((state, action, reward, next_state, done))  # 将经验推入缓冲区
        self.t_step = (self.t_step + 1) % 4  # 时间步数计数器(每4步学习一次)
        if self.t_step == 0:  # 如果时间步数为0
            # 如果缓冲区中的经验数量大于批次大小,self.memory是重放缓冲区的实例,memory属性是__init__中定义的列表
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(100)  # 从缓冲区中随机抽取100个经验
                self.learn(experiences, discount_factor)  # 从经验中学习,使用之前定义的折扣因子


    def act(self, state, epsilon=0.):  # 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数
        # 将状态转换为pytorch张量,增加一个维度,表示批次的维度,0表示批次维度的索引,将其放在最前面,将张量发送到设备上
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()  # 将本地网络设置为评估模式,不更新梯度,本地网络是代理类的属性,继承自nn.Module类,有eval()方法
        with torch.no_grad():  # 不计算梯度,检查是否处于推理模式而不是训练模式
            # 使用本地网络对状态进行前向传播,得到每个动作的价值,这些价值将被epsilon贪婪策略选择(这里我们得到的不是最终的值,而是对应于状态的q值)
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()  # 将本地网络设置为训练模式,更新梯度,本地网络是代理类的属性,继承自nn.Module类,有train()方法
        # Eplison贪婪动作选择策略 --(用于探索和利用的平衡)
        if random.random() > epsilon:  # 如果随机数大于epsilon,random.random()是random库的方法,返回一个0到1之间的随机数
            # 返回价值最大的动作,np.argmax是numpy库的方法,返回最大值的索引,action_values发送到cpu上,因为它是简单的,data.numpy()将格式转换为numpy的数据格式
            return np.argmax(action_values.cpu().data.numpy())
        else:  # 否则
            # 返回随机的动作,random.choice是random库的方法,从给定的列表中随机选择一个元素,np.arange是numpy库的方法,返回一个从0到动作大小的数组
            return random.choice(np.arange(self.action_size))


    def learn(self, experiences, discount_factor):  # 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子
        states, next_states, actions, rewards, dones = experiences  # 从经验中解包元素
        # 从目标网络中获取下一个状态的最大q值,self.target_qnetwork(next_states)返回每个动作的价值,.detach()将张量从计算图中分离,我们不会在反向传播中使用这些值,.max(1)沿着第一个维度取最大值,得到两个张量(最大值和索引),因此我们添加.max[1][0]只获取最大值,.unsqueeze(1)增加一个维度,表示批次的维度,但这次在第一个位置
        next_q_targets = self.target_qnetwork(
            next_states).detach().max(1)[0].unsqueeze(1) # 从目标Q网络的输出中,选择每个下一个状态对应的最大Q值,然后将其整理成一个列向量 next_q_targets。这个向量将用于计算Q-learning的目标值,以便更新本地Q网络
        q_targets = rewards + discount_factor * next_q_targets * \
            (1 - dones)  # 计算当前状态的目标值,使用公式:奖励加上折扣后的未来最大值,乘以1减去结束标志
        q_expected = self.local_qnetwork(states).gather(
            1, actions)  # 获取当前状态的预期值,.gather(1, actions)根据动作选择对应的价值
        loss = F.mse_loss(q_expected, q_targets)  # 计算预期值和目标值之间的均方误差
        self.optimizer.zero_grad()  # 清空优化器的梯度,zero_grad()是Adam的方法
        loss.backward()  # 反向传播误差
        self.optimizer.step()  # 更新本地网络的参数,step()是优化器的方法
        # 使用软更新的方法更新目标网络的参数,参数为本地网络,目标网络和插值参数
        self.soft_update(self.local_qnetwork,
                         self.target_qnetwork, interpolation_parameter)


    # 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数
    def soft_update(self, local_model, target_model, interpolation_parameter):
        # 遍历目标模型和本地模型的参数,zip()是一个函数,用于将参数打包成元组,parameters()是nn.Module的方法,返回模型的参数
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            # 使用插值参数更新目标模型的参数,将本地模型的参数乘以插值参数,再加上目标模型的参数乘以1减去插值参数,.copy_()是一个方法,用于复制张量的数据
            target_param.data.copy_(interpolation_parameter * local_param.data + (
                1.0 - interpolation_parameter) * target_param.data)

• 创建 Agent 类的实例,传入状态维度和动作个数。

agent = Agent(state_size, number_actions)  # 创建一个代理或人工智能,参数为状态大小和动作数量

• 定义训练过程,包括回合数,每个回合的最大时间步数,epsilon 值的初始值,终止值,衰减值,记录最近 100 个回合的分数,遍历每个回合,重置环境,初始化分数,遍历每个时间步,选择动作,执行动作,存储经验,更新状态,累加奖励,判断是否结束,更新 epsilon 值,打印当前回合和平均分数,判断是否达到目标分数,保存模型参数。

# ### 训练DQN代理
# %%
number_episodes = 2000  # 我们想要训练的次数
# 我们不想让一个回合卡住,所以设置一个最大的时间步数(在月球上着陆的尝试最多为1000个时间步)
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0  # epsilon的初始值
epsilon_ending_value = 0.01  # epsilon的结束值
epsilon_decay_value = 0.995  # epsilon的衰减率,按照(1*0.995 , 1*0.995*0.995,...)的方式递减
epsilon = epsilon_starting_value  # epsilon的变量
scores_on_100_episodes = deque(maxlen=100)  # 最近100个回合的分数(列表) 创建了一个双端队列,最大长度为100


for episode in range(1, number_episodes + 1):  # 循环直到2000次(上限固定)
    state, _ = env.reset()  # 重置环境(这里返回状态和观察值)
    score = 0  # 每个回合的累积分数
    for t in range(maximum_number_timesteps_per_episode):  # 循环每个时间步
        action = agent.act(state, epsilon)  # 根据当前状态和epsilon贪婪策略选择一个动作
        # 根据动作执行环境的步骤,返回下一个状态,奖励,是否结束等值,_是丢弃不需要的值
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)  # 调用代理的学习方法
        state = next_state  # 更新状态
        score += reward  # 累加奖励
        if done:  # 如果回合结束
            break  # 跳出循环
    scores_on_100_episodes.append(score)  # 将最近的回合分数添加到列表中
    epsilon = max(epsilon_ending_value, epsilon_decay_value *
                  epsilon)  # 更新epsilon的值,使其按照衰减率递减,但不低于结束值
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(
        scores_on_100_episodes)), end="")  # \r覆盖之前的输出,\t制表符,.2f保留两位小数,打印当前的回合数和平均分数,end表示不换行
    if episode % 100 == 0:  # 每100个回合
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(
            episode, np.mean(scores_on_100_episodes)))  # \r覆盖之前的输出,打印当前的回合数和平均分数,换行
    if np.mean(scores_on_100_episodes) >= 200.0:  # 如果达到胜利的条件
        print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(
            episode - 100, np.mean(scores_on_100_episodes)))  # \n换行,:d表示整数,打印环境在多少个回合内解决,以及平均分数
        torch.save(agent.local_qnetwork.state_dict(),
                   'checkpoint.pth')  # 将当前Agent的本地Q网络(agent.local_qnetwork)的参数保存到文件中
        break  # 跳出循环

• 定义展示模型表现的函数,使用 imageio 库将每一帧的图像保存为视频文件,使用 HTML 标签和 base64 编码在网页中显示视频。

• 调用展示模型表现的函数,传入智能体和环境的名称。

from gym.wrappers.monitoring.video_recorder import VideoRecorder  # 导入gym模块,用于录制视频
from IPython.display import HTML, display  # 导入IPython模块,用于显示HTML和视频
import imageio  # 导入imageio模块,用于处理图像
import base64  # 导入base64模块,用于编码和解码
import io  # 导入io模块,用于读写文件
import glob  # 导入glob模块,用于查找文件
import gymnasium as gym  # 导入环境库
# %%




def show_video_of_model(agent, env_name):  # 定义一个函数,用于展示代理在环境中的表现
    env = gym.make(env_name, render_mode='rgb_array')  # 创建一个环境,渲染模式为rgb数组
    state, _ = env.reset()  # 重置环境,获取初始状态
    done = False  # 设置结束标志为False
    frames = []  # 创建一个列表,用于存储每一帧的图像
    while not done:  # 循环直到结束
        frame = env.render()  # 渲染环境,获取当前帧的图像
        frames.append(frame)  # 将图像添加到列表中
        action = agent.act(state)  # 根据当前状态选择一个动作
        state, reward, done, _, _ = env.step(
            action.item())  # 根据动作执行环境的步骤,获取下一个状态,奖励,是否结束等值
    env.close()  # 关闭环境
    # 使用imageio模块,将列表中的图像保存为视频,帧率为30
    imageio.mimsave('video.mp4', frames, fps=30)




show_video_of_model(agent, 'LunarLander-v2')  # 调用函数,展示代理在月球着陆环境中的表现




def show_video():  # 定义一个函数,用于显示视频
    mp4list = glob.glob('*.mp4')  # 使用glob模块,查找当前目录下的所有mp4文件
    if len(mp4list) > 0:  # 如果找到了
        mp4 = mp4list[0]  # 取第一个文件
        video = io.open(mp4, 'r+b').read()  # 使用io模块,以二进制模式读取文件
        encoded = base64.b64encode(video)  # 使用base64模块,对文件进行编码
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))  # 使用IPython模块,显示HTML格式的视频,使用base64编码的数据作为源
    else:  # 如果没有找到
        print("Could not find video")  # 打印提示信息
show_video()  # 调用函数,显示视频

笔记

【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例,机器学习,人工智能

【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例,机器学习,人工智能

【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例,机器学习,人工智能

参考网址:

https://gymnasium.farama.org/environments/box2d/lunar_lander/文章来源地址https://www.toymoban.com/news/detail-823299.html

到了这里,关于【机器学习】强化学习(六)-DQN(Deep Q-Learning)训练月球着陆器示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 强化学习Q-learning实践

    前篇文章介绍了强化学习系统红的基本概念和重要组成部分,并解释了 Q-learning 算法相关的理论知识。本文的目标是在 Python3 中实现该算法,并将其应用于实际的实验中。 闲话少说,我们直接开始吧! 为了使本文具有实际具体的意义,特意选择了一个简单而基本的环境,可

    2024年02月08日
    浏览(63)
  • 强化学习 - Q-learning(Q学习)

    强化学习中的 Q-learning (Q学习)是一种用于 学习在未知环境中做出决策的方法 。它是基于值函数的方法,通过学习一个值函数 Q,该函数表示在给定状态和动作下,期望的累积奖励。 以下是一个简单的 Q-learning 的实现教程,使用 Python 进行演示。这里我们考虑一个简单的驾

    2024年01月24日
    浏览(60)
  • 【强化学习】Q-Learning算法详解

    1 Q-Learning算法简介 1.1 行为准则 我们做很多事情都有自己的行为准则,比如小时候爸妈常说:不写完作业就不准看电视。所以我们在写作业这种状态下,写的好的行为就是继续写作业,知道写完他,我们还可以得到奖励。不好的行为就是没写完就跑去看电视了,被爸妈发现,

    2024年01月16日
    浏览(70)
  • 强化学习——Q-Learning算法原理

    一、Q-Learning :异策略时序差分控制 从决策方式来看,强化学习可以分为 基于策略 的方法( policy-based )和 基于价值 的方法( value-based )。基于策略的方法直接对策略进行优化,使制定的的策略能够获得最大的奖励。基于价值的强化学习方法中,智能体不需要制定显式的策略,

    2024年01月23日
    浏览(56)
  • 【强化学习】——Q-learning算法为例入门Pytorch强化学习

    🤵‍♂️ 个人主页:@Lingxw_w的个人主页 ✍🏻作者简介:计算机研究生在读,研究方向复杂网络和数据挖掘,阿里云专家博主,华为云云享专家,CSDN专家博主、人工智能领域优质创作者,安徽省优秀毕业生 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话

    2024年02月10日
    浏览(72)
  • 【强化学习】常用算法之一 “Q-learning”

      作者主页: 爱笑的男孩。的博客_CSDN博客-深度学习,活动,python领域博主 爱笑的男孩。擅长深度学习,活动,python,等方面的知识,爱笑的男孩。关注算法,python,计算机视觉,图像处理,深度学习,pytorch,神经网络,opencv领域. https://blog.csdn.net/Code_and516?type=blog 个人简介:打工人。 持续分

    2024年02月11日
    浏览(58)
  • 强化学习基础篇[2]:SARSA、Q-learning算法简介、应用举例、优缺点分析

    【强化学习原理+项目专栏】必看系列:单智能体、多智能体算法原理+项目实战、相关技巧(调参、画图等、趣味项目实现、学术应用项目实现 专栏详细介绍 :【强化学习原理+项目专栏】必看系列:单智能体、多智能体算法原理+项目实战、相关技巧(调参、画图等、趣味项

    2024年02月07日
    浏览(46)
  • 强化学习应用(二):基于Q-learning的物流配送路径规划研究(提供Python代码)

    Q-learning是一种强化学习算法,用于解决基于马尔可夫决策过程(MDP)的问题。它通过学习一个值函数来指导智能体在环境中做出决策,以最大化累积奖励。 Q-learning算法的核心思想是使用一个Q值函数来估计每个状态动作对的价值。Q值表示在特定状态下采取某个动作所能获得

    2024年01月21日
    浏览(66)
  • 强化学习应用(一):基于Q-learning的无人机物流路径规划研究(提供Python代码)

    Q-learning是一种强化学习算法,用于解决基于马尔可夫决策过程(MDP)的问题。它通过学习一个价值函数来指导智能体在环境中做出决策,以最大化累积奖励。 Q-learning算法的核心思想是通过不断更新一个称为Q值的表格来学习最优策略。Q值表示在给定状态下采取某个动作所能

    2024年02月02日
    浏览(50)
  • 强化学习应用(八):基于Q-learning的无人机物流路径规划研究(提供Python代码)

    Q-learning是一种强化学习算法,用于解决基于马尔可夫决策过程(MDP)的问题。它通过学习一个价值函数来指导智能体在环境中做出决策,以最大化累积奖励。 Q-learning算法的核心思想是通过不断更新一个称为Q值的表格来学习最优策略。Q值表示在给定状态下采取某个动作所能

    2024年01月17日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包