2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

这篇具有很好参考价值的文章主要介绍了2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

写在前面

在学习自动驾驶领域上的强化学习过程中,我决定使用highwy-env库建设的模拟器来进行环境构建,但是翻阅了众多教程(包含国内国外)之后,发现教程内容过旧,因为随着2023年的到来,highway-env库也进行了更新,前两年的教程无一例外都使用了老旧版本的函数和返回值。

highway-env是什么东西?

安装方式:(默认最新版)pip install highway-env

首先先列出我发现的新库中的改动:

以前返回值有四个:

        observation, reward, done, info = env.step(action)

现在返回值有五个:

        observation, reward, terminated, truncated, info = env.step(action)

我推测以前的环境数据形式是ndarray数组:

        data = env.reset()

        data = (arragry([[ndarray],[],[],...,[]]),type==dtype32)

现在的环境数据形式是元组:

        data = env.reset()

        data = ((arragry([[ndarray],[],[],...,[]]),type==dtype32,{reward:{},terminated:{},...,})

基于以上改动,那么在代码中的数据处理部分也会相应地发生改变。特别是在使用多个库的时候,需要注意版本关联问题。

参考的一些代码

我的虚拟环境配置:(GPU)

虚拟环境是什么东西?来人,喂它吃九转大肠。

2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

其中必须用到的主要有以下几个:

基于 python 3.8.0

pytorch

gym

highway

tqdm

matplotlib

pygame

numpy

highway-env

使用DoubleDQN算法进行训练,此后还有在此基础上的其他改动。

默认创建python文件double_dqn.py,以下为文件中代码,拼在一起就是完整的。

注释是英文是因为我做的是英文的项目,简单翻译即可。

所使用的库

import os
import copy
import random
import time
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F

import gym
import highway_env

检测设备并初始化默认十字路口环境

# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Author: Da Xuanzi 2023-2-17

# Define the environment
env = gym.make("intersection-v0")
# details
env.config["duration"] = 13
env.config["vehicles_count"] = 20
env.config["vehicles_density"] = 1.3
env.config["reward_speed_range"] = [7.0, 10.0]
env.config["initial_vehicle_count"] = 10
env.config["simulation_frequency"] = 15
env.config["arrived_reward"] = 2
env.reset()

十字路口环境的结构:

env.config
{
    "observation": {
        "type": "Kinematics",
        "vehicles_count": 15,
        "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
        "features_range": {
            "x": [-100, 100],
            "y": [-100, 100],
            "vx": [-20, 20],
            "vy": [-20, 20],
        },
        "absolute": True,
        "flatten": False,
        "observe_intentions": False
    },
    "action": {
        "type": "DiscreteMetaAction",
        "longitudinal": False,
        "lateral": True
    },
    "duration": 13,  # [s]
    "destination": "o1",
    "initial_vehicle_count": 10,
    "spawn_probability": 0.6,
    "screen_width": 600,
    "screen_height": 600,
    "centering_position": [0.5, 0.6],
    "scaling": 5.5 * 1.3,
    "collision_reward": IntersectionEnv.COLLISION_REWARD,
    "normalize_reward": False
}

构建网络

可以自定义隐藏层节点个数

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # super class
        super(Net, self).__init__()
        # hidden nodes define
        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # define forward pass of the actor
        x = state # state
        # Relu function double
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

构建学习器

class Replay: # learning
    def __init__(self,
                 buffer_size, init_length, state_dim, action_dim, env):
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)
    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)  # remove the last one in dict

    def buffer_sample(self, n):
        # random n samples from exp buffer
        return random.sample(self._storage, n)

构建学习对象

PATH = 你的文件夹绝对路径/相对路径

