1. Frozen Lake 환경


import gymnasium as gym
env=gym.make('FrozenLake-v1', render_mode='human')
env.reset()
for _ in range(100):
    env.render()
    env.step(env.action_space.sample())
env.close()
obs=env.reset()
for _ in range(100):
    env.render()
    obs,re,done,tr,info=env.step(env.action_space.sample())
env.close()
image 5

frozen lake

1.1. Q-learning

decaying E-greedy



import gymnasium as gym
# from importlib_metadata import entry_points
import numpy as np
import matplotlib.pyplot as plt
# from gym.envs.registration import register
import random as pr

def rargmax(vector):
    m=np.amax(vector)
    indices=np.nonzero(vector==m)[0]
    return pr.choice(indices)

# env=gym.make('FrozenLake-v1', render_mode='human', is_slippery= False)
env=gym.make('FrozenLake-v1', render_mode=None ,is_slippery= False)

# env.render()
Q=np.zeros([env.observation_space.n,env.action_space.n])

learning_rate=0.85
num_episodes=2000
dis=0.99

rList=[]
for i in range(num_episodes):
    # e=1./((i/100)+1);
    e=1./((i/50)+10)
    state,info=env.reset()
    rAll=0
    done=False
    while not done:
        
        
        if np.random.rand(1)<e:
            action = env.action_space.sample()
        else:
            action = rargmax(Q[state,:])

        #noise
        # action=np.argmax(Q[state,:]+np.random.randn(1,env.action_space.n)/(i+1))
        
        new_state, reward,done,_,_=env.step(action)
        
        # if reward>0:
            # print("done and reward")
            # print(reward)
            
        # env.render()
        Q[state,action]=reward+dis*np.max(Q[new_state,:])
        
        # Q[state,action]=(1-learning_rate)*Q[state,action]\
        #     +learning_rate*(reward+dis*np.max(Q[new_state,:]))

        rAll += reward
        state = new_state

    rList.append(rAll)

print("sucess rate:"+str(sum(rList)/num_episodes))
print("Final Q-Table Values")
print("LEFT DOWN RIGHT UP")
print(Q)
plt.bar(range(len(rList)),rList,color="blue")
plt.show()

# env=gym.make('FrozenLake-v1', render_mode='human',is_slippery= False)
# obs=env.reset()
# for _ in range(100):
#     env.render()
#     obs,re,done,tr,info=env.step(np.argmax(Q[state,:]+np.random.randn(1,env.action_space.n)))
#     if done:
#         obs=env.reset()
# env.close()

위에 코드를 실행하면 아래와 같은 결과를 얻을 수 있다.

image 11
image 12
image 8

discount factor 는 0.99 이고 2000번의 episode 를 돌게 된다.
noise 를 추가 하는 방법으로 exploration 을 하게 된다. noise e 는 episode 가 진행됨에 따라 감소한다. 점차 학습이 진행되면서 탐험을 줄이는 것이다.

학습된 q-table 을 보면 discount factor 를 사용한 것이 들어나듯 0.98~ 와 같은 1이 아닌 값이다.

결국 Q-learning 은 MDP 환경에서 최적의 정책을 학습하는 강화학습 알고리즘이다.
MDP 는 S, A, P, R. γ 로 이루어진다.

s는 현재 상태 (state)
a는 현재 상태에서 취한 행동 (action)
r는 해당 행동을 취한 후 얻은 보상 (reward)
a를 취한 후의 새로운 상태 (new state)
γ는 미래 보상에 대한 할인율 (discount factor), 코드에서는 𝛾=0.99 로 설정된다.
α는 학습률 (learning rate), 코드에서는 α=1로 가정됨 (기본적으로 Q-값 업데이트 수식에서는 생략됨)

1.2. Q-learning (stochastic, nondeterministic)

하지만 위에 코드에서 env=gym.make(‘FrozenLake-v1’, render_mode=None ,is_slippery= False) 부분의 is_slippery=True 로 바꾸어 주면 결과가 다르게 변한다.

image 13

이는 이전 알고리즘의 단점이 들어나느 결과이다. 환경의 is slippery 를 True 로 한 것은 사람? 이 오른쪽으로 가라라고 했을 때 오른쪽으로 가지 않을 확률이 생긴 것을 의미한다. 즉 상태 전이 확률이 1 이 아니게 된 것이다.

