Tutorial 4#

March 5, 2026#

In the previous tutorials, you have familiarized yourself with PyTorch, MONAI, and Weights & Biases. In this tutorial, you will work on two topics:

  • deep learning-based denoising of (synthetic) CT images

  • deep learning-based registration of images

First, let’s take care of the necessities:

  • If you’re using Google Colab, make sure to select a GPU Runtime.

  • Connect to Weights & Biases using the code below.

  • Install a few libraries that we will use in this tutorial.

import os
import wandb

os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
wandb.login()
!pip install dival
!pip install kornia

Part 1 - Reconstruction#

In this tutorial, you will reconstruct CT images. To not use too much disk storage, we will synthetise images on the fly using the Deep Inversion Validation Library (dival). These are 2D images with \(128\times 128\) pixels that contain a random number of ellipses with random sizes and random intensities.

First, make a dataset of ellipses. This will make an object that we can call for images using a generator. Next, we take a look at what this dataset contains. We will use the generator to ask for a sample. Each sample contains a sinogram and a ground truth (original) synthetic image that we can visualize. You may recall from the lecture that the sinogram is made up of integrals along projections. The horizontal axis in the sinogram corresponds to the location \(s\) along the detector, the vertical axis to the projection angle \(\theta\).

import dival

dataset = dival.get_standard_dataset('ellipses', impl='skimage')
dat_gen = dataset.generator(part='train')

Run the cell below to show a sinogram and image in the dataset.

import numpy as np
import matplotlib.pyplot as plt

# Get a sample from the generator
sinogram, ground_truth = next(dat_gen)
fig, axs = plt.subplots(1, 2, figsize=(10, 5))

# Show the sinogram
axs[0].imshow(sinogram, cmap='gray', extent=[0, 183, -90, 90])
axs[0].set_title('Sinogram')
axs[0].set_xlabel('$s$')
axs[0].set_ylabel('$\Theta$')

# Show the ground truth image
axs[1].imshow(ground_truth, cmap='gray')
axs[1].set_title('Ground truth')
axs[1].set_xlabel('$x$')
axs[1].set_ylabel('$y$')
plt.show()   

Not only does the sinogram contain few angles, it also contains added white noise. If we simply backproject the sinogram to the image domain we end up with a low-quality image. Let’s give it a try using the standard Filtered Backprojection (FBP) algorithm for CT and its implementation in scikit-image.

import skimage.transform as sktr

# Get a sample from the generator
sinogram, ground_truth = next(dat_gen)
sinogram = np.asarray(sinogram).transpose()

# This defines the projectiona angles
theta = np.linspace(-90., 90., sinogram.shape[1], endpoint=True)

# Perform FBP
fbp_recon = sktr.iradon(sinogram, theta=theta, filter_name='ramp')[28:-27, 28:-27]
fig, axs = plt.subplots(1, 3, figsize=(12, 4))
axs[0].imshow(sinogram.transpose(), cmap='gray', extent=[0, 183, -90, 90])
axs[0].set_title('Sinogram')
axs[0].set_xlabel('$s$')
axs[0].set_ylabel('$\Theta$')
axs[1].imshow(ground_truth, cmap='gray', clim=[0, 1])
axs[1].set_title('Ground truth')
axs[1].set_xlabel('$x$')
axs[1].set_ylabel('$y$')
axs[2].imshow(fbp_recon, cmap='gray', clim=[0, 1])
axs[2].set_title('FBP')
axs[2].set_xlabel('$x$')
axs[2].set_ylabel('$y$')
plt.show()

Exercise

What do you think of the quality of the reconstructed FBP algorithm? Use the cell below to quantify the similarity between the images using the structural similarity index (SSIM). Does this reflect your intuition? Also compute the PSNR using the peak_signal_noise_ratio method in scikit-image.

import skimage.metrics as skme

print('SSIM = {:.2f}'.format(skme.structural_similarity(np.asarray(ground_truth), fbp_recon, data_range=np.max(ground_truth)-np.min(ground_truth))))
# ⌨ FILL IN

Answer key

import skimage.metrics as skme

print('SSIM = {:.2f}'.format(skme.structural_similarity(np.asarray(ground_truth), fbp_recon, data_range=np.max(ground_truth)-np.min(ground_truth))))
print('PSNR = {:.2f}'.format(skme.peak_signal_noise_ratio(np.asarray(ground_truth), fbp_recon)))

Datasets and dataloaders#

Our (or your) goal now is to obtain high(er) quality reconstructed images based on the sinogram measurements. As you have seen in the lecture, this can be done in four ways:

  1. Train a reconstruction method that directly maps from the measurement (sinogram) domain to the image domain.

  2. Preprocessing Clean up the sinogram using a neural network, then backproject to the image domain.

  3. Postprocessing First backproject to the image domain, then improve the reconstruction using a neural network.

  4. Iterative methods that integrate data consistency.

Here, we will follow the third approach, postprocessing. We create reconstructions from the generated sinograms using filtered backprojection and use a neural network to learn corrections on this FBP image and improve the reconstruction, as shown in the image below. The data that we need for training this network is the reconstructions from FBP, and the ground-truth reconstructions from the dival dataset.

fig

We will make a training dataset of 512 samples from the ellipses dival dataset that we store in a MONAI DataSet. The code below does this in four steps:

  1. Create a dival generator that creates sinograms and ground-truth reconstructions.

  2. Make a dictionary (like we did in the previous tutorial) that contains the ground-truth reconstructions and the reconstructions constructed by FBP as separate keys.

  3. Define the transforms for the data (also like the previous tutorial). In this case we require an additional ‘channels’ dimension, as that is what the neural network expects. We will not make use of extra data augmentation.

  4. Construct the dataset using the dictionary and the defined transform.

import tqdm
import monai

theta = np.linspace(-90., 90., sinogram.shape[1], endpoint=True)

# Make a generator for the training part of the dataset
train_gen = dataset.generator(part='train')
train_samples = []

# Make a list of (in this case) 512 random training samples. We store the filtered backprojection (FBP) and ground truth image
# in a dictionary for each sample, and add these to a list.
for ns in tqdm.tqdm(range(512)):
    sinogram, ground_truth = next(train_gen)
    sinogram = np.asarray(sinogram).transpose()
    fbp_recon = sktr.iradon(sinogram, theta=theta, filter_name='ramp')[28:-27, 28:-27]
    train_samples.append({'fbp': fbp_recon, 'ground_truth': np.asarray(ground_truth)})

# You can add or remove transforms here
train_transform = monai.transforms.Compose([
    monai.transforms.EnsureChannelFirstd(keys=['fbp', 'ground_truth'], channel_dim="no_channel")    
])    

# Use the list of dictionaries and the transform to initialize a MONAI CacheDataset
train_dataset = monai.data.CacheDataset(train_samples, transform=train_transform)    

Exercise

Also make a validation dataset and call it val_dataset. This dataset can be smaller, e.g., 64 or 128 samples.

# Your code goes here

Answer key

val_gen = dataset.generator(part='validation')
val_samples = []
val_transform = monai.transforms.Compose([
    monai.transforms.AddChanneld(keys=['fbp', 'ground_truth'])
])
for ns in tqdm.tqdm(range(128)):
    sinogram, ground_truth = next(val_gen)
    sinogram = np.asarray(sinogram).transpose()
    fbp_recon = sktr.iradon(sinogram, theta=theta, filter_name='ramp')[28:-27, 28:-27]
    val_samples.append({'fbp': fbp_recon, 'ground_truth': np.asarray(ground_truth)})
val_dataset = monai.data.CacheDataset(val_samples, transform=val_transform)  

Exercise

Now, make a dataloader for both the validation and training data, called train_loader and validation_loader, that we can use for sampling batches during training of the network. Give them a reasonable batch size, e.g., 16.

# ⌨️ FILL IN
train_loader = ...
validation_loader = ...

Answer key

train_loader = monai.data.DataLoader(train_dataset, batch_size=16)
validation_loader = monai.data.DataLoader(val_dataset, batch_size=16)

Model#

Now that we have datasets and dataloaders, the next step is to define a model, optimizer and criterion. Because we want to improve the FBP-reconstructed image, we are dealing with an image-to-image task. A standard U-Net as implemented in MONAI is therefore a good starting point. First, make sure that you are using the GPU (CUDA), otherwise training will be extremely slow.

import torch

if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = "cpu"
print(f'The used device is {device}')

Exercise

Initialize a U-Net with the correct settings, e.g. channels and dimensions, and call it model. Here, it’s convenient to use the BasicUNet as implemented in MONAI.

Answer key

model = monai.networks.nets.BasicUNet(
    spatial_dims=2,
    out_channels=1
).to(device)

# model = monai.networks.nets.SegResNet(
#     spatial_dims=2,
#     out_channels=1
# ).to(device)

Loss function#

An important aspect is the loss function that you will use to optimize the model. The problem that we are trying to solve using a neural network is a regression problem, which differs from the classification approach we covered in the segmentation tutorial. Instead of classifying each pixel as a certain class, we alter their intensities to obtain a better overall reconstruction of the image.

Because this task is substantially different, we need to change our loss function. In the previous tutorial we used the Dice loss, which measures the overlap for each of the classes to segment. In this case, an L2 (mean squared error) or L1 (mean average error) loss suits our objective. Alternatively, we can use a loss that aims to maximize the structural similarity (SSIM). For this, we use the kornia library.

