Gradient free optimization

Let's train a low-level controller using gradient-free optimization random search. The controller will map from an aim-point to low-level controls, just like the hand-designed controller of homework 6. Unlike homework 6, we wont use the re-projection, but directly use the aim-point in 2d screen coordinates as an input ($x,y \in [-1,1]$).

To speed things up, we'll also use the ray library.

In [1]:
%matplotlib inline
import pystk
import ray
import numpy as np
import torch

{'node_ip_address': '',
 'redis_address': '',
 'object_store_address': '/tmp/ray/session_2019-11-19_22-52-47_293226_9403/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-11-19_22-52-47_293226_9403/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2019-11-19_22-52-47_293226_9403'}

Setup code (ignore for now)

The class below run a controller on inside of ray (for parallelization). You can ignore this part, and just remember that we can use Rollout.rollout(controller) to test how well a controller does. This might be useful for the final project though.

In [2]:
class Rollout(object):
    def __init__(self, config: pystk.GraphicsConfig = None):
        if config is None:
            config = pystk.GraphicsConfig.ld()
            config.screen_width = 200
            config.screen_height = 150
        self.graphics_config = config
        self.config = None
        self.race = None
        self.track = None

    def __del__(self):
        if self.race is not None:
            del self.race

    def start(self, config: pystk.RaceConfig = None):
        if config is None:
            config = pystk.RaceConfig()
            config.players[0].controller = pystk.PlayerConfig.Controller.PLAYER_CONTROL
            config.track = "zengarden"
            config.step_size = 0.1

        if self.race is not None:
            del self.race

        self.config = config

        self.race = pystk.Race(config)

        self.track = pystk.Track()

    def stop(self):
        if self.race is not None:
            del self.race

        self.config = None
        self.race = None
        self.track = None
    def _point_on_track(distance, track, offset=0.0):
        node_idx = np.searchsorted(track.path_distance[..., 1],
                                   distance % track.path_distance[-1, 1]) % len(track.path_nodes)
        d = track.path_distance[node_idx]
        x = track.path_nodes[node_idx]
        t = (distance + offset - d[0]) / (d[1] - d[0])
        return x[1] * t + x[0] * (1 - t)
    def _to_image(self, x, proj, view):
        W, H = self.graphics_config.screen_width, self.graphics_config.screen_height
        p = proj @ view @ np.array(list(x) + [1])
        return np.array([p[0] / p[-1], - p[1] / p[-1]])

    def rollout(self, c, max_step: float = 100, restart: bool = True, return_data: bool = False):
        :param c: A controller that maps a 2D aim point to a command
        import collections
        Data = collections.namedtuple('Data', 'aim_point distance image')
        DistanceOnly = collections.namedtuple('Data', 'distance')
        assert self.race is not None, "You need to start the case before the rollout"

        if restart:

        next_action = pystk.Action()
        result = []
        for it in range(max_step):

            state = pystk.WorldState()
            kart = state.players[0].kart
            proj = np.array(state.players[0].camera.projection).T
            view = np.array(state.players[0].camera.view).T

            aim_point_world = self._point_on_track(kart.distance_down_track + 20, self.track)
            ap = self._to_image(aim_point_world, proj, view)
            # Take an action using the 2D aim-point
            next_action = c(ap)

            # Should we return any data
            distance=kart.overall_distance / self.track.length
            if return_data:
                result.append(Data(aim_point=ap, distance=distance, image=1*np.asarray(self.race.render_data[0].image)))
        if return_data:
            return result
        return distance

# Ray will just run SuperTuxKart in parallel
class RayRollout(Rollout):

# Some visalization code, no need to understand this...
def dummy_control(ap):
    return pystk.Action(steer=0, acceleration=1)

def draw_aimpoint(data):
    f = 1*data.image
    a = data.aim_point
    x, y = int(np.clip(0.5*(a[0]+1), 0, 0.999) * f.shape[1]), int(np.clip(0.5*(a[1]+1), 0, 0.999) * f.shape[0])
    f[max(0,y-5):y+5, max(0,x-5):x+5] = [255,128,128]
    return f

def show_video(frames, fps=10):
    import imageio
    from IPython.display import Video, display 
    imageio.mimwrite('/tmp/test.mp4', frames, fps=fps); 
    display(Video('/tmp/test.mp4', width=480, height=360, embed=True))

def visualize_controller(c, max_step=100):
    if not hasattr(visualize_controller, 'ro'):
        hd_config = pystk.GraphicsConfig.hd()
        hd_config.screen_width = 600
        hd_config.screen_height = 450 = RayRollout.remote(hd_config)
    data = ray.get(, max_step=max_step, return_data=True))
    show_video([draw_aimpoint(d) for d in data], fps=30)
    return data[-1].distance

IMAGEIO FFMPEG_WRITER WARNING: input image is not divisible by macro_block_size=16, resizing from (600, 450) to (608, 464) to ensure video compatibility with most codecs and players. To prevent resizing, make your input image divisible by the macro_block_size or set the macro_block_size to 1 (risking incompatibility).