강화학습(Reinforcement Learning)은 어떤 임의의 존재 (Agent) 가 주어진 환경 내에서 어떻게 행동해야 하는지에 대해 학습하는 것을 말합니다. 이러한 학습 과정은 다양한 상황에서 Agent가 한 행동에 대해, 양 또는 음의 보상으로 피드백을 받음으로써 진행됩니다. 


강화학습과 정책(Policy)

강화학습의 최종 목표는 환경(Environment)과 상호작용을 하는 임의의 Agent를 학습시키는 것입니다. Agent는 상태(State)라고 부르는 다양한 상황 안에서 행동(Action)을 취하며 조금씩 학습해 나갑니다. Agent가 취한 행동은 그에 대한 응답으로 양(+)이나 음(-), 또는 0의 보상(Reward)을 돌려받습니다.

 

여기에서 Agent의 목표는 처음 시작하는 시점부터 종료시점까지 일어나는 모든 에피소드에서 받을 보상값을 최대로 끌어올리는 것입니다. 이를 위해 음(-)값의 보상을 받는 행동은 최대한 피하도록 하고, 양(+)값의 보상을 받을 수 있는 행동을 강화시킨다는 의미에서 강화학습이라는 이름이 붙게 되었습니다. 이렇게 Agent가 학습을 하는 과정에서 점점 발전하게 될 의사결정 전략을 정책(Policy)라고 합니다. 

 

강화학습의 한 예로 슈퍼 마리오를 들어보겠습니다. 마리오는 그 게임의 세상, 즉 환경(Environment)과 상호작용을 하는 Agent입니다. 상태(State)는 매 순간 스크린에 보여지는 그 이미지이고, 한 에피소드는 게임의 한 판이 시작되는 시점부터 끝나는 시점까지의 내용이라고 할 수 있겠죠. 이 때, 끝나는 시점은 그 판을 클리어하는 순간이 될 수도, 또는 마리오가 죽게되는 순간이 될 수도 있습니다.

 

Agent가 취할 수 있는 행동(Action)은 '앞으로 이동', '뒤로 이동', '점프' 등이 될 것입니다. 각각의 행동에 따라 보상은 달라질 수 있는데, 예를 들어 마리오가 코인을 먹거나 보너스 아이템을 얻은 경우 양(+)의 보상을 받고, 떨어지거나 적에게 맞아서 죽게 되는 경우 음(-)의 보상을 받도록 설계할 수 있습니다. 마리오가 그냥 아무것도 하지 않고 여기저기 돌아다니기만 한다면 그냥 특별한 것 없이, 보상은 0이 되겠죠. 

 

 

그럼 이제 본격적인 내용으로 들어가봅시다. 가장 단순한 경우부터 생각해보죠. 우리는 모든 상태에서 취하는 각각의 행동에 대해 보상을 얼마나 받을지 미리 알고 있다고 가정해보겠습니다. 이 경우 우리는 행동을 어떻게 결정하면 될까요? 간단하게, 최종적으로 가장 높은 보상을 받을 수 있는 행동들을 연속적으로 취하면 될 것입니다. 이렇게 최종적으로 받는 모든 보상의 총합을 Q-value (Quality Value의 약자입니다) 라고 합니다.


벨만 방정식

Q(s,a)=r(s,a)+γmaxaQ(s′,a)

 

 

하나하나 살펴보겠습니다. 상태 s 에서 행동 a 를 취할 때 받을 수 있는 모든 보상의 총합 Q(s,a) 는, 현재 행동을 취해서 받을 수 있는 즉각보상과 미래에 받을 미래보상의 최대값의 합으로 계산할 수 있습니다. 우변의 r(s,a) 는 현재 상태 s 에서 행동 a 를 취했을 때 받을 즉각보상값을 나타냅니다. 또한, s′ 은 현재 상태 s 에서 행동 a 를 취해 도달하는 바로 다음의 상태로, maxaQ(s′,a) 는 다음 상태 s′ 에서 받을 수 있는 보상의 최대값입니다. 이 값을 최대화 할 수 있는 행동을 선택해내는 것이 Agent의 목표입니다.

 

또한 그 앞에 붙어있는 γ 는 할인율이라고 부르는 값으로, 미래가치에 대한 중요도를 조절합니다. γ 값이 커질수록 미래에 받을 보상에 더 큰 가치를 두는 것이고, 작아질수록 즉각보상을 더 중요하게 고려하는 것입니다.

 

