【机器学习】强化学习(四)-时序差分学习

这篇具有很好参考价值的文章主要介绍了【机器学习】强化学习(四)-时序差分学习。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

蒙特卡洛算法需要使用完整的片段进行计算,这在有些问题中是不现实的,尤其是对于没有终止状态的问题。时序差分算法对此进行了改进

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

蒙特卡洛控制和时序差分学习有什么区别?

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

四、时序差分算法(Temporal Difference Learning, TD 学习)

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

4.1 时序差分(0)

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

4.2 Sarsa算法

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

4.3 Q学习(Q-learning)

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

4.4 Sarsa和Q-learning有什么区别?

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

4.5 示例代码

公共类:discrete.py  plotting.py

离散环境的类 discrete.py它继承自 gym 库的 Env 类,用于创建和管理强化学习的环境。它的主要功能是:

  • 定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,动作的空间,状态的空间等。

  • 定义了环境的基本方法,如设置随机数种子,重置环境,执行一个动作,返回下一个状态,奖励,是否结束和附加信息等。

  • 使用了 numpy 库,gym 库和 categorical_sample 函数来进行数值计算,环境管理和概率采样等操作。

# 导入 numpy 库,用于进行数值计算
import numpy as np


# 导入 gym 库,用于创建和管理强化学习的环境
from gym import Env, spaces
# 导入 gym 库的 seeding 模块,用于设置随机数种子
from gym.utils import seeding
# 导入 gym 库的 toy_text 模块的 categorical_sample 函数,用于从一个概率分布中采样一个类别
from gym.envs.toy_text.utils import categorical_sample


# 定义一个离散环境的类,继承自 gym 库的 Env 类
class DiscreteEnv(Env):


    """
    Has the following members
    - nS: number of states # 状态的数量
    - nA: number of actions # 动作的数量
    - P: transitions (*) # 状态转移的概率
    - isd: initial state distribution (**) # 初始状态的分布


    (*) dictionary of lists, where
      P[s][a] == [(probability, nextstate, reward, done), ...] # P[s][a] 是一个列表,表示在状态 s 下采取动作 a 后,可能的下一个状态,奖励和是否结束的概率
    (**) list or array of length nS # isd 是一个长度为 nS 的列表或数组,表示每个状态作为初始状态的概率
    """


    # 定义初始化方法,接受四个参数:状态的数量,动作的数量,状态转移的概率,初始状态的分布
    def __init__(self, nS, nA, P, isd):
        self.P = P # 将状态转移的概率赋值给 self.P
        self.isd = isd # 将初始状态的分布赋值给 self.isd
        self.lastaction = None  # for rendering # 定义一个属性,用于记录上一次的动作,用于渲染
        self.nS = nS # 将状态的数量赋值给 self.nS
        self.nA = nA # 将动作的数量赋值给 self.nA


        # 定义一个属性,表示动作的空间,是一个离散的空间,取值范围是 [0, nA-1]
        self.action_space = spaces.Discrete(self.nA)
        # 定义一个属性,表示状态的空间,是一个离散的空间,取值范围是 [0, nS-1]
        self.observation_space = spaces.Discrete(self.nS)


        self.seed() # 调用 seed 方法,设置随机数种子
        # 从初始状态的分布中采样一个状态,赋值给 self.s
        self.s = categorical_sample(self.isd, self.np_random)


    # 定义一个方法,用于设置随机数种子,接受一个参数:种子
    def seed(self, seed=None):
        # 调用 seeding 模块的 np_random 函数,根据种子生成一个随机数生成器,赋值给 self.np_random,并返回种子
        self.np_random, seed = seeding.np_random(seed)
        return [seed]


    # 定义一个方法,用于重置环境,返回初始状态
    def reset(self):
        # 从初始状态的分布中采样一个状态,赋值给 self.s
        self.s = categorical_sample(self.isd, self.np_random)
        self.lastaction = None # 将上一次的动作设为 None
        return int(self.s) # 返回初始状态,转换为整数类型


    # 定义一个方法,用于执行一个动作,返回下一个状态,奖励,是否结束和附加信息
    def step(self, a):
        # 根据当前状态和动作,从状态转移的概率中获取可能的转移列表,赋值给 transitions
        transitions = self.P[self.s][a]
        # 从转移列表中,根据转移的概率,采样一个转移的索引,赋值给 i
        i = categorical_sample([t[0] for t in transitions], self.np_random)
        # 根据转移的索引,获取转移的概率,下一个状态,奖励和是否结束,赋值给 p, s, r, d
        p, s, r, d = transitions[i]
        self.s = s # 将下一个状态赋值给 self.s
        self.lastaction = a # 将当前动作赋值给 self.lastaction
        # 返回下一个状态,奖励,是否结束和附加信息,其中附加信息是一个字典,包含转移的概率,下一个状态转换为整数类型
        return (int(s), r, d, {"prob": p})

用于绘制一些问题中的价值函数的图形的函数 plotting.py 。价值函数表示在不同的状态下,采取最优策略能够获得的期望回报。这些代码使用了matplotlib库,numpy库,pandas库和namedtuple来进行数据处理和图形绘制。代码中定义了三个函数,分别是:

  • plot_cost_to_go_mountain_car:这个函数用于绘制山地车问题的价值函数,山地车问题是一个连续状态空间的强化学习问题,目标是让一辆车在两座山之间来回移动,最终到达右边的山顶。这个函数接受一个环境对象,一个估计器对象和一个网格数作为参数,然后生成一个三维的曲面图,显示在不同的位置和速度下,采取最优动作的成本(负的价值)。

  • plot_value_function:这个函数用于绘制二十一点游戏的价值函数,二十一点游戏是一个离散状态空间的强化学习问题,目标是让玩家的牌的总和尽可能接近21,但不超过21,同时要比庄家的牌的总和大。这个函数接受一个价值函数字典和一个标题作为参数,然后分别绘制两个三维的曲面图,显示在不同的玩家总和和庄家显示牌下,有可用的Ace和没有可用的Ace的情况下的价值。

  • plot_episode_stats:这个函数用于绘制每个回合的统计信息,包括回合的长度,回合的奖励,回合的时间步数和回合的编号。这个函数接受一个命名元组,一个平滑窗口和一个是否显示图形的标志作为参数,然后分别绘制三个二维的折线图,显示回合的长度,回合的奖励和回合的时间步数随回合的编号的变化。这个函数返回三个图形对象。

# 导入matplotlib库,用于绘制图形
import matplotlib
# 导入numpy库,用于进行数值计算
import numpy as np
# 导入pandas库,用于进行数据分析
import pandas as pd
# 导入namedtuple,用于创建命名元组
from collections import namedtuple
# 导入pyplot模块,用于绘制二维图形
from matplotlib import pyplot as plt
# 导入Axes3D模块,用于绘制三维图形
from mpl_toolkits.mplot3d import Axes3D


# 创建一个命名元组,用于存储每个回合的长度和奖励
EpisodeStats = namedtuple("Stats",["episode_lengths", "episode_rewards"])


