Creating flexible neuroimaging pipelines with Nipype

Luke Chang

University of Colorado Boulder

Overview

  1. Why Python?
  2. The beauty of iPython Notebooks
  3. What is Nipype?
  4. Example preprocessing pipeline

Python's Cannibalization of Scientific Computing

iPython Notebooks

  • Interactive browser based python engine
  • Great for lab notebooks
  • Great for creating tutorials
  • Can create slideshows (like this one)
  • Can be easily shared (i.e., dropbox/github link rendered in nbviewer)
  • Can run on blanca!

How to Set up iPython Notebook on Blanca

    1. log on to Blanca and make sure you are using Luke's Python Anaconda distribution

      /projects/luch0518/software/anaconda

    2. Start ipython notebook server on blanca. Disable browser and specify port

      ipython notebook --no-browser --port=8889

  1. On your local computer, login to blanca using portforwarding

    ssh -N -f -L localhost:8889:localhost:8889 luch0518@blogin01.rc.colorado.edu


  2. Open your browser to localhost:8889/tree
  3. The iPython notebook server should shutdown when you close the SSH connection, but just in case you can also kill the process. Here is the command to find the process id:

    ps aux | grep localhost:8889

  4. Nipype can submit jobs automatically to Blanca. You can check your queue or delete any process

    Check queue

    qstat -u $USER

    Delete Queue

    qselect -u $USER | xargs qdel

Problems With Neuroimaging Analyses

  • Lot's of software packages, but all have different assumptions, philosophies, interfaces, and file formats. What if we wanted to mix and match?
  • How do we train people?
  • How do we process big data? (not feasible to use a gui for 1000 subjects)
  • There are many pipeline packages, but have you ever tried using them? (e.g., BIRN Tools, HCP, LONI, MRN auto-analysis pipeline)
  • How do we facilitate reproducible research?
  • How do we create different lab pipelines?
  • How can we have a pipeline that runs on different platforms and computing architectures?

What is nipype?

What is nipype?

  • easily interact with tools from different software packages
  • combine processing steps from different software packages
  • develop new workflows faster by reusing common steps from old ones
  • process data faster by running it in parallel on many cores/machines
  • make your research easily reproducible
  • share your processing workflows with the community

Nipype Architecture

  • Interface
  • Engine
  • Executable Plugins

Nipype: Interface

  • Interface: Wraps a program or function

Nipype: Engine

  • Node/MapNode: Wraps an Interface for use in a Workflow

  • Workflow: A graph whose nodes are of type Node, MapNode, or Workflow and whose edge represent data flow

Nipype: Plugin

  • Interface: A component that describes how a Workflow should be executed

Installing environment

  • Scientific Python

    • Debian/Ubuntu/Scientific Fedora
    • Enthought Python Distribution
    • Anaconda
    • There are modules on Blanca, though old versions of python.
    • We have our own Anaconda build installed on Blanca.
  • Installing Nipype

    • PyPi
    • Github
    • Neurodebian
  • Running Nipype

    • Ensure tools are installed and accessible on your path
    • Remember Nipype is simpyl a wrapper for other software packages (e.g., spm, fsl, afni, etc)

Nipype Preprocessing Pipeline Example

Preprocessing Steps

  1. Slice timing
  2. Realignment to mean image
  3. Coregister anatomical to functional
  4. Segment anatomical with bias correction
  5. Normalize anatomical to MNI space
  6. Apply normalization to functionals
  7. Smooth - 8mm
  8. Artifact Detection - 3 sd of global mean (3.5 sd of root mean squared sucessive differences, mahalonobis distance across slices? ask tor for more details, but ignore fo now)
  9. Create nuisance covariate matrix for glm (note the file has headers and NaNs for missing data in the derivative columns)

Figures Generated

  • Realignment Parameters (plot_realign/Realignment_parameters.pdf)
  • MNI space single subject overlayed on mean warped functional (plot_normalization_check/Normalized_Functional_Check.pdf)
  • Spikes identified using global signal (art/plot.rafunc.png)