좀 더 풀어서 설명하면 사람이 Q 에게 물어봤을 때 Q 는 오른쪽으로 가라고 한다. Q 는 거짓말을 한 것은 아니다. Q 는 실제로 자기가 했을 때 오른쪽으로 갔을 때 성공을 했으니까.
하지만, 만약 Q 가 했던 것은 위로 간 것인데 우연히 오른쪽으로 미끄러져서 성공을 한 것이라면?
이런 식으로 Q 는 사실을 말했지만 그것을 믿고 내가 따라가면 실패를 하게 된다.

해결법은 Q 의 정보를 조금만 이용하는 것이다. (100% 믿지 않는다.)

결국 얼마나 Q를 믿을 것인지를 learning rate α 로 표현하고
Q-learning 에서 Q 값의 업데이트는 다음과 같은 수식으로 표현된다.

image 14

이 수식을 α 로 묶어서 아래와 같이 표현하기도 한다.

image 6

코드는 다음과 같다


import gymnasium as gym
# from importlib_metadata import entry_points
import numpy as np
import matplotlib.pyplot as plt
# from gym.envs.registration import register
import random as pr

def rargmax(vector):
    m=np.amax(vector)
    indices=np.nonzero(vector==m)[0]
    return pr.choice(indices)

# env=gym.make('FrozenLake-v1', render_mode='human', is_slippery= False)
env=gym.make('FrozenLake-v1', render_mode=None ,is_slippery= True)

# env.render()
Q=np.zeros([env.observation_space.n,env.action_space.n])

learning_rate=0.85
num_episodes=2000
dis=0.99

rList=[]
for i in range(num_episodes):
    # e=1./((i/100)+1);
    # e=1./((i/50)+10)
    state,info=env.reset()
    rAll=0
    done=False
    while not done:
        
        
        # if np.random.rand(1)<e:
        #     action = env.action_space.sample()
        # else:
        #     action = rargmax(Q[state,:])

        #noise
        action=np.argmax(Q[state,:]+np.random.randn(1,env.action_space.n)/(i+1))
        
        new_state, reward,done,_,_=env.step(action)
        
        # if reward>0:
            # print("done and reward")
            # print(reward)
            
        # env.render()
        # Q[state,action]=reward+dis*np.max(Q[new_state,:])
        
        Q[state,action]=(1-learning_rate)*Q[state,action]\
            +learning_rate*(reward+dis*np.max(Q[new_state,:]))

        rAll += reward
        state = new_state

    rList.append(rAll)

print("sucess rate:"+str(sum(rList)/num_episodes))
print("Final Q-Table Values")
print("LEFT DOWN RIGHT UP")
print(Q)
plt.bar(range(len(rList)),rList,color="blue")
plt.show()

# env=gym.make('FrozenLake-v1', render_mode='human',is_slippery= True)
# obs=env.reset()
# for _ in range(100):
#     env.render()
#     obs,re,done,tr,info=env.step(np.argmax(Q[state,:]+np.random.randn(1,env.action_space.n)))
#     if done:
#         obs=env.reset()
# env.close()
image 15

성공 확률이 60퍼센트 정도로 오른 것을 확인 할 수 있다.
Q 를 업데이트 하는 부분이 수정되었고 추가적으로 action 을 선택할 때 탐험하는 방법이 다르게 코딩되었다.

1.3. Q-Network

이전 1.1. 과 1.2. 에 q table 방식은 복잡한 문제를 해결할 수 없다.
예를 들어 100×100 미로 문제를 해결하기 위해서는 100x100x4 의 q-table 이 필요하고
아래 그림과 같은 게임 문제에서는 80×80 pixel + 2 color(흑/백) 이라면 이 게임의 state 를 표현하기 위해서 2^(80×80) 이라는 큰 숫자가 필요해진다.

상태 공간의 크기는 모든 가능한 상태의 수를 의미합니다. 각 픽셀이 흑(0) 또는 백(1)일 수 있으므로, 하나의 픽셀은 2개의 상태(흑/백)를 가질 수 있다.

80×80 픽셀은 총 6400개의 픽셀을 의미.
각 픽셀이 독립적으로 두 가지 상태(흑/백)를 가질 수 있으므로, 전체 상태 공간은
2^(6400) 가지 가능한 상태 조합을 가질 수 있다.