# 定义一个函数,用于绘制山地车问题的价值函数
def plot_cost_to_go_mountain_car(env, estimator, num_tiles=20):
    # 生成一个等差数列,表示状态空间中的位置范围
    x = np.linspace(env.observation_space.low[0], env.observation_space.high[0], num=num_tiles)
    # 生成一个等差数列,表示状态空间中的速度范围
    y = np.linspace(env.observation_space.low[1], env.observation_space.high[1], num=num_tiles)
    # 生成一个网格,表示状态空间中的所有可能组合
    X, Y = np.meshgrid(x, y)
    # 对每个状态,计算估计器预测的最大动作价值,并取负数,表示成本
    Z = np.apply_along_axis(lambda _: -np.max(estimator.predict(_)), 2, np.dstack([X, Y]))


    # 创建一个图形对象,设置大小为10*5
    fig = plt.figure(figsize=(10, 5))
    # 在图形对象上添加一个子图,设置为三维投影
    ax = fig.add_subplot(111, projection='3d')
    # 在子图上绘制一个曲面,表示价值函数
    surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1,
                           cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
    # 设置子图的x轴标签为位置
    ax.set_xlabel('Position')
    # 设置子图的y轴标签为速度
    ax.set_ylabel('Velocity')
    # 设置子图的z轴标签为价值
    ax.set_zlabel('Value')
    # 设置子图的标题为山地车问题的成本函数
    ax.set_title("Mountain \"Cost To Go\" Function")
    # 在图形对象上添加一个颜色条,表示价值的范围
    fig.colorbar(surf)
    # 显示图形
    plt.show()




# 定义一个函数,用于绘制价值函数的曲面图
def plot_value_function(V, title="Value Function"):
    """
    Plots the value function as a surface plot.
    """
    # 找到价值函数中的最小和最大的玩家总和
    min_x = min(k[0] for k in V.keys())
    max_x = max(k[0] for k in V.keys())
    # 找到价值函数中的最小和最大的庄家显示牌
    min_y = min(k[1] for k in V.keys())
    max_y = max(k[1] for k in V.keys())


    # 生成一个等差数列,表示玩家总和的范围
    x_range = np.arange(min_x, max_x + 1)
    # 生成一个等差数列,表示庄家显示牌的范围
    y_range = np.arange(min_y, max_y + 1)
    # 生成一个网格,表示所有可能的状态组合
    X, Y = np.meshgrid(x_range, y_range)


    # 对每个状态,根据是否有可用的Ace,计算价值函数的值
    Z_noace = np.apply_along_axis(lambda _: V[(_[0], _[1], False)], 2, np.dstack([X, Y]))
    Z_ace = np.apply_along_axis(lambda _: V[(_[0], _[1], True)], 2, np.dstack([X, Y]))


    # 定义一个内部函数,用于绘制一个曲面图
    def plot_surface(X, Y, Z, title):
        # 创建一个图形对象,设置大小为20*10
        fig = plt.figure(figsize=(20, 10))
        # 在图形对象上添加一个子图,设置为三维投影
        ax = fig.add_subplot(111, projection='3d')
        # 在子图上绘制一个曲面,表示价值函数
        surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1,
                               cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
        # 设置子图的x轴标签为玩家总和
        ax.set_xlabel('Player Sum')
        # 设置子图的y轴标签为庄家显示牌
        ax.set_ylabel('Dealer Showing')
        # 设置子图的z轴标签为价值
        ax.set_zlabel('Value')
        # 设置子图的标题
        ax.set_title(title)
        # 设置子图的视角
        ax.view_init(ax.elev, -120)
        # 在图形对象上添加一个颜色条,表示价值的范围
        fig.colorbar(surf)
        # 显示图形
        plt.show()


    # 调用内部函数,分别绘制没有可用Ace和有可用Ace的情况下的价值函数
    plot_surface(X, Y, Z_noace, "{} (No Usable Ace)".format(title))
    plot_surface(X, Y, Z_ace, "{} (Usable Ace)".format(title))


# 定义一个函数,用于绘制每个回合的统计信息
def plot_episode_stats(stats, smoothing_window=10, noshow=False):
    # 绘制每个回合的长度随时间的变化
    fig1 = plt.figure(figsize=(10,5))
    plt.plot(stats.episode_lengths)
    plt.xlabel("Episode")
    plt.ylabel("Episode Length")
    plt.title("Episode Length over Time")
    # 如果noshow为真,不显示图形,否则显示图形
    if noshow:
        plt.close(fig1)
    else:
        plt.show()#fig1


    # 绘制每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理
    fig2 = plt.figure(figsize=(10,5))
    rewards_smoothed = pd.Series(stats.episode_rewards).rolling(smoothing_window, min_periods=smoothing_window).mean()
    plt.plot(rewards_smoothed)
    plt.xlabel("Episode")
    plt.ylabel("Episode Reward (Smoothed)")
    plt.title("Episode Reward over Time (Smoothed over window size {})".format(smoothing_window))
    # 如果noshow为真,不显示图形,否则显示图形
    if noshow:
        plt.close(fig2)
    else:
        plt.show()#fig2


    # 绘制每个回合的时间步数和回合数的关系
    fig3 = plt.figure(figsize=(10,5))
    plt.plot(np.cumsum(stats.episode_lengths), np.arange(len(stats.episode_lengths)))
    plt.xlabel("Time Steps")
    plt.ylabel("Episode")
    plt.title("Episode per time step")
    # 如果noshow为真,不显示图形,否则显示图形
    if noshow:
        plt.close(fig3)
    else:
        plt.show()#fig3


    # 返回三个图形对象
    return fig1, fig2, fig3

SARSA算法求解有风格子世界问题

有风格子世界环境的类 windy_gridworld,它继承自 discrete.DiscreteEnv 类,用于创建和管理一个强化学习的环境。它的主要功能是:

  • 定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,风的强度,动作的空间,状态的空间等。

  • 定义了环境的基本方法,如限制坐标的范围,计算转移的概率,重置环境,渲染环境等。

  • 使用了 io 库,gym 库,numpy 库,sys 库和 discrete 模块来进行输入输出,环境管理,数值计算,系统操作和离散环境的管理等操作。

# 导入io模块,这是一个内置的模块,提供了与输入输出流相关的功能
import io
# 导入gym库,这是一个用于强化学习的开源库,提供了多种环境和接口
import gym
# 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能
import numpy as np
# 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
import sys


# 从当前目录下的discrete模块中导入DiscreteEnv类,这是一个用于实现离散动作空间的环境的基类
from . import discrete


# 定义四个常量,表示四个动作的编号
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3