Setup

  • Import Modules and set up paths and defaults
In []:
# Define Python Modules to import
from nipype.interfaces import spm
import nipype.interfaces.io as nio           # Data i/o
import nipype.interfaces.utility as util     # utility
from nipype.pipeline.engine import Node, Workflow
from nipype.interfaces.base import BaseInterface, TraitedSpec, File, traits
import nipype.algorithms.rapidart as ra      # artifact detection
from nipype.interfaces.nipy.preprocess import ComputeMask
import nipype.interfaces.matlab as mlab
import os
import nibabel as nib
from IPython.display import Image
import glob

# Specify various inputs files for pipeline
# spm_path = '/projects/ics/software/spm/spm8_r5236/'
spm_path = '/Users/lukechang/Documents/Matlab/spm8/'
canonical_file = spm_path + 'canonical/single_subj_T1.nii'
template_file = spm_path + 'templates/T1.nii'

# Set the way matlab should be called
# mlab.MatlabCommand.set_default_matlab_cmd("matlab -nodesktop -nosplash -nojvm -noFigureWindows")
mlab.MatlabCommand.set_default_matlab_cmd("matlab -nodesktop -nosplash")
mlab.MatlabCommand.set_default_paths(spm_path)

Define Nodes

  • Nodes are processes

  • They can refer to specific functions of an interface (e.g., coregister)

  • They can be a custom function

  • Iterables allow you to iterate over a vector of parameters (e.g., subjects, or smoothing parameters)

In [5]:
#Setup Data Source for Input Data
ds = Node(nio.DataGrabber(infields=['subject_id', 'task_id'], outfields=['func', 'struc']),name='datasource')
ds.inputs.base_directory = os.path.abspath(data_dir + '/' + subject_id)
ds.inputs.template = '*'
ds.inputs.sort_filelist = True
ds.inputs.template_args = {'func': [['task_id']], 'struc':[]}
ds.inputs.field_template = {'func': 'Functional/Raw/%s/func.nii','struc': 'Structural/SPGR/spgr.nii'}
ds.inputs.subject_id = subject_id
ds.inputs.task_id = task_list
ds.iterables = ('task_id',task_list)

#Get Timing Acquisition for slice timing
tr = 2
ta = Node(interface=util.Function(input_names=['tr', 'n_slices'], output_names=['ta'],  function = get_ta), name="ta")
ta.inputs.tr=tr

#Slice Timing: sequential ascending 
slice_timing = Node(interface=spm.SliceTiming(), name="slice_timing")
slice_timing.inputs.time_repetition = tr
slice_timing.inputs.ref_slice = 1

#Realignment - 6 parameters - realign to first image of very first series.
realign = Node(interface=spm.Realign(), name="realign")
realign.inputs.register_to_mean = True

#Plot Realignment
plot_realign = Node(interface=PlotRealignmentParameters(), name="plot_realign")

#Artifact Detection
art = Node(interface=ra.ArtifactDetect(), name="art")
art.inputs.use_differences      = [True,False]
art.inputs.use_norm             = True
art.inputs.norm_threshold       = 1
art.inputs.zintensity_threshold = 3
art.inputs.mask_type            = 'file'
art.inputs.parameter_source     = 'SPM'
    
#Coregister - 12 parameters, cost function = 'nmi', fwhm 7, interpolate, don't mask
#anatomical to functional mean across all available data.
coregister = Node(interface=spm.Coregister(), name="coregister")
coregister.inputs.jobtype = 'estimate'

# Segment structural, gray/white/csf,mni, 
segment = Node(interface=spm.Segment(), name="segment")
segment.inputs.save_bias_corrected = True
    
#Normalize - structural to MNI - then apply this to the coregistered functionals
normalize = Node(interface=spm.Normalize(), name = "normalize")
normalize.inputs.template = os.path.abspath(template_file)

#Plot normalization Check
plot_normalization_check = Node(interface=Plot_Coregistration_Montage(), name="plot_normalization_check")
plot_normalization_check.inputs.canonical_img = canonical_file
    