image 7

따라서 이를 network 로 대체한다. 네트워크의 출력은 Approximate Q^* 가 된다.
network 를 학습시켜 Q 를 대체 한다는 것은 network 를 학습시킬 정답값이 필요하게 된다.

image 17

이전에는 Q update 를 할 때 (1-α)Q(s,a)+~~ 식을 사용하였었는데 이를 사용하지 않는다.
왜냐하면, network 를 학습하는 gradient descent 방법에서 애초에 조금씩 조금씩(learning rate) 학습이 진행되기 때문에 결과적으로 (1-α)Q(s,a)+~~ 식 과 같은 의미가 되기 때문이다.

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import random as pr
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

# GPU 사용 가능 여부 확인 및 device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def rargmax(vector):
    m = np.amax(vector)
    indices = np.nonzero(vector == m)[0]
    return pr.choice(indices)

def one_hot(x):
    return np.identity(16)[x:x+1]

# 환경 설정
env = gym.make('FrozenLake-v1', render_mode=None, is_slippery=True)
input_size = env.observation_space.n  # 16
output_size = env.action_space.n  # 4
learning_rate = 0.2

# Q-테이블 초기화 및 학습 파라미터 설정
num_episodes = 3000
dis = 0.99
rList = []

# 가중치 텐서 초기화 (GPU로 이동)
W = torch.zeros((input_size, output_size), requires_grad=True, dtype=torch.float32, device=device)
optimizer = optim.SGD([W], lr=learning_rate)

rand_count = 0
notrand_count = 0

# 학습 루프
for i in tqdm(range(num_episodes)):
    e = 1./((i/300)+1)
    state, info = env.reset()
    rAll = 0
    done = False

    while not done:
        # 상태에 대한 one-hot 인코딩 후 GPU로 이동
        X = torch.from_numpy(one_hot(state)).to(torch.float32).to(device)
        Qs = X.matmul(W)

        # Epsilon-Greedy 방식으로 행동 선택
        if np.random.rand(1) < e:
            action = env.action_space.sample()
            rand_count += 1
        else:
            action = np.argmax(Qs.detach().cpu().numpy())  # GPU에서 계산된 값을 CPU로 이동하여 numpy 배열로 변환
            notrand_count += 1

        # 환경에 행동 적용 및 새로운 상태, 보상, 종료 정보 업데이트
        new_state, reward, done, _, _ = env.step(action)

        if done:
            Qs[0, action] = reward
        else:
            # 새로운 상태에 대한 one-hot 인코딩 후 GPU로 이동
            Y = torch.from_numpy(one_hot(new_state)).to(torch.float32).to(device)
            Qs1 = Y.matmul(W)
            Qs[0, action] = reward + dis * torch.max(Qs1)

        # 손실 함수 계산 및 업데이트
        cost = F.mse_loss(Qs, X.matmul(W))
        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        rAll += reward
        state = new_state

    rList.append(rAll)

# 학습 결과 출력
print("notrand_count: " + str(notrand_count))
print("rand_count: " + str(rand_count))
print("success rate: " + str(sum(rList) / num_episodes))

# 학습 결과 시각화
plt.bar(range(len(rList)), rList, color="blue")
plt.show()

env=gym.make('FrozenLake-v1', render_mode='human',is_slippery= True)
obs=env.reset()
for _ in range(50):
    env.render()
    action = np.argmax(Qs.detach().cpu().numpy())
    obs,re,done,tr,info=env.step(action)
    if done:
        obs=env.reset()
env.close()

위에 코드는 q table 을 q network 로 한것이다. 그렇다면 Q network 는 어떤 값으로 학습을 할 것인가?

\[ \min_{\theta} \sum_{t=0}^{T} \left[ \hat{Q}(s_t, a_t \mid \theta) – \left( r_t + \gamma \max_{a’} \hat{Q}(s_{t+1}, a’ \mid \theta) \right) \right]^2 \]

즉 Q^*(y label) =r+γmaxQ(s’) 이되는 것이고 위 식을 바탕으로 network 가 state 가 입력으로 들어갈 때 Q^* 를 estimate 할 수 있게 된다.

image 19

하지만 위 방법은 diverges 하다. 학습이 잘 안된다는 뜻이다. 실제로 위 코드를 실행해보면 성공 확률이 높지 않게 나온다.
크게 두 가지 문제가 있다.
– network 학습에 사용되는 정답 값, 즉 target 이 움직이는 단점이 있다.
– correlations between samples