# 定义一个类,继承自DiscreteEnv类,用于实现有风格子世界问题的强化学习环境
class WindyGridworldEnv(discrete.DiscreteEnv):


    # 定义一个元数据字典,表示该环境支持的渲染模式
    metadata = {'render.modes': ['human', 'ansi']}


    # 定义一个私有方法,用于限制坐标的范围,使其不超过网格的边界
    def _limit_coordinates(self, coord):
        # 将坐标的第一个分量限制在0到网格的行数减一之间
        coord[0] = min(coord[0], self.shape[0] - 1)
        coord[0] = max(coord[0], 0)
        # 将坐标的第二个分量限制在0到网格的列数减一之间
        coord[1] = min(coord[1], self.shape[1] - 1)
        coord[1] = max(coord[1], 0)
        # 返回限制后的坐标
        return coord


    # 定义一个私有方法,用于计算状态转移的概率,根据当前位置,动作的变化量,和风的强度
    def _calculate_transition_prob(self, current, delta, winds):
        # 计算新的位置,等于当前位置加上动作的变化量,再加上风的影响
        new_position = np.array(current) + np.array(delta) + np.array([-1, 0]) * winds[tuple(current)]
        # 限制新的位置的范围,转换为整数类型
        new_position = self._limit_coordinates(new_position).astype(int)
        # 计算新的状态,将新的位置转换为一维的索引
        new_state = np.ravel_multi_index(tuple(new_position), self.shape)
        # 判断是否达到目标位置,即(3, 7)
        is_done = tuple(new_position) == (3, 7)
        # 返回一个列表,包含一个元组,表示状态转移的概率,新的状态,即时奖励,和是否结束的标志
        return [(1.0, new_state, -1.0, is_done)]


    # 定义一个构造方法,用于初始化环境的属性
    def __init__(self):
        # 定义网格的形状,为7行10列
        self.shape = (7, 10)


        # 计算状态空间的大小,为网格的元素个数
        nS = np.prod(self.shape)
        # 定义动作空间的大小,为4个动作
        nA = 4


        # 定义风的强度,为一个与网格形状相同的数组,某些列有不同的风力
        winds = np.zeros(self.shape)
        winds[:,[3,4,5,8]] = 1
        winds[:,[6,7]] = 2


        # 计算状态转移的概率,用一个字典表示,键为状态,值为另一个字典,键为动作,值为一个列表,包含状态转移的元组
        P = {}
        # 对每个状态进行循环
        for s in range(nS):
            # 将状态转换为二维的位置
            position = np.unravel_index(s, self.shape)
            # 初始化状态对应的字典,键为动作,值为一个空列表
            P[s] = { a : [] for a in range(nA) }
            # 对每个动作进行循环,分别计算状态转移的概率,调用之前定义的私有方法
            P[s][UP] = self._calculate_transition_prob(position, [-1, 0], winds)
            P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1], winds)
            P[s][DOWN] = self._calculate_transition_prob(position, [1, 0], winds)
            P[s][LEFT] = self._calculate_transition_prob(position, [0, -1], winds)


        # 定义初始状态分布,为一个与状态空间大小相同的数组,只有(3, 0)位置的概率为1,其他为0
        isd = np.zeros(nS)
        isd[np.ravel_multi_index((3,0), self.shape)] = 1.0


        # 调用父类的构造方法,传入状态空间大小,动作空间大小,状态转移概率,和初始状态分布
        super(WindyGridworldEnv, self).__init__(nS, nA, P, isd)


    # 定义一个方法,用于渲染环境,根据模式和关闭标志,调用另一个私有方法
    def render(self, mode='human', close=False):
        self._render(mode, close)


    # 定义一个私有方法,用于渲染环境,根据模式和关闭标志,输出或显示网格世界的图形界面
    def _render(self, mode='human', close=False):
        # 如果关闭标志为True,表示不需要渲染,直接返回
        if close:
            return


        # 根据模式,选择输出的文件对象,如果是ansi模式,使用io模块中的StringIO对象,如果是human模式,使用系统的标准输出
        outfile = io.StringIO() if mode == 'ansi' else sys.stdout


        # 对每个状态进行循环
        for s in range(self.nS):
            # 将状态转换为二维的位置
            position = np.unravel_index(s, self.shape)
            # print(self.s)
            # 根据位置,选择输出的符号,如果是当前状态,输出 x,如果是目标位置,输出 T,否则输出 o
            if self.s == s:
                output = " x "
            elif position == (3,7):
                output = " T "
            else:
                output = " o "


            # 如果位置在第一列,去掉输出符号的左边空格
            if position[1] == 0:
                output = output.lstrip()
            # 如果位置在最后一列,去掉输出符号的右边空格,并换行
            if position[1] == self.shape[1] - 1:
                output = output.rstrip()
                output += "\n"


            # 将输出符号写入文件对象
            outfile.write(output)
        # 在所有状态循环结束后,再换行
        outfile.write("\n")

测试程序 Cliff Environment Playground.py,用于在有风格子世界环境中进行一些动作,并打印出环境的状态和渲染结果。它的主要功能是:

  • 导入 gym 库,numpy 库,sys 库和 WindyGridworldEnv 类,用于创建和管理环境,进行数值计算,系统操作和有风格子世界的管理等操作。

  • 如果当前路径中没有 “../”,则将其添加到路径中,方便导入其他模块。

  • 创建一个有风格子世界的环境,赋值给 env。

  • 调用 env 的 reset 方法,重置环境,返回初始状态,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 调用 env 的 step 方法,执行一个向右的动作,返回下一个状态,奖励,是否结束和附加信息,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 重复上述两步,执行五次向右的动作和一次向下的动作,打印和渲染每一步的结果

import gym
import numpy as np
import sys


if "../" not in sys.path:
  sys.path.append("../") 


from lib.envs.windy_gridworld import WindyGridworldEnv


# %%
env = WindyGridworldEnv()


print(env.reset())
env.render()


print(env.step(1))
env.render()


print(env.step(1))
env.render()


print(env.step(1))
env.render()


print(env.step(2))
env.render()


print(env.step(1))
env.render()


print(env.step(1))
env.render()

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

实现SARSA算法 SARSA Solution.py,SARSA算法是一种基于时序差分学习的强化学习算法,可以找到最优的epsilon-贪婪策略。

代码使用了gym库,itertools库,matplotlib库,numpy库,pandas库和sys库来进行环境模拟,数据处理和图形绘制。代码中定义了两个函数,分别是:

make_epsilon_greedy_policy:这个函数用于根据给定的Q函数和epsilon参数,创建一个epsilon-贪婪策略。这个函数接受一个状态到动作价值的字典,一个随机选择动作的概率,和一个环境中的动作数作为参数。这个函数返回一个函数,这个函数接受一个观察作为参数,返回一个长度为动作数的numpy数组,表示每个动作的概率。

sarsa:这个函数用于实现SARSA算法,找到最优的epsilon-贪婪策略。这个函数接受一个OpenAI环境,一个回合数,一个折扣因子,一个学习率,和一个epsilon参数作为参数。这个函数返回一个元组(Q, stats)。Q是最优的动作价值函数,一个状态到动作价值的字典。stats是一个EpisodeStats对象,包含两个numpy数组,分别表示每个回合的长度和奖励。

代码的主要流程是:

        1. 创建一个默认的动作价值函数,一个统计信息对象,和一个epsilon-贪婪策略。

        2. 对于每个回合,重置环境,选择第一个动作,然后循环执行以下步骤:

                a. 执行一个动作,观察下一个状态,奖励,和是否结束。

                b. 选择下一个动作,根据当前的策略。

                c. 更新统计信息。

                d. 使用时序差分更新公式,更新动作价值函数。

                e. 如果结束,跳出循环。

                f. 更新当前的动作和状态。

        3. 返回动作价值函数和统计信息对象。

        4. 使用plotting模块,绘制统计信息的图形。