이 방정식을 벨만 방정식이라고 합니다. 더 자세한 설명과 수학적 유도 과정은 위키피디아에서 확인할 수 있습니다. 이 우아한 공식은 다음 두 가지 성질때문에 강화학습에서 굉장히 유용합니다. 

  • Markov States 가정이 유효하다면, 벨만 방정식의 재귀적인 성질은 미래에 받을 수 있는 보상을 멀리 떨어진 과거로까지 전파될 수 있게 한다.
  • 벨만 방정식의 재귀적인 성질은 또한, 처음에 실제  Q 값이 얼마인지 알지 못해도 괜찮도록 한다. 즉, 처음에는 추측으로 값을 정하지만, 점점 수렴해서 마지막에는 정답 값에 도달할 것이다.

 


DQN 비용 함수

 

 

DQN코드

import random
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from collections import deque
from time import time

seed = 1546847731  # or try a new seed by using: seed = int(time())
random.seed(seed)
print('Seed: {}'.format(seed))

class Game:
    board = None
    board_size = 0
    
    def __init__(self, board_size=4):
        self.board_size = board_size
        self.reset()
    
    def reset(self):
        self.board = np.zeros(self.board_size)
    
    def play(self, cell):
        # returns a tuple: (reward, game_over?)
        if self.board[cell] == 0:
            self.board[cell] = 1
            game_over = len(np.where(self.board == 0)[0]) == 0
            return (1,game_over)
        else:
            return (-1,False)
            
 
 def state_to_str(state):
    return str(list(map(int,state.tolist())))

all_states = list()
for i in range(2):
    for j in range(2):
        for k in range(2):
            for l in range(2):
                s = np.array([i,j,k,l])
                all_states.append(state_to_str(s))
                
print('All possible states:')
for s in all_states:
    print(s)
    
game = Game()


num_of_games = 2000
epsilon = 0.1
gamma = 1

q_table = pd.DataFrame(0, index=np.arange(4), columns=all_states)

r_list = []  # store the total reward of each game so we can plot it later

for g in range(num_of_games):
    game_over = False
    game.reset()
    total_reward = 0
    while not game_over:
        state = np.copy(game.board)
        if random.random() < epsilon:
            action = random.randint(0,3)
        else:
            action = q_table[state_to_str(state)].idxmax()
        reward, game_over = game.play(action)
        total_reward += reward
        if np.sum(game.board) == 4:  # terminal state
            next_state_max_q_value = 0
        else:
          # 벨만방정식
            next_state = np.copy(game.board)
            next_state_max_q_value = q_table[state_to_str(next_state)].max()
            # 이게 진짜 벨만방정식
        q_table.loc[action,state_to_str(state)] = reward + gamma * next_state_max_q_value
    r_list.append(total_reward)
q_table

for i in range(2):
    for j in range(2):
        for k in range(2):
            for l in range(2):
                b = np.array([i,j,k,l])
                if len(np.where(b == 0)[0]) != 0:
                    action = q_table[state_to_str(b)].idxmax()
                    pred = q_table[state_to_str(b)].tolist()
                    print('board: {b}\tpredicted Q values: {p} \tbest action: {a}\tcorrect action? {s}'
                          .format(b=b,p=pred,a=action,s=b[action]==0))
                          
plt.figure(figsize=(14,7))
plt.plot(range(len(r_list)),r_list)
plt.xlabel('Games played')
plt.ylabel('Reward')
plt.show()