class DOUBLEDQN(nn.Module):
    def __init__(
        self,
            env, # gym environment
            state_dim, # state size
            action_dim, # action size
        lr = 0.001, # learning rate
        gamma = 0.99, # discount factor
        batch_size = 5, # batch size for each training
        timestamp = "",):
        # super class
        super(DOUBLEDQN, self).__init__()
        self.env = env
        self.env.reset()
        self.timestamp = timestamp
        # for evaluation purpose
        self.test_env = copy.deepcopy(env)
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend = False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)#TODO
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)#TODO
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)#TODO
        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

    def choose_our_action(self, state, epsilon = 0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target,i)
        state = torch.FloatTensor(target).to(device)
        # randn() return a set of samples which are Gaussian distribution
        # no argments -> return a float number
        if np.random.randn() <= epsilon:
            # when random number smaller than epsilon: do these things
            # put a state array into estimate net to obtain their value array
            # choose max values in value array -> obtain action
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            # when random number bigger than epsilon: randomly choose a action
            action = np.random.randint(0, self.action_dim)

        return action

    def train(self, num_episode):
        # num_eposide: total turn number for train
        loss_list = [] # loss set
        avg_reward_list = [] # reward set
        episode_reward = 0
        rend = 0
        # tqdm : a model for showing process bar
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step +=1
                action = self.choose_our_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch in replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)
                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)


                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # obtain estimate Q value gather(dim, index) dim==1:column index
                estimate_Q_value = self.estimate_net(state_batch).gather(1, action_batch)
                # obtain target Q value detach:remove the matched element
                max_action_index = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q_value = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_index.unsqueeze(1))# squeeze(1) n*1->1*1, unsqueeze(1) 1*1->n*1

                # mse_loss: mean loss
                loss = F.mse_loss(estimate_Q_value, target_Q_value)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                # load parameters into model
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter +=1

            reward, count = self.eval()
            episode_reward += reward

            # you can update these variables
            if episode_reward % 100 == 0:
                rend += 1
                if rend % 5 == 0:
                    self.is_rend = True
                else:
                    self.is_rend = False
            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # episode_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                # saving as timestamp file
                np.save(path + "/DOUBLE_DQN_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
     
        while not done:
            action = self.choose_our_action(state, epsilon = 1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count

构建运行函数

超参数可以自己设置 lr gamma

if __name__ == "__main__":

    # timestamp
    named_tuple = time.localtime()
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)
    print(time_string)
    # create a doubledqn object
    double_dqn_object = DOUBLEDQN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    # your chosen train times
    iteration = 20
    # start training
    avg_loss, avg_reward_list = double_dqn_object.train(iteration)
    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")
    torch.save(double_dqn_object.state_dict(), path + "/DOUBLE_DQN_MODEL.pt")

使用数据进行绘制图片

新建文件draw_figures.py

?处自己替换成自己的路径

import matplotlib.pyplot as plt
import numpy as np
Loss = r"?\?\DOUBLE_DQN_LOSS.npy"
Reward = r"?\?\DOUBLE_DQN_EACH_REWARD.npy"
avg_loss = np.load(Loss)
avg_reward_list = np.load(Reward)
# print("loss", avg_loss)
# print("reward", avg_reward_list)
plt.figure(figsize=(10, 6))
plt.plot(avg_loss)
plt.grid()
plt.title("Double DQN Loss")
plt.xlabel("epochs")
plt.ylabel("loss")
plt.savefig(r"?\figures\double_dqn_loss.png", dpi=150)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(avg_reward_list)
plt.grid()
plt.title("Double DQN Training Reward")
plt.xlabel("epochs")
plt.ylabel("reward")
plt.savefig(r"?\figures\double_dqn_train_reward.png", dpi=150)
plt.show()

2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

提纳里手动分割线

Dueling_DQN

以上基本稍作改动即可

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        """
        Initialize the network

        : param state_dim: int, size of state space
        : param action_dim: int, size of action space
        """
        super(Net, self).__init__()

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim + 1)

    def forward(self, state):
        """
        Define the forward pass of the actor

        : param state: ndarray, the state of the environment
        """
        x = state
        # print(x.shape)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

class Replay: # learning
    def __init__(self,
                 buffer_size, init_length, state_dim, action_dim, env):
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)
    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)  # remove the last one in dict

    def buffer_sample(self, n):
        # random n samples from exp buffer
        return random.sample(self._storage, n)

class DUELDQN(nn.Module):
    def __init__(
        self,
        env,
        state_dim,
        action_dim,
        lr=0.001,
        gamma=0.99,
        batch_size=5,
        timestamp="",
    ):
        """
        : param env: object, a gym environment
        : param state_dim: int, size of state space
        : param action_dim: int, size of action space
        : param lr: float, learning rate
        : param gamma: float, discount factor
        : param batch_size: int, batch size for training
        """
        super(DUELDQN, self).__init__()

        self.env = env
        self.env.reset()
        self.timestamp = timestamp

        self.test_env = copy.deepcopy(env)  # for evaluation purpose
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend =False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

    def choose_action(self, state, epsilon=0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target, i)
        state = torch.FloatTensor(target).to(device)
        # randn() return a set of samples which are Gaussian distribution
        # no argments -> return a float number
        if np.random.randn() <= epsilon:
            # when random number smaller than epsilon: do these things
            # put a state array into estimate net to obtain their value array
            # choose max values in value array -> obtain action
            action_value = self.estimate_net(state)
            action_value = action_value[:-1]
            action = torch.argmax(action_value).item()
        else:
            # when random number bigger than epsilon: randomly choose a action
            action = np.random.randint(0, self.action_dim)

        return action

    def calculate_duelling_q_values(self, duelling_q_network_output):
        """
        Calculate the Q values using the duelling network architecture. This is equation (9) in the paper.

        :param duelling_q_network_output: tensor, output of duelling q network
        :return: Q values
        """
        state_value = duelling_q_network_output[:, -1]
        avg_advantage = torch.mean(duelling_q_network_output[:, :-1], dim=1)
        q_values = state_value.unsqueeze(1) + (
            duelling_q_network_output[:, :-1] - avg_advantage.unsqueeze(1)
        )
        return q_values

    def train(self, num_episode):
        # num_eposide: total turn number for train
        loss_list = [] # loss set
        avg_reward_list = [] # reward set
        episode_reward = 0

        # tqdm : a model for showing process bar
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step += 1
                action = self.choose_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch in replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)
                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)


                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_net_output = self.estimate_net(state_batch)
                estimate_Q = self.calculate_duelling_q_values(estimate_net_output)
                estimate_Q = estimate_Q.gather(1, action_batch)

                # get target Q value
                max_action_idx = (
                    self.estimate_net(state_next_batch)[:, :-1].detach().argmax(1)
                )
                target_net_output = self.target_net(state_next_batch)
                target_Q = self.calculate_duelling_q_values(target_net_output).gather(
                    1, max_action_idx.unsqueeze(1)
                )
                target_Q = reward_batch + done_batch * self.gamma * target_Q

                # compute mse loss
                loss = F.mse_loss(estimate_Q, target_Q)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter += 1

            reward, count = self.eval()
            episode_reward += reward

            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # epoch_reward = 0
                path = PATH + "/" + self.timestamp
                # create a new directory for saving
                try:
                    os.makedirs(path)
                except OSError:
                    pass

                np.save(path + "/DUELING_DQN_LOSS.npy", loss_list)
                np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            state_next, reward, done, _, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = state_next

        return total_reward, count