import kornia 
import torch

# Three loss functions, turn them on or off by commenting

# loss_function = torch.nn.MSELoss()
# loss_function = torch.nn.L1Loss()
loss_function = kornia.losses.SSIMLoss(window_size=3)

As in previous tutorials, we use an adaptive SGD (Adam) optimizer to train our network. This tutorial, we add a learning rate scheduler. This scheduler lowers the learning rate every step_size steps, meaning that the optimizer will take smaller steps in the direction of the gradient after a set amount of epochs. Therefore, the optimizer can potentially find a better local minimum for the weights of the neural network.

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)

Exercise

Complete the code below and train the U-Net.

What does the model learn? Look carefully at how we determine the output of the model. Can you describe what happens in the following line: outputs = model(batch_data['fbp'].float().to(device)) + batch_data["fbp"].float().to(device)?

from tqdm.notebook import tqdm
import wandb
from skimage.metrics import structural_similarity as ssim


run = wandb.init(
    project='tutorial4_reconstruction',
    name='test',
    config={
        'loss function': str(loss_function), 
        'lr': optimizer.param_groups[0]["lr"],
        'batch_size': train_loader.batch_size,
    }
)
# Do not hesitate to enrich this list of settings to be able to correctly keep track of your experiments!
# For example you should include information on your model architecture

run_id = run.id # We remember here the run ID to be able to write the evaluation metrics

def log_to_wandb(epoch, train_loss, val_loss, batch_data, outputs):
    """ Function that logs ongoing training variables to W&B """

    # Create list of images that have segmentation masks for model output and ground truth
    # log_imgs = [wandb.Image(PIL.Image.fromarray(img.detach().cpu().numpy())) for img in outputs]
    val_ssim = []
    for im_id in range(batch_data['ground_truth'].shape[0]):
        val_ssim.append(ssim(batch_data['ground_truth'].detach().cpu().numpy()[im_id, 0, :, :].squeeze(), 
                             outputs.detach().cpu().numpy()[im_id, 0, :, :].squeeze() ))
    val_ssim = np.mean(np.asarray(val_ssim))
    # Send epoch, losses and images to W&B
    wandb.log({'epoch': epoch, 'train_loss': train_loss, 'val_loss': val_loss, 'val_ssim': val_ssim}) 
    
for epoch in tqdm(range(75)):
    model.train()    
    epoch_loss = 0
    step = 0
    for batch_data in train_loader: 
        step += 1
        optimizer.zero_grad()
        outputs = model(batch_data["fbp"].float().to(device)) + batch_data["fbp"].float().to(device)
        # FILL IN
    # validation part
    step = 0
    val_loss = 0
    for batch_data in validation_loader:
        step += 1
        model.eval()
        outputs = model(batch_data['fbp'].float().to(device)) + batch_data["fbp"].float().to(device)
        # FILL IN
    log_to_wandb(epoch, train_loss, val_loss, batch_data, outputs)
    # Scheduler also needs to make a step during training
    scheduler.step()

# Store the network parameters        
torch.save(model.state_dict(), r'trainedUNet.pt')
run.finish()

Answer key

from tqdm.notebook import tqdm
import wandb
from skimage.metrics import structural_similarity as ssim


run = wandb.init(
    project='tutorial4_reconstruction',
    config={
        'loss function': str(loss_function), 
        'lr': optimizer.param_groups[0]["lr"],
        'batch_size': train_loader.batch_size,
    }
)
# Do not hesitate to enrich this list of settings to be able to correctly keep track of your experiments!
# For example you should include information on your model architecture

run_id = run.id # We remember here the run ID to be able to write the evaluation metrics

def log_to_wandb(epoch, train_loss, val_loss, batch_data, outputs):
    """ Function that logs ongoing training variables to W&B """

    # Create list of images that have segmentation masks for model output and ground truth
    # log_imgs = [wandb.Image(PIL.Image.fromarray(img.detach().cpu().numpy())) for img in outputs]
    val_ssim = []
    for im_id in range(batch_data['ground_truth'].shape[0]):
        val_ssim.append(ssim(batch_data['ground_truth'].detach().cpu().numpy()[im_id, 0, :, :].squeeze(), 
                             outputs.detach().cpu().numpy()[im_id, 0, :, :].squeeze() ))
    val_ssim = np.mean(np.asarray(val_ssim))
    # Send epoch, losses and images to W&B
    wandb.log({'epoch': epoch, 'train_loss': train_loss, 'val_loss': val_loss, 'val_ssim': val_ssim}) 
    
