Simple Q-learning based controller for epuck robot

In this example, we try to train a Q-learning based agent that controls a robot so that it moves on a straight line. Specifically, we simulate the epuck robot from the Webots library. The world is a simple rectangular grid shown in the image below

../../../_images/simple_board.png

There are two actions available, MOVE_FWD and STOP. The agent begins at position (0, 0, 0) which is the center of the grid. The goal is to move the robot at [-2.5, 0.0] and stop there.

The controller, we will develop in this example, is very simple. Ideally, we would like to monitor some aspect of the robot e.g. the speed input to the motors. However, the controller in this example will simply select actions to be executed. The result of this, is that our robot may drift during the simulation. However, since the action space is really small and we limit the mumber of simulation episodes to a small number, this drift will not be be signigicant. We will address these points in a later example.

Recall, that the vanilla Q-learning algorithm, uses a table tp represent the state-action value function. Thus, we will digitize the grid environment using ten points in every direction. This is done in the EnvironmentWrapper class.

Let’s go over the code. As usual, we start with the needed imports

import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt

from controller import Supervisor, Node

from src.worlds.webot_environment_wrapper import EnvironmentWrapper, EnvConfig
from src.worlds.state_aggregation_webot_env import StateAggregationWebotEnv, StateAggregationWebotEnvBoundaries
from src.worlds.webot_robot_action_space import WebotRobotActionType, WebotRobotActionBase, \
    WebotRobotMoveFWDAction, WebotRobotStopAction

from src.agents.diff_drive_robot_qlearner import DiffDriveRobotQLearner
from src.algorithms.td.td_algorithm_base import TDAlgoInput
from src.policies import EpsilonGreedyPolicy, EpsilonDecayOption
from src.utils import INFO

Next we define constants that control the program execution

# Define a variable that defines the duration of each physics step.
# This macro will be used as argument to the Robot::step function,
# and it will also be used to enable the devices.
# This duration is specified in milliseconds and it must
# be a multiple of the value in the basicTimeStep field of the WorldInfo node.
TIME_STEP = 32

MAX_SPEED = 6.28
MIN_SPEED = 0.0

# threshold value for the proximity sensors
# to identify the fact that the robot crushed the wall
BUMP_THESHOLD = 90
N_EPISODES = 1000
PLOT_STEP = 10
N_ITRS_PER_EPISODE = 2000
EPS = 1.0
EPS_DECAY_OP = EpsilonDecayOption.INVERSE_STEP
ALPHA = 0.1
GAMMA = 0.99

Next, we define a function that allows us to plot a running mean of the gained rewards

def plot_running_avg(avg_rewards, step):

    running_avg = np.empty(avg_rewards.shape[0])
    for t in range(avg_rewards.shape[0]):
        running_avg[t] = np.mean(avg_rewards[max(0, t - step): (t + 1)])
    plt.plot(running_avg)
    plt.xlabel("Number of episodes")
    plt.ylabel("Reward")
    plt.title("Running average")
    plt.show()

The Policy class represents a simple policy strategy we will follow. Observe, that when at state (5, 5) i.e. the center of the grid, we always select MOVE_FWD action

class Policy(EpsilonGreedyPolicy):
        def __init__(self, n_actions: int) -> None:
                super(Policy, self).__init__(eps=EPS, decay_op=EPS_DECAY_OP, n_actions=n_actions)

        def __call__(self, q_func, state) -> int:

                # if we are at the origin always choose FWD
                if state == (5, 5):
                    return 1

                return super(Policy, self).__call__(q_func, state)

        def select_action(self, q_func, state) -> int:
                # if we are at the origin always choose FWD
                if state == (5, 5):
                    return 1

                return super(Policy, self).max_action(q_func, state=state, n_actions=self.n_actions)

The OnGoal class deals with rewards assignement. It checks whether the robot reached the goal position and which action has been chosen.

