강화학습에서 유명한 PPO 논문과 이론을 공부하고 -> gym 을 이용하여 2d 환경에서 실험해보고 -> 이 후 물리엔진이 포함된 3차원 환경의 webots 시뮬레이터를 이용해 실제로 로봇에 적용 해보겠습니다.

1. 이론 : Proximal Policy Optimization Algorithms

작성중..

저자: John Schulman, Filip Wolski, Prafulla Dhariwal, Alec Radford, Oleg Klimov

Abstract

환경과의 상호작용을 통해 데이터를 샘플링 -> 확률적 경사 상승(stochastic gradient ascent)을 사용하여 “surrogate” 목적 함수를 최적화하는 새로운 정책 기울기 방법을 제안.
새로운 방법인 Proximal Policy Optimization(PPO)는 Trust Region Policy Optimization(TRPO)의 일부 이점을 가지면서도 구현이 훨씬 간단하고, 더 일반적이며, 샘플 효율성이 더 좋다(실험적으로).

강화 학습은 에이전트(agent)가 환경과 상호작용하여 보상(reward)을 최대화하는 정책(policy)을 학습하는 과정
정책은 주어진 상태(state)에서 어떤 행동(action)을 취할지 결정하는 함수. 정책은 확률적일 수 있으며, 이는 특정 상태에서 행동을 선택할 확률을 나타냄.

Policy Gradient Methods

Introduction

신경망 함수 근사기를 사용한 강화 학습에 대해 여러 가지 접근법이 제안되었습니다. 대표적으로 Deep Q-learning, “vanilla” policy gradient 방법, Trust Region/Natural Policy Gradient 방법이 있습니다. 그러나 더 확장 가능하고, 데이터 효율적이며, 강인한 방법을 개발할 여지가 있습니다. PPO는 TRPO의 데이터 효율성과 신뢰할 수 있는 성능을 달성하면서도 first-order optimization 만을 사용하여 이를 실현하고자 합니다.

정책 기울기 방법

정책 기울기 방법은 정책 기울기의 추정치를 계산하여 확률적 경사 상승 알고리즘에 적용합니다. 가장 일반적으로 사용 되는 기울기 추정치는 다음과 같습니다:

[ hat{g} = mathbb{E}t [nablatheta log pi_theta (a_t | s_t) hat{A}_t] ]

여기서 $(pi_theta)$는 확률적 정책이며 $(hat{A}_t)$는 시간 단계 $(t)$에서의 어드밴티지 함수의 추정치입니다.

정책 기울기 방법은 정책의 성능을 직접적으로 최적화하기 위해 정책의 파라미터에 대한 기울기를 계산합니다. 이 방법은 보통 확률적 경사 상승(stochastic gradient ascent) 알고리즘을 사용하여 정책 파라미터를 업데이트합니다. 기울기 추정은 정책 파라미터를 조정하는 방향과 크기를 결정하는 데 사용됩니다.

기울기 추정의 의미

[ hat{g} = mathbb{E}_t [nabla_theta log pi_theta (a_t | s_t) hat{A}_t] ]

이 식에서 각각의 요소가 무엇을 의미하는지 분석해보겠습니다.

1. 기울기 기호 $(hat{g})$: 

   – 정책 파라미터 $theta$에 대한 성능 기울기의 추정값입니다.

2. 기대값 $(mathbb{E}_t)$:

   – 여러 시간 단계에서의 평균을 나타냅니다. 즉, 여러 에피소드나 샘플을 통해 얻어진 평균 기울기를 의미합니다.

3. 로그 확률의 기울기 ((nabla_theta log pi_theta (a_t | s_t))):

   – 주어진 상태 (s_t)에서 행동 (a_t)를 선택할 확률 (pi_theta(a_t | s_t))의 로그 값에 대한 정책 파라미터 (theta)의 기울기입니다.

   – 이 값은 얼마나 정책 파라미터 (theta)를 조정해야 주어진 행동 (a_t)의 확률을 증가시킬 수 있는지를 나타냅니다.

