“I no longer work out how to engineer a system; I work out how I make a system learn to do things. That’s a fundamental shift in engineering, biggest since the invention of fire’

Dr Aldo Faisal, Professor of AI & Neuroscience at the Dept. of Computing and the Dept. of Bioengineering at ICL

 
 

This project was created as part of a submission for Reinforcement Learning module at Imperial College London. The particular challenge tackled here focused on solving the maze environment with RL agent. The goal was to create appropriate conditions that would allow the agent (red) to learn how to get to the goal state (blue) in the maze by engineering the reward function and providing a proper pipeline of data. The theoretical side of the code was based on Markovian Decision Process and Belman Optimality Equation. An example maze environment that the agent needs to solve is presented below

 

RL agent solving maze.gif

The agent trains by choosing Epsilon-Greedy action at each state, which provides a balance between exploration and exploitation. At each step it takes a batch of training data from Replay Buffer and trains the neural network with it. While testing, the agent is choosing Greedy action, effectively executing the optimal path. Watch the 40 seconds gif below of the agent solving the environment

rl-agent-getting-to-the-goal-vid.gif

Implementation

import numpy as np
import torch
import collections
import random
import statistics

# The Network class inherits the torch.nn.Module class, which represents a neural network.
class Network(torch.nn.Module):

    # The class initialisation function. This takes as arguments the dimension of the network's input (i.e. the dimension of the state), and the dimension of the network's output (i.e. the dimension of the action).
    def __init__(self, input_dimension, output_dimension):
        # Call the initialisation function of the parent class.
        super(Network, self).__init__()
        # Define the network layers. This example network has two hidden layers, each with 100 units.
        self.layer_1 = torch.nn.Linear(in_features=input_dimension, out_features=100)
        self.layer_2 = torch.nn.Linear(in_features=100, out_features=100)
        self.output_layer = torch.nn.Linear(in_features=100, out_features=output_dimension)

    # Function which sends some input data through the network and returns the network's output. In this example, a ReLU activation function is used for both hidden layers, but the output layer has no activation function (it is just a linear layer).
    def forward(self, input):
        layer_1_output = torch.nn.functional.relu(self.layer_1(input))
        layer_2_output = torch.nn.functional.relu(self.layer_2(layer_1_output))
        output = self.output_layer(layer_2_output)
        return output

# The DQN class determines how to train the above neural network.
class DQN:

    # The class initialisation function.
    def __init__(self):
        # Create a Q-network, which predicts the q-value for a particular state.
        self.q_network = Network(input_dimension=2, output_dimension=4)
        self.target_network= Network(input_dimension=2, output_dimension=4)
        # Define the optimiser which is used when updating the Q-network. The learning rate determines how big each gradient step is during backpropagation.
        self.optimiser = torch.optim.Adam(self.q_network.parameters(), lr=0.005)
        self.optimiser_target = torch.optim.Adam(self.target_network.parameters(), lr=0.005)

    def _calculate_loss_bellman_minibatch_with_target(self, minibatch, gamma=0.9):
        #transition = (self.state, discrete_action, reward, next_state)
        #unzip the values in the minibatch
        states, actions, rewards, next_states = zip(*minibatch)

        state_tensor=torch.tensor(states, dtype=torch.float32)
        next_state_tensor=torch.tensor(next_states, dtype=torch.float32)
        action_tensor=torch.tensor(actions, dtype=torch.int64)
        reward_tensor = torch.tensor(rewards, dtype=torch.float32)
        predicted_q_value_tensor = self.q_network.forward(state_tensor).gather(dim=1, index=action_tensor.unsqueeze(-1)).squeeze(-1)
        succesor_q_value_tensor = torch.amax(self.target_network.forward(next_state_tensor),1)
        succesor_q_value_tensor = succesor_q_value_tensor.detach()
        label = reward_tensor + gamma * succesor_q_value_tensor 
        loss = torch.nn.MSELoss()(predicted_q_value_tensor, label)
        return loss

    def train_q_network_bellman_minibatch_loss_with_target(self, minibatch):
        # Set all the gradients stored in the optimiser to zero.
        self.optimiser.zero_grad()
        # Calculate the loss for this transition.
        loss = self._calculate_loss_bellman_minibatch_with_target(minibatch)
        # Compute the gradients based on this loss, i.e. the gradients of the loss with respect to the Q-network parameters.
        loss.backward()
        # Take one gradient step to update the Q-network.
        self.optimiser.step()
        # Return the loss as a scalar
        return loss.item()
    
    def update_target_network(self):
        q_dict = self.q_network.state_dict()
        self.target_network.load_state_dict(q_dict)
        
