
Use PyTorch on a single node

This notebook demonstrates how to use PyTorch on the Spark driver node to fit a neural network on MNIST handwritten digit recognition data.


  • PyTorch installed


  • GPU-enabled cluster

The content of this notebook is copied from the PyTorch project under the license with slight modifications in comments. Thanks to the developers of PyTorch for this example.

dbutils.widgets.dropdown('USE_GPU', 'no', ['no', 'yes'])
USE_GPU = dbutils.widgets.get('USE_GPU') == 'yes'
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from collections import namedtuple

import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

Handwritten Digit Recognition

This tutorial walks through a classic computer vision application of identifying handwritten digits. We train a simple Convolutional Neural Network on the MNIST dataset.


While this notebook runs on both non-GPU and GPU clusters, we recommend using a GPU cluster. If you change the Widget USE_GPU to yes, PyTorch will take advantage of CUDA-enabled GPU and run faster.

  • batch_size: number of examples in a training mini-batch
  • test_batch_size: number of examples in a testing/inference mini-batch. This is usually larger than batch_size since we don't have to do backward pass during testing/inference.

The training algorithm goes through the training set for epochs passes. We use Stochastic Gradient Descent with learning rate lr and momentum factor momentum. Please check PyTorch SGD implementation for details.

MNIST_DIR = '/tmp/data/mnist'
use_cuda = USE_GPU and torch.cuda.is_available()

Params = namedtuple('Params', ['batch_size', 'test_batch_size', 'epochs', 'lr', 'momentum', 'seed', 'cuda', 'log_interval'])
args = Params(batch_size=64, test_batch_size=1000, epochs=10, lr=0.01, momentum=0.5, seed=1, cuda=use_cuda, log_interval=200)

Prepare MNIST Dataset

We download the dataset, shuffle the rows, create batches and standardize the features.


data_transform_fn = transforms.Compose([
    transforms.Normalize((0.1307,), (0.3081,))])

train_loader =
        datasets.MNIST(MNIST_DIR, train=True, download=True,
        batch_size=args.batch_size, shuffle=True, num_workers=1)

test_loader =
        datasets.MNIST(MNIST_DIR, train=False, 
        batch_size=args.test_batch_size, shuffle=True, num_workers=1)

Construct a CNN model

Now we create a simple CNN model with two convolutional layers (conv) and two fully connected (fc) layers. We also add a dropout layer between the conv and fc layers.

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x,
        x = self.fc2(x)
        return F.log_softmax(x)
model = Net()
model.share_memory() # gradients are allocated lazily, so they are not shared here
Out[4]: Net( (conv1): Conv2d(1, 10, kernel_size=(5, 5), stride=(1, 1)) (conv2): Conv2d(10, 20, kernel_size=(5, 5), stride=(1, 1)) (conv2_drop): Dropout2d(p=0.5) (fc1): Linear(in_features=320, out_features=50, bias=True) (fc2): Linear(in_features=50, out_features=10, bias=True) )

Model Training

To train the model, let us define a Negative Log Likelihood loss and create a Stochastic Gradient Descent optimizer with momentum. Calling loss.backward() followed by optimizer.step() updates the model parameters.

def train_epoch(epoch, args, model, data_loader, optimizer):
    for batch_idx, (data, target) in enumerate(data_loader):
        if args.cuda:
            data, target = data.cuda(), target.cuda()      
        data, target = Variable(data), Variable(target)
        output = model(data)
        loss = F.nll_loss(output, target)
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(data_loader.dataset),
                100. * batch_idx / len(data_loader),

def test_epoch(model, data_loader):
    test_loss = 0
    correct = 0
    for data, target in data_loader:
        if args.cuda:
            data, target = data.cuda(), target.cuda()      
        data, target = Variable(data, volatile=True), Variable(target)
        output = model(data)
        test_loss += F.nll_loss(output, target, size_average=False).data.item() # sum up batch loss
        pred =[1] # get the index of the max log-probability
        correct += pred.eq(

    test_loss /= len(data_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(data_loader.dataset),
        100. * correct / len(data_loader.dataset)))

# Run the training loop over the epochs (evaluate after each)
if args.cuda:
    model = model.cuda()
optimizer = optim.SGD(model.parameters(),, momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
    train_epoch(epoch, args, model, train_loader, optimizer)
    test_epoch(model, test_loader)
/local_disk0/tmp/1562789836403-0/ UserWarning: Implicit dimension choice for log_softmax has been deprecated. Change the call to include dim=X as an argument. from six import StringIO # cString does not support unicode well Train Epoch: 1 [0/60000 (0%)] Loss: 2.332586 Train Epoch: 1 [12800/60000 (21%)] Loss: 1.336145 Train Epoch: 1 [25600/60000 (43%)] Loss: 0.672064 Train Epoch: 1 [38400/60000 (64%)] Loss: 0.711025 Train Epoch: 1 [51200/60000 (85%)] Loss: 0.805226 /local_disk0/tmp/1562789836403-0/ UserWarning: volatile was removed and now has no effect. Use `with torch.no_grad():` instead. import matplotlib as mpl /databricks/python/lib/python3.6/site-packages/torch/nn/ UserWarning: size_average and reduce args will be deprecated, please use reduction='sum' instead. warnings.warn(warning.format(ret)) Test set: Average loss: 0.2064, Accuracy: 9386/10000 (93%) Train Epoch: 2 [0/60000 (0%)] Loss: 0.468238 Train Epoch: 2 [12800/60000 (21%)] Loss: 0.394495 Train Epoch: 2 [25600/60000 (43%)] Loss: 0.586426 Train Epoch: 2 [38400/60000 (64%)] Loss: 0.393710 Train Epoch: 2 [51200/60000 (85%)] Loss: 0.222817 Test set: Average loss: 0.1327, Accuracy: 9601/10000 (96%) Train Epoch: 3 [0/60000 (0%)] Loss: 0.374804 Train Epoch: 3 [12800/60000 (21%)] Loss: 0.407537 Train Epoch: 3 [25600/60000 (43%)] Loss: 0.270941 Train Epoch: 3 [38400/60000 (64%)] Loss: 0.261346 Train Epoch: 3 [51200/60000 (85%)] Loss: 0.363663 Test set: Average loss: 0.1042, Accuracy: 9676/10000 (96%) Train Epoch: 4 [0/60000 (0%)] Loss: 0.282447 Train Epoch: 4 [12800/60000 (21%)] Loss: 0.706974 Train Epoch: 4 [25600/60000 (43%)] Loss: 0.122113 Train Epoch: 4 [38400/60000 (64%)] Loss: 0.376030 Train Epoch: 4 [51200/60000 (85%)] Loss: 0.229221 Test set: Average loss: 0.0906, Accuracy: 9714/10000 (97%) Train Epoch: 5 [0/60000 (0%)] Loss: 0.116464 Train Epoch: 5 [12800/60000 (21%)] Loss: 0.241802 Train Epoch: 5 [25600/60000 (43%)] Loss: 0.295382 Train Epoch: 5 [38400/60000 (64%)] Loss: 0.616222 Train Epoch: 5 [51200/60000 (85%)] Loss: 0.251576

We can see key information about training in the output:

  • At prescribed logging intervals, the training process prints out how many samples in the training set it has gone through in the current epoch, and the loss of the current training mini-batch.
  • We can see that over the epochs, on the testing set, the loss decreases and the accuracy increases, suggesting that the model is indeed improving, even though at batch level there might be fluctuations.