# 导入gym库,这是一个用于强化学习的开源库,提供了多种环境和接口[^1^][1]
import gym
# 导入itertools库,这是一个用于创建迭代器的标准库,提供了多种迭代工具[^2^][2]
import itertools
# 导入matplotlib库,这是一个用于绘图的开源库,提供了多种图形和图表[^3^][3]
import matplotlib
# 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能[^4^][4]
import numpy as np
# 导入pandas库,这是一个用于数据分析和处理的开源库,提供了DataFrame等数据结构[^5^][5]
import pandas as pd
# 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
import sys


# 检查当前的系统路径中是否包含上一级目录,如果不包含,则将其添加到系统路径中
# 这样做的目的是为了能够导入上一级目录中的lib文件夹中的模块
if "../" not in sys.path:
  sys.path.append("../") 


# 从lib文件夹中的envs子文件夹中导入WindyGridworldEnv类,这是一个用于实现有风网格世界问题的强化学习环境
from collections import defaultdict
from lib.envs.windy_gridworld import WindyGridworldEnv
# 从lib文件夹中导入plotting模块,这是一个用于绘制统计数据的模块
from lib import plotting


# 设置matplotlib的样式为ggplot,这是一种美观的绘图风格
matplotlib.style.use('ggplot')


# %%
# 创建一个WindyGridworldEnv的实例对象,命名为env,这是一个7x10的网格世界,有一些单元格有风向和风力,智能体需要从起点走到终点,受到风的影响
env = WindyGridworldEnv()


# %%
# 定义一个函数,用于根据给定的Q函数和epsilon值,创建一个epsilon贪婪策略
def make_epsilon_greedy_policy(Q, epsilon, nA):
    """
    根据给定的Q函数和epsilon值,创建一个epsilon贪婪策略


    参数:
        Q: 一个字典,映射从状态到动作值
            每个值是一个长度为nA的numpy数组(见下文)
        epsilon: 选择一个随机动作的概率,介于0和1之间的浮点数
        nA: 环境中的动作数量


    返回:
        一个函数,接受一个观察值作为参数,返回
        每个动作的概率,以长度为nA的numpy数组的形式


    """
    # 定义一个内部函数,用于根据观察值,返回每个动作的概率
    def policy_fn(observation):
        # 创建一个长度为nA的numpy数组,每个元素的值为epsilon/nA,表示选择一个随机动作的概率
        A = np.ones(nA, dtype=float) * epsilon / nA
        # 根据Q函数,找到当前状态下最优的动作
        best_action = np.argmax(Q[observation])
        # 将最优动作的概率增加1-epsilon,表示选择最优动作的概率
        A[best_action] += (1.0 - epsilon)
        # 返回动作概率数组
        return A
    # 返回内部函数
    return policy_fn


# %%
# 定义一个函数,用于实现SARSA算法,即基于策略的时序差分控制,寻找最优的epsilon贪婪策略
def sarsa(env, num_episodes, discount_factor=1.0, alpha=0.5, epsilon=0.1):
    """
    SARSA算法: 基于策略的时序差分控制,寻找最优的epsilon贪婪策略


    参数:
        env: OpenAI环境
        num_episodes: 运行的回合数
        discount_factor: Gamma折扣因子
        alpha: 时序差分学习率
        epsilon: 选择一个随机动作的概率,介于0和1之间的浮点数


    返回:
        一个元组 (Q, stats)
        Q是最优的动作值函数,一个字典,映射从状态到动作值
        stats是一个EpisodeStats对象,包含两个numpy数组,分别记录每个回合的长度和奖励
    """


    # 最终的动作值函数
    # 一个嵌套的字典,映射从状态到(动作到动作值)
    # 使用defaultdict,当访问不存在的键时,返回一个长度为nA的零数组
    Q = defaultdict(lambda: np.zeros(env.action_space.n))


    # 跟踪有用的统计数据
    # 使用plotting模块中的EpisodeStats类,创建一个对象,包含两个长度为  num_episodes 的零数组,分别记录每个回合的长度和奖励
    stats = plotting.EpisodeStats(
        episode_lengths=np.zeros(num_episodes),
        episode_rewards=np.zeros(num_episodes))


    # 我们正在遵循的策略
    # 使用前面定义的函数,根据Q函数和epsilon值,创建一个epsilon贪婪策略
    policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)


    # 对于每个回合
    for i_episode in range(num_episodes):
        # 打印出当前的回合数,方便调试
        if (i_episode + 1) % 100 == 0:
            print("\rEpisode {}/{}.".format(i_episode + 1, num_episodes), end="")
            sys.stdout.flush()


        # 重置环境,选择第一个动作
        state = env.reset()
        # 根据策略,得到当前状态下每个动作的概率
        action_probs = policy(state)
        # 根据动作概率,随机选择一个动作
        action = np.random.choice(np.arange(len(action_probs)), p=action_probs)


        # 在环境中进行一步
        # 使用itertools库中的count函数,创建一个无限的计数器,表示每个回合的时间步数
        for t in itertools.count():
            # 执行一个动作,观察下一个状态,奖励,是否结束,和其他信息
            next_state, reward, done, _ = env.step(action)


            # 根据当前的策略,选择下一个动作,这是一个概率性的选择,根据每个动作的概率分布
            next_action_probs = policy(next_state)
            next_action = np.random.choice(np.arange(len(next_action_probs)), p=next_action_probs)


            # 更新统计信息,累加每个回合的奖励,记录每个回合的长度
            stats.episode_rewards[i_episode] += reward
            stats.episode_lengths[i_episode] = t


            # 使用时序差分更新公式,更新动作价值函数
            # 计算目标值,即当前的奖励加上折扣后的下一个状态和动作的价值
            td_target = reward + discount_factor * Q[next_state][next_action]
            # 计算误差,即目标值减去当前的状态和动作的价值
            td_delta = td_target - Q[state][action]
            # 用学习率乘以误差,更新当前的状态和动作的价值
            Q[state][action] += alpha * td_delta


            # 如果回合结束,跳出循环
            if done:
                break


            # 更新当前的动作和状态,为下一个时间步做准备
            action = next_action
            state = next_state




    # 返回动作价值函数和统计数据
    return Q, stats


# %%
# 调用sarsa函数,传入环境和回合数等参数,得到动作价值函数和统计数据
Q, stats = sarsa(env, 200)