class ReplayBuffer:
    def __init__(self, maxlength):
        # self.length = length
        self.replay_buffer = collections.deque(maxlen=maxlength)
        self.counter = 0
        self.x_mean = 0
        self.y_mean = 0
        # self.batch_size = size

    def buffer_size(self):
        return len(self.replay_buffer)

    def append_transition(self, transition):
        self.replay_buffer.append(transition)
        self.counter += 1
        size = len(self.replay_buffer)
        if self.counter % 200 == 0:
            x_list=[]
            y_list=[]
            for x, y, b ,n in self.replay_buffer:
                x_list.append(x[0])
                y_list.append(x[1])

            self.x_mean = statistics.mean(x_list)
            self.y_mean = statistics.mean(y_list)
            # print('X: '+ str(x_mean) + 'Y: '+ str(y_mean))
        return self.x_mean, self.y_mean, size
        

    def rand_mini_batch(self, size):
        # minibatch_indices = np.random.choice(range(len(self.replay_buffer)), size)
        # print(minibatch_indices)
        minibatch=[]
        if len(self.replay_buffer) > size:
            minibatch_indices = np.random.choice(range(len(self.replay_buffer)), size)
            for i in range(size):
                minibatch.append([])
        else:
            minibatch_indices = np.random.choice(range(len(self.replay_buffer)), len(self.replay_buffer))
            for i in range(len(self.replay_buffer)):
                minibatch.append([])
        # print(minibatch_indices)
        for i in range(len(minibatch)):
            index = minibatch_indices[i]
            minibatch[i]=np.array(self.replay_buffer[index])

        return minibatch
##########################################################################################
##                                                                                      ##
##                                  A G E N T                                           ##
##                                                                                      ##
##########################################################################################

class Agent:

    # Function to initialise the agent
    def __init__(self):
        # Set the episode length
        self.episode_length = 500
        # Reset the total number of steps which the agent has taken
        self.num_steps_taken = 0
        # The state variable stores the latest state of the agent in the environment
        self.state = None
        # The action variable stores the latest action which the agent has applied to the environment
        self.action = None
        self.dqn = DQN()
        self.dqn.update_target_network()
        self.buffer = ReplayBuffer(maxlength=5000)
        self.reward = None
        self.episode_counter = -1
        self.training = True
        self.greedy_run = True
        self.last_greedy_run_score = 66

    # Function to check whether the agent has reached the end of an episode
    def has_finished_episode(self):
        if self.num_steps_taken % self.episode_length == 0:
            self.num_steps_taken = 0
            self.greedy_run = not self.greedy_run
            if not self.greedy_run:
                self.episode_counter += 1
            if self.greedy_run:
                self.episode_length = 100
            return True
        else:
            return False

    # Function to get the greedy action for a particular state
    def get_greedy_action(self, state):
        # 
        state_tensor=torch.tensor(state).unsqueeze(0)
        discrete_action = torch.argmax(self.dqn.q_network.forward(state_tensor)).item()
        action = self._discrete_action_to_continuous(discrete_action)
        return action

    def get_greedy_action_discrete(self, state):
        # 
        state_tensor=torch.tensor(state).unsqueeze(0)
        discrete_action = torch.argmax(self.dqn.q_network.forward(state_tensor)).item()
        return discrete_action
    
    def get_epsilon_action(self):
        # epsilon greedy function
        episode = self.episode_counter
        steps = self.num_steps_taken
        # decay of epsilon over episodes
        eq = 1 * 0.95 ** episode
        if 25 > episode > 15:
            self.episode_length = 300
            epsilon = 0.1
        elif episode == 0:
            self.episode_length = 2000
            epsilon = 1
        else:
            self.episode_length = 500
            epsilon = eq
        if random.uniform(0,1) > epsilon:
            state_tensor=torch.tensor(self.state).unsqueeze(0)
            action = torch.argmax(self.dqn.q_network.forward(state_tensor)).item()
        else:
            action_list = [0]*35 + [1] *30 +[3] * 30 + [2] * 5
            action = random.choice(action_list)
            # action = random.randint(0,3)
        return action, epsilon, episode

    def _discrete_action_to_continuous(self, discrete_action):
        #NESW 0,1,2,3
        if discrete_action == 0:
            # right
            continuous_action = np.array([0.02, 0], dtype=np.float32)
        elif discrete_action == 1:
            # up
            continuous_action = np.array([0, 0.02], dtype=np.float32)
        elif discrete_action == 2:
            # Move left
            continuous_action = np.array([-0.02, 0], dtype=np.float32)
        elif discrete_action == 3:
            # Move down
            continuous_action = np.array([0, -0.02], dtype=np.float32)
        return continuous_action
    
        # Function to get the next action, using whatever method you like
    def get_next_action(self, state):
        self.state = state
        # Choose the next action.
        if self.greedy_run == True or not self.training:
            discrete_action = self.get_greedy_action_discrete(state)
            # print("GREEDY RUN" + " episode: " + str(self.episode_counter+0.5) + " step " + str(self.num_steps_taken))
        else:
            discrete_action, epsilon, episode = self.get_epsilon_action()
            # print('epsilon: '+ str(round(epsilon,3))+' episode: '+ str(episode)+' step: '+ str(self.num_steps_taken))
        # Store the action; this will be used later, when storing the transition
        self.action = discrete_action
        # Convert the discrete action into a continuous action.
        action = self._discrete_action_to_continuous(discrete_action)    
        # Update the number of steps which the agent has taken
        self.num_steps_taken += 1
        
        return action

    # Function to set the next state and distance, which resulted from applying action self.action at state self.state
    def set_next_state_and_distance(self, next_state, distance_to_goal):
        # Reward function, containing reducing condition if the chosen action lead to hitting the wall
        reward = 0.2*(2**(1/2) - distance_to_goal)**2
        if self.state[0] == next_state[0] and self.state[1] == next_state[1]:
            reward = reward - 0.005

        transition = (self.state, self.action, reward, next_state)

        if self.greedy_run:
            self.last_greedy_run_score = distance_to_goal
            if distance_to_goal < 0.03:
                self.training = False
    
        if not self.training :
            print("Training stopped!" + " greedy_score: " + str(self.last_greedy_run_score) )
        elif self.greedy_run:
            print("run run run!" + " greedy_score: " + str(round(self.last_greedy_run_score,4)))
        else:
            # adding transition to buffer
            x_mean, y_mean, size = self.buffer.append_transition(transition)
            print("reward: " + str(round(reward,3))+" x_mean: " + str(round(x_mean,3))+ " y_mean: " + str(round(y_mean,3))+ " size: " + str(size)+ " greedy_score: " + str(round(self.last_greedy_run_score,4)))

            #get minibatch from buffer if there's enough samples there
            if self.buffer.buffer_size() >= 100:
                minibatch = self.buffer.rand_mini_batch(2000) #+ int(self.num_steps_taken/100)
                loss_value = self.dqn.train_q_network_bellman_minibatch_loss_with_target(minibatch)
            
            if self.num_steps_taken%500 == 0:
                self.dqn.update_target_network()

    def training_finished(self):
        return self.training