for epoch in tqdm(range(75)):
    model.train()    
    epoch_loss = 0
    step = 0
    for batch_data in train_loader: 
        step += 1
        optimizer.zero_grad()
        outputs = model(batch_data["fbp"].float().to(device)) + batch_data["fbp"].float().to(device)
        loss = loss_function(outputs, batch_data["ground_truth"].to(device))
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    train_loss = epoch_loss/step
    # validation part
    step = 0
    val_loss = 0
    for batch_data in validation_loader:
        step += 1
        model.eval()
        outputs = model(batch_data['fbp'].float().to(device)) + batch_data["fbp"].float().to(device)
        loss = loss_function(outputs, batch_data['ground_truth'].to(device))   
        val_loss+= loss.item()
    val_loss = val_loss / step
    log_to_wandb(epoch, train_loss, val_loss, batch_data, outputs)
    scheduler.step()

# Store the network parameters        
torch.save(model.state_dict(), r'trainedUNet.pt')
run.finish()

Exercise

Now make a DataSet and DataLoader for the test set. Just a handful of images should be enough.

import tqdm

test_gen = dataset.generator(part='test')
....
test_dataset = ....

test_loader = monai.data.DataLoader(test_dataset, batch_size=1)

Answer key

import tqdm

test_gen = dataset.generator(part='test')
test_samples = []
test_transform = monai.transforms.Compose([
    monai.transforms.AddChanneld(keys=['fbp', 'ground_truth'])
])
for ns in tqdm.tqdm(range(4)):
    sinogram, ground_truth = next(test_gen)
    sinogram = np.asarray(sinogram).transpose()
    fbp_recon = sktr.iradon(sinogram, theta=theta, filter_name='ramp')[28:-27, 28:-27]
    test_samples.append({'sinogram': sinogram, 'fbp': fbp_recon, 'ground_truth': np.asarray(ground_truth)})
test_dataset = monai.data.CacheDataset(test_samples, transform=val_transform)

test_loader = monai.data.DataLoader(test_dataset, batch_size=1)

Exercise

Visualize a number of reconstructions from the neural network and compare them to the FBP reconstructed images, using the code below. The performance of the network is evaluated using the structural similarity function in scikit-image. Does the neural network improve this metric a lot compared to the filtered back projection?

model.eval()

for test_sample in test_loader:
    output = model(test_sample['fbp'].to(device)) + test_sample['fbp'].to(device)
    output = output.detach().cpu().numpy()[0, 0, :, :].squeeze()
    ground_truth = test_sample['ground_truth'][0, 0, :, :].squeeze()
    fbp_recon = test_sample['fbp'][0, 0, :, :].squeeze()
    fig, axs = plt.subplots(1, 3, figsize=(12, 4))
    axs[0].imshow(fbp_recon, cmap='gray', clim=[0, 1])
    axs[0].set_title('FBP SSIM={:.2f}'.format(ssim(ground_truth.cpu().numpy(), fbp_recon.cpu().numpy())))
    axs[0].set_xlabel('$x$')
    axs[0].set_ylabel('$y$')
    axs[1].imshow(ground_truth, cmap='gray', clim=[0, 1])
    axs[1].set_title('Ground truth')
    axs[1].set_xlabel('$x$')
    axs[1].set_ylabel('$y$')
    axs[2].imshow(output, cmap='gray', clim=[0, 1])
    axs[2].set_title('CNN SSIM={:.2f}'.format(ssim(ground_truth.cpu().numpy(), output)))
    axs[2].set_xlabel('$x$')
    axs[2].set_ylabel('$y$')
    plt.show()   

Answer key

Some observations that you could make:

  • The SSIM is definitely improved compared to the standard filtered back projection (FBP). CNN results should be in the order of ~0.8 SSIM.

  • The output images of the CNN are less noisy than the FBP reconstructions. However, they’re also a bit more blotchy/cartoonish if you use the CNN.

Exercise

Instead of a U-Net, try a different model, e.g., a SegResNet in MONAI. Evaluate how the different loss functions affect the performance of the network. Notes that the SSIM on the validation set is also written to Weights & Biases during training. Which loss leads to the best SSIM scores? Which loss results in the worst SSIM scores?

Answer:

Answer key

In general, using an SSIM loss will lead to better SSIM scores. The L1 loss is also expected to lead to better then the MSE loss, as it’s less susceptible to outliers and will smooth the resulting images less.

From post-processing to pre-processing#

So far, you have used a post-processing approach for reconstruction. In the lecture, we have discussed an alternative pre-processing approach, in which the sinogram image is improved before FBP. This additional exercise is entirely optional, but you could try to turn the current model into such a model, and see if the results that you get are better or worse than the results obtained so far. Good luck!

Part 2 - Registration#

In this part you will develop, train, and evaluate a CNN that learns to perform deformable image registration in chest X-ray images.