if __name__ == "__main__":

    # timestamp for saving
    named_tuple = time.localtime()  # get struct_time
    time_string = time.strftime(
        "%Y-%m-%d-%H-%M", named_tuple
    )  # have a folder of "date+time ex: 1209_20_36 -> December 12th, 20:36"

    duel_dqn_object = DUELDQN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    path = PATH + "/" + time_string
    # Train the policy
    iterations = 10
    avg_loss, avg_reward_list = duel_dqn_object.train(iterations)
    np.save(path + "/DUELING_DQN_LOSS.npy", avg_loss)
    np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)
    torch.save(duel_dqn_object.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")
    torch.save(duel_dqn_object.state_dict(), path + "/DUELING_DQN_MODEL.pt")

DDQN+OtherChanges

三层2D卷积

# add CNN structure
class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # initalize the network
        # state_dim: state space
        # action_dim: action space
        super(Net, self).__init__()

        # nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # in_channel : input size = in_channels * in_N * in_N
        # out_channel : define
        # kernel_size : rules or define
        # stride: step length
        # padding: padding size
        # out_N = (in_N - Kernel_size + 2 * Padding)/ Stride +1
        self.cnn = nn.Sequential(
            # the first 2D convolutional layer
            nn.Conv2d(1, 4, kernel_size=3, padding=1),
            nn.BatchNorm2d(4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
            # the second 2D convolutional layer
            nn.Conv2d(4, 8, kernel_size=3, padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
            # the third 2D convolutional layer ---- my test and try or more convolutional layers
            nn.Conv2d(8, 4, kernel_size=3, padding=1),
            nn.BatchNorm2d(4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
        )

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(4 * 1 * 9, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # define forward pass of the actor
        x = state # state
        x = self.cnn(x)
        x = x.view(x.size(0), -1)
        # Relu function double
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out


class Replay:
    def __init__(self, buffer_size, init_length, state_dim, action_dim, env):

        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)

    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0) # remove the last one in dict

    def buffer_sample(self, N):
        # random n samples from exp buffer
        return random.sample(self._storage, N)



class DOUBLEDQN_CNN(nn.Module):
    def __init__(
        self,
            env,  # gym environment
            state_dim,  # state size
            action_dim,  # action size
            lr=0.001,  # learning rate
            gamma=0.99,  # discount factor
            batch_size=5,  # batch size for each training
            timestamp="", ):
        # super class
        super(DOUBLEDQN_CNN, self).__init__()

        self.env = env
        self.env.reset()
        self.timestamp = timestamp
        # for evaluation purpose
        self.test_env = copy.deepcopy(env)
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend = False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)


    def choose_action(self, state, epsilon=0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        #TODO
        state = (
            torch.FloatTensor(state).to(device).reshape(-1, 1, 7, self.state_dim // 7)
        )
        if np.random.randn() <= epsilon:
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            action = np.random.randint(0, self.action_dim)
        return action

    def train(self, num_episode):
        # num_eposide: total turn number for train
        count_list = []
        loss_list = []
        total_reward_list = []
        avg_reward_list = []
        episode_reward = 0
        rend = 0
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step += 1
                action = self.choose_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch from replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)

                # extract batch data

                action_batch = torch.LongTensor([exp["action"] for exp in exp_batch])
                reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch])
                done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch])
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list)
                state_batch = torch.FloatTensor(state_temp_list)

                # reshape
                state_batch = state_batch.to(device).reshape(
                    self.batch_size, 1, 7, self.state_dim // 7
                )
                action_batch = action_batch.to(device).reshape(self.batch_size, -1)
                reward_batch = reward_batch.to(device).reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.to(device).reshape(
                    self.batch_size, 1, 7, self.state_dim // 7
                )
                done_batch = done_batch.to(device).reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

                # get target Q value
                max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_idx.unsqueeze(1))

                # compute mse loss
                loss = F.mse_loss(estimate_Q, target_Q)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter += 1

            reward, count = self.eval()
            episode_reward += reward

            # you can update these variables
            if episode_reward % 100 == 0:
                rend += 1
                if rend % 5 == 0:
                    self.is_rend = True
                else:
                    self.is_rend = False
            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # epoch_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                # saving as timestamp file
                np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count