The implemented solution for solving the maze environment is Deep Q-Learning with Replay Buffer and Target Network. It additionally contains an evaluation check after each training episode that can stop the training given the agent reaches the goal with greedy policy. The hyper parameters were hand-tuned using trial and error method combined with understanding of the underlying principles. Alternative implementation included a variation of Prioritised Experience Replay, however it was found that it actually worsen the performance of the network due to added computational complexity. The implementation is outlined below.

Lines 8-24 contain Network class. This class allows for initiation of the neural net- work with given architecture using torch module. Additionally, the forward function is defined which is responsible for gaining network’s output given an input.

Lines 27-68 contain DQN class. This class is responsible for training of the neural net. The two instances of the Network class are initiated with respective optimisers, the regular q network and the target network. Further functions are responsible for calculation of the loss and its back-propagation. The mean-square-error loss of the mini-batch is calculated using Bellman equation, with incorporated target network for successive state-action value prediction.

Lines 70-116 contain ReplayBuffer class. This class allows for creation of container where the agent’s transition will be put into. It contains methods for appending the transition to the buffer (container) and for sampling of random mini-batch for training purposes.

Lines 123-260 contain the main body of code in form of Agent class. Firstly, the instances of DQN and ReplayBuffer are initiated, along with multiple numerical and flag variables. Function ”has finished episode” is responsible for checking whether the episode is done and additionally it initiates the greedy run evaluating the policy every other episode. Function ”get greedy action” calculates the greedy action by taking the argmax on forward pass through neural. Function ”get epsilon function” can either return the greedy function, with the same mechanism as the previously mentioned function, or a randomized action. The randomized action has respectively 35, 30, 30 and 5 percent chance of being right, up, down and left. This distribution favorizes actions leading to the goal state. Epsilon is decaying with the number of episodes, in order to ensure high exploration at the start of training and then higher specialization in getting the right values for optimal path states. The function ”get next action” checks whether the agent is in training or evaluation mode and then the respective epsilon-greedy or greedy action is gotten. The function ”set next state and distance” is responsible for training of the agent. Firstly, it calculates the reward based on the supplied distance to goal. The reward function was tuned to give better value differentiation near the initial state. Next, if the agent is in training mode, the transition is appended to the buffer and a mini-batch is randomly sampled from it. Based on that batch, the prediction loss is calculated and back propagated through network for learning. Finally, the target network is updated with set frequency.

Previous
Previous

Brain Machine Interface

Next
Next

Neural Decoder