前回はActorのニューラルネットワークを作って、観測情報obsを入力することによってactionを得る、とうところまでできました。
今回はその結果であるtransition: obs, action, reward, next_state, doneを保存するところを作ります。
これは経験再生:ReplayBufferという方法で、方策πに従って行動した結果をいったんメモリーバッファーとして保存し、ニューラルネットワークのパラメータ学習のときに、そのメモリーバッファーからランダムに取り出して入力データとして使用するために使います、これをやらないで行動の結果の順番通りに入力データとして入れてしまうと、似たようなデータばかり入れて学習することになるのでパラメータが最適化されていきません。
一旦バッファーにいれて、あとで改めてバッチ学習させます。
Contents
agent.remember(s,a,r,s’,done)メソドを作成する
#7. トラジェクトを保存する。経験再生(ReplayBuffer)
agent.remember(obs, action, reward, next_state, int(done))
#8.AgentDDPGクラスにremenberメソドを追加する
def remember(self, obs, action, reward, next_state, done):
self.memory.store_transition(self, obs, action, reward, next_state, done)
# 9.AgentDDPG.__init__()にmemoryインスタンスを追加する。
# 10.memoryインスタンスの元クラスReplayBufferクラスを作成する。
# 11.ReplayBufferクラスのメソドとしてトランジションを保存する実態であるstore_transitionメソドを作成する
ここまでのスクリプト
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import gymnasium as gym import time import torch as T import torch.nn as nn import torch.nn.functional as F import torch.optim as opitm import numpy as np # 10. ReplayBufferクラスを新規作成する class ReplayBuffer: def __init__(self, max_memory_size, n_obs_space, n_action_space): self.max_memory_size = max_memory_size self.n_obs_space = n_obs_space self.n_action_space = n_action_space self.memory_count = 0 self.state_memory = np.zeros(self.max_memory_size) self.action_memory = np.zeros(self.max_memory_size) self.reward_memory = np.zeros(self.max_memory_size) self.next_state_memory = np.zeros(self.max_memory_size) self.terminal = np.zeros(self.max_memory_size) # 11.トランジション保存のためstore_transitionメソドを作成する def store_transition(self, obs, action, reward, next_state, done): index = self.memory_count % self.max_memory_size # 最大メモリー数に到達したら、古いデータから上書きされていくギミック self.state_memory[index] = obs.detach().numpy().flatten()[0] self.action_memory[index] = action.flatten()[0] self.reward_memory[index] = reward.flatten()[0] self.next_state_memory[index] = next_state.flatten()[0] self.terminal[index] = 1 - int(done) # ゴールならterminal = 0 となるように self.memory_count += 1 # 6.ActorNNクラスを新規作成する class ActorNN(nn.Module): def __init__(self, input_dim, output_dim): print('ActorNN.__init__ is working.') super(ActorNN, self).__init__() self.fc1 = nn.Linear(input_dim, 64) self.fc2 = nn.Linear(64, output_dim) def forward(self, obs): print('AgetDDPG.ActorNN.forward is working') print('====ここまではOK1====') x = self.fc1(obs) x = F.relu(x) mu = self.fc2(x) print('action μ:', mu) print('====ここまではOK2====') action = mu return action # 3.エージェントクラスを定義する class AgentDDPG: def __init__(self, input_dim, output_dim): print('AgentDDPG.__init__ is working.') # 5.ActorNNクラスのインスタンスを生成する self.input_dim = input_dim self.output_dim = output_dim self.actor = ActorNN(input_dim=self.input_dim, output_dim=self.output_dim) # 9.memoryインスタンスを追加 MAX_MEMORY_SIZE = 1000 self.memory = ReplayBuffer(max_memory_size=MAX_MEMORY_SIZE, n_obs_space=self.input_dim, n_action_space=self.output_dim) def choose_action(self, obs): print('AgentDDPG.choose_action is working.') # 4.方策(アクター)はニューラルネットワークで表現する。 # ActorNNクラスを新規作成し、インスタンスactorとして使用する。 action = self.actor.forward(obs) action = action.detach().numpy() print('====ここまではOK3====') return action # 8.remenberメソドを追加 def remember(self, obs, action, reward, next_state, done): self.memory.store_transition(obs, action, reward, next_state, done) # 2.エージェントクラスのインスタンスを生成する agent = AgentDDPG(input_dim=17, output_dim=6) env = gym.make("HalfCheetah-v4", render_mode= 'human') EPISODES = 10 DELAY_TIME = 0.00 # sec total_rewards = [] for eposode in range(EPISODES): obs = env.reset() obs = T.tensor(obs[0], dtype=T.float) print(type(obs)) print('observation_space : ', env.observation_space) print('obs :', obs) reward: float = 0 total_reward: float = 0 done: bool = False for j in range(100): env.render() # ここをDDPGに置き換えていく action = agent.choose_action(obs) # 1.Agentクラスを定義していく print('====ここまではOK4====') print('action_space : ', env.action_space) print('action : ', action) next_state, reward, done, _, info = env.step(action) print('next_state, reward, done, _, info :', next_state, reward, done, _, info) print('====ここまではOK5====') #7. トラジェクトを保存する。経験再生(ReplayBuffer) agent.remember(obs, action, reward, next_state, int(done)) print('next_state:', next_state) obs = next_state obs = T.tensor(obs, dtype=T.float) total_reward += reward time.sleep(DELAY_TIME) print('total_reward : ', total_reward) total_rewards.append(total_reward) print('total_rewards : ', total_rewards) env.close() # 空なんですけど・・・ print('script is done.') # https://gymnasium.farama.org/ print(len(agent.memory.reward_memory)) print(agent.memory.reward_memory) |
The following two tabs change content below.
Keita N
最新記事 by Keita N (全て見る)
- 2024/1/13 ビットコインETFの取引開始:新たな時代の幕開け - 2024年1月13日
- 2024/1/5 日本ビジネスにおける変革の必要性とその方向性 - 2024年1月6日
- 2024/1/3 アメリカ債権ETFの見通しと最新動向 - 2024年1月3日