4. 어드밴티지 함수의 추정치 ((hat{A}_t)):

   – 어드밴티지 함수 (A(s_t, a_t))는 특정 상태에서 특정 행동을 선택하는 것이 얼마나 좋은지를 나타내는 함수입니다. 이는 주로 실제 얻은 보상과 기대되는 보상의 차이로 계산됩니다.

   – 어드밴티지 함수는 주어진 상태에서 특정 행동의 상대적 가치를 평가하는 데 사용됩니다.

#### 기반이 되는 기초 이론

##### REINFORCE 알고리즘

REINFORCE 알고리즘은 가장 기본적인 정책 기울기 방법 중 하나입니다. 이 알고리즘은 각 시간 단계에서 다음과 같이 정책을 업데이트합니다:

[ theta leftarrow theta + alpha hat{g} ]

여기서 (alpha)는 학습률입니다. 정책 기울기 (hat{g})는 다음과 같이 정의됩니다:

[ hat{g} = mathbb{E}_t [nabla_theta log pi_theta (a_t | s_t) hat{R}_t] ]

여기서 (hat{R}_t)는 시간 단계 (t)에서의 실제 보상의 합입니다. 그러나 REINFORCE 알고리즘은 고차원의 상태 공간에서 샘플 효율성이 낮다는 단점이 있습니다. 이를 개선하기 위해 어드밴티지 함수를 사용하는 방법이 발전되었습니다.

##### 어드밴티지 함수

어드밴티지 함수 (A(s_t, a_t))는 다음과 같이 정의됩니다:

[ A(s_t, a_t) = Q(s_t, a_t) – V(s_t) ]

여기서 (Q(s_t, a_t))는 상태-행동 값 함수로, 특정 상태에서 특정 행동을 취했을 때 기대되는 총 보상을 나타냅니다. (V(s_t))는 상태 값 함수로, 특정 상태에서 기대되는 총 보상을 나타냅니다. 어드밴티지 함수는 특정 행동이 평균보다 얼마나 더 나은지를 평가합니다.

##### PPO의 개선

PPO는 기존의 정책 기울기 방법들을 개선한 알고리즘입니다. 특히, PPO는 확률 비율을 클리핑하여 정책 업데이트가 과도하게 이루어지는 것을 방지합니다:

[ L_{CLIP} (theta) = mathbb{E}_t left[ min left( r_t(theta) hat{A}_t, text{clip}(r_t(theta), 1-epsilon, 1+epsilon) hat{A}_t right) right] ]

여기서 (r_t(theta) = frac{pi_theta(a_t | s_t)}{pi_{theta_{text{old}}}(a_t | s_t)})는 새로운 정책과 기존 정책의 확률 비율입니다. 클리핑을 통해 정책 업데이트의 안정성을 높입니다.

목적 함수:

[ L_{PG}(theta) = mathbb{E}t [log pitheta (a_t | s_t) hat{A}_t] ]

신뢰 구간 방법

TRPO에서 목적 함수(“대리” 목적 함수)는 정책 업데이트 크기에 제약을 두고 최대화됩니다:

[ text{maximize} quad mathbb{E}t left[ frac{pitheta(a_t | s_t)}{pi_{theta_{text{old}}}(a_t | s_t)} hat{A}_t right] ]

다음 제약 조건을 만족하면서:

[ mathbb{E}t [text{KL}[pi{theta_{text{old}}}(· | s_t), pi_theta(· | s_t)]] leq delta ]

클립된 대리 목적 함수

과도한 정책 업데이트를 방지하기 위해, 확률 비율을 1에서 너무 벗어나지 않도록 클리핑하여 목적 함수를 수정합니다:

[ L_{CLIP} (theta) = mathbb{E}_t left[ min left( r_t(theta) hat{A}_t, text{clip}(r_t(theta), 1-epsilon, 1+epsilon) hat{A}_t right) right] ]

여기서 ε는 하이퍼파라미터로, 일반적으로 0.2를 사용합니다.

2. 알고리즘 테스트, 2d 환경 gym [gymnasium]

설명에서 포함되어 있는 코드를 순서대로 모두 복사하면 코드가 에러없이 실행이 된다. [2024. 07. 20 마지막 확인]