# %%
# 调用plotting模块中的plot_episode_stats函数,传入统计数据,绘制回合长度和回合奖励的图形
plotting.plot_episode_stats(stats)
# print("\nQ:\r{}".format(Q)) #输出最终价值函数
# defaultdict(<function sarsa.<locals>.<lambda> at 0x000001CA7D86FF70>, {30: array([-16.62121833, -16.68188683, -16.73355158, -17.17868707]), 20: array([-15.85469654, -15.82517745, -16.65515113, -15.62910245]), 10: array([-15.38586765, -15.58275272, -15.2877975 , -15.24034802]), 0: array([-15.09297672, -15.06110493, -15.13709802, -14.7251808 ]), 1: array([-14.56234244, -14.54038227, -14.91194528, -14.92618857]), 2: array([-14.17266391, -13.80525148, -13.93608606, -13.78078067]), 3: array([-14.28053776, -13.74421705, -13.56175795, -14.14192259]), 4: array([-13.27142219, -12.79400183, -13.54938834, -13.71948143]), 5: array([-12.48366893, -11.3930307 , -12.77222323, -12.9336916 ]), 6: array([-11.91175853,  -9.91457551, -11.78255518, -11.69363854]), 7: array([-10.77910057,  -9.81671771, -10.98045998, -10.65801588]), 8: array([-10.74969529,  -8.0102353 ,  -9.77162591, -11.43791039]), 9: array([ -8.33103372,  -8.39513518,  -6.32893384, -10.33290655]), 19: array([-8.84390206, -8.1119523 , -6.72538289, -9.03481089]), 12: array([-14.20294124, -13.97901573, -13.96065147, -15.22720425]), 11: array([-15.13194653, -14.98909044, -15.48678909, -15.04447429]), 13: array([-13.37943249, -13.53181049, -13.7098475 , -13.81976352]), 29: array([-8.55710389, -6.87412944, -4.85772855, -7.17036152]), 18: array([-8.94868871, -8.0233618 , -9.06798147, -9.26537925]), 22: array([-14.52083337, -14.19290266, -14.38417474, -14.27854828]), 21: array([-15.4374521 , -15.00991466, -15.5801335 , -15.60899897]), 23: array([-13.95910955, -13.42154284, -13.83539163, -14.19399668]), 39: array([-6.4785114 , -5.94494094, -3.99613988, -6.79803347]), 28: array([-7.66699712, -6.61983091, -7.60264565, -8.3233084 ]), 32: array([-14.70999481, -14.81900147, -14.63750975, -16.3293473 ]), 31: array([-15.94781378, -16.20896192, -16.07386806, -15.91479389]), 33: array([-13.86220924, -13.99533353, -14.58005824, -14.60042878]), 40: array([-16.37138175, -15.93543669, -16.00626584, -16.10155303]), 41: array([-15.11021159, -15.0313713 , -15.12066262, -16.1427075 ]), 42: array([-14.39118224, -14.04513618, -14.11888151, -15.06676474]), 43: array([-13.47905898, -13.3280808 , -13.36766233, -14.36908711]), 14: array([-13.24084653, -12.82979917, -12.7967486 , -12.89074042]), 51: array([-15.16797579, -14.42686921, -14.48158134, -14.3608748 ]), 50: array([-16.05803884, -14.96313727, -14.8700738 , -15.28846151]), 52: array([-13.58167077, -13.26323861, -13.58650088, -14.25622422]), 53: array([-12.36202997, -12.46939028, -12.89155887, -13.90026794]), 24: array([-12.75612708, -12.31330449, -12.97263319, -13.78728591]), 61: array([-14.51254789, -13.84352851, -14.20420505, -14.03010117]), 60: array([-14.67217839, -14.48765693, -14.49793836, -14.88186542]), 62: array([-13.10041286, -13.10038901, -13.11464442, -13.2568325 ]), 34: array([-12.82238768, -12.67668025, -13.06624469, -12.75352672]), 49: array([-5.44358852, -3.80098726, -3.26364328, -4.10764147]), 38: array([-7.86199643, -6.35308864, -6.60455766, -6.3621277 ]), 17: array([ -9.21518149,  -9.06675019,  -9.02526121, -10.003373  ]), 63: array([-12.62337832, -12.26700563, -12.85346847, -12.74301769]), 15: array([-11.70362604, -11.39586923, -12.27834132, -12.50392354]), 59: array([-2.81008911, -2.54296875, -2.81005859, -2.52370968]), 69: array([-1.875     , -2.52734375, -2.3359375 , -1.87109375]), 48: array([-6.67165589, -5.06948669, -1.75      , -1.        ]), 27: array([-7.71386745, -6.34017058, -7.76830481, -6.69818919]), 58: array([-3.79243281, -2.51201346, -2.29996305, -1.76249076]), 37: array([0., 0., 0., 0.]), 44: array([-11.66811483, -11.73730811, -12.26177971, -11.80549878]), 54: array([-12.15950782, -11.33421547, -12.16753238, -12.14326109]), 25: array([-12.10995948, -10.72434757, -11.13916866, -11.16080802]), 16: array([-10.27994511,  -9.72139598, -10.71965764, -11.12713914]), 35: array([-11.3114562 , -10.77935897, -11.14367845, -10.83009369]), 68: array([-1.46875   , -2.16746892, -1.5       , -0.9375    ]), 45: array([-11.21385201, -10.38149598, -11.15339582, -11.649213  ]), 47: array([-2.82864534, -4.36609306, -0.9375    , -5.39884604]), 36: array([ -9.49267328,  -9.54858587, -10.25678092, -10.16425445]), 26: array([-10.2265589 ,  -9.85410817,  -9.97563059, -10.89997015]), 57: array([-2.66771439, -2.42248535, -0.5       , -5.24217275])})

输出结果:

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

每个回合的长度随时间的变化

说明:共200回合,随着价值函数的更新,越往后的回合执行越少的动作就能抵达终点

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理

说明:随着价值函数的更新越往后的回合得到的回报越高

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

每个回合的时间步数累加和回合数的关系

说明:每个回合的时间步数表示在一个回合中,执行了多少次动作。回合数表示完成了多少个回合。一个回合的结束条件是到达目标状态或者超过最大的时间步数。这个图反映了学习的效果和效率,如果回合数随着时间步数的增加而快速增加,说明学习的效果好,能够更快地找到最优的策略和动作。如果回合数随着时间步数的增加而缓慢增加,说明学习的效率低,需要更多的时间和尝试才能找到最优的策略和动作。

Q-Learning 算法求解悬崖行走问题

悬崖行走环境的类 cliff_walking.py,它继承自 discrete.DiscreteEnv 类,用于创建和管理一个强化学习的环境。它的主要功能是:

  • 定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,悬崖的位置,动作的空间,状态的空间等。

  • 定义了环境的基本方法,如限制坐标的范围,计算转移的概率,重置环境,执行一个动作,返回下一个状态,奖励,是否结束和附加信息,渲染环境等。

  • 使用了 io 库,numpy 库,sys 库和 discrete 模块来进行输入输出,数值计算,系统操作和离散环境的管理等操作。

# 导入io模块,这是一个内置的模块,提供了与输入输出流相关的功能
import io
# 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能
import numpy as np
# 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
import sys


# 从当前目录下的discrete模块中导入DiscreteEnv类,这是一个用于实现离散动作空间的环境的基类
from . import discrete


# 定义四个常量,表示四个动作的编号
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3


