Coding Week 11

This week we will train a deep network that learns to drive in SuperTuxKart. Unlike homework 6, we'll map in put images directly to actions (steering, acceleration), but only train and test on a single level 'lighthouse'.

In [ ]:
%matplotlib inline
from coding_11 import pytux, pystk, Autopilot

Data collection

Below is the basic code for data collection. We drive with a Autopilot (part of the solution to HW6) to collect good (image, action) pairs.

In [ ]:
from os import makedirs, path
from shutil import rmtree
from PIL import Image
import numpy as np


DATASET_PATH = 'drive_data'
track = 'lighthouse'


good_ac = None

# TODO: implement this in class.
class NoisyAutopilot(Autopilot):
    def act(self, image, aim_pt, vel):
        return super().act(image, aim_pt, vel)


def collect(_, im, a):
    global n
    fn = path.join(DATASET_PATH, track + '_%05d' % n)
    Image.fromarray(im).save(fn + '.png')
    with open(fn + '.csv', 'w') as f:
        f.write('%0.2f,%0.2f' % (a.steer, a.acceleration))
    n += 1

    
n_images = 10000
n = 0
if path.exists(DATASET_PATH):
    rmtree(DATASET_PATH)
makedirs(DATASET_PATH, exist_ok=True)

agent = Autopilot()


while n < n_images:
    step, _ = pytux.rollout(
        track, agent, max_frames=1000,
        verbose=(n == 0), data_callback=collect)
    
    print('%d / %d frames collected' % (n, n_images))

Data loading

Now that the data is collected, we'll load it using our standard SuperTuxDataset (from HW1-3). No need to change anything here.

In [ ]:
import numpy as np
import torchvision.transforms.functional as TF

from torch.utils.data import Dataset, DataLoader


class ToTensor(object):
    def __call__(self, image, *args):
        return (TF.to_tensor(image),) + args


class SuperTuxDataset(Dataset):
    def __init__(self, dataset_path=DATASET_PATH, transform=ToTensor()):
        from PIL import Image
        from glob import glob
        from os import path
        self.data = []
        for f in glob(path.join(dataset_path, '*.csv')):
            i = Image.open(f.replace('.csv', '.png'))
            i.load()
            self.data.append((i, np.loadtxt(f, dtype=np.float32, delimiter=',')))
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = self.data[idx]
        data = self.transform(*data)
        return data


def load_data(dataset_path=DATASET_PATH, transform=ToTensor(),
              num_workers=8, batch_size=128):
    dataset = SuperTuxDataset(dataset_path, transform=transform)
    return DataLoader(dataset, num_workers=num_workers,
                      batch_size=batch_size, shuffle=True, drop_last=True)

Training

Most of the code below is copied from HW2/HW3 respectively. We don't need data-augmentation since we train and evaluate on the same level for now. However, we do need to setup a training loss, and then convert the network output to SuperTuxKart actions (in act).

In [ ]:
import time
import torch

from torch.utils.tensorboard import SummaryWriter


class CNNClassifier(torch.nn.Module):
    def __init__(self,
                 layers=[16, 32, 64, 128],
                 n_input_channels=3, n_output_channels=6, kernel_size=5):
        super().__init__()

        L = [torch.nn.BatchNorm2d(3)]
        c = n_input_channels
        for l in layers:
            L.append(torch.nn.Conv2d(c, l, kernel_size, stride=2, padding=kernel_size//2))
            L.append(torch.nn.ReLU())
            c = l
        self.network = torch.nn.Sequential(*L)
        self.classifier = torch.nn.Linear(c, n_output_channels)

    def forward(self, x):
        return self.classifier(self.network(x).mean(dim=[2, 3]))

    def act(self, image, _, current_vel):
        # TODO: Implement
        pass


def train(model, train_data, lr=0.001, n_epochs=20):
    device = torch.device('cuda')
    writer = SummaryWriter('log/{}'.format(time.strftime('%H-%M')), flush_secs=5)
    optim = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-6)
    # TODO: Implement
    loss_func = None
    steps = 0
    
    model.to(device)
    
    for epoch in range(n_epochs):
        model.train()

        train_loss = []
        
        for image, action in train_data:
            image, action = image.to(device), action.to(device)
            action_pred = model(image)
            
            loss = loss_func(action_pred, action)
            train_loss.append(loss.item())
          
            optim.zero_grad()
            loss.backward()
            optim.step()
          
            steps += 1
            writer.add_scalar('loss/train_batch', loss.item(), steps)
            
        writer.add_scalar('loss/train_epoch', np.mean(train_loss), epoch)
        
        print('Epoch: %d, train loss: %.4f' % (epoch, np.mean(train_loss)))


model = CNNClassifier(n_output_channels=2)
train_data = load_data(batch_size=256)

train(model, train_data)
In [ ]:
model.eval()

pytux.rollout(track, model, max_frames=1000, verbose=True)
In [ ]: