Warning: This document is for an old version of niworkflows.

Source code for niworkflows.interfaces.cifti

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
#
"""Handling connectivity: combines FreeSurfer surfaces with subcortical volumes."""
from pathlib import Path
import json
import warnings

import nibabel as nb
from nibabel import cifti2 as ci
import numpy as np
from nilearn.image import resample_to_img

from nipype.utils.filemanip import split_filename
from nipype.interfaces.base import (
    BaseInterfaceInputSpec,
    TraitedSpec,
    File,
    traits,
    SimpleInterface,
    Directory,
)
import templateflow.api as tf

CIFTI_SURFACES = ("fsaverage5", "fsaverage6", "fsLR")
CIFTI_VOLUMES = ("MNI152NLin2009cAsym", "MNI152NLin6Asym")
CIFTI_STRUCT_WITH_LABELS = {  # CITFI structures with corresponding labels
    # SURFACES
    "CIFTI_STRUCTURE_CORTEX_LEFT": None,
    "CIFTI_STRUCTURE_CORTEX_RIGHT": None,
    # SUBCORTICAL
    "CIFTI_STRUCTURE_ACCUMBENS_LEFT": (26,),
    "CIFTI_STRUCTURE_ACCUMBENS_RIGHT": (58,),
    "CIFTI_STRUCTURE_AMYGDALA_LEFT": (18,),
    "CIFTI_STRUCTURE_AMYGDALA_RIGHT": (54,),
    "CIFTI_STRUCTURE_BRAIN_STEM": (16,),
    "CIFTI_STRUCTURE_CAUDATE_LEFT": (11,),
    "CIFTI_STRUCTURE_CAUDATE_RIGHT": (50,),
    "CIFTI_STRUCTURE_CEREBELLUM_LEFT": (6, 8,),  # DKT31  # HCP MNI152
    "CIFTI_STRUCTURE_CEREBELLUM_RIGHT": (45, 47,),  # DKT31  # HCP MNI152
    "CIFTI_STRUCTURE_DIENCEPHALON_VENTRAL_LEFT": (28,),
    "CIFTI_STRUCTURE_DIENCEPHALON_VENTRAL_RIGHT": (60,),
    "CIFTI_STRUCTURE_HIPPOCAMPUS_LEFT": (17,),
    "CIFTI_STRUCTURE_HIPPOCAMPUS_RIGHT": (53,),
    "CIFTI_STRUCTURE_PALLIDUM_LEFT": (13,),
    "CIFTI_STRUCTURE_PALLIDUM_RIGHT": (52,),
    "CIFTI_STRUCTURE_PUTAMEN_LEFT": (12,),
    "CIFTI_STRUCTURE_PUTAMEN_RIGHT": (51,),
    "CIFTI_STRUCTURE_THALAMUS_LEFT": (10,),
    "CIFTI_STRUCTURE_THALAMUS_RIGHT": (49,),
}
CIFTI_VARIANTS = {
    "HCP grayordinates": ("fsLR", "MNI152NLin6Asym"),
    "fMRIPrep grayordinates": ("fsaverage", "MNI152NLin2009cAsym"),
}


class _GenerateCiftiInputSpec(BaseInterfaceInputSpec):
    bold_file = File(mandatory=True, exists=True, desc="input BOLD file")
    volume_target = traits.Enum(
        "MNI152NLin6Asym",
        "MNI152NLin2009cAsym",
        usedefault=True,
        desc="CIFTI volumetric output space",
    )
    surface_target = traits.Enum(
        "fsLR",
        "fsaverage5",
        "fsaverage6",
        usedefault=True,
        desc="CIFTI surface target space",
    )
    surface_density = traits.Enum(
        "10k", "32k", "41k", "59k", desc="Surface vertices density."
    )
    TR = traits.Float(mandatory=True, desc="Repetition time")
    surface_bolds = traits.List(
        File(exists=True),
        mandatory=True,
        desc="list of surface BOLD GIFTI files" " (length 2 with order [L,R])",
    )
    subjects_dir = Directory(mandatory=True, desc="FreeSurfer SUBJECTS_DIR")


class _GenerateCiftiOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="generated CIFTI file")
    out_metadata = File(exists=True, desc="variant metadata JSON")
    variant = traits.Str(desc="Name of variant space")
    density = traits.Str(desc="Total number of grayordinates")


[docs]class GenerateCifti(SimpleInterface): """ Generate CIFTI image from BOLD file in target spaces. Currently supports ``fsLR``, ``fsaverage5``, or ``fsaverage6`` for template surfaces and ``MNI152NLin6Asym`` or ``MNI152NLin2009cAsym`` as template volumes. """ input_spec = _GenerateCiftiInputSpec output_spec = _GenerateCiftiOutputSpec def _run_interface(self, runtime): annotation_files, label_file = _get_cifti_data( self.inputs.surface_target, self.inputs.volume_target, self.inputs.subjects_dir, self.inputs.surface_density, ) out_metadata, variant, density = _get_cifti_variant( self.inputs.surface_target, self.inputs.volume_target, self.inputs.surface_density, ) self._results.update({"out_metadata": out_metadata, "variant": variant}) if density: self._results["density"] = density self._results["out_file"] = _create_cifti_image( self.inputs.bold_file, label_file, self.inputs.surface_bolds, annotation_files, self.inputs.TR, (self.inputs.surface_target, self.inputs.volume_target), ) return runtime
class _CiftiNameSourceInputSpec(BaseInterfaceInputSpec): variant = traits.Str( mandatory=True, desc="unique label of spaces used in combination to generate CIFTI file", ) density = traits.Str(desc="density label") class _CiftiNameSourceOutputSpec(TraitedSpec): out_name = traits.Str(desc="(partial) filename formatted according to template")
[docs]class CiftiNameSource(SimpleInterface): """ Construct new filename based on unique label of spaces used to generate a CIFTI file. Examples -------- >>> namer = CiftiNameSource() >>> namer.inputs.variant = 'HCP grayordinates' >>> res = namer.run() >>> res.outputs.out_name 'space-fsLR_bold.dtseries' >>> namer.inputs.density = '32k' >>> res = namer.run() >>> res.outputs.out_name 'space-fsLR_den-32k_bold.dtseries' """ input_spec = _CiftiNameSourceInputSpec output_spec = _CiftiNameSourceOutputSpec def _run_interface(self, runtime): suffix = "" if "hcp" in self.inputs.variant.lower(): suffix += "space-fsLR_" if self.inputs.density: suffix += "den-{}_".format(self.inputs.density) suffix += "bold.dtseries" self._results["out_name"] = suffix return runtime
def _get_cifti_data(surface, volume, subjects_dir=None, density=None): """ Fetch surface and volumetric label files for CIFTI creation. Parameters ---------- surface : str Target surface space volume : str Target volume space subjects_dir : str, optional Path to FreeSurfer subjects directory (required `fsaverage5`/`fsaverage6` surfaces) density : str, optional Surface density (required for `fsLR` surfaces) Returns ------- annotation_files : list Surface annotation files to allow removal of medial wall label_file : str Volumetric label file of subcortical structures Examples -------- >>> annots, label = _get_cifti_data('fsLR', 'MNI152NLin6Asym', density='32k') >>> annots # doctest: +ELLIPSIS ['.../tpl-fsLR_hemi-L_den-32k_desc-nomedialwall_dparc.label.gii', \ '.../tpl-fsLR_hemi-R_den-32k_desc-nomedialwall_dparc.label.gii'] >>> label # doctest: +ELLIPSIS '.../tpl-MNI152NLin6Asym_res-02_atlas-HCP_dseg.nii.gz' """ if surface not in CIFTI_SURFACES or volume not in CIFTI_VOLUMES: raise NotImplementedError( "Variant (surface: {0}, volume: {1}) is not supported".format( surface, volume ) ) tpl_kwargs = {"suffix": "dseg"} # fMRIPrep grayordinates if volume == "MNI152NLin2009cAsym": tpl_kwargs.update({"resolution": "2", "desc": "DKT31"}) annotation_files = sorted( (subjects_dir / surface / "label").glob("*h.aparc.annot") ) # HCP grayordinates elif volume == "MNI152NLin6Asym": # templateflow specific resolutions (2mm, 1.6mm) res = {"32k": "2", "59k": "6"}[density] tpl_kwargs.update({"atlas": "HCP", "resolution": res}) annotation_files = [ str(f) for f in tf.get( "fsLR", density=density, desc="nomedialwall", suffix="dparc" ) ] if len(annotation_files) != 2: raise IOError("Invalid number of surface annotation files") label_file = str(tf.get(volume, **tpl_kwargs)) return annotation_files, label_file def _get_cifti_variant(surface, volume, density=None): """ Identify CIFTI variant and return metadata. Parameters ---------- surface : str Target surface space volume : str Target volume space density : str, optional Surface density (required for `fsLR` surfaces) Returns ------- out_metadata : str JSON file with variant metadata variant : str Name of CIFTI variant Examples -------- >>> metafile, variant, _ = _get_cifti_variant('fsaverage5', 'MNI152NLin2009cAsym') >>> str(metafile) # doctest: +ELLIPSIS '.../dtseries_variant.json' >>> variant 'fMRIPrep grayordinates' >>> _, variant, grayords = _get_cifti_variant('fsLR', 'MNI152NLin6Asym', density='59k') >>> variant 'HCP grayordinates' >>> grayords '170k' """ if surface in ("fsaverage5", "fsaverage6"): density = {"fsaverage5": "10k", "fsaverage6": "41k"}[surface] surface = "fsaverage" for variant, targets in CIFTI_VARIANTS.items(): if all(target in targets for target in (surface, volume)): break variant = None if variant is None: raise NotImplementedError( "No corresponding variant for (surface: {0}, volume: {1})".format( surface, volume ) ) grayords = None out_metadata = Path.cwd() / "dtseries_variant.json" out_json = { "space": variant, "surface": surface, "volume": volume, "surface_density": density, } if surface == "fsLR": grayords = {"32k": "91k", "59k": "170k"}[density] out_json["grayordinates"] = grayords out_metadata.write_text(json.dumps(out_json, indent=2)) return out_metadata, variant, grayords def _create_cifti_image( bold_file, label_file, bold_surfs, annotation_files, tr, targets ): """ Generate CIFTI image in target space. Parameters ---------- bold_file : str BOLD volumetric timeseries label_file : str Subcortical label file bold_surfs : list BOLD surface timeseries [L,R] annotation_files : list Surface label files used to remove medial wall tr : float BOLD repetition time targets : tuple or list Surface and volumetric output spaces Returns ------- out : BOLD data saved as CIFTI dtseries """ bold_img = nb.load(bold_file) label_img = nb.load(label_file) if label_img.shape != bold_img.shape[:3]: warnings.warn("Resampling bold volume to match label dimensions") bold_img = resample_to_img(bold_img, label_img) # ensure images match HCP orientation (LAS) bold_img = _reorient_image(bold_img, orientation="LAS") label_img = _reorient_image(label_img, orientation="LAS") bold_data = bold_img.get_fdata(dtype="float32") timepoints = bold_img.shape[3] label_data = np.asanyarray(label_img.dataobj).astype("int16") # Create brain models idx_offset = 0 brainmodels = [] bm_ts = np.empty((timepoints, 0), dtype="float32") for structure, labels in CIFTI_STRUCT_WITH_LABELS.items(): if labels is None: # surface model model_type = "CIFTI_MODEL_TYPE_SURFACE" # use the corresponding annotation hemi = structure.split("_")[-1] # currently only supports L/R cortex surf_ts = nb.load(bold_surfs[hemi == "RIGHT"]) surf_verts = len(surf_ts.darrays[0].data) if annotation_files[0].endswith(".annot"): annot = nb.freesurfer.read_annot(annotation_files[hemi == "RIGHT"]) # remove medial wall medial = np.nonzero(annot[0] != annot[2].index(b"unknown"))[0] else: annot = nb.load(annotation_files[hemi == "RIGHT"]) medial = np.nonzero(annot.darrays[0].data)[0] # extract values across volumes ts = np.array([tsarr.data[medial] for tsarr in surf_ts.darrays]) vert_idx = ci.Cifti2VertexIndices(medial) bm = ci.Cifti2BrainModel( index_offset=idx_offset, index_count=len(vert_idx), model_type=model_type, brain_structure=structure, vertex_indices=vert_idx, n_surface_vertices=surf_verts, ) idx_offset += len(vert_idx) bm_ts = np.column_stack((bm_ts, ts)) else: model_type = "CIFTI_MODEL_TYPE_VOXELS" vox = [] ts = None for label in labels: ijk = np.nonzero(label_data == label) if ijk[0].size == 0: # skip label if nothing matches continue ts = ( bold_data[ijk] if ts is None else np.concatenate((ts, bold_data[ijk])) ) vox += [ [ijk[0][idx], ijk[1][idx], ijk[2][idx]] for idx in range(len(ts)) ] vox = ci.Cifti2VoxelIndicesIJK(vox) bm = ci.Cifti2BrainModel( index_offset=idx_offset, index_count=len(vox), model_type=model_type, brain_structure=structure, voxel_indices_ijk=vox, ) idx_offset += len(vox) bm_ts = np.column_stack((bm_ts, ts.T)) # add each brain structure to list brainmodels.append(bm) # add volume information brainmodels.append( ci.Cifti2Volume( bold_img.shape[:3], ci.Cifti2TransformationMatrixVoxelIndicesIJKtoXYZ(-3, bold_img.affine), ) ) # generate Matrix information series_map = ci.Cifti2MatrixIndicesMap( (0,), "CIFTI_INDEX_TYPE_SERIES", number_of_series_points=timepoints, series_exponent=0, series_start=0.0, series_step=tr, series_unit="SECOND", ) geometry_map = ci.Cifti2MatrixIndicesMap( (1,), "CIFTI_INDEX_TYPE_BRAIN_MODELS", maps=brainmodels ) # provide some metadata to CIFTI matrix meta = { "surface": targets[0], "volume": targets[1], } # generate and save CIFTI image matrix = ci.Cifti2Matrix() matrix.append(series_map) matrix.append(geometry_map) matrix.metadata = ci.Cifti2MetaData(meta) hdr = ci.Cifti2Header(matrix) img = ci.Cifti2Image(dataobj=bm_ts, header=hdr) img.set_data_dtype(bold_img.get_data_dtype()) img.nifti_header.set_intent("NIFTI_INTENT_CONNECTIVITY_DENSE_SERIES") out_file = "{}.dtseries.nii".format(split_filename(bold_file)[1]) ci.save(img, out_file) return Path.cwd() / out_file def _reorient_image(img, *, target_img=None, orientation=None): """ Coerce an image to a target orientation. .. note:: Only RAS -> LAS conversion is currently supported Parameters ---------- img : :obj:`SpatialImage` image to be reoriented target_img : :obj:`SpatialImage`, optional target in desired orientation orientation : :obj:`str` or :obj:`tuple`, optional desired orientation, if no target image is provided .. testsetup:: >>> img = nb.load(Path(test_data) / 'testSpatialNormalizationRPTMovingWarpedImage.nii.gz') >>> las_img = img.as_reoriented([[0, -1], [1, 1], [2, 1]]) Examples -------- >>> nimg = _reorient_image(img, target_img=img) >>> nb.aff2axcodes(nimg.affine) ('R', 'A', 'S') >>> nimg = _reorient_image(img, target_img=las_img) >>> nb.aff2axcodes(nimg.affine) ('L', 'A', 'S') >>> nimg = _reorient_image(img, orientation='LAS') >>> nb.aff2axcodes(nimg.affine) ('L', 'A', 'S') >>> _reorient_image(img, orientation='LPI') Traceback (most recent call last): ... NotImplementedError: Cannot reorient ... >>> _reorient_image(img) Traceback (most recent call last): ... RuntimeError: No orientation ... """ orient0 = nb.aff2axcodes(img.affine) if target_img is not None: orient1 = nb.aff2axcodes(target_img.affine) elif orientation is not None: orient1 = tuple(orientation) else: raise RuntimeError("No orientation to reorient to!") if orient0 == orient1: # already in desired orientation return img elif orient0 == tuple("RAS") and orient1 == tuple("LAS"): # RAS -> LAS return img.as_reoriented([[0, -1], [1, 1], [2, 1]]) else: raise NotImplementedError( "Cannot reorient {0} to {1}.".format(orient0, orient1) )