1.4. DQN (중요)

image 21

network 를 사용하여도 성공률이 크게 증가하지 않았었다. 때문에 DQN 알고리즘이 나왔다.

image 20

먼저 sample correlation 문제가 있었다. 빨간점이 전체 data 라고 할 때 우리가 만들 수 있는, 이 데이터들을 대표하는 선은 보라색 선일 것이다. 하지만 만약 우리가 현재 시점에서 얻은 데이터가 파랑색 범위의 데이터 밖에 없다면 우리가 얻은 선(Q)은 현실 세계에서 전혀 사용할 수 없는 녹색 선이 될 수 도 있다.

그리고, target 이 움직이는 문제(non stationary targets) 에 대해서는 화살을 쏘자마자 과녁이 움직이는 것을 생각하면 된다. 결국 target 이 움직이기 때문에 network 를 학습하는 것이 잘 안된다는 것이다.

DQN 은 이 문제들을 아래와 같이 3개의 방법으로 해결하였다.

  1. 깊은 신경망
  2. 저장해두고 replay 하는 것
  3. 네트워크를 2개 사용하는 것 (구분하는 것)

1.3.1. 깊은 신경망

먼저 신경망을 깊게 바꾸겠다.

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import random as pr
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

# GPU 사용 가능 여부 확인 및 device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def one_hot(x):
    return np.identity(16)[x:x+1]

# 환경 설정
env = gym.make('FrozenLake-v1', render_mode=None, is_slippery=True)
input_size = env.observation_space.n  # FrozenLake는 이산적 상태 공간
output_size = env.action_space.n      # 행동 공간

learning_rate = 1e-1
env.reset()
random_episodes=0
reward_sum=0

# 신경망 클래스 정의 (GPU로 이동)
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear1 = nn.Linear(input_size, 8)
        self.linear2 = nn.Linear(8, 16)
        self.linear3 = nn.Linear(16, output_size)

        for m in self.modules():
            if isinstance(m, nn.Linear):
                torch.nn.init.uniform_(m.weight.data)

    def forward(self, x):
        out = torch.sigmoid(self.linear1(x))
        out = torch.sigmoid(self.linear2(out))
        out = self.linear3(out)
        return out

# 모델을 GPU로 이동
model = MLP().to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
num_episodes = 2000
dis = 0.9
rList = []
rand_count = 0
notrand_count = 0

# 학습 루프
for i in tqdm(range(num_episodes)):
    e = 1. / ((i/300)+1)  # Decaying E-greedy
    rAll = 0  # 총 보상 초기화
    step_count = 0
    state, info = env.reset()
    done = False

    while not done:
        step_count += 1
        # 상태를 텐서로 변환하고 GPU로 이동
        x = torch.from_numpy(np.reshape(one_hot(state), [1, input_size])).to(torch.float32).to(device)

        # 신경망을 통해 Q값 계산
        Qs = model(x)

        # Epsilon-Greedy 방식으로 행동 선택
        if np.random.rand(1) < e:
            action = env.action_space.sample()
            rand_count += 1
        else:
            action = np.argmax(Qs.detach().cpu().numpy())  # GPU에서 CPU로 이동하여 numpy 배열로 변환
            notrand_count += 1

        # 환경에 행동 적용 및 새로운 상태, 보상, 종료 정보 업데이트
        new_state, reward, done, _, _ = env.step(action)

        # 보상을 누적
        rAll += reward

        # 종료 여부에 따른 Q값 업데이트
        if done:
            Qs[0, action] = reward  # 종료 시 보상만 반영
        else:
            Y = torch.from_numpy(np.reshape(one_hot(new_state), [1, input_size])).to(torch.float32).to(device)
            Qs1 = model(Y)
            Qs[0, action] = reward + dis * torch.max(Qs1)

        # 손실 함수 계산 및 역전파
        cost = F.mse_loss(Qs, model(x))
        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        state = new_state

    rList.append(rAll)  # 총 보상 저장
    if reward > 0:
        print("Episode: {} Total Reward: {}".format(i, rAll))

    # 최근 10개의 에피소드에서 평균 보상이 0.8 이상이면 성공했다고 가정 (FrozenLake는 1이 최대 보상)
    if len(rList) > 10 and np.mean(rList[-10:]) > 0.8:
        print("Solved in episode {} with average reward {:.2f}".format(i, np.mean(rList[-10:])))
        break