# e greedy
class QNetwork:
    def __init__(self, hidden_layers_size, gamma, learning_rate, input_size=4, output_size=4):
        self.q_target = tf.placeholder(shape=(None,output_size), dtype=tf.float32)
        self.r = tf.placeholder(shape=None,dtype=tf.float32)
        self.states = tf.placeholder(shape=(None, input_size), dtype=tf.float32)
        self.enum_actions = tf.placeholder(shape=(None,2), dtype=tf.int32) 
        layer = self.states
        for l in hidden_layers_size:
            layer = tf.layers.dense(inputs=layer, units=l, activation=tf.nn.relu,
                                    kernel_initializer=tf.contrib.layers.xavier_initializer(seed=seed))
        self.output = tf.layers.dense(inputs=layer, units=output_size,
                                      kernel_initializer=tf.contrib.layers.xavier_initializer(seed=seed))
        self.predictions = tf.gather_nd(self.output,indices=self.enum_actions)
        self.labels = self.r + gamma * tf.reduce_max(self.q_target, axis=1)
        self.cost = tf.reduce_mean(tf.losses.mean_squared_error(labels=self.labels, predictions=self.predictions))
        self.optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(self.cost)
        
        
        class ReplayMemory:
    memory = None
    counter = 0

    def __init__(self, size):
        self.memory = deque(maxlen=size)

    def append(self, element):
        self.memory.append(element)
        self.counter += 1

    def sample(self, n):
        return random.sample(self.memory, n)
        
        
num_of_games = 2000
epsilon = 0.1
gamma = 0.99
batch_size = 10
memory_size = 2000

tf.reset_default_graph()
tf.set_random_seed(seed)
qnn = QNetwork(hidden_layers_size=[20,20], gamma=gamma, learning_rate=0.001)
memory = ReplayMemory(memory_size)
sess = tf.Session()
sess.run(tf.global_variables_initializer())


r_list = []  
c_list = []  # same as r_list, but for the cost

counter = 0  # will be used to trigger network training

# e greedy 후보2
for g in range(num_of_games):
    game_over = False
    game.reset()
    total_reward = 0
    while not game_over:
        counter += 1
        state = np.copy(game.board)
        if random.random() < epsilon:
            action = random.randint(0,3)
        else:
            pred = np.squeeze(sess.run(qnn.output,feed_dict={qnn.states: np.expand_dims(game.board,axis=0)}))
            action = np.argmax(pred)
        reward, game_over = game.play(action)
        total_reward += reward
        next_state = np.copy(game.board)
        memory.append({'state':state,'action':action,'reward':reward,'next_state':next_state,'game_over':game_over})
        if counter % batch_size == 0:
            # Network training
            batch = memory.sample(batch_size)
            q_target = sess.run(qnn.output,feed_dict={qnn.states: np.array(list(map(lambda x: x['next_state'], batch)))})
            terminals = np.array(list(map(lambda x: x['game_over'], batch)))
            for i in range(terminals.size):
                if terminals[i]:
                    # Remember we use the network's own predictions for the next state while calculatng loss.
                    # Terminal states have no Q-value, and so we manually set them to 0, as the network's predictions
                    # for these states is meaningless
                    q_target[i] = np.zeros(game.board_size)
            _, cost = sess.run([qnn.optimizer, qnn.cost], 
                               feed_dict={qnn.states: np.array(list(map(lambda x: x['state'], batch))),
                               qnn.r: np.array(list(map(lambda x: x['reward'], batch))),
                               qnn.enum_actions: np.array(list(enumerate(map(lambda x: x['action'], batch)))),
                               qnn.q_target: q_target})
            c_list.append(cost)
    r_list.append(total_reward)
print('Final cost: {}'.format(c_list[-1]))



for i in range(2):
    for j in range(2):
        for k in range(2):
            for l in range(2):
                b = np.array([i,j,k,l])
                if len(np.where(b == 0)[0]) != 0:
                    pred = np.squeeze(sess.run(qnn.output,feed_dict={qnn.states: np.expand_dims(b,axis=0)}))
                    pred = list(map(lambda x: round(x,3),pred))
                    action = np.argmax(pred)
                    print('board: {b}\tpredicted Q values: {p} \tbest action: {a}\tcorrect action? {s}'
                          .format(b=b,p=pred,a=action,s=b[action]==0))
                          
plt.figure(figsize=(14,7))
plt.plot(range(len(r_list)),r_list)
plt.xlabel('Games played')
plt.ylabel('Reward')
plt.show()

plt.figure(figsize=(14,7))
plt.plot(range(len(r_list)),r_list)
plt.xlabel('Game played')
plt.ylabel('Reward')
plt.ylim(-2,4.5)
plt.show()


plt.figure(figsize=(14,7))
plt.plot(range(len(c_list)),c_list)
plt.xlabel('Trainings')
plt.ylabel('Cost')
plt.show()

sess.close()  # Don't forget to close tf.session

728x90
반응형
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 라이프코리아트위터 공유하기
  • shared
  • 카카오스토리 공유하기