Coding Week 12

This week we will train a deep network to balance a pole on a cart (CartPole-v1) using vanilla policy gradient (REINFORCE).

pole

A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over. A reward of +1 is provided for every timestep that the pole remains upright. The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.

Inputs: pole angle, velocity, cart location, velocity.

Output: left (-1) or right (+1) action

Setup

We will use OpenAI gym in this assignment

In [1]:
from pathlib import Path

import torch
import numpy as np
import gym

from torch.distributions import Categorical

import IPython.display as ipd
from IPython.display import Video

def sample_categorical(logits):
    return Categorical(logits=logits).sample().item()

def rollout(env, policy, sample=sample_categorical, max_steps=500):
    """
      Roll out a policy.
      env: Gym environment to roll out
      policy: A function that maps states s to distributions of actions (logits)
      sample: A sampling function that convert logits to samples
    """
    s_l_a_r = list()
    s = env.reset()

    for _ in range(max_steps):
        # Ask out policy what to do
        logit = policy(s)
        # Sample an action according to the policy
        a = sample(logit)
        # Take a step in the environment
        sp, r, done, info = env.step(a)
        # Store the result
        s_l_a_r.append((s, logit, a, r))
        # and move to the new state
        s = sp
        
        if done: break

    return s_l_a_r

Out network (policy) is a two layer network. I know it's underwhelming :(

In [14]:
class Policy(torch.nn.Module):  
    def __init__(self, env, k=16):
        super().__init__()
        
        self.net = torch.nn.Sequential(
            torch.nn.Linear(env.observation_space.shape[0], k),
#             torch.nn.ReLU(),
            torch.nn.Linear(k, env.action_space.n))

    def forward(self, s):
        return self.net(torch.FloatTensor(s))

Recall what we're trying to do in RL: maximize the expected return of a policy $\pi$ (or in turn minmize a los $L$) $$ -L = E_{\tau \sim P_\pi}[R(\tau)], $$ where $\tau = \{s_0, a_0, s_1, a_1, \ldots\}$ is a trajectory of states and actions. The return of a trajectory is then defined as the sum of individual rewards $R(\tau) = \sum_k r(s_k)$ (we won't discount in this assignment).

Policy gradient computes the gradient of the loss $L$ using the log-derivative trick $$ \nabla_\pi L = -E_{\tau \sim P_\pi}[\sum_k r(s_k) \nabla_\pi \sum_i \log \pi(a_i | s_i)]. $$ Since the return $r(s_k)$ only depends on action $a_i$ in the past $i < k$ we can further simplify the above equation: $$ \nabla_\pi L = -E_{\tau \sim P_\pi}\left[\sum_i \left(\nabla_\pi \log \pi(a_i | s_i)\right)\left(\sum_{k=i}^{|\tau|} r(s_k) \right)\right]. $$ We will implement an estimator for this objective below. There are a few steps that we need to follow:

  • The expectation $E_{\tau \sim P_\pi}$ are rollouts of our policy
  • The log probability $\log \pi(a_i | s_i)$ uses the Categorical.log_prob
  • Gradient computation uses the .backward() function
  • The gradient $\nabla_\pi L$ is then used in a standard optimizer
In [ ]:
# Let's setup the gym environment, ignore visual_env
env = gym.make('CartPole-v1')
visual_env = gym.wrappers.Monitor(env, './gym-results', force=True, video_callable=lambda i: i%2==0)

policy = Policy(env)
optim = torch.optim.RMSprop(policy.parameters(), lr=1e-2)

# Training starts here
for epoch in range(20):
    play_time = []
    
    for it in range(100):
        # Rollout
        trajectory = rollout(env, policy)
        play_time.append(len(trajectory))
        # Compute (future) return and gradient
        R = 0
        optim.zero_grad()
        for s, logit, a, r in reversed(trajectory):
            R += r
            p = Categorical(logits=logit)
            l = p.log_prob(torch.tensor(a))
            (-l*R).backward()
        optim.step()
    # Display how well we're doing and show a nice video
    trajectory = rollout(visual_env, policy, sample=lambda x: int(x[1] > x[0]))
    print('%d: %.2f (%.2f)' % (epoch, np.mean(play_time), len(trajectory)))
    if epoch % 2 == 1:
        ipd.display(Video(str(list(sorted(Path('./gym-results/').glob('*.mp4')))[-1])))
0: 40.26 (66.00)
1: 51.65 (35.00)
2: 54.24 (500.00)
3: 276.22 (500.00)
4: 439.64 (500.00)
5: 500.00 (500.00)
In [7]:
help(p.log_prob)
Help on method log_prob in module torch.distributions.categorical:

log_prob(value) method of torch.distributions.categorical.Categorical instance
    Returns the log of the probability density/mass function evaluated at
    `value`.
    
    Args:
        value (Tensor):