Tutorial 6#

June 20, 2024#

In this tutorial you will develop, train, and evaluate a CNN that learns to perform deformable image registration in chest X-ray images.

First, let’s take care of the necessities:

  • If you’re using Google Colab, make sure to select a GPU Runtime.

  • Connect to Weights & Biases using the code below.

  • Install a few libraries that we will use in this tutorial.

import os
import wandb

os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
wandb.login()
!pip install monai

Part 1 - Registration#

import monai
import numpy as np
import matplotlib.pyplot as plt
import torch
import wandb

We will register chest X-ray images. We will reuse the data of Tutorial 3. As always, we first set the paths. This should be the path ending in ‘ribs’. If you don’t have the data set anymore, you can download it using the lines below:

!wget https://surfdrive.surf.nl/files/index.php/s/Y4psc2pQnfkJuoT/download -O Tutorial_3.zip
!unzip -qo Tutorial_3.zip
data_path = "ribs"
# ONLY IF YOU USE JUPYTER: ADD PATH ⌨️
data_path = r'ribs'# WHEREDIDYOUPUTTHEDATA?
# ONLY IF YOU USE COLAB: ADD PATH ⌨️
from google.colab import drive

drive.mount('/content/drive')
data_path = r'/content/drive/My Drive/Tutorial3'
# check if data_path exists:
import os

if not os.path.exists(data_path):
    print("Please update your data path to an existing folder.")
elif not set(["train", "val", "test"]).issubset(set(os.listdir(data_path))):
    print("Please update your data path to the correct folder (should contain train, val and test folders).")
else:
    print("Congrats! You selected the correct folder :)")

Data management#

In this part we prepare all the tools needed to load and visualize our samples. One thing we could do is perform inter-patient registration, i.e., register two chest X-ray images of different patients. However, this is a very challenging problem. Instead, to make our life a bit easier, we will perform intra-patient registration: register two images of the same patient. For each patient, we make a synthetic moving image by applying some random elastic deformations. To build this data set, we we used the Rand2DElasticd transform on both the image and the mask. We will use a neural network to learn the deformation field between the fixed image and the moving image. fig

Similarly as in Tutorial 3, make a dictionary of the image file names.

import os
import numpy as np
import matplotlib.pyplot as plt
import glob
import monai
from PIL import Image
import torch

def build_dict_ribs(data_path, mode='train'):
    """
    This function returns a list of dictionaries, each dictionary containing the keys 'img' and 'mask' 
    that returns the path to the corresponding image.
    
    Args:
        data_path (str): path to the root folder of the data set.
        mode (str): subset used. Must correspond to 'train', 'val' or 'test'.
        
    Returns:
        (List[Dict[str, str]]) list of the dictionnaries containing the paths of X-ray images and masks.
    """
    # test if mode is correct
    if mode not in ["train", "val", "test"]:
        raise ValueError(f"Please choose a mode in ['train', 'val', 'test']. Current mode is {mode}.")
    
    # define empty dictionary
    dicts = []
    # list all .png files in directory, including the path
    paths_xray = glob.glob(os.path.join(data_path, mode, 'img', '*.png'))
    # make a corresponding list for all the mask files
    for xray_path in paths_xray:
        if mode == 'test':
            suffix = 'val'
        else:
            suffix = mode
        # find the binary mask that belongs to the original image, based on indexing in the filename
        image_index = os.path.split(xray_path)[1].split('_')[-1].split('.')[0]
        # define path to mask file based on this index and add to list of mask paths
        mask_path = os.path.join(data_path, mode, 'mask', f'VinDr_RibCXR_{suffix}_{image_index}.png')
        if os.path.exists(mask_path):
            dicts.append({'fixed': xray_path, 'moving': xray_path, 'fixed_mask': mask_path, 'moving_mask': mask_path})
    return dicts

class LoadRibData(monai.transforms.Transform):
    """
    This custom Monai transform loads the data from the rib segmentation dataset.
    Defining a custom transform is simple; just overwrite the __init__ function and __call__ function.
    """
    def __init__(self, keys=None):
        pass

    def __call__(self, sample):
        fixed = Image.open(sample['fixed']).convert('L') # import as grayscale image
        fixed = np.array(fixed, dtype=np.uint8)
        moving = Image.open(sample['moving']).convert('L') # import as grayscale image
        moving = np.array(moving, dtype=np.uint8)        
        fixed_mask = Image.open(sample['fixed_mask']).convert('L') # import as grayscale image
        fixed_mask = np.array(fixed_mask, dtype=np.uint8)
        moving_mask = Image.open(sample['moving_mask']).convert('L') # import as grayscale image
        moving_mask = np.array(moving_mask, dtype=np.uint8)        
        # mask has value 255 on rib pixels. Convert to binary array
        fixed_mask[np.where(fixed_mask==255)] = 1
        moving_mask[np.where(moving_mask==255)] = 1        
        return {'fixed': fixed, 'moving': moving, 'fixed_mask': fixed_mask, 'moving_mask': moving_mask, 'img_meta_dict': {'affine': np.eye(2)}, 
                'mask_meta_dict': {'affine': np.eye(2)}}

Then we make a training dataset like before. The Rand2DElasticd transform here determines how much deformation is in the ‘moving’ image.

train_dict_list = build_dict_ribs(data_path, mode='train')

