Source code for sdcflows.workflows.fit.pepolar

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
#
"""Datasets with multiple phase encoded directions."""
from nipype.pipeline import engine as pe
from nipype.interfaces import utility as niu

from niworkflows.engine.workflows import LiterateWorkflow as Workflow

from ... import data

INPUT_FIELDS = ("metadata", "in_data")
_PEPOLAR_DESC = """\
A *B<sub>0</sub>*-nonuniformity map (or *fieldmap*) was estimated based on two (or more)
echo-planar imaging (EPI) references """


[docs] def init_topup_wf( grid_reference=0, omp_nthreads=1, sloppy=False, debug=False, name="pepolar_estimate_wf", ): """ Create the PEPOLAR field estimation workflow based on FSL's ``topup``. Workflow Graph .. workflow :: :graph2use: orig :simple_form: yes from sdcflows.workflows.fit.pepolar import init_topup_wf wf = init_topup_wf() Parameters ---------- grid_reference : :obj:`int` Index of the volume (after flattening) that will be taken for gridding reference. sloppy : :obj:`bool` Whether a fast configuration of topup (less accurate) should be applied. debug : :obj:`bool` Run in debug mode name : :obj:`str` Name for this workflow omp_nthreads : :obj:`int` Parallelize internal tasks across the number of CPUs given by this option. Inputs ------ in_data : :obj:`list` of :obj:`str` A list of EPI files that will be fed into TOPUP. metadata : :obj:`list` of :obj:`dict` A list of dictionaries containing the metadata corresponding to each file in ``in_data``. Outputs ------- fmap : :obj:`str` The path of the estimated fieldmap. fmap_ref : :obj:`str` The path of an unwarped conversion of files in ``in_data``. fmap_mask : :obj:`str` The path of mask corresponding to the ``fmap_ref`` output. fmap_coeff : :obj:`str` or :obj:`list` of :obj:`str` The path(s) of the B-Spline coefficients supporting the fieldmap. method: :obj:`str` Short description of the estimation method that was run. """ from nipype.interfaces.fsl.epi import TOPUP from niworkflows.interfaces.nibabel import MergeSeries, ReorientImage from niworkflows.interfaces.images import RobustAverage from ...utils.misc import front as _front from ...interfaces.epi import GetReadoutTime, SortPEBlips from ...interfaces.utils import UniformGrid, PadSlices, ReorientImageAndMetadata from ...interfaces.bspline import TOPUPCoeffReorient from ..ancillary import init_brainextraction_wf workflow = Workflow(name=name) workflow.__desc__ = f"""\ {_PEPOLAR_DESC} with `topup` (@topup; FSL {TOPUP().version}). """ inputnode = pe.Node(niu.IdentityInterface(fields=INPUT_FIELDS), name="inputnode") outputnode = pe.Node( niu.IdentityInterface( fields=[ "fmap", "fmap_ref", "fmap_coeff", "fmap_mask", "jacobians", "xfms", "out_warps", "method", ] ), name="outputnode", ) outputnode.inputs.method = "PEB/PEPOLAR (phase-encoding based / PE-POLARity)" # Calculate the total readout time of each run readout_time = pe.MapNode( GetReadoutTime(), name="readout_time", iterfield=["metadata", "in_file"], run_without_submitting=True, ) # Average each run so that topup is not overwhelmed (see #279) runwise_avg = pe.MapNode( RobustAverage(num_threads=omp_nthreads), name="runwise_avg", iterfield="in_file", ) # Regrid all to the reference (grid_reference=0 means first averaged run) regrid = pe.Node(UniformGrid(reference=grid_reference), name="regrid") # Sort PE blips to ensure reproducibility sort_pe_blips = pe.Node( SortPEBlips(), name="sort_pe_blips", run_without_submitting=True ) # Merge into one 4D file concat_blips = pe.Node(MergeSeries(affine_tolerance=1e-4), name="concat_blips") # Pad dimensions so that they meet TOPUP's expectations pad_blip_slices = pe.Node(PadSlices(), name="pad_blip_slices") # Run 3dVolReg between runs: uses RobustAverage for consistency and to generate # debugging artifacts (typically, one wants to look at the average across uncorrected runs) setwise_avg = pe.Node(RobustAverage(num_threads=omp_nthreads), name="setwise_avg") # The core of the implementation # Feed the input images in LAS orientation, so FSL does not run funky reorientations to_las = pe.Node(ReorientImageAndMetadata(target_orientation="LAS"), name="to_las") topup = pe.Node( TOPUP( config=str(data.load(f"flirtsch/b02b0{'_quick' * sloppy}.cnf")) ), name="topup", ) # "Generalize" topup coefficients and store them in a spatially-correct NIfTI file fix_coeff = pe.Node( TOPUPCoeffReorient(), name="fix_coeff", run_without_submitting=True ) # Average the output ref_average = pe.Node(RobustAverage(num_threads=omp_nthreads), name="ref_average") # Sophisticated brain extraction of fMRIPrep brainextraction_wf = init_brainextraction_wf() # fmt: off workflow.connect([ (inputnode, runwise_avg, [("in_data", "in_file")]), (inputnode, readout_time, [("metadata", "metadata")]), (runwise_avg, regrid, [("out_file", "in_data")]), (regrid, readout_time, [("out_data", "in_file")]), (regrid, sort_pe_blips, [("out_data", "in_data")]), (readout_time, sort_pe_blips, [("readout_time", "readout_times"), ("pe_dir_fsl", "pe_dirs_fsl")]), (sort_pe_blips, topup, [("readout_times", "readout_times")]), (setwise_avg, fix_coeff, [("out_file", "fmap_ref")]), (sort_pe_blips, concat_blips, [("out_data", "in_files")]), (concat_blips, pad_blip_slices, [("out_file", "in_file")]), (pad_blip_slices, setwise_avg, [("out_file", "in_file")]), (setwise_avg, to_las, [("out_hmc_volumes", "in_file")]), (sort_pe_blips, to_las, [("pe_dirs_fsl", "pe_dir")]), (to_las, topup, [ ("out_file", "in_file"), ("pe_dir", "encoding_direction"), ]), (topup, fix_coeff, [("out_fieldcoef", "in_coeff")]), (to_las, fix_coeff, [(("pe_dir", _front), "pe_dir")]), (topup, outputnode, [("out_jacs", "jacobians"), ("out_mats", "xfms")]), (ref_average, brainextraction_wf, [("out_file", "inputnode.in_file")]), (brainextraction_wf, outputnode, [ ("outputnode.out_file", "fmap_ref"), ("outputnode.out_mask", "fmap_mask") ]), (fix_coeff, outputnode, [("out_coeff", "fmap_coeff")]), ]) # fmt: on if not debug: # Roll orientation back to original from_las = pe.Node(ReorientImage(), name="from_las") from_las_fmap = pe.Node(ReorientImage(), name="from_las_fmap") # fmt: off workflow.connect([ (setwise_avg, from_las, [("out_file", "target_file")]), (setwise_avg, from_las_fmap, [("out_file", "target_file")]), (topup, from_las, [("out_corrected", "in_file")]), (from_las, ref_average, [("out_file", "in_file")]), (topup, from_las_fmap, [("out_field", "in_file")]), (topup, outputnode, [("out_warps", "out_warps")]), (from_las_fmap, outputnode, [("out_file", "fmap")]), ]) # fmt: on return workflow from sdcflows.interfaces.bspline import ApplyCoeffsField # Separate the runs again, as our ApplyCoeffsField corrects them separately unwarp = pe.Node(ApplyCoeffsField(), name="unwarp") unwarp.interface._always_run = True concat_corrected = pe.Node(MergeSeries(), name="concat_corrected") # fmt:off workflow.connect([ (fix_coeff, unwarp, [("out_coeff", "in_coeff")]), (setwise_avg, unwarp, [("out_hmc_volumes", "in_data")]), (sort_pe_blips, unwarp, [("readout_times", "ro_time"), ("pe_dirs", "pe_dir")]), (unwarp, outputnode, [("out_field", "fmap")]), (unwarp, concat_corrected, [("out_corrected", "in_files")]), (concat_corrected, ref_average, [("out_file", "in_file")]), ]) # fmt:on return workflow
[docs] def init_3dQwarp_wf(omp_nthreads=1, debug=False, name="pepolar_estimate_wf"): """ Create the PEPOLAR field estimation workflow based on AFNI's ``3dQwarp``. This workflow takes in two EPI files that MUST have opposed :abbr:`PE (phase-encoding)` direction. Therefore, EPIs with orthogonal PE directions are not supported. Workflow Graph .. workflow :: :graph2use: orig :simple_form: yes from sdcflows.workflows.fit.pepolar import init_3dQwarp_wf wf = init_3dQwarp_wf() Parameters ---------- debug : :obj:`bool` Whether a fast configuration of topup (less accurate) should be applied. name : :obj:`str` Name for this workflow omp_nthreads : :obj:`int` Parallelize internal tasks across the number of CPUs given by this option. Inputs ------ in_data : :obj:`list` of :obj:`str` A list of two EPI files, the first of which will be taken as reference. Outputs ------- fmap : :obj:`str` The path of the estimated fieldmap. fmap_ref : :obj:`str` The path of an unwarped conversion of the first element of ``in_data``. """ from nipype.interfaces import afni from niworkflows.interfaces.header import CopyHeader from niworkflows.interfaces.fixes import ( FixHeaderRegistration as Registration, FixHeaderApplyTransforms as ApplyTransforms, ) from niworkflows.interfaces.freesurfer import StructuralReference from niworkflows.func.util import init_enhance_and_skullstrip_bold_wf from ...utils.misc import front as _front, last as _last from ...interfaces.utils import Flatten, ConvertWarp workflow = Workflow(name=name) workflow.__desc__ = f"""{_PEPOLAR_DESC} \ with `3dQwarp` (@afni; AFNI {''.join(['%02d' % v for v in afni.Info().version() or []])}). """ inputnode = pe.Node( niu.IdentityInterface(fields=["in_data", "metadata"]), name="inputnode" ) outputnode = pe.Node( niu.IdentityInterface(fields=["fmap", "fmap_ref"]), name="outputnode" ) flatten = pe.Node(Flatten(), name="flatten") sort_pe = pe.Node( niu.Function(function=_sorted_pe, output_names=["sorted", "qwarp_args"]), name="sort_pe", run_without_submitting=True, ) merge_pes = pe.MapNode( StructuralReference( auto_detect_sensitivity=True, initial_timepoint=1, fixed_timepoint=True, # Align to first image intensity_scaling=True, # 7-DOF (rigid + intensity) no_iteration=True, subsample_threshold=200, out_file="template.nii.gz", ), name="merge_pes", iterfield=["in_files"], ) pe0_wf = init_enhance_and_skullstrip_bold_wf( omp_nthreads=omp_nthreads, name="pe0_wf" ) pe1_wf = init_enhance_and_skullstrip_bold_wf( omp_nthreads=omp_nthreads, name="pe1_wf" ) align_pes = pe.Node( Registration( from_file=data.load("translation_rigid.json"), output_warped_image=True, ), name="align_pes", n_procs=omp_nthreads, ) qwarp = pe.Node( afni.QwarpPlusMinus( blur=[-1, -1], environ={"OMP_NUM_THREADS": f"{min(omp_nthreads, 4)}"}, minpatch=9, nopadWARP=True, noweight=True, pblur=[0.05, 0.05], ), name="qwarp", n_procs=min(omp_nthreads, 4), ) to_ants = pe.Node(ConvertWarp(), name="to_ants", mem_gb=0.01) cphdr_warp = pe.Node(CopyHeader(), name="cphdr_warp", mem_gb=0.01) unwarp_reference = pe.Node( ApplyTransforms( dimension=3, float=True, interpolation="LanczosWindowedSinc", ), name="unwarp_reference", ) # fmt: off workflow.connect([ (inputnode, flatten, [("in_data", "in_data"), ("metadata", "in_meta")]), (flatten, sort_pe, [("out_list", "inlist")]), (sort_pe, qwarp, [("qwarp_args", "args")]), (sort_pe, merge_pes, [("sorted", "in_files")]), (merge_pes, pe0_wf, [(("out_file", _front), "inputnode.in_file")]), (merge_pes, pe1_wf, [(("out_file", _last), "inputnode.in_file")]), (pe0_wf, align_pes, [("outputnode.skull_stripped_file", "fixed_image")]), (pe1_wf, align_pes, [("outputnode.skull_stripped_file", "moving_image")]), (pe0_wf, qwarp, [("outputnode.skull_stripped_file", "in_file")]), (align_pes, qwarp, [("warped_image", "base_file")]), (inputnode, cphdr_warp, [(("in_data", _front), "hdr_file")]), (qwarp, cphdr_warp, [("source_warp", "in_file")]), (cphdr_warp, to_ants, [("out_file", "in_file")]), (to_ants, unwarp_reference, [("out_file", "transforms")]), (inputnode, unwarp_reference, [("in_reference", "reference_image"), ("in_reference", "input_image")]), (unwarp_reference, outputnode, [("output_image", "fmap_ref")]), (to_ants, outputnode, [("out_file", "fmap")]), ]) # fmt: on return workflow
def _sorted_pe(inlist): """ Generate suitable inputs to ``3dQwarp``. Example ------- >>> paths, args = _sorted_pe([ ... ("dir-AP_epi.nii.gz", {"PhaseEncodingDirection": "j-"}), ... ("dir-AP_bold.nii.gz", {"PhaseEncodingDirection": "j-"}), ... ("dir-PA_epi.nii.gz", {"PhaseEncodingDirection": "j"}), ... ("dir-PA_bold.nii.gz", {"PhaseEncodingDirection": "j"}), ... ("dir-AP_sbref.nii.gz", {"PhaseEncodingDirection": "j-"}), ... ("dir-PA_sbref.nii.gz", {"PhaseEncodingDirection": "j"}), ... ]) >>> paths[0] ['dir-AP_epi.nii.gz', 'dir-AP_bold.nii.gz', 'dir-AP_sbref.nii.gz'] >>> paths[1] ['dir-PA_epi.nii.gz', 'dir-PA_bold.nii.gz', 'dir-PA_sbref.nii.gz'] >>> args '-noXdis -noZdis' >>> paths, args = _sorted_pe([ ... ("dir-AP_epi.nii.gz", {"PhaseEncodingDirection": "j-"}), ... ("dir-LR_epi.nii.gz", {"PhaseEncodingDirection": "i"}), ... ]) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ValueError: """ out_ref = [inlist[0][0]] out_opp = [] ref_pe = inlist[0][1]["PhaseEncodingDirection"] for d, m in inlist[1:]: pe = m["PhaseEncodingDirection"] if pe == ref_pe: out_ref.append(d) elif pe[0] == ref_pe[0]: out_opp.append(d) else: raise ValueError("Cannot handle orthogonal PE encodings.") return ( [out_ref, out_opp], {"i": "-noYdis -noZdis", "j": "-noXdis -noZdis", "k": "-noXdis -noYdis"}[ ref_pe[0] ], )