class OnGoal(object):

        def __init__(self, goal_position: list) -> None:

                # radius away from the goal
                self.goal_radius: float = 0.1
                self.robot_radius = 7.4 / 100.0
                self.goal_position = np.array(goal_position)
                self.start_position = np.array([0., 0., 0., ])

        def check(self, robot_node: Node, action: WebotRobotActionBase) -> tuple:

                position = robot_node.getPosition()
                position = np.array(position)

                # compute l2 norm from goal
                l2_norm = np.linalg.norm(position - self.start_position)

                # we don't want to be stacked where we started
                # we want to make progress. If we are at the
                # start position and decide to STOP then exit the game
                if l2_norm < 1.0e-4 and action.action_type == WebotRobotActionType.STOP:
                    return True, -2.0, l2_norm

                # compute l2 norm from goal
                l2_norm = np.linalg.norm(position - self.goal_position)

                if l2_norm < self.goal_radius:

                    # we reached the goal but we also want
                    # the robot to stop

                    if action.action_type == WebotRobotActionType.STOP:
                        return True, 10.0, l2_norm
                    else:
                        # otherwise punish the agent
                        return False, -2.0, l2_norm

                # goal has not been reached. No reason to stop
                # so penalize this choice
                if action.action_type == WebotRobotActionType.STOP:
                    return False, -2.0, l2_norm

                return False, 0.0, l2_norm

Finally, the controller_main function puts everything together. We need a Supervisor Webot Node so that we can reset the environment. We obtain the robot node using the supervisor.getFromDef(name='qlearn_e_puck') function.

def controller_main():

        # number of steps to play
        supervisor = Supervisor()

        robot_node = supervisor.getFromDef(name='qlearn_e_puck')

        if robot_node is None:
                raise ValueError("Robot node is None")

        robot_node.enablePoseTracking(TIME_STEP)

        goal_position = [0.0, 0.0, -2.5]
        on_goal_criterion = OnGoal(goal_position=goal_position)

        robot = supervisor
        env_config = EnvConfig()
        env_config.dt = TIME_STEP
        env_config.robot_name = "qlearn_e_puck"
        env_config.bump_threshold = BUMP_THESHOLD
        env_config.on_goal_criterion = on_goal_criterion
        env_config.reward_on_wall_crush = -5.0
        environment = EnvironmentWrapper(robot=robot, robot_node=robot_node, config=env_config)

        environment.add_action(action=WebotRobotStopAction())
        environment.add_action(action=WebotRobotMoveFWDAction(motor_speed=MAX_SPEED))

        # position aggregation environment
        boundaries = StateAggregationWebotEnvBoundaries(xcoords=(-3.0, 3.0),
                                                        ycoords=(-3.0, 3.0))

        state_aggregation_env = StateAggregationWebotEnv(env=environment,
                                             boundaries=boundaries, states=(10, 10))

        agent_config = TDAlgoInput()
        agent_config.n_episodes = N_EPISODES
        agent_config.n_itrs_per_episode = N_ITRS_PER_EPISODE
        agent_config.gamma = GAMMA
        agent_config.alpha = ALPHA
        agent_config.output_freq = 1
        agent_config.train_env = state_aggregation_env
        agent_config.policy = Policy(n_actions=state_aggregation_env.n_actions)

        agent = DiffDriveRobotQLearner(algo_in=agent_config)
        agent.train()

        agent.save_q_function(filename=Path("q_learner.json"))
        plot_running_avg(agent.total_rewards, step=PLOT_STEP)

        print("{0} Finished training".format(INFO))
        # once the agent is trained let's play
        agent.training_finished = True
        agent.load_q_function(filename=Path("q_learner.json"))
        agent.play(env=state_aggregation_env, n_games=1)
if __name__ == '__main__':

        controller_main()

In order to run this example, we will need to launch the environment on Webots. Running the driver code above produces the following running average rewards

../../../_images/rewards_q_learn_simple_controller.png