시뮬레이션 환경 구성

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributions as distributions

import matplotlib.pyplot as plt
import numpy as np
import gymnasium as gym

train_env = gym.make('CartPole-v1')
test_env = gym.make('CartPole-v1')
SEED = 12
train_env.reset(seed=SEED)
test_env.reset(seed=SEED+1)
np.random.seed(SEED)
torch.manual_seed(SEED)


env=gym.make('CartPole-v1', render_mode='human')

done = False
truc = False

state , _= env.reset(seed=SEED+1)

for i in range(5):
    state , _= env.reset(seed=SEED+1)
    while 1:
        state, reward, done, truc ,_= env.step(env.action_space.sample())
        print(f'scenario:{i}')
        print(state, reward, done, truc)
        if done | truc:
            break

env.close() 

여기까지의 코드를 실행하면 아래와 같은 기본적인 시뮬레이션이 실행되는 것을 확인 할 수 있다.
image 74

openai 의 gym 라이브러리를 사용해서 cartpole 환경을 설정한다.
seed 는 실험의 재현성을 위해 사용되는데 -> seed 를 고정함으로서 실험의 일관성을 유지할 수 있다.
train 과 test 의 seed 를 다르게 한 것은 정확한 성능 평가를 위해서

PPO 알고리즘

class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout = 0.5):
        super().__init__()

        self.fc_1 = nn.Linear(input_dim, hidden_dim)
        self.fc_2 = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        x = self.fc_1(x)
        x = self.dropout(x)
        x = F.relu(x)
        x = self.fc_2(x)
        return x

class ActorCritic(nn.Module):
    def __init__(self, actor, critic):
        super().__init__()
        
        self.actor = actor
        self.critic = critic
        
    def forward(self, state):
        
        action_pred = self.actor(state)
        value_pred = self.critic(state)
        
        return action_pred, value_pred

INPUT_DIM = train_env.observation_space.shape[0]
HIDDEN_DIM = 128
OUTPUT_DIM = train_env.action_space.n

actor = MLP(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM)
critic = MLP(INPUT_DIM, HIDDEN_DIM, 1)

policy = ActorCritic(actor, critic)

def init_weights(m):
    if type(m) == nn.Linear:
        torch.nn.init.xavier_normal_(m.weight)
        m.bias.data.fill_(0)

policy.apply(init_weights)

LEARNING_RATE = 0.01

optimizer = optim.Adam(policy.parameters(), lr = LEARNING_RATE)

def train(env, policy, optimizer, discount_factor, ppo_steps, ppo_clip):
        
    policy.train()
        
    states = []
    actions = []
    log_prob_actions = []
    values = []
    rewards = []
    done = False
    truc = False
    episode_reward = 0

    state,_ = env.reset(seed=SEED)

    while not (done or truc):

        state = torch.FloatTensor(state).unsqueeze(0)

        #append state here, not after we get the next state from env.step()
        states.append(state)
        
        action_pred, value_pred = policy(state)
                
        action_prob = F.softmax(action_pred, dim = -1)
                
        dist = distributions.Categorical(action_prob)
        
        action = dist.sample()
        
        log_prob_action = dist.log_prob(action)
        
        state, reward, done, truc, _ = env.step(action.item())

        actions.append(action)
        log_prob_actions.append(log_prob_action)
        values.append(value_pred)
        rewards.append(reward)
        
        episode_reward += reward
    
    states = torch.cat(states)
    actions = torch.cat(actions)    
    log_prob_actions = torch.cat(log_prob_actions)
    values = torch.cat(values).squeeze(-1)
    
    returns = calculate_returns(rewards, discount_factor)
    advantages = calculate_advantages(returns, values)
    
    policy_loss, value_loss = update_policy(policy, states, actions, log_prob_actions, advantages, returns, optimizer, ppo_steps, ppo_clip)

    return policy_loss, value_loss, episode_reward

def calculate_returns(rewards, discount_factor, normalize = True):
    
    returns = []
    R = 0
    
    for r in reversed(rewards):
        R = r + R * discount_factor
        returns.insert(0, R)
        
    returns = torch.tensor(returns)
    
    if normalize:
        returns = (returns - returns.mean()) / returns.std()
        
    return returns