#Create Mask
compute_mask = Node(interface=ComputeMask(), name="compute_mask")
#remove lower 5% of histogram of mean image
compute_mask.inputs.m = .05
        
#Smooth
#implicit masking (.im) = 0, dtype = 0
smooth = Node(interface=spm.Smooth(), name = "smooth")
fwhmlist = [8]
smooth.iterables = ('fwhm',fwhmlist)

#Create Covariate matrix
make_covariates = Node(interface=Create_Covariates(), name="make_covariates")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-5-a033aa3bfa93> in <module>()
      1 #Setup Data Source for Input Data
----> 2 ds = Node(nio.DataGrabber(infields=['subject_id', 'task_id'], outfields=['func', 'struc']),name='datasource')
      3 ds.inputs.base_directory = os.path.abspath(data_dir + '/' + subject_id)
      4 ds.inputs.template = '*'
      5 ds.inputs.sort_filelist = True

NameError: name 'Node' is not defined

Create a Workflow

  • A workflow is a processing pipeline

  • It is a directed acyclic graph that represents data flow

  • Nodes are processes

  • Edges are direction of data flow

  • Must define input and and output of processing node

In []:
Preprocessed = Workflow(name="Preprocessed")
Preprocessed.base_dir = os.path.abspath(data_dir + '/' + subject_id + '/Functional')

Preprocessed.connect([  
                    (ds, ta, [(('func', get_n_slices), "n_slices")]),
                    (ta, slice_timing, [("ta", "time_acquisition")]),
                    (ds, slice_timing, [('func', 'in_files'),
                                        (('func', get_n_slices), "num_slices"),
                                        (('func', get_slice_order), "slice_order"),
                                           ]),
                    (slice_timing, realign, [('timecorrected_files', 'in_files')]),
                    (realign, compute_mask, [('mean_image','mean_volume')]),
                    (realign,coregister, [('mean_image', 'target')]),
                    (ds,coregister, [('struc', 'source')]),
                    (coregister,segment, [('coregistered_source', 'data')]),
                    (segment, normalize, [('transformation_mat','parameter_file'),
                                        ('bias_corrected_image', 'source'),]),
                    (realign,normalize, [('realigned_files', 'apply_to_files'),
                                        (('realigned_files', get_vox_dims), 'write_voxel_sizes')]),
                    (normalize, smooth, [('normalized_files', 'in_files')]),
                    (compute_mask,art,[('brain_mask','mask_file')]),
                    (realign,art,[('realignment_parameters','realignment_parameters')]),
                    (realign,art,[('realigned_files','realigned_files')]),
                    (realign,plot_realign, [('realignment_parameters', 'realignment_parameters')]),
                    (normalize, plot_normalization_check, [('normalized_files', 'wra_img')]),
                    (realign, make_covariates, [('realignment_parameters', 'realignment_parameters')]),
                    (art, make_covariates, [('outlier_files', 'spike_id')]),
                      ])

Visualize the graphical pipeline

  • Each processing step in the workflow is a node in the graph
  • Because it is a DAG, you can easily run different pipelines on the same data without interfering with other pipelines.
  • All of the files you will need for subsequent analyses will be in each of these folders.
  • If you make changes to a node, nipype will only rerun the thing you changed and all dependent nodes.
  • You can easily iterate over a vector of parameters using iterables (e.g., different smoothing parameters)

Here is a directed acyclic graph of the preprocessing pipeline.

In [6]:
data_dir = '/Users/lukechang/Dropbox/PTSD/Data/Imaging/'
sub = 'subj46153C'
Preprocessed = create_preproc_func_pipeline(data_dir = data_dir, subject_id=sub)
Preprocessed.write_graph(data_dir + sub + "/Preprocessed_Workflow.dot")
Image(filename=data_dir + sub + '/Preprocessed_Workflow.dot.png')
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-6-295c75f0b91b> in <module>()
      1 data_dir = '/Users/lukechang/Dropbox/PTSD/Data/Imaging/'
      2 sub = 'subj46153C'