print("notrand_count: " + str(notrand_count))
print("rand_count: " + str(rand_count))
print("episode {} with average reward {:.2f}".format(i, np.mean(rList[-10:])))
# 결과 시각화 (선택사항)
plt.bar(range(len(rList)), rList, color="blue")
plt.show()

하지만 여전히 학습은 잘 되지 않는다

1.3.2. replay buffer

결국 이전에 결과들을 다시 재사용 하도록 하는 것이다. 코드는 다음과 같다.

import gymnasium as gym
import numpy as np
import math
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque

# GPU 사용 가능 여부 확인 및 device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Replay Buffer 클래스 정의
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)  # 버퍼 최대 크기

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))  # 경험 추가

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones

    def size(self):
        return len(self.buffer)  # 현재 버퍼에 저장된 경험의 수

# DQN 에이전트 클래스 정의
class DQNAgent:
    def __init__(self):
        self.model = nn.Sequential(
            nn.Linear(16, 128),  # FrozenLake는 4x4 grid, one-hot encoded
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 4)  # 행동 4가지 (왼쪽, 아래, 오른쪽, 위)
        ).to(device)

        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.memory = ReplayBuffer(10000)  # Replay Buffer 생성
        self.batch_size = 64
        self.gamma = 0.99  # 할인율
        self.steps_done = 0

    def memorize(self, state, action, reward, next_state, done):
        """경험을 메모리에 저장"""
        self.memory.add(state, action, reward, next_state, done)

    def act(self, state):
        """Epsilon-Greedy 방식으로 행동 선택"""
        eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * self.steps_done / EPS_DECAY)
        self.steps_done += 1

        if random.random() > eps_threshold:  # 신경망 기반 행동
            with torch.no_grad():
                return self.model(state).max(1)[1].view(1, 1)
        else:  # 랜덤 행동
            return torch.tensor([[random.randrange(4)]], device=device, dtype=torch.long)

    def learn(self):
        """Replay Buffer에서 경험 샘플링 후 학습"""
        if self.memory.size() < self.batch_size:
            return

        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        # 텐서로 변환 (states 차원 수정)
        states = torch.FloatTensor(states).to(device).squeeze(1)  # [batch_size, input_size]
        actions = torch.LongTensor(actions).to(device).view(-1, 1)  # [batch_size, 1]
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device).squeeze(1)  # [batch_size, input_size]
        dones = torch.FloatTensor(dones).to(device)

        # Q(s, a) 계산 (현 상태에서의 Q값)
        current_q_values = self.model(states).gather(1, actions).squeeze(1)

        # Q(s', a') 계산 (다음 상태에서 최대 Q값)
        next_q_values = self.model(next_states).max(1)[0]

        # done이 True일 경우, 다음 상태 Q값은 0으로 처리
        expected_q_values = rewards + (self.gamma * next_q_values * (1 - dones))

        # MSE 손실 계산 및 신경망 업데이트
        loss = F.mse_loss(current_q_values, expected_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()




# FrozenLake 환경 설정
env = gym.make('FrozenLake-v1', render_mode=None, is_slippery=True)
input_size = env.observation_space.n  # 상태는 16개의 one-hot encoded 벡터
output_size = env.action_space.n      # 행동은 4가지

# 파라미터 설정
EPISODES = 1000
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 500
LR = 0.001

# DQN 에이전트 생성
agent = DQNAgent()

def one_hot(x):
    """상태를 one-hot 인코딩"""
    return np.identity(input_size)[x:x+1]

# 학습 루프
score_history = []  # 점수 저장용
for e in range(EPISODES):
    state, _ = env.reset()
    state = torch.FloatTensor(one_hot(state)).to(device)  # 상태를 텐서로 변환
    done = False
    steps = 0
    total_reward = 0

    while not done:
        action = agent.act(state)  # Epsilon-Greedy 방식으로 행동 선택
        next_state, reward, done, _, _ = env.step(action.item())
        next_state = torch.FloatTensor(one_hot(next_state)).to(device)  # 다음 상태를 텐서로 변환

        # 게임에서 실패 시 보상을 -1로 설정
        if done and reward == 0:
            reward = -1

        total_reward += reward
        agent.memorize(state.cpu().numpy(), action.cpu().numpy(), reward, next_state.cpu().numpy(), done)
        agent.learn()  # 경험 학습

        state = next_state
        steps += 1

    score_history.append(total_reward)
    print(f"에피소드 {e+1}, 점수: {total_reward}, 최근 10게임 평균 점수: {np.mean(score_history[-10:])}")

    if np.mean(score_history[-10:]) > 0.8:  # 평균 보상이 0.8 이상이면 종료
        print(f"성공! {e+1} 에피소드에서 해결됨.")
        break

# 결과 출력
print("학습 완료")


# 학습된 모델로 테스트 실행
test_episodes = 2
success_count = 0
env=gym.make('FrozenLake-v1', render_mode='human',is_slippery= True)
obs=env.reset()
for episode in range(test_episodes):
    state, _ = env.reset()
    state = torch.FloatTensor(one_hot(state)).to(device)
    done = False
    total_reward = 0

    while not done:
        with torch.no_grad():
            action = agent.act(state)  # 학습된 신경망을 사용하여 행동 선택
        next_state, reward, done, _, _ = env.step(action.item())
        total_reward += reward
        state = torch.FloatTensor(one_hot(next_state)).to(device)

    if total_reward > 0:
        success_count += 1
    print(f"테스트 에피소드 {episode + 1}: 보상 {total_reward}")

print(f"50번 테스트 중 성공 횟수: {success_count}/50")


# 평균 점수 시각화
plt.plot(score_history)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Reward over Episodes')
plt.show()

# 학습된 모델 저장
torch.save(agent.model.state_dict(), "frozenlake_dqn_model.pth")
image 22

점점 학습이 진행 되면서 reward 를 발생 시키는 것을 볼 수 있다.

1.3.3. target network (네트워크 구분, 네트워크 2개 사용)

import gymnasium as gym
import numpy as np
import math
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import matplotlib.pyplot as plt

# GPU 사용 가능 여부 확인 및 device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Replay Buffer 클래스 정의
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)  # 버퍼 최대 크기

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))  # 경험 추가

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones

    def size(self):
        return len(self.buffer)  # 현재 버퍼에 저장된 경험의 수