def calculate_advantages(returns, values, normalize = True):
    
    advantages = returns - values
    
    if normalize:
        
        advantages = (advantages - advantages.mean()) / advantages.std()
        
    return advantages

def update_policy(policy, states, actions, log_prob_actions, advantages, returns, optimizer, ppo_steps, ppo_clip):
    
    total_policy_loss = 0 
    total_value_loss = 0
    
    advantages = advantages.detach()
    log_prob_actions = log_prob_actions.detach()
    actions = actions.detach()
    
    for _ in range(ppo_steps):
                
        #get new log prob of actions for all input states
        action_pred, value_pred = policy(states)
        value_pred = value_pred.squeeze(-1)
        action_prob = F.softmax(action_pred, dim = -1)
        dist = distributions.Categorical(action_prob)
        
        #new log prob using old actions
        new_log_prob_actions = dist.log_prob(actions)
        
        policy_ratio = (new_log_prob_actions - log_prob_actions).exp()
                
        policy_loss_1 = policy_ratio * advantages
        policy_loss_2 = torch.clamp(policy_ratio, min = 1.0 - ppo_clip, max = 1.0 + ppo_clip) * advantages
        
        policy_loss = - torch.min(policy_loss_1, policy_loss_2).sum()
        
        value_loss = F.smooth_l1_loss(returns, value_pred).sum()
    
        optimizer.zero_grad()

        policy_loss.backward()
        value_loss.backward()

        optimizer.step()
    
        total_policy_loss += policy_loss.item()
        total_value_loss += value_loss.item()
    
    return total_policy_loss / ppo_steps, total_value_loss / ppo_steps

def evaluate(env, policy):
    
    policy.eval()
    
    rewards = []
    done = False
    truc = False
    episode_reward = 0

    state , _= env.reset(seed=SEED+1)

    while not (done or truc):

        state = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
        
            action_pred, _ = policy(state)

            action_prob = F.softmax(action_pred, dim = -1)
                
        action = torch.argmax(action_prob, dim = -1)
                
        state, reward, done, truc ,_= env.step(action.item())

        episode_reward += reward
        
    return episode_reward
image 77
image 75

gym 을 통해 구성한 cartpole 환경은 위 사진과 같다.
action space 는 ‘Discrete(2)’, 따라서 actor 의 output 차원은 2가 된다.
환경(observation) 의 차원은 4이다.
critic 은 가치 네트워크를 나타낸다. output 차원(1) 은 상태의 가치를 출력한다.

image 78

‘action_pred, value_pred = policy(state)’ : 정책 네트워크를 통해 행동과 상태 가치를 예측

-> ‘F.softmax’ : 행동 확률을 softmax 함수를 사용하여 계산
-> ‘Categorical’ : 행동 활률 분포를 생성
-> ‘dist.sample()’ : 행동을 샘플링하고
-> ‘dist.log_prob(action)’ : 선택한 행동의 로그 확률을 계산

ex)
네트워크에서 나온 raw score 가 action 이 [0.3, 0.5] 라고 하자. (cart 를 왼쪽으로 미는 것이 0.3)
softmax 를 거치면 [0.4502, 0.5498]. 이 값들은 각 행동(왼쪽, 오른쪽) 에 대한 확률을 나타낸다. 이를 행동 확률 분포로 바꾸고 sampling 함으로써 오른쪽 값이 선택되어 1 이라는 결과가 나온다.
이후 로그 확률을 계산하면 -0.5981 값이 나온다. log PI(1) = log (0.5498) ~= -0.5981

-> ‘~ = env.step()’ : 환경에서 선택한 행동을 수행하고 다음 상태, 보상, 종료 여부를 얻는다.
-> 에피소드가 종료 후
-> 보상 리스트와 discount_factor 를 사용하여 return 값을 계산한다.
-> 반환 값과 가치 리스트를 사용하여 advantage 를 계산한다.
-> 마지막으로, 정책을 업데이트하고 loss 를 반환한다.

image 79