We will register chest X-ray images. We will reuse the data of Tutorial 3. As always, we first set the paths. This should be the path ending in ‘ribs’. If you don’t have the data set anymore, you can download it using the lines below:

!wget https://surfdrive.surf.nl/files/index.php/s/Y4psc2pQnfkJuoT/download -O Tutorial_3.zip
!unzip -qo Tutorial_3.zip
data_path = "ribs"
# ONLY IF YOU USE JUPYTER: ADD PATH ⌨️
data_path = r'ribs'# WHEREDIDYOUPUTTHEDATA?
# ONLY IF YOU USE COLAB: ADD PATH ⌨️
from google.colab import drive

drive.mount('/content/drive')
data_path = r'/content/drive/My Drive/Tutorial3'
# check if data_path exists:
import os

if not os.path.exists(data_path):
    print("Please update your data path to an existing folder.")
elif not set(["train", "val", "test"]).issubset(set(os.listdir(data_path))):
    print("Please update your data path to the correct folder (should contain train, val and test folders).")
else:
    print("Congrats! You selected the correct folder :)")

Data management#

In this part we prepare all the tools needed to load and visualize our samples. One thing we could do is perform inter-patient registration, i.e., register two chest X-ray images of different patients. However, this is a very challenging problem. Instead, to make our life a bit easier, we will perform intra-patient registration: register two images of the same patient. For each patient, we make a synthetic moving image by applying some random elastic deformations. To build this data set, we we used the Rand2DElasticd transform on both the image and the mask. We will use a neural network to learn the deformation field between the fixed image and the moving image.

fig

Similarly as in Tutorial 3, make a dictionary of the image file names.

import os
import numpy as np
import matplotlib.pyplot as plt
import glob
import monai
from PIL import Image
import torch

def build_dict_ribs(data_path, mode='train'):
    """
    This function returns a list of dictionaries, each dictionary containing the keys 'img' and 'mask' 
    that returns the path to the corresponding image.
    
    Args:
        data_path (str): path to the root folder of the data set.
        mode (str): subset used. Must correspond to 'train', 'val' or 'test'.
        
    Returns:
        (List[Dict[str, str]]) list of the dictionnaries containing the paths of X-ray images and masks.
    """
    # test if mode is correct
    if mode not in ["train", "val", "test"]:
        raise ValueError(f"Please choose a mode in ['train', 'val', 'test']. Current mode is {mode}.")
    
    # define empty dictionary
    dicts = []
    # list all .png files in directory, including the path
    paths_xray = glob.glob(os.path.join(data_path, mode, 'img', '*.png'))
    # make a corresponding list for all the mask files
    for xray_path in paths_xray:
        if mode == 'test':
            suffix = 'val'
        else:
            suffix = mode
        # find the binary mask that belongs to the original image, based on indexing in the filename
        image_index = os.path.split(xray_path)[1].split('_')[-1].split('.')[0]
        # define path to mask file based on this index and add to list of mask paths
        mask_path = os.path.join(data_path, mode, 'mask', f'VinDr_RibCXR_{suffix}_{image_index}.png')
        if os.path.exists(mask_path):
            dicts.append({'fixed': xray_path, 'moving': xray_path, 'fixed_mask': mask_path, 'moving_mask': mask_path})
    return dicts

class LoadRibData(monai.transforms.Transform):
    """
    This custom Monai transform loads the data from the rib segmentation dataset.
    Defining a custom transform is simple; just overwrite the __init__ function and __call__ function.
    """
    def __init__(self, keys=None):
        pass

    def __call__(self, sample):
        fixed = Image.open(sample['fixed']).convert('L') # import as grayscale image
        fixed = np.array(fixed, dtype=np.uint8)
        moving = Image.open(sample['moving']).convert('L') # import as grayscale image
        moving = np.array(moving, dtype=np.uint8)        
        fixed_mask = Image.open(sample['fixed_mask']).convert('L') # import as grayscale image
        fixed_mask = np.array(fixed_mask, dtype=np.uint8)
        moving_mask = Image.open(sample['moving_mask']).convert('L') # import as grayscale image
        moving_mask = np.array(moving_mask, dtype=np.uint8)        
        # mask has value 255 on rib pixels. Convert to binary array
        fixed_mask[np.where(fixed_mask==255)] = 1
        moving_mask[np.where(moving_mask==255)] = 1        
        return {'fixed': fixed, 'moving': moving, 'fixed_mask': fixed_mask, 'moving_mask': moving_mask, 'img_meta_dict': {'affine': np.eye(2)}, 
                'mask_meta_dict': {'affine': np.eye(2)}}

Then we make a training dataset like before. The Rand2DElasticd transform here determines how much deformation is in the ‘moving’ image.

train_dict_list = build_dict_ribs(data_path, mode='train')