if __name__ == "__main__":

    # timestamp
    named_tuple = time.localtime()
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)
    print(time_string)
    # create a doubledqn object
    double_dqn_cnn_object = DOUBLEDQN_CNN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    # your chosen train times
    iteration = 20
    # start training
    avg_loss, avg_reward_list = double_dqn_cnn_object.train(iteration)
    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_cnn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")
    torch.save(double_dqn_cnn_object.state_dict(), path + "/DOUBLE_DQN_CNN_MODEL.pt")

经验池

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # state_dim: state space
        # action_dim: action space
        super(Net, self).__init__()

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # state: ndarray
        x = state
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

# Priortized_Replay
class Prioritized_Replay:
    def __init__(
        self,
        buffer_size,
        init_length,
        state_dim,
        action_dim,
        est_Net,
        tar_Net,
        gamma,
    ):
        # state_dim: state space
        # action_dim: action space
        # env: env
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.is_rend = False

        self.priority = deque(maxlen=buffer_size)
        self._storage = []
        self._init_buffer(init_length, est_Net, tar_Net)

    def _init_buffer(self, n, est_Net, tar_Net):
        # n: sample number
        state = env.reset()
        for i in range(n):
            action = env.action_space.sample()
            observation, reward, done, truncated, info = env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self.prioritize(est_Net, tar_Net, exp, alpha=0.6)
            self._storage.append(exp)
            state = observation

            if done:
                state = env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)
    # add prioritize
    def prioritize(self, est_Net, tar_Net, exp, alpha=0.6):
        state = torch.FloatTensor(exp["state"]).to(device).reshape(-1)

        q = est_Net(state)[exp["action"]].detach().cpu().numpy()
        q_next = exp["reward"] + self.gamma * torch.max(est_Net(state).detach())
        # TD error
        p = (np.abs(q_next.cpu().numpy() - q) + (np.e ** -10)) ** alpha
        self.priority.append(p.item())

    def get_prioritized_batch(self, N):
        prob = self.priority / np.sum(self.priority)
        # random.choices(list,weights=None,*,cum_weights=None,k=1)
        # weight: set the chosen item rate
        # k: times for choice
        # cum_weight: sum of weight
        sample_idxes = random.choices(range(len(prob)), k=N, weights=prob)
        importance = (1 / prob) * (1 / len(self.priority))
        sampled_importance = np.array(importance)[sample_idxes]
        sampled_batch = np.array(self._storage)[sample_idxes]
        return sampled_batch.tolist(), sampled_importance

    def buffer_sample(self, N):
        # random n samples from exp buffer
        return random.sample(self._storage, N)


class DDQNPB(nn.Module):
    def __init__(
        self,
        env,
        state_dim,
        action_dim,
        lr=0.001,
        gamma=0.99,
        buffer_size=1000,
        batch_size=50,
        beta=1,
        beta_decay=0.995,
        beta_min=0.01,
        timestamp="",
    ):
        # env: environment
        # state_dim: state space
        # action_dim: action space
        # lr: learning rate
        # gamma: loss/discount factor
        # batch_size: training batch size
        super(DDQNPB, self).__init__()

        self.timestamp = timestamp

        self.test_env = copy.deepcopy(env)  # for evaluation purpose
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

        self.ReplayBuffer = Prioritized_Replay(
            buffer_size,
            100,
            self.state_dim,
            self.action_dim,
            self.estimate_net,
            self.target_net,
            gamma,
        )
        self.priority = self.ReplayBuffer.priority
        # NOTE: right here beta is equal to (1-beta) in most of website articles, notation difference
        # start from 1 and decay
        self.beta = beta
        self.beta_decay = beta_decay
        self.beta_min = beta_min

    def choose_action(self, state, epsilon=0.9):
        # state: env state
        # epsilon: [0,1]
        # return action you choose
        # get a 1D array
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target, i)
        state = torch.FloatTensor(target).to(device)
        if np.random.randn() <= epsilon:
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            action = np.random.randint(0, self.action_dim)
        return action

    def train(self, num_episode):
        # num_epochs: training times
        loss_list = []
        avg_reward_list = []
        episode_reward = 0

        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = env.reset()
            each_loss = 0
            step = 0
            rend = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                action = self.choose_action(state)
                observation, reward, done, _, info = env.step(action)
                # self.env.render()
                # store experience to replay memory
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # importance weighting
                if self.beta > self.beta_min:
                    self.beta *= self.beta_decay

                # sample random batch from replay memory
                exp_batch, importance = self.ReplayBuffer.get_prioritized_batch(
                    self.batch_size
                )
                importance = torch.FloatTensor(importance ** (1 - self.beta)).to(device)

                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)

                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

                # get target Q value
                max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_idx.unsqueeze(1))

                # compute mse loss
                # loss = F.mse_loss(estimate_Q, target_Q)
                loss = torch.mean(
                    torch.multiply(torch.square(estimate_Q - target_Q), importance)
                )
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                #TODO
                # update target network
                if self.learn_step_counter % 10 == 0:
                    # self.update_target_networks()
                    self.target_net.load_state_dict(self.estimate_net.state_dict())

                self.learn_step_counter += 1
                step += 1

                env.render()
                # you can update these variables
                # if episode_reward % 100 == 0:
                #     rend += 1
                #     if rend % 5 == 0:
                #         self.is_rend = True
                #     else:
                #         self.is_rend = False

            reward, count = self.eval()
            episode_reward += reward


            # save
            period = 1
            if episode % period == 0:
                # log
                each_loss /= period
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print(
                    "\nepoch: [{}/{}], \tavg loss: {:.4f}, \tavg reward: {:.3f}, \tsteps: {}".format(
                        episode, num_episode, each_loss, episode_reward, count
                    )
                )
                # episode_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(),path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")

        env.close()
        return loss_list, avg_reward_list

    def eval(self):
        """
        Evaluate the policy
        """
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count