return 값은 위와 같이 계산된다.
return 값은 특정 시점에서 시작하여 미래의 보상을 고려한 총 보상의 추정치이다.
$$ R_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k} $$
일반적으로 return 은 위와 같이 계산된다.
gamma 는 할인 계수로서 미래 보상의 현재 가치를 조정한다. gamma 가 0에 가까우면 즉각적인 보상만 중요하고, 1에 가까우면 미래의 보상도 중요하게 평가한다.

-> ‘reversed’ : 리턴 계산은 보상 목록을 역순으로 순회하여 각 시간 스템에서의 리턴을 계산하는 방식으로 이루어진다.
이후 정규화를 수행하여 학습의 안정성과 효율성을 높인다.

[t=0, t=1, t=end] 에 대한 보상이 저장되어 있을 것이므로 이를 역순으로 하여 [t=end, t=1, t=0] 으로 바꾼다. 그렇게 하면 t=end 부터 계산하게 되고 discount_factor(=1이 아닐때) 가 여러번 곱해지게 될 것 이므로 앞서 설명했던 미래의 보상의 가치가 점점 낮아지도록 return 값이 결정되는 것이다.
보통 discount_factor 를 1로 하지 않는데,생각해보면 당장 로봇이 넘어지려고 하는데 우선 막고 봐야 할 것 아닌가?!

image 81

이 advantage 는 특정 상태-행동 쌍이 평균적인 상태-행동 쌍보다 얼마나 더 좋은지 측정하는 지표이고, 정책의 업데이트에 중요한 역할을 한다.

$$ \hat{A}_t = R_t – V(s_t) $$

위 수식에서 advantage 가 양수이면, 해당 상태-행동 쌍이 평균적이 상태-행동 쌍보다 더 나은 것으로 간주한다. 즉 현재(특정 t 에서의) return 결과가 현재(특정 t 에서의) 에서의 상태 가치 함수 결과보다 큰 것이다. 이때 상태 가치 함수 결과는 위에서 정책 네트워크를 통해 나온 결과이다.

image 82

-> ‘policy_loss_1’ : 정책 비율과 advantage 값을 곱하여 계산한다.
-> ‘policy_loss_2’ : 클리핑을 적용하여 계산된 정책 비율과 advantage 를 곱한다. 이를 통해 손실을 과도하게 증가시키는 것을 방지한다.

시뮬레이션 결과 확인

MAX_EPISODES = 1500
DISCOUNT_FACTOR = 0.99
N_TRIALS = 25
REWARD_THRESHOLD = 500# 475
PRINT_EVERY = 10
PPO_STEPS = 5
PPO_CLIP = 0.2

train_rewards = []
test_rewards = []

for episode in range(1, MAX_EPISODES+1):
    
    policy_loss, value_loss, train_reward = train(train_env, policy, optimizer, DISCOUNT_FACTOR, PPO_STEPS, PPO_CLIP)
    
    test_reward = evaluate(test_env, policy)
    
    train_rewards.append(train_reward)
    test_rewards.append(test_reward)
    
    mean_train_rewards = np.mean(train_rewards[-N_TRIALS:])
    mean_test_rewards = np.mean(test_rewards[-N_TRIALS:])
    
    if episode % PRINT_EVERY == 0:
    
        print(f'| Episode: {episode:3} | Mean Train Rewards: {mean_train_rewards:5.1f} | Mean Test Rewards: {mean_test_rewards:5.1f} |')
    
    if mean_test_rewards >= REWARD_THRESHOLD:
        
        print(f'Reached reward threshold in {episode} episodes')
        
        break

plt.figure(figsize=(12,8))
plt.plot(test_rewards, label='Test Reward')
plt.plot(train_rewards, label='Train Reward')
plt.xlabel('Episode', fontsize=20)
plt.ylabel('Reward', fontsize=20)
plt.hlines(REWARD_THRESHOLD, 0, len(test_rewards), color='r')
plt.legend(loc='lower right')
plt.grid()
image 73

위와 같은 결과를 얻을 수 있다.

3. Webots 시뮬레이션

webots 시뮬레이션에 적용해보자.


0 Comments

Leave a Reply