# constructDataset from list of paths + transform
transform = monai.transforms.Compose(
[
    LoadRibData(),
    monai.transforms.EnsureChannelFirstd(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask'], channel_dim='no_channel'),
    monai.transforms.Resized(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask'], spatial_size=(256, 256),  mode=['bilinear', 'bilinear', 'nearest', 'nearest']),
    monai.transforms.HistogramNormalized(keys=['fixed', 'moving']),
    monai.transforms.ScaleIntensityd(keys=['fixed', 'moving'], minv=0.0, maxv=1.0),
    monai.transforms.Rand2DElasticd(keys=['moving', 'moving_mask'], spacing=(64, 64), 
                                    magnitude_range=(-8, 8), prob=1, mode=['bilinear', 'nearest']),    
])
train_dataset = monai.data.Dataset(train_dict_list, transform=transform)

Exercise

Visualize fixed and moving training images associated to their comparison image with the visualize_fmc_sample function below.

Try different methods to create the comparison image. How well do these different methods allow you to qualitatively assess the quality of the registration?

More information on this method is available in the scikit-image documentation.

def visualize_fmc_sample(sample, method="checkerboard"):
    """
    Plot three images: fixed, moving and comparison.
    
    Args:
        sample (dict): sample of dataset created with `build_dataset`.
        method (str): method used by `skimage.util.compare_image`.
    """
    import skimage.util as skut 
    
    skut_methods = ["diff", "blend", "checkerboard"]
    if method not in skut_methods:
        raise ValueError(f"Method must be chosen in {skut_methods}.\n"
                         f"Current value is {method}.")
    
    
    fixed = np.squeeze(sample['fixed'])
    moving = np.squeeze(sample['moving'])
    comp_checker = skut.compare_images(fixed, moving, method=method)
    axs = plt.figure(constrained_layout=True, figsize=(15, 5)).subplot_mosaic("FMC")
    axs['F'].imshow(fixed, cmap='gray')
    axs['F'].set_title('Fixed')
    axs['M'].imshow(moving, cmap='gray')
    axs['M'].set_title('Moving')
    axs['C'].imshow(comp_checker, cmap='gray')
    axs['C'].set_title('Comparison')
    plt.show()
sample = train_dataset[0]
for method in ["diff", "blend", "checkerboard"]:
    print(f"Method {method}")
    visualize_fmc_sample(sample, method=method)

Now we apply a little trick. Because applying the random deformation in each training iteration will be very costly, we only apply the deformation once and we make a new dataset based on the deformed images. Running the cell below may take a few minutes.

import tqdm

train_loader = monai.data.DataLoader(train_dataset, batch_size=1, shuffle=False)

samples = []
for train_batch in tqdm.tqdm(train_loader):
    samples.append(train_batch)

# Make a new dataset and dataloader using the transformed images
train_dataset = monai.data.Dataset(samples, transform=monai.transforms.SqueezeDimd(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask']))
train_loader = monai.data.DataLoader(train_dataset, batch_size=16, shuffle=False)

Exercise

Create val_dataset and val_loader, corresponding to the DataSet and DataLoader for your validation set. The transforms can be the same as in the training set.

# Your code goes here

Answer key

val_dict_list = build_dict_ribs(data_path, mode='val')

# constructDataset from list of paths + transform
transform = monai.transforms.Compose(
[
    LoadRibData(),
    monai.transforms.EnsureChannelFirstd(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask'], channel_dim='no_channel'),
    monai.transforms.Resized(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask'], spatial_size=(256, 256),  mode=['bilinear', 'bilinear', 'nearest', 'nearest']),
    monai.transforms.HistogramNormalized(keys=['fixed', 'moving']),
    monai.transforms.ScaleIntensityd(keys=['fixed', 'moving'], minv=0.0, maxv=1.0),
    monai.transforms.Rand2DElasticd(keys=['moving', 'moving_mask'], spacing=(64, 64), 
                                    magnitude_range=(-8, 8), prob=1, mode=['bilinear', 'nearest']),    
])
val_dataset = monai.data.Dataset(val_dict_list, transform=transform)
val_loader = monai.data.DataLoader(val_dataset, batch_size=1, shuffle=False)

samples = []
for val_batch in tqdm.tqdm(val_loader):
    samples.append(val_batch)

# Make a new dataset and dataloader using the transformed images
val_dataset = monai.data.Dataset(samples, transform=monai.transforms.SqueezeDimd(keys=['fixed', 'moving', 'fixed_mask', 'moving_mask']))
val_loader = monai.data.DataLoader(val_dataset, batch_size=16, shuffle=False)

Model#

As model, we’ll use a U-Net. The input/output structure is quite different from what we’ve seen before:

  • the network takes as input two images: the moving and fixed images.

  • it outputs one tensor representing the deformation field.

img

This deformation field can be applied to the moving image with the monai.networks.blocks.Warp block of Monai.

fig

This deformed moving image is then compared to the fixed image: if they are similar, the deformation field is correctly registering the moving image on the fixed image. Keep in mind that this is done on training data, and we want the U-Net to learn to predict a proper deformation field given two new and unseen images. So we’re not optimizing for a pair of images as would be done in conventional iterative registration, but training a model that can generalize.

fig

Before starting, let’s check that you can work on a GPU by runnning the following cell:

  • if the device is “cuda” you are working on a GPU,

  • if the device is “cpu” call a teacher.

if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    os.environ["PYTORCH_ENABLE_MPS_FALLBACK"]="1"
else:
    device = "cpu"
print(f'The used device is {device}')

Exercise

Construct a U-Net with suitable settings and name it model. Keep in mind that you want to be able to correctly apply its output to the input moving image with the warp_layer!

model = # FILL IN

warp_layer = monai.networks.blocks.Warp().to(device)

Answer key

model = monai.networks.nets.UNet(
    spatial_dims=2,
    in_channels=2,
    out_channels=2,
    channels = (8, 16, 32, 64, 128),
    strides=(2, 2, 2, 2),
    num_res_units=2,
).to(device)

warp_layer = monai.networks.blocks.Warp().to(device)

Objective function#

We evaluate the similarity between the fixed image and the deformed moving image with the MSELoss(). The L1 or SSIM losses seen in the previous section could also be used. Furthermore, the deformation field is regularized with BendingEnergyLoss. This is a penalty that takes the smoothness of the deformation field into account: if it’s not smooth enough, the bending energy is high. Thus, our model will favor smooth deformation fields.

Finally, we pick an optimizer, in this case again an Adam optimizer.

image_loss = torch.nn.MSELoss()
regularization = monai.losses.BendingEnergyLoss()
optimizer = torch.optim.Adam(model.parameters(), 1e-3)

Exercise

Add a learning rate scheduler that lowers the learning rate by a factor ten every 100 epochs.

# Your code goes here

Answer key

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 100, gamma=0.1)

To warp the moving image using the predicted deformation field and then compute the loss between the deformed image and the fixed image, we define a forward function which does all this. The output of this function is pred_image.

def forward(batch_data, model):
    """
    Applies the model to a batch of data.
    
    Args:
        batch_data (dict): a batch of samples computed by a DataLoader.
        model (Module): a model computing the deformation field.
    
    Returns:
        ddf (Tensor): batch of deformation fields.
        pred_image (Tensor): batch of deformed moving images.
    
    """
    fixed_image = batch_data["fixed"].to(device).float()
    moving_image = batch_data["moving"].to(device).float()
    
    # predict DDF
    ddf = model(torch.cat((moving_image, fixed_image), dim=1))

    # warp moving image and label with the predicted ddf
    pred_image = warp_layer(moving_image, ddf)

    return ddf, pred_image

You can supervise the training process in W&B, in which at each epoch a batch of validation images are used to compute the comparison images of your choice, based on the parameter method.

def log_to_wandb(epoch, train_loss, val_loss, pred_batch, fixed_batch, method="checkerboard"):
    """ Function that logs ongoing training variables to W&B """
    import skimage.util as skut
    
    log_imgs = []
    for fixed_pt, pred_pt in zip(pred_batch, fixed_batch):
        fixed_np = np.squeeze(fixed_pt.cpu().detach())
        pred_np = np.squeeze(pred_pt.cpu().detach())
        comp_checker = skut.compare_images(fixed_np, pred_np, method=method)
        log_imgs.append(wandb.Image(comp_checker))

    # Send epoch, losses and images to W&B
    wandb.log({'epoch': epoch, 'train_loss': train_loss, 'val_loss': val_loss, 'results': log_imgs})

Training time#

Use the following cells to train your network. You may choose different parameters to improve the performance!

# Choose your parameters

max_epochs = 200
reg_weight = 0 # By default 0, but you can investigate what it does
from tqdm import tqdm

run = wandb.init(
    project='tutorial4_registration',
    config={
        'lr': optimizer.param_groups[0]["lr"],
        'batch_size': train_loader.batch_size,
        'regularization': reg_weight,
        'loss_function': str(image_loss)
    }
)
# Do not hesitate to enrich this list of settings to be able to correctly keep track of your experiments!
# For example you should add information on your model...

run_id = run.id # We remember here the run ID to be able to write the evaluation metrics

for epoch in tqdm(range(max_epochs)):    
    model.train()
    epoch_loss = 0
    for batch_data in train_loader:
        optimizer.zero_grad()

        ddf, pred_image = forward(batch_data, model)

        fixed_image = batch_data["fixed"].to(device).float()
        reg = regularization(ddf)
        loss = image_loss(pred_image, fixed_image) + reg_weight * reg
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    epoch_loss /= len(train_loader)

    model.eval()
    val_epoch_loss = 0
    for batch_data in val_loader:
        ddf, pred_image = forward(batch_data, model)
        fixed_image = batch_data["fixed"].to(device).float()
        reg = regularization(ddf)
        loss = image_loss(pred_image, fixed_image) + reg_weight * reg
        val_epoch_loss += loss.item()
    val_epoch_loss /= len(val_loader)

    log_to_wandb(epoch, epoch_loss, val_epoch_loss, pred_image, fixed_image)
    
run.finish()    

Evaluation of the trained model#

Now that the model has been trained, it’s time to evaluate its performance. Use the code below to visualize samples and deformation fields.

Exercise

Are you satisfied with these registration results? Do they seem anatomically plausible? Try out different regularization factors (reg_weight) and see what they do to the registration.

Answer:

Answer key

Depending on the strength of the elastic deformation that was applied when generating the samples, registration may be successful. It could be that there is quite a bit of folding going on. In that case, setting a non-zero positive value for reg_weight will lead to more plausible deformations. If you set this value very high, you will see that the registration performance drops: the deformation vector field is very smooth, but doesn’t align the image any more.

def visualize_prediction(sample, model, method="checkerboard"):
    """
    Plot three images: fixed, moving and comparison.
    
    Args:
        sample (dict): sample of dataset created with `build_dataset`.
        model (Module): a model computing the deformation field.
        method (str): method used by `skimage.util.compare_image`.
    """
    import skimage.util as skut 
    
    skut_methods = ["diff", "blend", "checkerboard"]
    if method not in skut_methods:
        raise ValueError(f"Method must be chosen in {skut_methods}.\n"
                         f"Current value is {method}.")
        
    model.eval()
    
    # Compute deformation field + deformed image
    batch_data = {
        "fixed": sample["fixed"].unsqueeze(0),
        "moving": sample["moving"].unsqueeze(0),
    }
    ddf, pred_image = forward(batch_data, model)
    ddf = ddf.detach().cpu().numpy().squeeze()
    ddf = np.linalg.norm(ddf, axis=0).squeeze()
    
    # Squeeze images
    fixed = np.squeeze(sample["fixed"])
    moving = np.squeeze(sample["moving"])    
    deformed = np.squeeze(pred_image.detach().cpu())
    
    # Generate comparison image
    comp_checker = skut.compare_images(fixed, deformed, method=method, n_tiles=(4, 4))
    
    # Plot everything
    fig, axs = plt.subplots(1, 5, figsize=(18, 5))    
    axs[0].imshow(fixed, cmap='gray')
    axs[0].set_title('Fixed')
    axs[1].imshow(moving, cmap='gray')
    axs[1].set_title('Moving')
    axs[2].imshow(deformed, cmap='gray')
    axs[2].set_title('Deformed')
    axs[3].imshow(comp_checker, cmap='gray')
    axs[3].set_title('Comparison')    
    dpl = axs[4].imshow(ddf, clim=(0, 10))
    fig.colorbar(dpl, ax=axs[4])
    plt.show()   
    plt.show()
for sample in val_dataset:
    visualize_prediction(sample, model)

Exercise

Compute the Jacobian determinant at each image voxel. How many of these are negative? Can you improve upon this?

You can use the code below to compute the Jacobian of your deformation vector field and inspect it.

def get_jacobian(sample, model):
    """
    Computes the jacobian of the deformation field for a given sample
    
    Args:
        sample (dict): sample of dataset created with `build_dataset`.
        model (Module): a model computing the deformation field.
        
    Returns:
        TODO
    """
    model.eval()
    
    batch_data = {
        "fixed": sample["fixed"].unsqueeze(0),
        "moving": sample["moving"].unsqueeze(0),
    }
    
    ddf, pred_image = forward(batch_data, model)
    ddf = ddf.detach().cpu().numpy().squeeze()
    ddf_dx = np.diff(ddf, axis=1, append=ddf[:, -1, :].reshape(2, 1, 256))/256
    ddf_dy = np.diff(ddf, axis=2, append=ddf[:, :, -1].reshape(2, 256, 1))/256
    
    jacobian = ddf_dx[0, :, :] * ddf_dy[1, :, :] - ddf_dx[1, :, :] * ddf_dy[0, :, :]
    return jacobian
for sample in val_dataset:
    jacobian = get_jacobian(sample, model)
    plt.figure()
    plt.imshow(jacobian, cmap='seismic', clim=(-0.003, 0.003))
    plt.colorbar()
    plt.show()