강화학습에서 유명한 PPO 논문과 이론을 공부하고 -> gym 을 이용하여 2d 환경에서 실험해보고 -> 이 후 물리엔진이 포함된 3차원 환경의 webots 시뮬레이터를 이용해 실제로 로봇에 적용 해보겠습니다.
1. 이론 : Proximal Policy Optimization Algorithms
작성중..
저자: John Schulman, Filip Wolski, Prafulla Dhariwal, Alec Radford, Oleg Klimov
Abstract
환경과의 상호작용을 통해 데이터를 샘플링 -> 확률적 경사 상승(stochastic gradient ascent)을 사용하여 “surrogate” 목적 함수를 최적화하는 새로운 정책 기울기 방법을 제안.
새로운 방법인 Proximal Policy Optimization(PPO)는 Trust Region Policy Optimization(TRPO)의 일부 이점을 가지면서도 구현이 훨씬 간단하고, 더 일반적이며, 샘플 효율성이 더 좋다(실험적으로).
강화 학습은 에이전트(agent)가 환경과 상호작용하여 보상(reward)을 최대화하는 정책(policy)을 학습하는 과정
정책은 주어진 상태(state)에서 어떤 행동(action)을 취할지 결정하는 함수. 정책은 확률적일 수 있으며, 이는 특정 상태에서 행동을 선택할 확률을 나타냄.
Policy Gradient Methods
Introduction
신경망 함수 근사기를 사용한 강화 학습에 대해 여러 가지 접근법이 제안되었습니다. 대표적으로 Deep Q-learning, “vanilla” policy gradient 방법, Trust Region/Natural Policy Gradient 방법이 있습니다. 그러나 더 확장 가능하고, 데이터 효율적이며, 강인한 방법을 개발할 여지가 있습니다. PPO는 TRPO의 데이터 효율성과 신뢰할 수 있는 성능을 달성하면서도 first-order optimization 만을 사용하여 이를 실현하고자 합니다.
정책 기울기 방법
정책 기울기 방법은 정책 기울기의 추정치를 계산하여 확률적 경사 상승 알고리즘에 적용합니다. 가장 일반적으로 사용 되는 기울기 추정치는 다음과 같습니다:
[ hat{g} = mathbb{E}t [nablatheta log pi_theta (a_t | s_t) hat{A}_t] ]
여기서 $(pi_theta)$는 확률적 정책이며 $(hat{A}_t)$는 시간 단계 $(t)$에서의 어드밴티지 함수의 추정치입니다.
정책 기울기 방법은 정책의 성능을 직접적으로 최적화하기 위해 정책의 파라미터에 대한 기울기를 계산합니다. 이 방법은 보통 확률적 경사 상승(stochastic gradient ascent) 알고리즘을 사용하여 정책 파라미터를 업데이트합니다. 기울기 추정은 정책 파라미터를 조정하는 방향과 크기를 결정하는 데 사용됩니다.
기울기 추정의 의미
[ hat{g} = mathbb{E}_t [nabla_theta log pi_theta (a_t | s_t) hat{A}_t] ]
이 식에서 각각의 요소가 무엇을 의미하는지 분석해보겠습니다.
1. 기울기 기호 $(hat{g})$:
– 정책 파라미터 $theta$에 대한 성능 기울기의 추정값입니다.
2. 기대값 $(mathbb{E}_t)$:
– 여러 시간 단계에서의 평균을 나타냅니다. 즉, 여러 에피소드나 샘플을 통해 얻어진 평균 기울기를 의미합니다.
3. 로그 확률의 기울기 ((nabla_theta log pi_theta (a_t | s_t))):
– 주어진 상태 (s_t)에서 행동 (a_t)를 선택할 확률 (pi_theta(a_t | s_t))의 로그 값에 대한 정책 파라미터 (theta)의 기울기입니다.
– 이 값은 얼마나 정책 파라미터 (theta)를 조정해야 주어진 행동 (a_t)의 확률을 증가시킬 수 있는지를 나타냅니다.
4. 어드밴티지 함수의 추정치 ((hat{A}_t)):
– 어드밴티지 함수 (A(s_t, a_t))는 특정 상태에서 특정 행동을 선택하는 것이 얼마나 좋은지를 나타내는 함수입니다. 이는 주로 실제 얻은 보상과 기대되는 보상의 차이로 계산됩니다.
– 어드밴티지 함수는 주어진 상태에서 특정 행동의 상대적 가치를 평가하는 데 사용됩니다.
#### 기반이 되는 기초 이론
##### REINFORCE 알고리즘
REINFORCE 알고리즘은 가장 기본적인 정책 기울기 방법 중 하나입니다. 이 알고리즘은 각 시간 단계에서 다음과 같이 정책을 업데이트합니다:
[ theta leftarrow theta + alpha hat{g} ]
여기서 (alpha)는 학습률입니다. 정책 기울기 (hat{g})는 다음과 같이 정의됩니다:
[ hat{g} = mathbb{E}_t [nabla_theta log pi_theta (a_t | s_t) hat{R}_t] ]
여기서 (hat{R}_t)는 시간 단계 (t)에서의 실제 보상의 합입니다. 그러나 REINFORCE 알고리즘은 고차원의 상태 공간에서 샘플 효율성이 낮다는 단점이 있습니다. 이를 개선하기 위해 어드밴티지 함수를 사용하는 방법이 발전되었습니다.
##### 어드밴티지 함수
어드밴티지 함수 (A(s_t, a_t))는 다음과 같이 정의됩니다:
[ A(s_t, a_t) = Q(s_t, a_t) – V(s_t) ]
여기서 (Q(s_t, a_t))는 상태-행동 값 함수로, 특정 상태에서 특정 행동을 취했을 때 기대되는 총 보상을 나타냅니다. (V(s_t))는 상태 값 함수로, 특정 상태에서 기대되는 총 보상을 나타냅니다. 어드밴티지 함수는 특정 행동이 평균보다 얼마나 더 나은지를 평가합니다.
##### PPO의 개선
PPO는 기존의 정책 기울기 방법들을 개선한 알고리즘입니다. 특히, PPO는 확률 비율을 클리핑하여 정책 업데이트가 과도하게 이루어지는 것을 방지합니다:
[ L_{CLIP} (theta) = mathbb{E}_t left[ min left( r_t(theta) hat{A}_t, text{clip}(r_t(theta), 1-epsilon, 1+epsilon) hat{A}_t right) right] ]
여기서 (r_t(theta) = frac{pi_theta(a_t | s_t)}{pi_{theta_{text{old}}}(a_t | s_t)})는 새로운 정책과 기존 정책의 확률 비율입니다. 클리핑을 통해 정책 업데이트의 안정성을 높입니다.
목적 함수:
[ L_{PG}(theta) = mathbb{E}t [log pitheta (a_t | s_t) hat{A}_t] ]
신뢰 구간 방법
TRPO에서 목적 함수(“대리” 목적 함수)는 정책 업데이트 크기에 제약을 두고 최대화됩니다:
[ text{maximize} quad mathbb{E}t left[ frac{pitheta(a_t | s_t)}{pi_{theta_{text{old}}}(a_t | s_t)} hat{A}_t right] ]
다음 제약 조건을 만족하면서:
[ mathbb{E}t [text{KL}[pi{theta_{text{old}}}(· | s_t), pi_theta(· | s_t)]] leq delta ]
클립된 대리 목적 함수
과도한 정책 업데이트를 방지하기 위해, 확률 비율을 1에서 너무 벗어나지 않도록 클리핑하여 목적 함수를 수정합니다:
[ L_{CLIP} (theta) = mathbb{E}_t left[ min left( r_t(theta) hat{A}_t, text{clip}(r_t(theta), 1-epsilon, 1+epsilon) hat{A}_t right) right] ]
여기서 ε는 하이퍼파라미터로, 일반적으로 0.2를 사용합니다.
2. 알고리즘 테스트, 2d 환경 gym [gymnasium]
설명에서 포함되어 있는 코드를 순서대로 모두 복사하면 코드가 에러없이 실행이 된다. [2024. 07. 20 마지막 확인]
시뮬레이션 환경 구성
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributions as distributions
import matplotlib.pyplot as plt
import numpy as np
import gymnasium as gym
train_env = gym.make('CartPole-v1')
test_env = gym.make('CartPole-v1')
SEED = 12
train_env.reset(seed=SEED)
test_env.reset(seed=SEED+1)
np.random.seed(SEED)
torch.manual_seed(SEED)
env=gym.make('CartPole-v1', render_mode='human')
done = False
truc = False
state , _= env.reset(seed=SEED+1)
for i in range(5):
state , _= env.reset(seed=SEED+1)
while 1:
state, reward, done, truc ,_= env.step(env.action_space.sample())
print(f'scenario:{i}')
print(state, reward, done, truc)
if done | truc:
break
env.close()
여기까지의 코드를 실행하면 아래와 같은 기본적인 시뮬레이션이 실행되는 것을 확인 할 수 있다.
openai 의 gym 라이브러리를 사용해서 cartpole 환경을 설정한다.
seed 는 실험의 재현성을 위해 사용되는데 -> seed 를 고정함으로서 실험의 일관성을 유지할 수 있다.
train 과 test 의 seed 를 다르게 한 것은 정확한 성능 평가를 위해서
PPO 알고리즘
class MLP(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, dropout = 0.5):
super().__init__()
self.fc_1 = nn.Linear(input_dim, hidden_dim)
self.fc_2 = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.fc_1(x)
x = self.dropout(x)
x = F.relu(x)
x = self.fc_2(x)
return x
class ActorCritic(nn.Module):
def __init__(self, actor, critic):
super().__init__()
self.actor = actor
self.critic = critic
def forward(self, state):
action_pred = self.actor(state)
value_pred = self.critic(state)
return action_pred, value_pred
INPUT_DIM = train_env.observation_space.shape[0]
HIDDEN_DIM = 128
OUTPUT_DIM = train_env.action_space.n
actor = MLP(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM)
critic = MLP(INPUT_DIM, HIDDEN_DIM, 1)
policy = ActorCritic(actor, critic)
def init_weights(m):
if type(m) == nn.Linear:
torch.nn.init.xavier_normal_(m.weight)
m.bias.data.fill_(0)
policy.apply(init_weights)
LEARNING_RATE = 0.01
optimizer = optim.Adam(policy.parameters(), lr = LEARNING_RATE)
def train(env, policy, optimizer, discount_factor, ppo_steps, ppo_clip):
policy.train()
states = []
actions = []
log_prob_actions = []
values = []
rewards = []
done = False
truc = False
episode_reward = 0
state,_ = env.reset(seed=SEED)
while not (done or truc):
state = torch.FloatTensor(state).unsqueeze(0)
#append state here, not after we get the next state from env.step()
states.append(state)
action_pred, value_pred = policy(state)
action_prob = F.softmax(action_pred, dim = -1)
dist = distributions.Categorical(action_prob)
action = dist.sample()
log_prob_action = dist.log_prob(action)
state, reward, done, truc, _ = env.step(action.item())
actions.append(action)
log_prob_actions.append(log_prob_action)
values.append(value_pred)
rewards.append(reward)
episode_reward += reward
states = torch.cat(states)
actions = torch.cat(actions)
log_prob_actions = torch.cat(log_prob_actions)
values = torch.cat(values).squeeze(-1)
returns = calculate_returns(rewards, discount_factor)
advantages = calculate_advantages(returns, values)
policy_loss, value_loss = update_policy(policy, states, actions, log_prob_actions, advantages, returns, optimizer, ppo_steps, ppo_clip)
return policy_loss, value_loss, episode_reward
def calculate_returns(rewards, discount_factor, normalize = True):
returns = []
R = 0
for r in reversed(rewards):
R = r + R * discount_factor
returns.insert(0, R)
returns = torch.tensor(returns)
if normalize:
returns = (returns - returns.mean()) / returns.std()
return returns
def calculate_advantages(returns, values, normalize = True):
advantages = returns - values
if normalize:
advantages = (advantages - advantages.mean()) / advantages.std()
return advantages
def update_policy(policy, states, actions, log_prob_actions, advantages, returns, optimizer, ppo_steps, ppo_clip):
total_policy_loss = 0
total_value_loss = 0
advantages = advantages.detach()
log_prob_actions = log_prob_actions.detach()
actions = actions.detach()
for _ in range(ppo_steps):
#get new log prob of actions for all input states
action_pred, value_pred = policy(states)
value_pred = value_pred.squeeze(-1)
action_prob = F.softmax(action_pred, dim = -1)
dist = distributions.Categorical(action_prob)
#new log prob using old actions
new_log_prob_actions = dist.log_prob(actions)
policy_ratio = (new_log_prob_actions - log_prob_actions).exp()
policy_loss_1 = policy_ratio * advantages
policy_loss_2 = torch.clamp(policy_ratio, min = 1.0 - ppo_clip, max = 1.0 + ppo_clip) * advantages
policy_loss = - torch.min(policy_loss_1, policy_loss_2).sum()
value_loss = F.smooth_l1_loss(returns, value_pred).sum()
optimizer.zero_grad()
policy_loss.backward()
value_loss.backward()
optimizer.step()
total_policy_loss += policy_loss.item()
total_value_loss += value_loss.item()
return total_policy_loss / ppo_steps, total_value_loss / ppo_steps
def evaluate(env, policy):
policy.eval()
rewards = []
done = False
truc = False
episode_reward = 0
state , _= env.reset(seed=SEED+1)
while not (done or truc):
state = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
action_pred, _ = policy(state)
action_prob = F.softmax(action_pred, dim = -1)
action = torch.argmax(action_prob, dim = -1)
state, reward, done, truc ,_= env.step(action.item())
episode_reward += reward
return episode_reward
gym 을 통해 구성한 cartpole 환경은 위 사진과 같다.
action space 는 ‘Discrete(2)’, 따라서 actor 의 output 차원은 2가 된다.
환경(observation) 의 차원은 4이다.
critic 은 가치 네트워크를 나타낸다. output 차원(1) 은 상태의 가치를 출력한다.
‘action_pred, value_pred = policy(state)’ : 정책 네트워크를 통해 행동과 상태 가치를 예측
-> ‘F.softmax’ : 행동 확률을 softmax 함수를 사용하여 계산
-> ‘Categorical’ : 행동 활률 분포를 생성
-> ‘dist.sample()’ : 행동을 샘플링하고
-> ‘dist.log_prob(action)’ : 선택한 행동의 로그 확률을 계산
ex)
네트워크에서 나온 raw score 가 action 이 [0.3, 0.5] 라고 하자. (cart 를 왼쪽으로 미는 것이 0.3)
softmax 를 거치면 [0.4502, 0.5498]. 이 값들은 각 행동(왼쪽, 오른쪽) 에 대한 확률을 나타낸다. 이를 행동 확률 분포로 바꾸고 sampling 함으로써 오른쪽 값이 선택되어 1 이라는 결과가 나온다.
이후 로그 확률을 계산하면 -0.5981 값이 나온다. log PI(1) = log (0.5498) ~= -0.5981
-> ‘~ = env.step()’ : 환경에서 선택한 행동을 수행하고 다음 상태, 보상, 종료 여부를 얻는다.
-> 에피소드가 종료 후
-> 보상 리스트와 discount_factor 를 사용하여 return 값을 계산한다.
-> 반환 값과 가치 리스트를 사용하여 advantage 를 계산한다.
-> 마지막으로, 정책을 업데이트하고 loss 를 반환한다.
return 값은 위와 같이 계산된다.
return 값은 특정 시점에서 시작하여 미래의 보상을 고려한 총 보상의 추정치이다.
$$ R_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k} $$
일반적으로 return 은 위와 같이 계산된다.
gamma 는 할인 계수로서 미래 보상의 현재 가치를 조정한다. gamma 가 0에 가까우면 즉각적인 보상만 중요하고, 1에 가까우면 미래의 보상도 중요하게 평가한다.
-> ‘reversed’ : 리턴 계산은 보상 목록을 역순으로 순회하여 각 시간 스템에서의 리턴을 계산하는 방식으로 이루어진다.
이후 정규화를 수행하여 학습의 안정성과 효율성을 높인다.
[t=0, t=1, t=end] 에 대한 보상이 저장되어 있을 것이므로 이를 역순으로 하여 [t=end, t=1, t=0] 으로 바꾼다. 그렇게 하면 t=end 부터 계산하게 되고 discount_factor(=1이 아닐때) 가 여러번 곱해지게 될 것 이므로 앞서 설명했던 미래의 보상의 가치가 점점 낮아지도록 return 값이 결정되는 것이다.
보통 discount_factor 를 1로 하지 않는데,생각해보면 당장 로봇이 넘어지려고 하는데 우선 막고 봐야 할 것 아닌가?!
이 advantage 는 특정 상태-행동 쌍이 평균적인 상태-행동 쌍보다 얼마나 더 좋은지 측정하는 지표이고, 정책의 업데이트에 중요한 역할을 한다.
$$ \hat{A}_t = R_t – V(s_t) $$
위 수식에서 advantage 가 양수이면, 해당 상태-행동 쌍이 평균적이 상태-행동 쌍보다 더 나은 것으로 간주한다. 즉 현재(특정 t 에서의) return 결과가 현재(특정 t 에서의) 에서의 상태 가치 함수 결과보다 큰 것이다. 이때 상태 가치 함수 결과는 위에서 정책 네트워크를 통해 나온 결과이다.
-> ‘policy_loss_1’ : 정책 비율과 advantage 값을 곱하여 계산한다.
-> ‘policy_loss_2’ : 클리핑을 적용하여 계산된 정책 비율과 advantage 를 곱한다. 이를 통해 손실을 과도하게 증가시키는 것을 방지한다.
시뮬레이션 결과 확인
MAX_EPISODES = 1500
DISCOUNT_FACTOR = 0.99
N_TRIALS = 25
REWARD_THRESHOLD = 500# 475
PRINT_EVERY = 10
PPO_STEPS = 5
PPO_CLIP = 0.2
train_rewards = []
test_rewards = []
for episode in range(1, MAX_EPISODES+1):
policy_loss, value_loss, train_reward = train(train_env, policy, optimizer, DISCOUNT_FACTOR, PPO_STEPS, PPO_CLIP)
test_reward = evaluate(test_env, policy)
train_rewards.append(train_reward)
test_rewards.append(test_reward)
mean_train_rewards = np.mean(train_rewards[-N_TRIALS:])
mean_test_rewards = np.mean(test_rewards[-N_TRIALS:])
if episode % PRINT_EVERY == 0:
print(f'| Episode: {episode:3} | Mean Train Rewards: {mean_train_rewards:5.1f} | Mean Test Rewards: {mean_test_rewards:5.1f} |')
if mean_test_rewards >= REWARD_THRESHOLD:
print(f'Reached reward threshold in {episode} episodes')
break
plt.figure(figsize=(12,8))
plt.plot(test_rewards, label='Test Reward')
plt.plot(train_rewards, label='Train Reward')
plt.xlabel('Episode', fontsize=20)
plt.ylabel('Reward', fontsize=20)
plt.hlines(REWARD_THRESHOLD, 0, len(test_rewards), color='r')
plt.legend(loc='lower right')
plt.grid()
위와 같은 결과를 얻을 수 있다.
3. Webots 시뮬레이션
webots 시뮬레이션에 적용해보자.
0 Comments