import gymnasium as gym
import time
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as opitm
import numpy as np
# 10. ReplayBufferクラスを新規作成する
class ReplayBuffer:
def __init__(self, max_memory_size, n_obs_space, n_action_space):
self.max_memory_size = max_memory_size
self.n_obs_space = n_obs_space
self.n_action_space = n_action_space
self.memory_count = 0
self.state_memory = np.zeros(self.max_memory_size)
self.action_memory = np.zeros(self.max_memory_size)
self.reward_memory = np.zeros(self.max_memory_size)
self.next_state_memory = np.zeros(self.max_memory_size)
self.terminal = np.zeros(self.max_memory_size)
# 11.トランジション保存のためstore_transitionメソドを作成する
def store_transition(self, obs, action, reward, next_state, done):
index = self.memory_count % self.max_memory_size # 最大メモリー数に到達したら、古いデータから上書きされていくギミック
self.state_memory[index] = obs.detach().numpy().flatten()[0]
self.action_memory[index] = action.flatten()[0]
self.reward_memory[index] = reward.flatten()[0]
self.next_state_memory[index] = next_state.flatten()[0]
self.terminal[index] = 1 - int(done) # ゴールならterminal = 0 となるように
self.memory_count += 1
# 6.ActorNNクラスを新規作成する
class ActorNN(nn.Module):
def __init__(self, input_dim, output_dim):
print('ActorNN.__init__ is working.')
super(ActorNN, self).__init__()
self.fc1 = nn.Linear(input_dim, 64)
self.fc2 = nn.Linear(64, output_dim)
def forward(self, obs):
print('AgetDDPG.ActorNN.forward is working')
print('====ここまではOK1====')
x = self.fc1(obs)
x = F.relu(x)
mu = self.fc2(x)
print('action μ:', mu)
print('====ここまではOK2====')
action = mu
return action
# 3.エージェントクラスを定義する
class AgentDDPG:
def __init__(self, input_dim, output_dim):
print('AgentDDPG.__init__ is working.')
# 5.ActorNNクラスのインスタンスを生成する
self.input_dim = input_dim
self.output_dim = output_dim
self.actor = ActorNN(input_dim=self.input_dim, output_dim=self.output_dim)
# 9.memoryインスタンスを追加
MAX_MEMORY_SIZE = 1000
self.memory = ReplayBuffer(max_memory_size=MAX_MEMORY_SIZE,
n_obs_space=self.input_dim,
n_action_space=self.output_dim)
def choose_action(self, obs):
print('AgentDDPG.choose_action is working.')
# 4.方策(アクター)はニューラルネットワークで表現する。
# ActorNNクラスを新規作成し、インスタンスactorとして使用する。
action = self.actor.forward(obs)
action = action.detach().numpy()
print('====ここまではOK3====')
return action
# 8.remenberメソドを追加
def remember(self, obs, action, reward, next_state, done):
self.memory.store_transition(obs, action, reward, next_state, done)
# 2.エージェントクラスのインスタンスを生成する
agent = AgentDDPG(input_dim=17, output_dim=6)
env = gym.make("HalfCheetah-v4", render_mode= 'human')
EPISODES = 10
DELAY_TIME = 0.00 # sec
total_rewards = []
for eposode in range(EPISODES):
obs = env.reset()
obs = T.tensor(obs[0], dtype=T.float)
print(type(obs))
print('observation_space : ', env.observation_space)
print('obs :', obs)
reward: float = 0
total_reward: float = 0
done: bool = False
for j in range(100):
env.render()
# ここをDDPGに置き換えていく
action = agent.choose_action(obs) # 1.Agentクラスを定義していく
print('====ここまではOK4====')
print('action_space : ', env.action_space)
print('action : ', action)
next_state, reward, done, _, info = env.step(action)
print('next_state, reward, done, _, info :', next_state, reward, done, _, info)
print('====ここまではOK5====')
#7. トラジェクトを保存する。経験再生(ReplayBuffer)
agent.remember(obs, action, reward, next_state, int(done))
print('next_state:', next_state)
obs = next_state
obs = T.tensor(obs, dtype=T.float)
total_reward += reward
time.sleep(DELAY_TIME)
print('total_reward : ', total_reward)
total_rewards.append(total_reward)
print('total_rewards : ', total_rewards)
env.close() # 空なんですけど・・・
print('script is done.')
# https://gymnasium.farama.org/
print(len(agent.memory.reward_memory))
print(agent.memory.reward_memory)