# 定义一个类,继承自DiscreteEnv类,用于实现悬崖行走问题的强化学习环境
class CliffWalkingEnv(discrete.DiscreteEnv):
    # 定义一个元数据字典,表示该环境支持的渲染模式
    metadata = {'render.modes': ['human', 'ansi']}


    # 定义一个私有方法,用于限制坐标的范围,使其不超过网格的边界
    def _limit_coordinates(self, coord):
        # 将坐标的第一个分量限制在0到网格的行数减一之间
        coord[0] = min(coord[0], self.shape[0] - 1)
        coord[0] = max(coord[0], 0)
        # 将坐标的第二个分量限制在0到网格的列数减一之间
        coord[1] = min(coord[1], self.shape[1] - 1)
        coord[1] = max(coord[1], 0)
        # 返回限制后的坐标
        return coord


    # 定义一个私有方法,用于计算状态转移的概率,根据当前位置和动作的变化量
    def _calculate_transition_prob(self, current, delta):
        # 计算新的位置,等于当前位置加上动作的变化量
        new_position = np.array(current) + np.array(delta)
        # 限制新的位置的范围,转换为整数类型
        new_position = self._limit_coordinates(new_position).astype(int)
        # 计算新的状态,将新的位置转换为一维的索引
        new_state = np.ravel_multi_index(tuple(new_position), self.shape)#给定一个多维数组的形状和一个多维的坐标,返回一个整数,表示该坐标在多维数组中对应的一维索引。例如,如果 self.shape 是 (4, 12),表示环境是一个 4 行 12 列的网格,那么 new_position 是 (0, 0) 对应的 new_state 是 0,表示网格的左上角,new_position 是 (3, 11) 对应的 new_state 是 47,表示网格的右下角
        # 判断是否落入悬崖,如果是,奖励为-100,否则为-1
        reward = -100.0 if self._cliff[tuple(new_position)] else -1.0
        # 判断是否达到目标位置或落入悬崖,如果是,回合结束
        is_done = self._cliff[tuple(new_position)] or (tuple(new_position) == (3,11))
        # 返回一个列表,包含一个元组,表示状态转移的概率,新的状态,即时奖励,和是否结束的标志
        return [(1.0, new_state, reward, is_done)]


    # 定义一个构造方法,用于初始化环境的属性
    def __init__(self):
        # 定义网格的形状,为4行12列
        self.shape = (4, 12)


        # 计算状态空间的大小,为网格的元素个数
        nS = np.prod(self.shape)
        # 定义动作空间的大小,为4个动作
        nA = 4


        # 定义悬崖的位置,为一个与网格形状相同的布尔数组,第四行的第二列到倒数第二列为True,表示悬崖
        self._cliff = np.zeros(self.shape, dtype=bool)
        self._cliff[3, 1:-1] = True #是悬崖


        # 计算状态转移的概率,用一个字典表示,键为状态,值为另一个字典,键为动作,值为一个列表,包含状态转移的元组
        P = {}
        # 对每个状态进行循环
        for s in range(nS):
            # 将状态转换为二维的位置
            position = np.unravel_index(s, self.shape) #给定一个多维数组的形状和一个一维的索引,返回一个元组,表示该索引在多维数组中对应的坐标。例如,如果 self.shape 是 (4, 12),表示环境是一个 4 行 12 列的网格,那么 s = 0 对应的 position 是 (0, 0),表示网格的左上角,s = 47 对应的 position 是 (3, 11),表示网格的右下角。这个函数可以方便地将状态的表示从一维转换为二维,便于进行坐标的运算和渲染。
            # 初始化状态对应的字典,键为动作,值为一个空列表
            P[s] = { a : [] for a in range(nA) }
            # 对每个动作进行循环,分别计算状态转移的概率,调用之前定义的私有方法
            P[s][UP] = self._calculate_transition_prob(position, [-1, 0]) #行-1
            P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])# 列+1
            P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])#行+1
            P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])#列-1


        # 定义初始状态分布,为一个与状态空间大小相同的数组,只有(3, 0)位置的概率为1,其他为0
        isd = np.zeros(nS)
        isd[np.ravel_multi_index((3,0), self.shape)] = 1.0


        # 调用父类的构造方法,传入状态空间大小,动作空间大小,状态转移概率,和初始状态分布
        super(CliffWalkingEnv, self).__init__(nS, nA, P, isd)


    # 定义一个方法,用于渲染环境,根据模式和关闭标志,调用另一个私有方法
    def render(self, mode='human', close=False):
        self._render(mode, close)


    # 定义一个私有方法,用于渲染环境,根据模式和关闭标志,输出或显示网格世界的图形界面
    def _render(self, mode='human', close=False):
        # 如果关闭标志为True,表示不需要渲染,直接返回
        if close:
            return


        # 根据模式,选择输出的文件对象,如果是ansi模式,使用io模块中的StringIO对象,如果是human模式,使用系统的标准输出
        outfile = io.StringIO() if mode == 'ansi' else sys.stdout


        # 对每个状态进行循环
        for s in range(self.nS):
            # 将状态转换为二维的位置
            position = np.unravel_index(s, self.shape)
            # print(self.s)
            # 根据位置,选择输出的符号,如果是当前状态,输出 x,如果是目标位置,输出 T,如果是悬崖位置,输出 C,否则输出 o
            if self.s == s:
                output = " x "
            elif position == (3,11):
                output = " T "
            elif self._cliff[position]:
                output = " C "
            else:
                output = " o "


            # 如果位置在第一列,去掉输出符号的左边空格
            if position[1] == 0:
                output = output.lstrip() 
            # 如果位置在最后一列,去掉输出符号的右边空格,并换行
            if position[1] == self.shape[1] - 1:
                output = output.rstrip() 
                output += "\n"


            # 将输出符号写入文件对象
            outfile.write(output)
        # 在所有状态循环结束后,再换行
        outfile.write("\n")

测试程序 Cliff Environment Playground.py ,用于在悬崖行走环境中进行一些动作,并打印出环境的状态和渲染结果。它的主要功能是:

  • 导入 gym 库,numpy 库,sys 库和 CliffWalkingEnv 类,用于创建和管理环境,进行数值计算,系统操作和悬崖行走的管理等操作。

  • 如果当前路径中没有 “../”,则将其添加到路径中,方便导入其他模块。

  • 创建一个悬崖行走的环境,赋值给 env。

  • 调用 env 的 reset 方法,重置环境,返回初始状态,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 调用 env 的 step 方法,执行一个向上的动作,返回下一个状态,奖励,是否结束和附加信息,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 重复上述两步,执行两次向右的动作和一次向下的动作,打印和渲染每一步的结果。

import gym
import numpy as np
import sys


if "../" not in sys.path:
  sys.path.append("../") 


from lib.envs.cliff_walking import CliffWalkingEnv


env = CliffWalkingEnv()


print(env.reset())
env.render()


print(env.step(0))
env.render()


print(env.step(1))
env.render()


print(env.step(1))
env.render()


print(env.step(2))
env.render()

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