----> 3 Preprocessed = create_preproc_func_pipeline(data_dir = data_dir, subject_id=sub)
      4 Preprocessed.write_graph(data_dir + sub + "/Preprocessed_Workflow.dot")
      5 Image(filename=data_dir + sub + '/Preprocessed_Workflow.dot.png')

NameError: name 'create_preproc_func_pipeline' is not defined

Execution Plugins

Allows seamless execution across many architectures

  • Local

    • serially
    • multicore
  • Clusters

    • PBS/Torque
    • SGE
    • SLURM
    • SSH (via IPython)
  • Caveats

    • Submitting jobs using PBS is really slow because nipype is waiting to submit jobs while they are in "c" completed state.
    • Haven't tested it on SLURM yet.
    • Works looping over subjects on interactive node.
    • Should be able to submit each subject to a node and run multiprocessing on each job submission.
In []:
# Create Pipeline for subject
data_dir='/Users/lukechang/Dropbox/PTSD/Data/Imaging'

# subject_id = 'subj46153C'
task_list=['s1_r1Cond','s1_r1Ext']
Preprocessed = create_preproc_func_pipeline(data_dir=data_dir, subject_id = subject_id, task_list=task_list)

# Get List of subjects
# sublist = sorted([x.split('/')[-1] for x in glob.glob(data_dir + '/subj*')])

# Loop over subjects
for sub in reversed(sublist):
    #Glob Subject runs as they vary
    runlist = [x.split('/')[-1] for x in glob.glob(data_dir + '/' + sub + '/Functional/Raw/*')]
    Preprocessed = create_preproc_func_pipeline(data_dir=data_dir, subject_id = sub, task_list=runlist)
    print  data_dir + '/' + sub
   
    # Write out pipeline as a DAG
    Preprocessed.write_graph(dotfilename=data_dir + '/' + sub + "/Functional/Preprocessed_Workflow.dot.svg",format='svg')
    Preprocessed.run(plugin='MultiProc', plugin_args={'n_procs' : 4}) 

Create your own custom functions

  • You can easily create your own Nodes running custom functions

  • Use object oriented python coding

  • You need to define Input and Output Specs

  • Can use python, shell scripts, matlab, etc.

In []:
# Example to create a custom coregistration plot using nilearn plotting tools

class Plot_Coregistration_Montage_InputSpec(TraitedSpec):
    wra_img = File(exists=True, mandatory=True) 
    canonical_img = File(exists=True, mandatory=True)
    title = traits.Str("Normalized Functional Check", usedefault=True)
    
class Plot_Coregistration_Montage_OutputSpec(TraitedSpec):
    plot = File(exists=True)

class Plot_Coregistration_Montage(BaseInterface):
    # This function creates a plot of an axial montage of the average normalized functional data 
    # with the spm MNI space single subject T1 overlaid. Useful for checking normalization
    input_spec = Plot_Coregistration_Montage_InputSpec
    output_spec = Plot_Coregistration_Montage_OutputSpec

    def _run_interface(self, runtime):
        import nibabel as nib
        from nilearn import plotting, datasets, image
        from nipype.interfaces.base import isdefined
        import numpy as np
        import pylab as plt
        import os
        
        wra_img = nib.load(self.inputs.wra_img)
        canonical_img = nib.load(self.inputs.canonical_img)
        title = self.inputs.title
        mean_wraimg = image.mean_img(wra_img)
        
        if title != "":
            filename = title.replace(" ", "_")+".pdf"
        else:
            filename = "plot.pdf"
        
        fig = plotting.plot_anat(mean_wraimg, title="wrafunc & canonical single subject", cut_coords=range(-40, 40, 10), display_mode='z')
        fig.add_edges(canonical_img)     
        fig.savefig(filename)
        fig.close()
        
        self._plot = filename
        
        runtime.returncode=0
        return runtime
    
    def _list_outputs(self):
        outputs = self._outputs().get()
        outputs["plot"] = os.path.abspath(self._plot)
        return outputs

Coregistration Check