if __name__ == "__main__":

    # timestamp for saving
    named_tuple = time.localtime()  # get struct_time
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)

    double_dqn_prioritized_object = DDQNPB(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        buffer_size=1000,
        batch_size=64,
        timestamp=time_string,
    )

    # Train the policy
    iterations = 10000
    avg_loss, avg_reward_list = double_dqn_prioritized_object.train(iterations)

    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_prioritized_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")
    torch.save(double_dqn_prioritized_object.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_MODEL.pt")

有些东西可以自己改掉,自己调出的bug才是好bug!(大雾)

写在后面:

关于自定义环境,刚刚花30分钟研究了一下,官方写的教程稀烂(狗头),我自己得到的攻略如下:

  1. 找到你的highway-env安装包位置,我的在:E:\formalFiles\Anaconda3-2020.07\envs\autodrive_38\Lib\site-packages\highway_env
  2. 在highway-env里的envs可以看到多个场景的定义文件,此处拿出intersection_env.py举例,其他的同理。新建一个文件test_env.py,把intersection_env.py的所有内容复制粘贴到里面。

    2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

  3. 在test_env.py里,重命名如下:
    class test(AbstractEnv):
        #
        # ACTIONS: Dict[int, str] = {
        #     0: 'SLOWER',
        #     1: 'IDLE',
        #     2: 'FASTER'
        # }
        ACTIONS: Dict[int, str] = {
            0: 'LANE_LEFT',
            1: 'IDLE',
            2: 'LANE_RIGHT',
            3: 'FASTER',
            4: 'SLOWER'
        }

    删除除了第一个class以外的所有class定义。这里是把动作区间改成5个。

  4. 在envs/_init_.py的末尾加上
    from highway_env.envs.test_env import *
  5. 在highway-env文件夹里找到一个单独的_init_.py,不是上一步的python文件!修改如下:
    def register_highway_envs():
        """Import the envs module so that envs register themselves."""
        # my test environment
        register(
            id='test-v0',# 引用名
            entry_point='highway_env.envs:test'#环境类名
        )
  6. 修改奖励,来到你的定义环境文件highway-env/envs/test_env.py里面,看到_reward函数,以及和它有关的_agent_reward函数等,可自行改掉算子。utils.py中有函数lmap()。
    def _reward(self, action: int) -> float:
        """Aggregated reward, for cooperative agents."""
        return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles
                       ) / len(self.controlled_vehicles)
    
    def _agent_reward(self, action: int, vehicle: Vehicle) -> float:
        """Per-agent reward signal."""
        rewards = self._agent_rewards(action, vehicle)
        reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())
        reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward
        reward *= rewards["on_road_reward"]
        if self.config["normalize_reward"]:
            reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])
        return reward
    
    def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:
        """Per-agent per-objective reward signal."""
        scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])
        return {
                "collision_reward": vehicle.crashed,
                "high_speed_reward": np.clip(scaled_speed, 0, 1),
                "arrived_reward": self.has_arrived(vehicle),
                "on_road_reward": vehicle.on_road
            }
  7. 引用自定义环境如下:
    import highway-env
    import gym
    
    env = gym.make("test-v0")
    env.reset()
  8. 我自定义的环境文件,个人设定,不代表最佳结果:
    from typing import Dict, Tuple, Text
    
    import numpy as np
    from highway_env import utils
    from highway_env.envs.common.abstract import AbstractEnv, MultiAgentWrapper
    from highway_env.road.lane import LineType, StraightLane, CircularLane, AbstractLane
    from highway_env.road.regulation import RegulatedRoad
    from highway_env.road.road import RoadNetwork
    from highway_env.vehicle.kinematics import Vehicle
    from highway_env.vehicle.controller import ControlledVehicle
    
    
    class test(AbstractEnv):
        #
        # ACTIONS: Dict[int, str] = {
        #     0: 'SLOWER',
        #     1: 'IDLE',
        #     2: 'FASTER'
        # }
        ACTIONS: Dict[int, str] = {
            0: 'LANE_LEFT',
            1: 'IDLE',
            2: 'LANE_RIGHT',
            3: 'FASTER',
            4: 'SLOWER'
        }
        ACTIONS_INDEXES = {v: k for k, v in ACTIONS.items()}
    
        @classmethod
        def default_config(cls) -> dict:
            config = super().default_config()
            config.update({
                "observation": {
                    "type": "Kinematics",
                    "vehicles_count": 15,
                    "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
                    "features_range": {
                        "x": [-100, 100],
                        "y": [-100, 100],
                        "vx": [-20, 20],
                        "vy": [-20, 20],
                    },
                    "absolute": True,
                    "flatten": False,
                    "observe_intentions": False
                },
                "action": {
                    "type": "DiscreteMetaAction",
                    "longitudinal": True,
                    "lateral": True,
                    "target_speeds": [0, 4.5, 9]
                },
                "duration": 13,  # [s]
                "destination": "o1",
                "controlled_vehicles": 1,
                "initial_vehicle_count": 10,
                "spawn_probability": 0.6,
                "screen_width": 600,
                "screen_height": 600,
                "centering_position": [0.5, 0.6],
                "scaling": 5.5 * 1.3,
                "collision_reward": -10,
                "high_speed_reward": 2,
                "arrived_reward": 5,
                "reward_speed_range": [7.0, 9.0],# change
                "normalize_reward": False,
                "offroad_terminal": False
            })
            return config
    
        def _reward(self, action: int) -> float:
            """Aggregated reward, for cooperative agents."""
            return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles
                       ) / len(self.controlled_vehicles)
    
        def _rewards(self, action: int) -> Dict[Text, float]:
            """Multi-objective rewards, for cooperative agents."""
            agents_rewards = [self._agent_rewards(action, vehicle) for vehicle in self.controlled_vehicles]
            return {
                name: sum(agent_rewards[name] for agent_rewards in agents_rewards) / len(agents_rewards)
                for name in agents_rewards[0].keys()
            }
        # edit your reward
        def _agent_reward(self, action: int, vehicle: Vehicle) -> float:
            """Per-agent reward signal."""
            rewards = self._agent_rewards(action, vehicle)
            reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())
            reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward
            reward *= rewards["on_road_reward"]
            if self.config["normalize_reward"]:
                reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])
            return reward
    
        def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:
            """Per-agent per-objective reward signal."""
            scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])
            return {
                "collision_reward": vehicle.crashed,
                "high_speed_reward": np.clip(scaled_speed, 0, 1),
                "arrived_reward": self.has_arrived(vehicle),
                "on_road_reward": vehicle.on_road
            }
    
        def _is_terminated(self) -> bool:
            return any(vehicle.crashed for vehicle in self.controlled_vehicles) \
                   or all(self.has_arrived(vehicle) for vehicle in self.controlled_vehicles) \
                   or (self.config["offroad_terminal"] and not self.vehicle.on_road)
    
        def _agent_is_terminal(self, vehicle: Vehicle) -> bool:
            """The episode is over when a collision occurs or when the access ramp has been passed."""
            return (vehicle.crashed or
                    self.has_arrived(vehicle) or
                    self.time >= self.config["duration"])
    
        def _is_truncated(self) -> bool:
            return
    
        def _info(self, obs: np.ndarray, action: int) -> dict:
            info = super()._info(obs, action)
            info["agents_rewards"] = tuple(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles)
            info["agents_dones"] = tuple(self._agent_is_terminal(vehicle) for vehicle in self.controlled_vehicles)
            return info
    
        def _reset(self) -> None:
            self._make_road()
            self._make_vehicles(self.config["initial_vehicle_count"])
    
        def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]:
            obs, reward, terminated, truncated, info = super().step(action)
            self._clear_vehicles()
            self._spawn_vehicle(spawn_probability=self.config["spawn_probability"])
            return obs, reward, terminated, truncated, info
    
        def _make_road(self) -> None:
            """
            Make an 4-way intersection.
    
            The horizontal road has the right of way. More precisely, the levels of priority are:
                - 3 for horizontal straight lanes and right-turns
                - 1 for vertical straight lanes and right-turns
                - 2 for horizontal left-turns
                - 0 for vertical left-turns
    
            The code for nodes in the road network is:
            (o:outer | i:inner + [r:right, l:left]) + (0:south | 1:west | 2:north | 3:east)
    
            :return: the intersection road
            """
            lane_width = AbstractLane.DEFAULT_WIDTH
            right_turn_radius = lane_width + 5  # [m}
            left_turn_radius = right_turn_radius + lane_width  # [m}
            outer_distance = right_turn_radius + lane_width / 2
            access_length = 50 + 50  # [m]
    
            net = RoadNetwork()
            n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED
            for corner in range(4):
                angle = np.radians(90 * corner)
                is_horizontal = corner % 2
                priority = 3 if is_horizontal else 1
                rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]])
                # Incoming
                start = rotation @ np.array([lane_width / 2, access_length + outer_distance])
                end = rotation @ np.array([lane_width / 2, outer_distance])
                net.add_lane("o" + str(corner), "ir" + str(corner),
                             StraightLane(start, end, line_types=[s, c], priority=priority, speed_limit=10))
                # Right turn
                r_center = rotation @ (np.array([outer_distance, outer_distance]))
                net.add_lane("ir" + str(corner), "il" + str((corner - 1) % 4),
                             CircularLane(r_center, right_turn_radius, angle + np.radians(180), angle + np.radians(270),
                                          line_types=[n, c], priority=priority, speed_limit=10))
                # Left turn
                l_center = rotation @ (np.array([-left_turn_radius + lane_width / 2, left_turn_radius - lane_width / 2]))
                net.add_lane("ir" + str(corner), "il" + str((corner + 1) % 4),
                             CircularLane(l_center, left_turn_radius, angle + np.radians(0), angle + np.radians(-90),
                                          clockwise=False, line_types=[n, n], priority=priority - 1, speed_limit=10))
                # Straight
                start = rotation @ np.array([lane_width / 2, outer_distance])
                end = rotation @ np.array([lane_width / 2, -outer_distance])
                net.add_lane("ir" + str(corner), "il" + str((corner + 2) % 4),
                             StraightLane(start, end, line_types=[s, n], priority=priority, speed_limit=10))
                # Exit
                start = rotation @ np.flip([lane_width / 2, access_length + outer_distance], axis=0)
                end = rotation @ np.flip([lane_width / 2, outer_distance], axis=0)
                net.add_lane("il" + str((corner - 1) % 4), "o" + str((corner - 1) % 4),
                             StraightLane(end, start, line_types=[n, c], priority=priority, speed_limit=10))
    
            road = RegulatedRoad(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])
            self.road = road
    
        def _make_vehicles(self, n_vehicles: int = 10) -> None:
            """
            Populate a road with several vehicles on the highway and on the merging lane
    
            :return: the ego-vehicle
            """
            # Configure vehicles
            vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])
            vehicle_type.DISTANCE_WANTED = 5  # Low jam distance
            vehicle_type.COMFORT_ACC_MAX = 6
            vehicle_type.COMFORT_ACC_MIN = -3
    
            # Random vehicles
            simulation_steps = 3
            for t in range(n_vehicles - 1):
                self._spawn_vehicle(np.linspace(0, 80, n_vehicles)[t])
            for _ in range(simulation_steps):
                [(self.road.act(), self.road.step(1 / self.config["simulation_frequency"])) for _ in range(self.config["simulation_frequency"])]
    
            # Challenger vehicle
            self._spawn_vehicle(60, spawn_probability=1, go_straight=True, position_deviation=0.1, speed_deviation=0)
    
            # Controlled vehicles
            self.controlled_vehicles = []
            for ego_id in range(0, self.config["controlled_vehicles"]):
                ego_lane = self.road.network.get_lane(("o{}".format(ego_id % 4), "ir{}".format(ego_id % 4), 0))
                destination = self.config["destination"] or "o" + str(self.np_random.randint(1, 4))
                ego_vehicle = self.action_type.vehicle_class(
                                 self.road,
                                 ego_lane.position(60 + 5*self.np_random.normal(1), 0),
                                 speed=ego_lane.speed_limit,
                                 heading=ego_lane.heading_at(60))
                try:
                    ego_vehicle.plan_route_to(destination)
                    ego_vehicle.speed_index = ego_vehicle.speed_to_index(ego_lane.speed_limit)
                    ego_vehicle.target_speed = ego_vehicle.index_to_speed(ego_vehicle.speed_index)
                except AttributeError:
                    pass
    
                self.road.vehicles.append(ego_vehicle)
                self.controlled_vehicles.append(ego_vehicle)
                for v in self.road.vehicles:  # Prevent early collisions
                    if v is not ego_vehicle and np.linalg.norm(v.position - ego_vehicle.position) < 20:
                        self.road.vehicles.remove(v)
    
        def _spawn_vehicle(self,
                           longitudinal: float = 0,
                           position_deviation: float = 1.,
                           speed_deviation: float = 1.,
                           spawn_probability: float = 0.6,
                           go_straight: bool = False) -> None:
            if self.np_random.uniform() > spawn_probability:
                return
    
            route = self.np_random.choice(range(4), size=2, replace=False)
            route[1] = (route[0] + 2) % 4 if go_straight else route[1]
            vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])
            vehicle = vehicle_type.make_on_lane(self.road, ("o" + str(route[0]), "ir" + str(route[0]), 0),
                                                longitudinal=(longitudinal + 5
                                                              + self.np_random.normal() * position_deviation),
                                                speed=8 + self.np_random.normal() * speed_deviation)
            for v in self.road.vehicles:
                if np.linalg.norm(v.position - vehicle.position) < 15:
                    return
            vehicle.plan_route_to("o" + str(route[1]))
            vehicle.randomize_behavior()
            self.road.vehicles.append(vehicle)
            return vehicle
    
        def _clear_vehicles(self) -> None:
            is_leaving = lambda vehicle: "il" in vehicle.lane_index[0] and "o" in vehicle.lane_index[1] \
                                         and vehicle.lane.local_coordinates(vehicle.position)[0] \
                                         >= vehicle.lane.length - 4 * vehicle.LENGTH
            self.road.vehicles = [vehicle for vehicle in self.road.vehicles if
                                  vehicle in self.controlled_vehicles or not (is_leaving(vehicle) or vehicle.route is None)]
    
        def has_arrived(self, vehicle: Vehicle, exit_distance: float = 25) -> bool:
            return "il" in vehicle.lane_index[0] \
                   and "o" in vehicle.lane_index[1] \
                   and vehicle.lane.local_coordinates(vehicle.position)[0] >= exit_distance
    
    