# constructDataset from list of paths + transform
transform = monai.transforms.Compose(
[
    LoadRibData(),
    monai.transforms.AddChanneld(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask']),
    monai.transforms.Resized(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask'], spatial_size=(256, 256),  mode=['bilinear', 'bilinear', 'nearest', 'nearest']),
    monai.transforms.HistogramNormalized(keys=['fixed', 'moving']),
    monai.transforms.ScaleIntensityd(keys=['fixed', 'moving'], minv=0.0, maxv=1.0),
    monai.transforms.Rand2DElasticd(keys=['moving', 'moving_mask'], spacing=(64, 64), 
                                    magnitude_range=(-8, 8), prob=1, mode=['bilinear', 'nearest']),    
])
train_dataset = monai.data.Dataset(train_dict_list, transform=transform)

Exercise

Visualize fixed and moving training images associated to their comparison image with the visualize_fmc_sample function below.

Try different methods to create the comparison image. How well do these different methods allow you to qualitatively assess the quality of the registration?

More information on this method is available in the scikit-image documentation.

def visualize_fmc_sample(sample, method="checkerboard"):
    """
    Plot three images: fixed, moving and comparison.
    
    Args:
        sample (dict): sample of dataset created with `build_dataset`.
        method (str): method used by `skimage.util.compare_image`.
    """
    import skimage.util as skut 
    
    skut_methods = ["diff", "blend", "checkerboard"]
    if method not in skut_methods:
        raise ValueError(f"Method must be chosen in {skut_methods}.\n"
                         f"Current value is {method}.")
    
    
    fixed = np.squeeze(sample['fixed'])
    moving = np.squeeze(sample['moving'])
    comp_checker = skut.compare_images(fixed, moving, method=method)
    axs = plt.figure(constrained_layout=True, figsize=(15, 5)).subplot_mosaic("FMC")
    axs['F'].imshow(fixed, cmap='gray')
    axs['F'].set_title('Fixed')
    axs['M'].imshow(moving, cmap='gray')
    axs['M'].set_title('Moving')
    axs['C'].imshow(comp_checker, cmap='gray')
    axs['C'].set_title('Comparison')
    plt.show()
sample = train_dataset[0]
for method in ["diff", "blend", "checkerboard"]:
    print(f"Method {method}")
    visualize_fmc_sample(sample, method=method)

Now we apply a little trick. Because applying the random deformation in each training iteration will be very costly, we only apply the deformation once and we make a new dataset based on the deformed images. Running the cell below may take a few minutes.

import tqdm

train_loader = monai.data.DataLoader(train_dataset, batch_size=1, shuffle=False)

samples = []
for train_batch in tqdm.tqdm(train_loader):
    samples.append(train_batch)

# Make a new dataset and dataloader using the transformed images
train_dataset = monai.data.Dataset(samples, transform=monai.transforms.SqueezeDimd(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask']))
train_loader = monai.data.DataLoader(train_dataset, batch_size=16, shuffle=False)

Exercise

Create val_dataset and val_loader, corresponding to the DataSet and DataLoader for your validation set. The transforms can be the same as in the training set.

Answer key

val_dict_list = build_dict_ribs(data_path, mode='val')

# constructDataset from list of paths + transform
transform = monai.transforms.Compose(
[
    LoadRibData(),
    monai.transforms.AddChanneld(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask']),
    monai.transforms.Resized(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask'], spatial_size=(256, 256),  mode=['bilinear', 'bilinear', 'nearest', 'nearest']),
    monai.transforms.HistogramNormalized(keys=['fixed', 'moving']),
    monai.transforms.ScaleIntensityd(keys=['fixed', 'moving'], minv=0.0, maxv=1.0),
    monai.transforms.Rand2DElasticd(keys=['moving', 'moving_mask'], spacing=(64, 64), 
                                    magnitude_range=(-8, 8), prob=1, mode=['bilinear', 'nearest']),    
])
val_dataset = monai.data.Dataset(val_dict_list, transform=transform)
val_loader = monai.data.DataLoader(val_dataset, batch_size=1, shuffle=False)

samples = []
for val_batch in tqdm.tqdm(val_loader):
    samples.append(val_batch)

# Make a new dataset and dataloader using the transformed images
val_dataset = monai.data.Dataset(samples, transform=monai.transforms.SqueezeDimd(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask']))
val_loader = monai.data.DataLoader(val_dataset, batch_size=16, shuffle=False)

Model#

As model, we’ll use a U-Net. The input/output structure is quite different from what we’ve seen before:

  • the network takes as input two images: the moving and fixed images.

  • it outputs one tensor representing the deformation field.

img

This deformation field can be applied to the moving image with the monai.networks.blocks.Warp block of Monai.

fig

This deformed moving image is then compared to the fixed image: if they are similar, the deformation field is correctly registering the moving image on the fixed image. Keep in mind that this is done on training data, and we want the U-Net to learn to predict a proper deformation field given two new and unseen images. So we’re not optimizing for a pair of images as would be done in conventional iterative registration, but training a model that can generalize.

fig

Before starting, let’s check that you can work on a GPU by runnning the following cell:

  • if the device is “cuda” you are working on a GPU,

  • if the device is “cpu” call a teacher.

if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    os.environ["PYTORCH_ENABLE_MPS_FALLBACK"]="1"
else:
    device = "cpu"
print(f'The used device is {device}')

Exercise

Construct a U-Net with suitable settings and name it model. Keep in mind that you want to be able to correctly apply its output to the input moving image with the warp_layer!

Answer key

model = monai.networks.nets.UNet(
    spatial_dims=2,
    in_channels=2,
    out_channels=2,
    channels = (8, 16, 32, 64, 128),
    strides=(2, 2, 2, 2),
    num_res_units=2,
).to(device)

warp_layer = monai.networks.blocks.Warp().to(device)

Objective function#

We evaluate the similarity between the fixed image and the deformed moving image with the MSELoss(). The L1 or SSIM losses seen in the previous section could also be used. Furthermore, the deformation field is regularized with BendingEnergyLoss. This is a penalty that takes the smoothness of the deformation field into account: if it’s not smooth enough, the bending energy is high. Thus, our model will favor smooth deformation fields.

Finally, we pick an optimizer, in this case again an Adam optimizer.

image_loss = torch.nn.MSELoss()
regularization = monai.losses.BendingEnergyLoss()
optimizer = torch.optim.Adam(model.parameters(), 1e-3)

Exercise

Add a learning rate scheduler that lowers the learning rate by a factor ten every 100 epochs.

Answer key

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 100, gamma=0.1)

To warp the moving image using the predicted deformation field and then compute the loss between the deformed image and the fixed image, we define a forward function which does all this. The output of this function is pred_image.

def forward(batch_data, model):
    """
    Applies the model to a batch of data.
    
    Args:
        batch_data (dict): a batch of samples computed by a DataLoader.
        model (Module): a model computing the deformation field.
    
    Returns:
        ddf (Tensor): batch of deformation fields.
        pred_image (Tensor): batch of deformed moving images.
    
    """
    fixed_image = batch_data["fixed"].to(device).float()
    moving_image = batch_data["moving"].to(device).float()
    
    # predict DDF
    ddf = model(torch.cat((moving_image, fixed_image), dim=1))

    # warp moving image and label with the predicted ddf
    pred_image = warp_layer(moving_image, ddf)

    return ddf, pred_image

You can supervise the training process in W&B, in which at each epoch a batch of validation images are used to compute the comparison images of your choice, based on the parameter method.

def log_to_wandb(epoch, train_loss, val_loss, pred_batch, fixed_batch, method="checkerboard"):
    """ Function that logs ongoing training variables to W&B """
    import skimage.util as skut
    
    log_imgs = []
    for fixed_pt, pred_pt in zip(pred_batch, fixed_batch):
        fixed_np = np.squeeze(fixed_pt.cpu().detach())
        pred_np = np.squeeze(pred_pt.cpu().detach())
        comp_checker = skut.compare_images(fixed_np, pred_np, method=method)
        log_imgs.append(wandb.Image(comp_checker))

    # Send epoch, losses and images to W&B
    wandb.log({'epoch': epoch, 'train_loss': train_loss, 'val_loss': val_loss, 'results': log_imgs})

Training time#

Use the following cells to train your network. You may choose different parameters to improve the performance!

# Choose your parameters

max_epochs = 200
reg_weight = 0 # By default 0, but you can investigate what it does
from tqdm import tqdm

run = wandb.init(
    project='tutorial4_registration',
    config={
        'lr': optimizer.param_groups[0]["lr"],
        'batch_size': train_loader.batch_size,
        'regularization': reg_weight,
        'loss_function': str(image_loss)
    }
)
# Do not hesitate to enrich this list of settings to be able to correctly keep track of your experiments!
# For example you should add information on your model...

run_id = run.id # We remember here the run ID to be able to write the evaluation metrics

for epoch in tqdm(range(max_epochs)):    
    model.train()
    epoch_loss = 0
    for batch_data in train_loader:
        optimizer.zero_grad()

        ddf, pred_image = forward(batch_data, model)

        fixed_image = batch_data["fixed"].to(device).float()
        reg = regularization(ddf)
        loss = image_loss(pred_image, fixed_image) + reg_weight * reg
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    epoch_loss /= len(train_loader)

    model.eval()
    val_epoch_loss = 0
    for batch_data in val_loader:
        ddf, pred_image = forward(batch_data, model)
        fixed_image = batch_data["fixed"].to(device).float()
        reg = regularization(ddf)
        loss = image_loss(pred_image, fixed_image) + reg_weight * reg
        val_epoch_loss += loss.item()
    val_epoch_loss /= len(val_loader)

    log_to_wandb(epoch, epoch_loss, val_epoch_loss, pred_image, fixed_image)
    
run.finish()    

Evaluation of the trained model#

Now that the model has been trained, it’s time to evaluate its performance. Use the code below to visualize samples and deformation fields.

Exercise

Are you satisfied with these registration results? Do they seem anatomically plausible? Try out different regularization factors (reg_weight) and see what they do to the registration.

Answer key

Depending on the strength of the elastic deformation that was applied when generating the samples, registration may be successful. It could be that there is quite a bit of folding going on. In that case, setting a non-zero positive value for reg_weight will lead to more plausible deformations. If you set this value very high, you will see that the registration performance drops: the deformation vector field is very smooth, but doesn’t align the image any more.

def visualize_prediction(sample, model, method="checkerboard"):
    """
    Plot three images: fixed, moving and comparison.
    
    Args:
        sample (dict): sample of dataset created with `build_dataset`.
        model (Module): a model computing the deformation field.
        method (str): method used by `skimage.util.compare_image`.
    """
    import skimage.util as skut 
    
    skut_methods = ["diff", "blend", "checkerboard"]
    if method not in skut_methods:
        raise ValueError(f"Method must be chosen in {skut_methods}.\n"
                         f"Current value is {method}.")
        
    model.eval()
    
    # Compute deformation field + deformed image
    batch_data = {
        "fixed": sample["fixed"].unsqueeze(0),
        "moving": sample["moving"].unsqueeze(0),
    }
    ddf, pred_image = forward(batch_data, model)
    ddf = ddf.detach().cpu().numpy().squeeze()
    ddf = np.linalg.norm(ddf, axis=0).squeeze()
    
    # Squeeze images
    fixed = np.squeeze(sample["fixed"])
    moving = np.squeeze(sample["moving"])    
    deformed = np.squeeze(pred_image.detach().cpu())
    
    # Generate comparison image
    comp_checker = skut.compare_images(fixed, deformed, method=method, n_tiles=(4, 4))
    
    # Plot everything
    fig, axs = plt.subplots(1, 5, figsize=(18, 5))    
    axs[0].imshow(fixed, cmap='gray')
    axs[0].set_title('Fixed')
    axs[1].imshow(moving, cmap='gray')
    axs[1].set_title('Moving')
    axs[2].imshow(deformed, cmap='gray')
    axs[2].set_title('Deformed')
    axs[3].imshow(comp_checker, cmap='gray')
    axs[3].set_title('Comparison')    
    dpl = axs[4].imshow(ddf, clim=(0, 10))
    fig.colorbar(dpl, ax=axs[4])
    plt.show()   
    plt.show()
for sample in val_dataset:
    visualize_prediction(sample, model)

Exercise

Compute the Jacobian determinant at each image voxel. How many of these are negative? Can you improve upon this?

You can use the code below to compute the Jacobian of your deformation vector field and inspect it.

def get_jacobian(sample, model):
    """
    Computes the jacobian of the deformation field for a given sample
    
    Args:
        sample (dict): sample of dataset created with `build_dataset`.
        model (Module): a model computing the deformation field.
        
    Returns:
        TODO
    """
    model.eval()
    
    batch_data = {
        "fixed": sample["fixed"].unsqueeze(0),
        "moving": sample["moving"].unsqueeze(0),
    }
    
    ddf, pred_image = forward(batch_data, model)
    ddf = ddf.detach().cpu().numpy().squeeze()
    ddf_dx = np.diff(ddf, axis=1, append=ddf[:, -1, :].reshape(2, 1, 256))/256
    ddf_dy = np.diff(ddf, axis=2, append=ddf[:, :, -1].reshape(2, 256, 1))/256
    
    jacobian = ddf_dx[0, :, :] * ddf_dy[1, :, :] - ddf_dx[1, :, :] * ddf_dy[0, :, :]
    return jacobian
for sample in val_dataset:
    jacobian = get_jacobian(sample, model)
    plt.figure()
    plt.imshow(jacobian, cmap='seismic', clim=(-0.003, 0.003))
    plt.colorbar()
    plt.show()

Part 2 - Equivariance#

In this part, we are going to use some concepts that you’ve learned in the lecture on geometric deep learning. We are going to look at the equivariance properties of a neural network architecture that you should by now be very familiar with: the U-Net. We will again use the chest X-ray segmentation problem. Because training a network is not the focus here, we have pretrained a network that you can use for these experiments.

Data loading#

We will again use the same utility functions as in Tutorial 3 to build a dictionary of files and load rib data.

import os
import numpy as np
import matplotlib.pyplot as plt
import glob
import monai
from PIL import Image
import torch

def build_dict_ribs(data_path, mode='train'):
    """
    This function returns a list of dictionaries, each dictionary containing the keys 'img' and 'mask' 
    that returns the path to the corresponding image.
    
    Args:
        data_path (str): path to the root folder of the data set.
        mode (str): subset used. Must correspond to 'train', 'val' or 'test'.
        
    Returns:
        (List[Dict[str, str]]) list of the dictionaries containing the paths of X-ray images and masks.
    """
    # test if mode is correct
    if mode not in ["train", "val", "test"]:
        raise ValueError(f"Please choose a mode in ['train', 'val', 'test']. Current mode is {mode}.")
    
    # define empty dictionary
    dicts = []
    # list all .png files in directory, including the path
    paths_xray = glob.glob(os.path.join(data_path, mode, 'img', '*.png'))
    # make a corresponding list for all the mask files
    for xray_path in paths_xray:
        if mode == 'test':
            suffix = 'val'
        else:
            suffix = mode
        # find the binary mask that belongs to the original image, based on indexing in the filename
        image_index = os.path.split(xray_path)[1].split('_')[-1].split('.')[0]
        # define path to mask file based on this index and add to list of mask paths
        mask_path = os.path.join(data_path, mode, 'mask', f'VinDr_RibCXR_{suffix}_{image_index}.png')
        if os.path.exists(mask_path):
            dicts.append({'img': xray_path, 'mask': mask_path})
    return dicts

class LoadRibData(monai.transforms.Transform):
    """
    This custom Monai transform loads the data from the rib segmentation dataset.
    Defining a custom transform is simple; just overwrite the __init__ function and __call__ function.
    """
    def __init__(self, keys=None):
        pass

    def __call__(self, sample):
        image = Image.open(sample['img']).convert('L') # import as grayscale image
        image = np.array(image, dtype=np.uint8)
        mask = Image.open(sample['mask']).convert('L') # import as grayscale image
        mask = np.array(mask, dtype=np.uint8)
        # mask has value 255 on rib pixels. Convert to binary array
        mask[np.where(mask==255)] = 1
        return {'img': image, 'mask': mask, 'img_meta_dict': {'affine': np.eye(2)}, 
                'mask_meta_dict': {'affine': np.eye(2)}}

Use the cell below to make a validation loader with a single image. This is sufficient for the small experiment that you will perform.

validation_dict_list = build_dict_ribs(data_path, mode='val')
validation_transform = monai.transforms.Compose(
    [
        LoadRibData(),
        monai.transforms.AddChanneld(keys=['img', 'mask']),
        monai.transforms.HistogramNormalized(keys=['img']),     
        monai.transforms.ScaleIntensityd(keys=['img'], minv=0, maxv=1),
        monai.transforms.Zoomd(keys=['img', 'mask'], zoom=0.25, mode=['bilinear', 'nearest'], keep_size=False),
        # monai.transforms.RandSpatialCropd(keys=['img', 'mask'], roi_size=[384, 384], random_size=False)
        monai.transforms.SpatialCropd(keys=['img', 'mask'], roi_center=[300, 300], roi_size=[384 + 64, 384])        
    ]
)
validation_data = monai.data.CacheDataset([validation_dict_list[3]], transform=validation_transform)
validation_loader = monai.data.DataLoader(validation_data, batch_size=1, shuffle=False)

Loading a pretrained model#

We have already trained a model for you, the parameters of which were shared in JupyterLab as well. Note: if you downloaded the data set yourself, the model should be in the same folder as the images. If you already downloaded the data set but not the model, the model file is available here.

!wget -O trainedUNet.pt https://surfdrive.surf.nl/files/index.php/s/613zrvr0RDYZDqp/download
pretrained_file = path.join(data_path, "trainedUNet.pt")

Next, we initialize a standard U-Net architecture and load the parameters of the pretrained network using the load_state_dict function.

import torch
import monai
import random

# Check whether we're using a GPU
if torch.cuda.is_available():
    n_gpus = torch.cuda.device_count()  # Total number of GPUs
    gpu_idx = random.randint(0, n_gpus - 1)  # Random GPU index
    device = torch.device(f'cuda:{gpu_idx}')
    print('Using GPU: {}'.format(device))
else:
    device = torch.device('cpu')
    print('GPU not found. Using CPU.')

model = monai.networks.nets.UNet(
    spatial_dims=2,
    in_channels=1,
    out_channels=1,
    channels = (8, 16, 32, 64, 128),
    strides=(2, 2, 2, 2),
    num_res_units=2,
    dropout=0.5
).to(device)

model.load_state_dict(torch.load(pretrained_file))
model.eval()

Let’s use the pretrained network to segment (part of) our image. Run the cell below.

for sample in validation_loader:

    img = sample['img'][:, :, :384, :384]    
    mask = sample['mask'][:, :, :384, :384]
    output_noshift = torch.sigmoid(model(img.to(device))).detach().cpu().numpy().squeeze()   
    
    fig, ax = plt.subplots(1,2, figsize = [12, 10])    
    # Plot X-ray image
    ax[0].imshow(img.squeeze(), 'gray')
    # Plot ground truth
    mask = np.squeeze(mask)
    overlay_mask = np.ma.masked_where(mask == 0, mask == 1)
    ax[0].imshow(overlay_mask, 'Greens', alpha = 0.7, clim=[0,1], interpolation='nearest')
    ax[0].set_title('Ground truth')
    # Plot output
    overlay_output = np.ma.masked_where(output_noshift < 0.1, output_noshift > 0.99)
    ax[1].imshow(img.squeeze(), 'gray')
    ax[1].imshow(overlay_output.squeeze(), 'Reds', alpha = 0.7, clim=[0,1])
    ax[1].set_title('Prediction')
    plt.show()      

As you can see, segmentation isn’t perfect, but that’s also not the goal of this exercise. What we are going to look into is the translation equivariance (Lecture 8) of the U-Net. That is: if you translate the image by \(d\) pixels, does the output also simply change by \(d\) pixels. Note that this is a nice feature to have for a segmentation network: in principle we’d want our network to give us the same label for a pixel regardless of where the image was cut. The image below visualizes this principle. For segmentation of the pixels in the orange square, it shouldn’t matter if we provide the red square or the green square as input to the U-Net.

Exercise

What do you think will happen to the U-Net’s prediction if we give it a slightly shifted version of the image as input?

Now we make a small script that performs the above experiment. First, we obtain the segmentation in the red box and we call this output_noshift. Then we shift the green box by an offset and each time obtain a segmentation in this box using the same model. We start small with a shift/offset of just a single pixel.

Exercise

Run the cell below and observe the outputs. Can you spot differences between the two segmentation masks?

offset = 1

for sample in validation_loader:

    # Original image
    img = sample['img'][:, :, :384, :384]    
    mask = sample['mask'][:, :, :384, :384]
    output_noshift = torch.sigmoid(model(img.to(device))).detach().cpu().numpy().squeeze()   

    # Plot X-ray image
    fig, ax = plt.subplots(1,2, figsize = [12, 10])    
    ax[0].imshow(img.squeeze(), 'gray')
    # Plot ground truth
    mask = np.squeeze(mask)
    overlay_mask = np.ma.masked_where(mask == 0, mask == 1)
    ax[0].imshow(overlay_mask, 'Greens', alpha = 0.7, clim=[0,1], interpolation='nearest')
    ax[0].set_title('Ground truth')
    # Plot output
    overlay_output = np.ma.masked_where(output_noshift < 0.1, output_noshift >0.99)
    ax[1].imshow(img.squeeze(), 'gray')
    ax[1].imshow(overlay_output.squeeze(), 'Reds', alpha = 0.7, clim=[0,1])
    ax[1].set_title('Prediction')
    plt.show()
    
    # Shifted image
    img = sample['img'][:, :, offset:offset+384, :384]
    mask = sample['mask'][:, :, offset:offset+384, :384]
    output = torch.sigmoid(model(img.to(device))).detach().cpu().numpy().squeeze()

    # Plot X-ray image
    fig, ax = plt.subplots(1,2, figsize = [12, 10])
    ax[0].imshow(img.squeeze(), 'gray')
    # Plot ground truth
    mask = np.squeeze(mask)
    overlay_mask = np.ma.masked_where(mask == 0, mask == 1)
    ax[0].imshow(overlay_mask, 'Greens', alpha = 0.7, clim=[0,1], interpolation='nearest')
    ax[0].set_title('Ground truth shifted')
    # Plot output
    overlay_output = np.ma.masked_where(output < 0.1, output >0.99)
    ax[1].imshow(img.squeeze(), 'gray')
    ax[1].imshow(overlay_output.squeeze(), 'Reds', alpha = 0.7, clim=[0,1])
    ax[1].set_title('Prediction shifted')
    plt.show()

To highlight the differences between both segmentation masks a bit more, we make a difference image. We correct for the shift applied so that we’re not comparing apples and oranges. The next cell shows the difference image between the original image and what we get when we process an image that is shifted by one pixel.

Exercise

Given these results, is a U-Net translation equivariant, invariant, or neither?

plt.figure(figsize=(6, 6))
diffout = output_noshift[offset:, :384] - output[:-offset, :384]
plt.imshow(diffout, cmap='seismic', clim=[-1, 1])
plt.title('Offset {}'.format(offset))
plt.colorbar()
plt.show()

We can repeat this for larger offsets. Let’s take offsets up to 64 pixels, and each time compute the difference between the original and shifted image, in a subimage that should be unaffected by the shift. We store the L1 norm of the difference image in an array norms and plot these as a function of offset.

Exercise

The resulting plot shows that the U-Net is equivariant for none of the translations. This is due to a combination of border effects and downsampling layers. However, the plot also shows a particular pattern, in which the norm dips every 16 pixels of offset. Can you explain this based on the U-Net architecture?

norms = []
offsets = []
plot_differences = False  # Set to True to plot difference images for every offset

img = sample['img'][:, :, :384, :384]    
mask = sample['mask'][:, :, :384, :384]
output_noshift = torch.sigmoid(model(img.to(device))).detach().cpu().numpy().squeeze()   

for offset in range(1, 65):
    for sample in validation_loader:
        img = sample['img'][:, :, offset:offset+384, :384]
        mask = sample['mask'][:, :, offset:offset+384, :384]

        output = torch.sigmoid(model(img.to(device))).detach().cpu().numpy().squeeze()  

        diffout = (output_noshift[offset:, :384] - output[:-offset, :384])[100:284, 100:284]
        offsets.append(offset)
        norms.append(np.sum(np.abs(diffout)))
        if plot_differences:
            plt.figure()
            plt.imshow(diffout, cmap='seismic', clim=[-1, 1])
            plt.title(f"Offset {offset}")
            plt.colorbar()
            plt.show()

plt.figure()
plt.plot(offsets, norms)
plt.xlabel('Offset')
plt.ylabel('Difference')
plt.show()