Q-learning 算法 求解悬崖行走问题的代码Q-Learning Solution.py

  • 该代码使用了一个名为悬崖行走的OpenAI环境,该环境是一个4x12的网格世界,其中代理人从左下角的起点开始,目标是到达右下角的终点,而不掉入悬崖中。

  • 该代码定义了一个函数make_epsilon_greedy_policy,该函数根据给定的Q函数和epsilon值,创建一个epsilon-贪婪策略。该函数返回一个函数,该函数接受一个观察值作为参数,并返回每个动作的概率,形式为一个长度为nA的numpy数组。

  • 该代码定义了一个函数q_learning,该函数实现了Q-Learning算法,即离策略的TD控制算法。该算法在遵循一个epsilon-贪婪策略的同时,寻找最优的贪婪策略。该函数接受以下参数:

    • env: OpenAI环境。

    • num_episodes: 运行的回合数。

    • discount_factor: Gamma折扣因子。

    • alpha: TD学习率。

    • epsilon: 选择随机动作的概率,介于0和1之间的浮点数。

  • 该函数返回一个元组(Q, stats)。Q是最优的动作值函数,是一个映射状态到动作值的字典。stats是一个EpisodeStats对象,包含两个numpy数组,分别记录了每个回合的长度和奖励。

  • 该函数的主要步骤如下:

    • 重置环境并选择第一个动作。

    • 对于每个时间步:

    • 执行一个动作,并观察下一个状态,奖励,是否结束,以及其他信息。

    • 更新统计信息。

    • TD更新:根据下一个状态的最优动作,计算TD目标。计算TD误差。更新Q中当前状态和动作的值。

    • 如果结束,跳出循环。

    • 更新当前状态为下一个状态。

    • 初始化一个空的Q字典,用于存储每个状态的动作值。

    • 初始化一个stats对象,用于记录有用的统计信息。

    • 根据Q和epsilon,创建一个epsilon-贪婪策略。

    • 对于每个回合:

  • 该代码使用了q_learning函数来求解悬崖行走问题,设置了500个回合,其他参数使用默认值。

  • 该代码使用了plotting.plot_episode_stats函数来绘制每个回合的长度和奖励的图表,以及每个状态的动作值的热力图。

# 导入gym库,用于提供强化学习的环境
import gym
# 导入itertools库,用于提供一些迭代器的工具函数
import itertools
# 导入matplotlib库,用于提供图形绘制的功能
import matplotlib
# 导入numpy库,用于提供数组和数学运算的功能
import numpy as np
# 导入pandas库,用于提供数据分析和处理的功能
import pandas as pd
# 导入sys库,用于提供系统相关的功能
import sys




# 判断当前的路径中是否包含"../",如果不包含,就添加到路径中,用于导入lib模块
if "../" not in sys.path:
  sys.path.append("../") 


# 导入defaultdict类,用于创建一个默认字典,即一个可以使用任意不存在的键访问的字典,如果访问一个不存在的键,它会自动创建一个默认值
from collections import defaultdict
# 导入CliffWalkingEnv类,用于创建一个悬崖行走的环境,这是一个网格世界,目标是从起点走到终点,中间有一些悬崖,如果掉入悬崖,就会返回起点并受到惩罚
from lib.envs.cliff_walking import CliffWalkingEnv
# 导入plotting模块,用于提供一些图形绘制的函数,例如plot_episode_stats函数
from lib import plotting


# 设置图形的风格为ggplot,一种流行的图形风格
matplotlib.style.use('ggplot')


# %%
# 创建一个悬崖行走的环境对象,用于与智能体进行交互
env = CliffWalkingEnv()


# %%
# 定义一个函数,叫做make_epsilon_greedy_policy,用于根据给定的Q函数和epsilon参数,创建一个epsilon-贪婪策略
def make_epsilon_greedy_policy(Q, epsilon, nA):
    """
    Creates an epsilon-greedy policy based on a given Q-function and epsilon.
    
    Args:
        Q: A dictionary that maps from state -> action-values.
            Each value is a numpy array of length nA (see below)
        epsilon: The probability to select a random action. Float between 0 and 1.
        nA: Number of actions in the environment.
    
    Returns:
        A function that takes the observation as an argument and returns
        the probabilities for each action in the form of a numpy array of length nA.
    
    """
    # 定义一个函数,叫做policy_fn,用于根据一个观察,返回一个动作的概率分布
    def policy_fn(observation):
        # 创建一个全为epsilon/nA的数组,表示每个动作的初始概率,其中epsilon是随机选择动作的概率,nA是动作数
        A = np.ones(nA, dtype=float) * epsilon / nA
        # 找到Q函数中对应于当前状态的最大动作价值的动作,即最优动作
        best_action = np.argmax(Q[observation])
        # 给最优动作的概率增加1-epsilon,表示最优动作被选择的概率更高
        A[best_action] += (1.0 - epsilon)
        # 返回这个数组,表示当前状态下的策略
        return A
    # 返回这个函数,作为epsilon-贪婪策略
    return policy_fn


# %%
# 定义一个函数,叫做q_learning,用于实现Q学习算法,找到最优的贪婪策略,同时遵循一个epsilon-贪婪策略
def q_learning(env, num_episodes, discount_factor=1.0, alpha=0.5, epsilon=0.1):
    """
    Q-Learning algorithm: Off-policy TD control. Finds the optimal greedy policy
    while following an epsilon-greedy policy
    
    Args:
        env: OpenAI environment.
        num_episodes: Number of episodes to run for.
        discount_factor: Gamma discount factor.
        alpha: TD learning rate.
        epsilon: Chance to sample a random action. Float between 0 and 1.
    
    Returns:
        A tuple (Q, episode_lengths).
        Q is the optimal action-value function, a dictionary mapping state -> action values.
        stats is an EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards.
    """
    
    # 创建一个默认字典,用于存储最终的动作价值函数,即在不同的状态下,每个动作能够获得的期望回报
    # 这个字典的键是状态,它的值是一个长度为动作数的零数组,表示在该状态下,每个动作的价值都是零
    # 如果访问一个不存在的状态,它会自动创建一个对应的零数组作为值
    Q = defaultdict(lambda: np.zeros(env.action_space.n))


    # 创建一个EpisodeStats对象,用于记录每个回合的长度和奖励的numpy数组
    stats = plotting.EpisodeStats(
        episode_lengths=np.zeros(num_episodes),
        episode_rewards=np.zeros(num_episodes))    
    
    # 创建一个epsilon-贪婪策略,用于在每个状态下,以一定的概率epsilon随机选择一个动作,否则选择当前最优的动作,即具有最大的动作价值的动作
    policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)
    
    # 对于每个回合,重置环境,选择第一个动作,然后循环执行以下步骤:
    for i_episode in range(num_episodes):
        # 打印出当前的回合数,用于调试
        if (i_episode + 1) % 100 == 0:
            print("\rEpisode {}/{}.".format(i_episode + 1, num_episodes), end="")
            sys.stdout.flush()
        
        # 重置环境,返回初始状态
        state = env.reset()
        
        # 在环境中执行一个步骤
        # total_reward = 0.0
        for t in itertools.count():
            
            # 选择一个动作,根据当前的策略,这是一个概率性的选择,根据每个动作的概率分布
            action_probs = policy(state)
            action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
            # 执行一个动作,观察下一个状态,奖励,是否结束,和其他信息
            next_state, reward, done, _ = env.step(action)


            # 更新统计信息,累加每个回合的奖励,记录每个回合的长度
            stats.episode_rewards[i_episode] += reward
            stats.episode_lengths[i_episode] = t
            
            # 使用时序差分更新公式,更新动作价值函数
            # 找到下一个状态中,具有最大动作价值的动作,即最优动作
            best_next_action = np.argmax(Q[next_state])    
            # 计算目标值,即当前的奖励加上折扣后的下一个状态和最优动作的价值
            td_target = reward + discount_factor * Q[next_state][best_next_action]
            # 计算误差,即目标值减去当前的状态和动作的价值
            td_delta = td_target - Q[state][action]
            # 用学习率乘以误差,更新当前的状态和动作的价值
            Q[state][action] += alpha * td_delta
                
            # 如果回合结束,跳出循环
            if done:
                break
                
            # 更新当前的状态,为下一个步骤做准备
            state = next_state
    
    # 返回最终的动作价值函数和统计信息
    return Q, stats