哦,都要一个可视化是吧?来了来了。

在test-v0下,用double_dqn.py训练的图:(action_dim==5)

目前是单智能体,后续的多智能体需要调整输入的数据和动作,以及控制小车的数量,做为后续的待定改进点。

2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

其他?等我写好 多智能体 0-0!

待好心人补充....毕竟这里是无人区啊(悲)

2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

 终有一日,我会成为神一样的提纳里先生!文章来源地址https://www.toymoban.com/news/detail-415197.html

到了这里,关于2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Webhook使用教程详解【2023更新】

    Webhook是一种用于实现不同应用程序之间实时通信的机制。它通过HTTP协议将事件或数据从一个应用程序传递到另一个应用程序。以下是一个Webhook使用教程的详细解释:   1. Webhook基本原理: Webhook基于发布-订阅模式工作。一个应用程序(发布者)通过向另一个应用程序(订阅

    2024年02月07日
    浏览(37)
  • Django实现接口自动化平台(九)环境envs序列化器及视图【持续更新中】

    相关文章: Django实现接口自动化平台(八)测试报告reports序列化器及视图【持续更新中】_做测试的喵酱的博客-CSDN博客 本章是项目的一个分解,查看本章内容时,要结合整体项目代码来看: python django vue httprunner 实现接口自动化平台(最终版)_python+vue自动化测试平台_做测

    2024年02月16日
    浏览(44)
  • 软件使用错误(警告)记录(持续更新)

     本博客用以记录在软件使用过程中所遇到的错误和关键性的警告,以及这些警告和错误的解决方法,方便日后查看以及能为其他遇到同样问题的人提供一个可能的解决方法。需要注意的是,此处记录的方法是根据本人遇到的问题记录的,所以在解决自己遇到的问题的时候需

    2023年04月08日
    浏览(65)
  • Gradle使用教程完整分享【2023年更新】

    Gradle是一个基于Apache Ant和Apache Maven概念的自动化构建工具。它是一个灵活、功能强大的构建工具,用于构建、测试和部署软件项目。   以下是Gradle的一些具体应用:   1. 项目构建: Gradle主要用于构建软件项目。它可以管理项目中的依赖关系、编译源代码、运行单元测试、打

    2024年02月07日
    浏览(35)
  • 记录工作项目中使用的插件(持续更新中)

    1.HighLightingSystem 用于3D物体高亮显示 在项目中的使用:导入插件后在需要高亮显示的3d物体上附加Highlighter组件,在需要显示高亮效果的摄像机上附加Highlighting Renderer组件。在代码中调整Highlighter属性即可控制物体高亮效果的开关、闪烁。 使用场景:提示玩家点击,或鼠标进入

    2024年02月05日
    浏览(48)
  • 已解决:Vue改变数据后界面不自动渲染,Vue中使用v-for遍历对象数组,当给其中某个元素对象重新赋值之后,页面组件列表没有渲染更新。

    问题如标题所说,Vue中使用v-for遍历对象数组,当给数组其中某个元素对象重新赋值之后,页面组件列表没有及时更新渲染,而是在页面进行了其他的渲染操作以后列表才更新出来新的数据,那同样给对象内的属性值赋值也可能存在不渲染的情况,一并解决,尤其发生在网络

    2024年02月06日
    浏览(55)
  • 使用国内镜像源在线安装QT(2023.3.25更新)

    STEP1 :下载qt online installer Index of /official_releases/online_installers (qt.io) STEP2 :使用国内镜像源在线安装Qt      qDPass( 12MB/s) 在《STEP1》下载的“qt-unified-windows-x64-4.5.2-online.exe”目录进入CMD,然后运行下面的命令:  ./qt-unified-windows-x64-4.5.2-online.exe --mirror https://mirror.nju.edu.cn/qt   

    2024年02月12日
    浏览(45)
  • 【电脑Tips】Win11自动更新之后开机黑屏

    目录 0.问题描述 1. 释放静电 具体操作 效果 参考博客 2. 运行explorer.exe 具体操作: 【问题】:如何打开任务管理器? 效果 参考博客 另外的运行方法 3. 禁用APP Readiness服务 具体操作 效果 参考博客 4. CMD启动explorer.exe 具体操作 效果 参考博客 5. 高级选项【有效】 具体操作 【问

    2024年02月09日
    浏览(55)
  • Mars3D使用过程遇到的问题记录【持续更新】

    需要标注线面的角度heading 2022年6月23日 heading计算方式: https://turfjs.fenxianglu.cn/ 计算两点之间的角度 直接F12在控制台可以计算 eg: 加载gltf模型,模型是透明的,需要改为不透明 2022年6月23日 用文本编辑器打开.gltf,把里面的\\\"alphaMode\\\":\\\"BLEND\\\"改成\\\"alphaMode\\\":\\\"OPAQUE\\\" 模型旋转之后,标

    2024年02月08日
    浏览(58)
  • VSCode更新之后出现的问题 自动下载.NET Runtime

    VSCode更新有bug,我更新的时间是2023/8/9,开C#项目的时候 会出现这种自动下载.NET Runtime的问题,   解决方案:setting.json增加如下代码块 \\\"dotnetAcquisitionExtension.existingDotnetPath\\\": [ { \\\"extensionId\\\": \\\"ms-dotnettools.csharp\\\", \\\"path\\\":\\\"C:\\\\Program Files\\\\dotnet\\\\dotnet.exe\\\" } ] 路径需要自己配置一下 github-

    2024年01月23日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包