# DQN 에이전트 클래스 정의
class DQNAgent:
    def __init__(self, batch_size, gamma, update_target_frequency):
        self.model = nn.Sequential(
            nn.Linear(16, 128),  # FrozenLake는 4x4 grid, one-hot encoded
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 4)  # 행동 4가지 (왼쪽, 아래, 오른쪽, 위)
        ).to(device)

        self.target_model = nn.Sequential(  # Target Network 정의
            nn.Linear(16, 128),  # FrozenLake는 4x4 grid, one-hot encoded
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 4)  # 행동 4가지 (왼쪽, 아래, 오른쪽, 위)
        ).to(device)

        self.target_model.load_state_dict(self.model.state_dict())  # Target Network 초기화
        self.target_model.eval()  # 학습하지 않음

        self.optimizer = optim.Adam(self.model.parameters(), lr=LR)
        self.memory = ReplayBuffer(10000)  # Replay Buffer 생성
        self.batch_size = batch_size
        self.gamma = gamma
        self.update_target_frequency = update_target_frequency

    def memorize(self, state, action, reward, next_state, done):
        """경험을 메모리에 저장"""
        self.memory.add(state, action, reward, next_state, done)

    def act(self, state):
        """Epsilon-Greedy 방식으로 행동 선택"""
        eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * agent.steps_done / EPS_DECAY)
        agent.steps_done += 1

        if random.random() > eps_threshold:  # 신경망 기반 행동
            with torch.no_grad():
                return self.model(state).max(1)[1].view(1, 1)
        else:  # 랜덤 행동
            return torch.tensor([[random.randrange(4)]], device=device, dtype=torch.long)

    def learn(self):
        """Replay Buffer에서 경험 샘플링 후 학습"""
        if self.memory.size() < self.batch_size:
            return

        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        # 텐서로 변환 (states 차원 수정)
        states = torch.FloatTensor(states).to(device).squeeze(1)  # [batch_size, input_size]
        actions = torch.LongTensor(actions).to(device).view(-1, 1)  # [batch_size, 1]
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device).squeeze(1)  # [batch_size, input_size]
        dones = torch.FloatTensor(dones).to(device)

        # Q(s, a) 계산 (현 상태에서의 Q값)
        current_q_values = self.model(states).gather(1, actions).squeeze(1)

        # Q(s', a') 계산 (다음 상태에서 Target Network를 사용하여 최대 Q값)
        next_q_values = self.target_model(next_states).max(1)[0]

        # done이 True일 경우, 다음 상태 Q값은 0으로 처리
        expected_q_values = rewards + (self.gamma * next_q_values * (1 - dones))

        # MSE 손실 계산 및 신경망 업데이트
        loss = F.mse_loss(current_q_values, expected_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        """Target Network를 Policy Network로 업데이트"""
        self.target_model.load_state_dict(self.model.state_dict())

# 하이퍼파라미터 설정
EPISODES = 1000
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 500
LR = 0.001
BATCH_SIZE = 64
GAMMA = 0.99
UPDATE_TARGET_FREQUENCY = 10

# DQN 에이전트 생성
agent = DQNAgent(batch_size=BATCH_SIZE, gamma=GAMMA, update_target_frequency=UPDATE_TARGET_FREQUENCY)

def one_hot(x):
    """상태를 one-hot 인코딩"""
    return np.identity(env.observation_space.n)[x:x+1]

# 학습 루프
score_history = []  # 점수 저장용
agent.steps_done = 0  # steps_done은 클래스 외부에서 관리
env = gym.make('FrozenLake-v1', render_mode=None, is_slippery=True)

for e in range(EPISODES):
    state, _ = env.reset()
    state = torch.FloatTensor(one_hot(state)).to(device)  # 상태를 텐서로 변환
    done = False
    steps = 0
    total_reward = 0

    while not done:
        action = agent.act(state)  # Epsilon-Greedy 방식으로 행동 선택
        next_state, reward, done, _, _ = env.step(action.item())
        next_state = torch.FloatTensor(one_hot(next_state)).to(device)  # 다음 상태를 텐서로 변환

        # 게임에서 실패 시 보상을 -1로 설정
        if done and reward == 0:
            reward = -1

        total_reward += reward
        agent.memorize(state.cpu().numpy(), action.cpu().numpy(), reward, next_state.cpu().numpy(), done)
        agent.learn()  # 경험 학습

        state = next_state
        steps += 1

    score_history.append(total_reward)
    print(f"에피소드 {e+1}, 점수: {total_reward}, 최근 10게임 평균 점수: {np.mean(score_history[-10:])}")

    # 일정 주기로 Target Network 업데이트
    if e % agent.update_target_frequency == 0:
        agent.update_target_network()

    if np.mean(score_history[-10:]) > 0.8:  # 평균 보상이 0.8 이상이면 종료
        print(f"성공! {e+1} 에피소드에서 해결됨.")
        break

# 결과 출력
print("학습 완료")

# 평균 점수 시각화
plt.plot(score_history)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Reward over Episodes')
plt.show()


# 학습된 모델로 테스트 실행

test_episodes = 50
success_count = 0
env=gym.make('FrozenLake-v1', render_mode='human' ,is_slippery= True)
obs=env.reset()

for episode in range(test_episodes):
    state, _ = env.reset()
    state = torch.FloatTensor(one_hot(state)).to(device)
    done = False
    total_reward = 0

    while not done:
        with torch.no_grad():
            action = agent.act(state)  # 학습된 신경망을 사용하여 행동 선택
        next_state, reward, done, _, _ = env.step(action.item())
        total_reward += reward
        state = torch.FloatTensor(one_hot(next_state)).to(device)

    if total_reward > 0:
        success_count += 1
    print(f"테스트 에피소드 {episode + 1}: 보상 {total_reward}")

print(f"50번 테스트 중 성공 횟수: {success_count}/50")


# 학습된 모델 저장
torch.save(agent.model.state_dict(), "frozenlake_dqn_model.pth")
image 24
image 23

결국 최근 10 episode 의 reward 가 평균 0.8을 넘으면서 179 episode 만에 학습을 종료하는 것을 볼 수 있다. 실제로 openai gym 의 render 모드를 ‘human’ 으로 하여 실행시켜 보면 아래 영상과 같이 리워드를 얻는 것을 확인 할 수 있다.


0 Comments

Leave a Reply