16.6 第二个环境
创建一个名为PredatorPreyEnv2的自定义强化学习环境类,用于模拟实现“捕食者-猎物”(Predator-Prey)的环境。这是一个多智能体环境,其中捕食者追逐猎物,而猎物尽量躲避被捕食。与传统的Predator-Prey环境不同,这个版本中的猎物不会永久消失,并且捕食者只能在一次行动中捕食猎物。这个环境常用于研究协作、竞争和策略选择等问题。对环境PredatorPreyEnv2的具体说明如下:
- Predator-Prey(捕食者-猎物):这个环境中包括两种类型的角色,一种是捕食者(Predator),另一种是猎物(Prey)。捕食者的目标是捕获猎物,而猎物的目标是躲避捕食者。
- Immortal Prey(不死猎物):在这个环境中,猎物是“不死的”,意味着它们不会因为被捕食而消失。通常,在传统的Predator-Prey环境中,猎物被捕食后会被移除,但在这个版本中,猎物不会被永久消失,它们可以一次又一次地被捕食。
- One time eat(一次性捕食):这意味着捕食者只能在一次行动中捕食到猎物,之后猎物不会再次被同一个捕食者捕食。这一规则增加了游戏的挑战,因为捕食者需要选择最佳的时机和策略来捕食猎物。
环境类PredatorPreyEnv2的具体实现流程如下所示。
(1)导入必要的Python库和模块,包括Gym,NumPy,Pygame等。具体实现代码如下所示。
import gymnasium as gym
from gymnasium import spaces
import pygame
import numpy as np
from collections import deque
(2)定义自定义Gym环境的类,该类继承自gym.Env,这是一个Gym环境的基类。具体实现代码如下所示。
class PredatorPreyEnv2(gym.Env):
metadata = {'render.modes': ['human', 'rgb_array'],
'render-fps': 4}
(3) 实现类PredatorPreyEnv2的构造函数,用于初始化环境的各种属性和参数。其中包括环境的大小、视野、捕食者和猎物数量、奖励设置等等。构造函数还设置了观察空间和动作空间的规格。具体实现代码如下所示。
def __init__(self,
render_mode=None,
size:int=10,
vision:int=5,
predator:int =3,
prey:int =1,
error_reward:float=-2,
success_reward:float=10,
living_reward:float=-1,
img_mode:bool=False,
episode_length:int=100,
history_length:int=4,
communication_bits:int=0,
cooperate:float=1):
self.size = size
self.vision = vision
self.window_size = 500
self.render_mode = render_mode
self.predator_num = predator
self.prey_num = prey
self.active_predator = [True for i in range(self.predator_num)]
self.active_prey = [True for i in range(self.prey_num)]
self.error_reward = error_reward
self.success_reward = success_reward
self.living_reward = living_reward
self.episode_length = episode_length
self.img_mode = img_mode
self.steps = 0
self.window = None
self.clock = None
self.cooperate = cooperate
self.render_scale = 1
self.observation_space = spaces.Dict({
'predator': spaces.Sequence(spaces.Box(0, size-1, shape=(2,), dtype=np.int32)),
'prey': spaces.Box(0, size-1, shape=(2,), dtype=np.int32),
})
total_actions = 5
self.action_space_predator = spaces.MultiDiscrete([total_actions]*predator)
self.action_space_prey = spaces.MultiDiscrete([total_actions]*prey)
self.single_action_space = spaces.Discrete(total_actions)
self._action_to_direction = {
0: np.array([0, 1]),
1: np.array([1, 0]),
2: np.array([0, -1]),
3: np.array([-1, 0]),
4: np.array([0, 0])
}
self.frame_history = deque(maxlen=4)
self.history_length = history_length
self.communication_bits = communication_bits
if self.communication_bits>0:
self.pred_communication = np.zeros((self.predator_num))
self.prey_communication = np.zeros((self.prey_num))
(4)分别实现方法_get_obs(self) 和_get_np_arr_obs(self),这两个方法用于获取当前环境的观察值(状态)。第一个方法返回一个包含捕食者和猎物位置的字典,第二个方法返回一个包含捕食者和猎物位置的NumPy数组。具体实现代码如下所示。
def _get_obs(self):
if self.img_mode:
return self._get_np_arr_obs()
return {
'predator': self._predator_location,
'prey': self._prey_location
}
(5)定义方法_get_info(self),用于获取环境的其他信息,通常为空字典。具体实现代码如下所示。
def _get_np_arr_obs(self):
predator_states = []
prey_states = []
for i in range(len(self._predator_location)):
state = self._render_predator_frame(predator_id=i)
predator_states.append(state)
for i in range(len(self._prey_location)):
state = self._render_prey_frame(prey_id=i)
prey_states.append(state)
return {
"predator":predator_states,
"prey":prey_states
}
(6)定义方法reset(self, *, seed: int=1, options=None),用于重置环境的状态,通常在每个新的回合(episode)开始时调用。它随机初始化了捕食者和猎物的位置,并返回初始观察值。具体实现代码如下所示。
def reset(self, *, seed: int=1, options=None):
self._predator_location = np.random.randint(0, self.size, size=(self.predator_num, 2))
self._prey_location = np.random.randint(0, self.size, size=(self.prey_num, 2))
self.steps = 0
self.active_predator = [True for i in range(self.predator_num)]
self.active_prey = [True for i in range(self.prey_num)]
if self.render_mode == 'human':
self._render_frame()
self._save_frame_history()
return self._get_frame_history(self.history_length), self._get_info()
(7)定义方法get_reward(self) 和方法_get_prey_reward(self),这两个方法用于计算捕食者和猎物的奖励。前者计算捕食者的奖励,如果捕食者成功捕获猎物,会获得正奖励,否则获得负奖励。后者计算猎物的奖励,如果猎物被捕获,奖励为0,否则为正奖励。具体实现代码如下所示。
def _get_reward(self):
# if any predator reaches prey, success. else, living reward
rewards = [self.living_reward for i in range(self.predator_num)]
for i in range(self.predator_num):
for j in range(self.prey_num):
if self.active_predator[i]:
if np.all(self._predator_location[i]==self._prey_location[j]):
rewards = [self.cooperate*self.success_reward for i in range(self.predator_num)]
rewards[i] = self.success_reward
# print("EATEN")
return rewards
return rewards
def _get_prey_reward(self):
# if any predator reaches prey, success. else, living reward
rewards = [self.success_reward for i in range(self.prey_num)]
for i in range(self.prey_num):
if self._prey_location[i] in self._predator_location:
rewards[i] = 0
return rewards
(8)定义方法_is_done(self),用于判断当前回合是否结束。如果捕食者全部死亡、达到最大步数或者所有猎物被捕获,回合结束。具体实现代码如下所示。
def _is_done(self):
# if all prey are gone or episode length is reached, done
if self.steps >= self.episode_length:
return True
if np.sum(self.active_predator)==0:
return True
return False
if np.sum(self.active_prey) == 0:
return True
return False
(9)分别定义方法_is_valid_predator(self, location, index) 和方法_is_valid_prey(self, location, index),这两个方法用于检查捕食者和猎物的移动是否有效,是否超出边界或重叠。具体实现代码如下所示。
def _is_valid_predator(self, location, index):
# check if location is valid
if location[0] < 0 or location[0] >= self.size or location[1] < 0 or location[1] >= self.size:
return False
if location in np.delete(self._predator_location, index, axis=0):
return False
return True
def _is_valid_prey(self, location, index):
# check if location is valid for prey of i'th index
if location[0] < 0 or location[0] >= self.size or location[1] < 0 or location[1] >= self.size:
return False
if location in np.delete(self._prey_location, index, axis=0):
return False
return True
(10)定义方法render(self),用于渲染环境,返回渲染结果。根据不同的渲染模式,可能返回RGB数组或者在Pygame窗口中显示。具体实现代码如下所示。
def render(self):
if self.render_mode =='rgb_array':
return self._render_frame()
(11)分别定义方法save_frame_history(self) 和方法_get_frame_history(self, history=4),这两个方法用于保存和获取环境的历史帧,通常用于可视化和回放。具体实现代码如下所示。
def _save_frame_history(self):
self.frame_history.append(self._get_obs())
def _get_frame_history(self, history=4):
if len(self.frame_history) < history:
return None
return list(self.frame_history)[-history:]
(12)定义方法step(),用于模拟智能体与环境之间的交互过程的一步,接收捕食者和猎物的动作,更新环境状态,并返回新的观察值、奖励、是否结束以及其他信息。具体实现代码如下所示。
def step(self, action_pred, action_prey, pred_communication=None, prey_communication=None):
if self._is_done():
raise RuntimeError("Episode is done")
self.steps += 1
# move predator
for i in range(self.predator_num):
if i < len(action_pred):
action = action_pred[i]
else:
action = self.single_action_space.sample()
new_location = self._predator_location[i] + self._action_to_direction[action]
if self._is_valid_predator(new_location, i):
self._predator_location[i] = new_location
# move prey
for i in range(self.prey_num):
if self.active_prey[i] == False: # if prey is dead,
continue
if i < len(action_prey):
action = action_prey[i]
else:
action = self.single_action_space.sample()
new_location = self._prey_location[i] + self._action_to_direction[action]
if self._is_valid_prey(new_location, i):
self._prey_location[i] = new_location
# check if any predator reaches prey and give reward
pred_reward = self._get_reward()
prey_reward = self._get_prey_reward()
for i in range(self.predator_num):
for j in range (self.prey_num):
if np.all(self._predator_location[i] == self._prey_location[j]):
self.active_predator[i] = False
#save communication of agents
if self.communication_bits > 0:
self.pred_communication = pred_communication
self.prey_communication = prey_communication
done = self._is_done()
reward = {
'predator': pred_reward,
'prey': prey_reward
}
if self.render_mode == 'human':
self._render_frame()
self._save_frame_history()
return self._get_frame_history(self.history_length), reward, done, self._get_info()
上述代码的实现流程如下所示:
- 接收智能体的动作:step() 方法接收智能体的动作作为输入参数。这些动作决定了智能体在当前时间步骤要采取的行动。
- 更新环境状态:根据智能体的动作,step() 方法会更新环境的内部状态,包括智能体的位置、奖励分配等等。这个更新可能包括移动智能体、改变环境状态等。
- 计算奖励:step() 方法会根据当前的环境状态和智能体的动作计算奖励。奖励表示智能体在执行该动作后的表现好坏,通常是一个数值。
- 判断是否结束:方法还会检查当前回合是否结束,可能的结束条件包括达到最大步数、任务成功完成或失败等。
- 返回结果:step() 方法会返回一个包含以下信息的元组:新的观察值(环境状态)、奖励、是否结束、其他信息。这些信息通常被用于智能体的学习和决策。
总之,step()方法是智能体与环境互动的核心,它模拟了一个时间步骤中的所有操作,允许智能体与环境进行连续的交互,以便智能体能够学习并改进其策略以达到某个目标。
(13)分别定义方法_render_predator_frame(self, predator_id:int=0) 和方法_render_prey_frame(self, prey_id:int=1),这两个方法用于渲染捕食者和猎物的图像帧,根据传入的捕食者或猎物的ID,在图像中绘制相应的位置信息。具体实现代码如下所示。
def _render_predator_frame(self, predator_id:int=0):
if predator_id==None:
return
frame = np.zeros((4, self.vision, self.vision), dtype=np.uint8)
# draw predator
pred_loc = self._predator_location[predator_id]
min_pred_loc = pred_loc - np.array([self.vision//2, self.vision//2])
max_pred_loc = pred_loc + np.array([self.vision//2, self.vision//2])
# add predator to centre of frame
frame[1, self.vision//2, self.vision//2] = self.active_predator[predator_id]
# for each predator or prey within min and max it will be added in the frame
for i in range(self.predator_num):
if i==predator_id:
continue
if (min_pred_loc[0] <= self._predator_location[i][0] <= max_pred_loc[0]
and
min_pred_loc[1] <= self._predator_location[i][1] <= max_pred_loc[1]):
loc_x = self._predator_location[i][0]-min_pred_loc[0]
loc_y = self._predator_location[i][1]-min_pred_loc[1]
# frame[2, loc_x, loc_y] = self.render_scale
frame[2, loc_x, loc_y] = int(self.active_predator[i])
# frame[2, loc_x, loc_y] = self.pred_communication[i]
if self.communication_bits > 0:
frame[3, loc_x, loc_y] = self.pred_communication[i]
for i in range(self.prey_num):
if (min_pred_loc[0] <= self._prey_location[i][0] <= max_pred_loc[0]
and
min_pred_loc[1] <= self._prey_location[i][1] <= max_pred_loc[1]):
loc_x = self._prey_location[i][0]-min_pred_loc[0]
loc_y = self._prey_location[i][1]-min_pred_loc[1]
frame[0, loc_x, loc_y] = self.render_scale
# create white for cells outside grid
if min_pred_loc[0] < 0:
frame[:, :abs(min_pred_loc[0]), :] = self.render_scale
if max_pred_loc[0] >= self.size:
frame[:, -(max_pred_loc[0]-self.size+1):, :] = self.render_scale
if min_pred_loc[1] < 0:
frame[:, :, :abs(min_pred_loc[1])] = self.render_scale
if max_pred_loc[1] >= self.size:
frame[:, :, -(max_pred_loc[1]-self.size+1):] = self.render_scale
return frame
def _render_prey_frame(self, prey_id:int=1):
if prey_id==None:
return
frame = np.zeros((3, self.vision, self.vision), dtype=np.uint8)
# draw prey
prey_loc = self._prey_location[prey_id]
min_prey_loc = prey_loc - np.array([self.vision//2, self.vision//2])
max_prey_loc = prey_loc + np.array([self.vision//2, self.vision//2])
# add prey to centre of frame
frame[1, self.vision//2, self.vision//2] = self.render_scale
# for each predator or prey within min and max it will be added in the frame
for i in range(self.predator_num):
if (min_prey_loc[0] <= self._predator_location[i][0] <= max_prey_loc[0]
and
min_prey_loc[1] <= self._predator_location[i][1] <= max_prey_loc[1]):
frame[2, self._predator_location[i][0]-min_prey_loc[0], self._predator_location[i][1]-min_prey_loc[1]] = self.render_scale
for i in range(self.prey_num):
if (min_prey_loc[0] <= self._prey_location[i][0] <= max_prey_loc[0]
and
min_prey_loc[1] <= self._prey_location[i][1] <= max_prey_loc[1]):
frame[0, self._prey_location[i][0]-min_prey_loc[0], self._prey_location[i][1]-min_prey_loc[1]] = self.render_scale
# create white for cells outside grid
if min_prey_loc[0] < 0:
frame[:, :abs(min_prey_loc[0]), :] = self.render_scale
if max_prey_loc[0] >= self.size:
frame[:, -(max_prey_loc[0]-self.size+1):, :] = self.render_scale
if min_prey_loc[1] < 0:
frame[:, :, :abs(min_prey_loc[1])] = self.render_scale
if max_prey_loc[1] >= self.size:
frame[:, :, -(max_prey_loc[1]-self.size+1):] = self.render_scale
return frame
(14)定义方法_render_frame(self),用于渲染整个环境的图像帧,包括网格、捕食者和猎物的位置。具体实现代码如下所示。 文章来源:https://www.toymoban.com/news/detail-861248.html
def _render_frame(self):
if self.window is None and self.render_mode == 'human':
pygame.init()
pygame.display.init()
self.window = pygame.display.set_mode((self.window_size, self.window_size))
self.window = pygame.display.set_mode((self.window_size, self.window_size))
if self.clock is None and self.render_mode == 'human':
self.clock = pygame.time.Clock()
canvas = pygame.Surface((self.window_size, self.window_size))
canvas.fill((255, 255, 255))
pixel_size = self.window_size // self.size
# draw grid
for i in range(self.size):
pygame.draw.line(canvas, (0, 0, 0), (0, i*pixel_size), (self.window_size, i*pixel_size))
pygame.draw.line(canvas, (0, 0, 0), (i*pixel_size, 0), (i*pixel_size, self.window_size))
# draw prey as rectangle
for i in range(self.prey_num):
if self.active_prey[i]:
pygame.draw.rect(canvas, (255, 0, 0), (self._prey_location[i][1]*pixel_size, self._prey_location[i][0]*pixel_size, pixel_size, pixel_size))
# draw predator as circle
for i in range(self.predator_num):
if self.active_predator[i]:
pygame.draw.circle(canvas, (0, 0, 255), (self._predator_location[i][1]*pixel_size+pixel_size//2, self._predator_location[i][0]*pixel_size+pixel_size//2), pixel_size//2)
if self.render_mode == 'human':
self.window.blit(canvas, canvas.get_rect())
pygame.event.pump()
pygame.display.update()
self.clock.tick(self.metadata['render-fps'])
else:
return np.transpose(pygame.surfarray.array3d(canvas), (1, 0, 2))
(15)定义方法close(self),用于关闭渲染窗口,释放资源。具体实现代码如下所示。文章来源地址https://www.toymoban.com/news/detail-861248.html
def close(self):
if self.window is not None:
pygame.quit()
self.window = None
self.clock = None
未完待续
到了这里,关于(16-3)多智能体强化学习实战:Predator-Prey游戏(3)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!