# 调用q_learning函数,传入环境对象,回合数,折扣因子,学习率,和epsilon参数,返回最终的动作价值函数和统计信息
Q, stats = q_learning(env, 500)


# %%
# 调用plotting模块中的plot_episode_stats函数,传入统计信息,绘制每个回合的长度,每个回合的奖励,和每个回合的时间步数与回合数的关系的图形
plotting.plot_episode_stats(stats)

输出结果:

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

每个回合的长度随时间的变化

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理

【机器学习】强化学习(四)-时序差分学习,机器学习,学习,人工智能

每个回合的时间步数和回合数的关系

最终价值函数

最终价值函数Q:在学习过程中,通过不断地更新和优化,最终收敛到一个稳定的价值函数,即最接近真实的价值函数的价值函数。训练最终的价值函数有以下的用途:

  • 训练最终的价值函数可以反映出最优的策略,即在每个状态下,选择哪个动作能够获得最大的价值。我们可以根据训练最终的价值函数,制定出最优的决策规则,从而在环境中表现出最佳的行为。

  • 训练最终的价值函数可以评估出不同的状态的重要性,即哪些状态能够带来更高的回报,哪些状态应该避免。我们可以根据训练最终的价值函数,分析出环境中的特征和规律,从而提高我们对环境的理解和掌握。

  • 训练最终的价值函数可以作为一种性能指标,即我们可以通过比较训练最终的价值函数和真实的价值函数,或者不同的训练方法和参数下的价值函数,来评估我们的学习效果和效率,从而优化我们的学习过程和方法。

The End文章来源地址https://www.toymoban.com/news/detail-811371.html

到了这里,关于【机器学习】强化学习(四)-时序差分学习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 人工智能课程笔记(7)强化学习(基本概念 Q学习 深度强化学习 附有大量例题)

    强化学习和深度学习都是机器学习的分支,但是两者在方法和应用场景上有所不同。 强化学习 : 强化学习概述 :强化学习是一种通过智能体与环境进行交互来学习最优行动策略的算法。在强化学习中,智能体与环境不断交互,观察环境的状态并采取不同的行动,从而获得奖

    2024年01月17日
    浏览(52)
  • 通用人工智能之路:什么是强化学习?如何结合深度学习?

    2015年, OpenAI 由马斯克、美国创业孵化器Y Combinator总裁阿尔特曼、全球在线支付平台PayPal联合创始人彼得·蒂尔等硅谷科技大亨创立,公司核心宗旨在于 实现安全的通用人工智能(AGI) ,使其有益于人类。 ChatGPT 是 OpenAI 推出的一个基于对话的原型 AI 聊天机器人,2022年12 月 1

    2024年02月16日
    浏览(56)
  • 通用人工智能技术(深度学习,大模型,Chatgpt,多模态,强化学习,具身智能)

    目录 前言 1.通用人工智能 1.1 生物学分析 1.2具身智能 1.2.1当前的人工智能的局限 1.2.2 具身智能实现的基础 1.2.3 强化学习(决策大模型) 2.结论 往期文章 参考文献       目前的人工智能实质上只是强人工智能,或者说单个领域的通用人工智能。比方说Chatgpt它属于自然语言

    2024年02月07日
    浏览(86)
  • 走进人工智能|强化学习 AI发展的未来引擎

    前言: 强化学习是一种通过智能体与环境交互,通过尝试最大化累计奖励来学习最优行为策略的机器学习方法。 本篇带你走进强化学习!一起来学习了解吧!!! 强化学习是一种机器学习方法,旨在通过试错来学习正确的行为。与其他机器学习方法不同,强化学习的主要目

    2024年02月10日
    浏览(83)
  • 强化学习系列--时序差分学习方法(SARSA算法)

    SARSA(State-Action-Reward-State-Action)是一种强化学习算法,用于解决马尔可夫决策过程(MDP)中的问题。 SARSA算法属于基于值的强化学习算法 ,用于学习最优策略。 在SARSA算法中,智能体通过与环境进行交互来学习。它基于 当前状态、选择的动作、获得的奖励、下一个状态和下

    2024年02月11日
    浏览(36)
  • 深度强化学习与人工智能:如何实现高效的资源分配

    深度强化学习(Deep Reinforcement Learning, DRL)是一种人工智能技术,它结合了深度学习和强化学习两个领域的优点,以解决复杂的决策问题。在过去的几年里,DRL已经取得了显著的成果,例如在游戏、机器人控制、自动驾驶等领域的应用。在资源分配方面,DRL可以帮助企业更有效地

    2024年02月21日
    浏览(69)
  • 强化学习:时序差分算法 TD-learning

       首先,我们考虑简单的平均估计计算: w = E [ X ] w=E[X] w = E [ X ] ,根据 RM算法 计算过程如下:   接着上面的例子,我们现在考虑一个较为复杂的问题,估计函数 v ( X ) v(X) v ( X ) 的平均值,根据 RM算法 计算过程如下:   接着上面的例子,我们现在考虑一个更复杂的

    2024年02月10日
    浏览(38)
  • 强化学习:原理与Python实战||一分钟秒懂人工智能对齐

    人工智能对齐(AI Alignment)指让人工智能的行为符合人的意图和价值观。 人工智能系统可能会出现“不对齐”(misalign)的问题。以ChatGPT这样的问答系统为例,ChatGPT的回答可能会含有危害祖国统一、侮辱先烈、丑化中华民族、教唆暴力、出口成“脏”等违法或不符合社会主

    2024年02月05日
    浏览(48)
  • 强化学习在人工智能的发展中的重要性与前景

    强化学习(Reinforcement Learning, RL)是一种人工智能(Artificial Intelligence, AI)技术,它通过在环境中进行交互,学习如何取得最大化的奖励。在过去的几年里,强化学习技术取得了显著的进展,并在许多领域得到了广泛应用,如游戏、自动驾驶、机器人控制、语音识别等。 在本文中,

    2024年02月20日
    浏览(76)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包