# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""Utilities."""
import os
import re
import json
import shutil
import numpy as np
import nibabel as nb
import nilearn.image as nli
from textwrap import indent
from collections import OrderedDict
import scipy.ndimage as nd
from nipype import logging
from nipype.utils.filemanip import fname_presuffix
from nipype.utils.misc import normalize_mc_params
from nipype.interfaces.io import add_traits
from nipype.interfaces.base import (
    traits,
    isdefined,
    File,
    InputMultiPath,
    TraitedSpec,
    BaseInterfaceInputSpec,
    SimpleInterface,
    DynamicTraitedSpec,
)
from .. import __version__
LOG = logging.getLogger("nipype.interface")
class _CopyXFormInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
    hdr_file = File(exists=True, mandatory=True, desc="the file we get the header from")
class _CopyHeaderInputSpec(BaseInterfaceInputSpec):
    in_file = File(exists=True, mandatory=True, desc="the file we get the data from")
    hdr_file = File(exists=True, mandatory=True, desc="the file we get the header from")
class _CopyHeaderOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="written file path")
class _NormalizeMotionParamsInputSpec(BaseInterfaceInputSpec):
    in_file = File(exists=True, mandatory=True, desc="the input parameters file")
    format = traits.Enum(
        "FSL", "AFNI", "FSFAST", "NIPY", usedefault=True, desc="output format"
    )
class _NormalizeMotionParamsOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="written file path")
[docs]class NormalizeMotionParams(SimpleInterface):
    """
    Convert input motion parameters into the designated convention.
    """
    input_spec = _NormalizeMotionParamsInputSpec
    output_spec = _NormalizeMotionParamsOutputSpec
    def _run_interface(self, runtime):
        mpars = np.loadtxt(self.inputs.in_file)  # mpars is N_t x 6
        mpars = np.apply_along_axis(
            func1d=normalize_mc_params, axis=1, arr=mpars, source=self.inputs.format
        )
        self._results["out_file"] = os.path.join(runtime.cwd, "motion_params.txt")
        np.savetxt(self._results["out_file"], mpars)
        return runtime 
class _GenerateSamplingReferenceInputSpec(BaseInterfaceInputSpec):
    fixed_image = File(
        exists=True, mandatory=True, desc="the reference file, defines the FoV"
    )
    moving_image = File(exists=True, mandatory=True, desc="the pixel size reference")
    xform_code = traits.Enum(None, 2, 4, usedefault=True, desc="force xform code")
    fov_mask = traits.Either(
        None,
        File(exists=True),
        usedefault=True,
        desc="mask to clip field of view (in fixed_image space)",
    )
    keep_native = traits.Bool(
        True,
        usedefault=True,
        desc="calculate a grid with native resolution covering "
        "the volume extent given by fixed_image, fast forward "
        "fixed_image otherwise.",
    )
class _GenerateSamplingReferenceOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="one file with all inputs flattened")
[docs]class GenerateSamplingReference(SimpleInterface):
    """
    Generates a reference grid for resampling one image keeping original resolution,
    but moving data to a different space (e.g. MNI).
    If the `fov_mask` optional input is provided, then the abbr:`FoV (field-of-view)`
    is cropped to a bounding box containing the brain mask plus an offest of two
    voxels along all dimensions. The `fov_mask` should be to the brain mask calculated
    from the T1w, and should not contain the brain stem. The mask is resampled into
    target space, and then the bounding box is calculated. Finally, the FoV is adjusted
    to that bounding box.
    """
    input_spec = _GenerateSamplingReferenceInputSpec
    output_spec = _GenerateSamplingReferenceOutputSpec
    def _run_interface(self, runtime):
        if not self.inputs.keep_native:
            self._results["out_file"] = self.inputs.fixed_image
            return runtime
        self._results["out_file"] = _gen_reference(
            self.inputs.fixed_image,
            self.inputs.moving_image,
            fov_mask=self.inputs.fov_mask,
            force_xform_code=self.inputs.xform_code,
            message="%s (niworkflows v%s)" % (self.__class__.__name__, __version__),
        )
        return runtime 
def _copyxform(ref_image, out_image, message=None):
    # Read in reference and output
    # Use mmap=False because we will be overwriting the output image
    resampled = nb.load(out_image, mmap=False)
    orig = nb.load(ref_image)
    if not np.allclose(orig.affine, resampled.affine):
        LOG.debug(
            "Affines of input and reference images do not match, "
            "FMRIPREP will set the reference image headers. "
            "Please, check that the x-form matrices of the input dataset"
            "are correct and manually verify the alignment of results."
        )
    # Copy xform infos
    qform, qform_code = orig.header.get_qform(coded=True)
    sform, sform_code = orig.header.get_sform(coded=True)
    header = resampled.header.copy()
    header.set_qform(qform, int(qform_code))
    header.set_sform(sform, int(sform_code))
    header["descrip"] = "xform matrices modified by %s." % (message or "(unknown)")
    newimg = resampled.__class__(resampled.dataobj, orig.affine, header)
    newimg.to_filename(out_image)
def _gen_reference(
    fixed_image,
    moving_image,
    fov_mask=None,
    out_file=None,
    message=None,
    force_xform_code=None,
):
    """
    Generates a sampling reference, and makes sure xform matrices/codes are
    correct
    """
    if out_file is None:
        out_file = fname_presuffix(
            fixed_image, suffix="_reference", newpath=os.getcwd()
        )
    # Moving images may not be RAS/LPS (more generally, transverse-longitudinal-axial)
    reoriented_moving_img = nb.as_closest_canonical(nb.load(moving_image))
    new_zooms = reoriented_moving_img.header.get_zooms()[:3]
    # Avoid small differences in reported resolution to cause changes to
    # FOV. See https://github.com/nipreps/fmriprep/issues/512
    # A positive diagonal affine is RAS, hence the need to reorient above.
    new_affine = np.diag(np.round(new_zooms, 3))
    resampled = nli.resample_img(
        fixed_image, target_affine=new_affine, interpolation="nearest"
    )
    if fov_mask is not None:
        # If we have a mask, resample again dropping (empty) samples
        # out of the FoV.
        fixednii = nb.load(fixed_image)
        masknii = nb.load(fov_mask)
        if np.all(masknii.shape[:3] != fixednii.shape[:3]):
            raise RuntimeError("Fixed image and mask do not have the same dimensions.")
        if not np.allclose(masknii.affine, fixednii.affine, atol=1e-5):
            raise RuntimeError("Fixed image and mask have different affines")
        # Get mask into reference space
        masknii = nli.resample_img(
            masknii, target_affine=new_affine, interpolation="nearest"
        )
        res_shape = np.array(masknii.shape[:3])
        # Calculate a bounding box for the input mask
        # with an offset of 2 voxels per face
        bbox = np.argwhere(np.asanyarray(masknii.dataobj) > 0)
        new_origin = np.clip(bbox.min(0) - 2, a_min=0, a_max=None)
        new_end = np.clip(bbox.max(0) + 2, a_min=0, a_max=res_shape - 1)
        # Find new origin, and set into new affine
        new_affine_4 = resampled.affine.copy()
        new_affine_4[:3, 3] = new_affine_4[:3, :3].dot(new_origin) + new_affine_4[:3, 3]
        # Calculate new shapes
        new_shape = new_end - new_origin + 1
        resampled = nli.resample_img(
            fixed_image,
            target_affine=new_affine_4,
            target_shape=new_shape.tolist(),
            interpolation="nearest",
        )
    xform = resampled.affine  # nibabel will pick the best affine
    _, qform_code = resampled.header.get_qform(coded=True)
    _, sform_code = resampled.header.get_sform(coded=True)
    xform_code = sform_code if sform_code > 0 else qform_code
    if xform_code == 1:
        xform_code = 2
    if force_xform_code is not None:
        xform_code = force_xform_code
    # Keep 0, 2, 3, 4 unchanged
    resampled.header.set_qform(xform, int(xform_code))
    resampled.header.set_sform(xform, int(xform_code))
    resampled.header["descrip"] = "reference image generated by %s." % (
        message or "(unknown software)"
    )
    resampled.to_filename(out_file)
    return out_file
class _SanitizeImageInputSpec(BaseInterfaceInputSpec):
    in_file = File(exists=True, mandatory=True, desc="input image")
    n_volumes_to_discard = traits.Int(
        0, usedefault=True, desc="discard n first volumes"
    )
    max_32bit = traits.Bool(
        False,
        usedefault=True,
        desc="cast data to float32 if higher " "precision is encountered",
    )
class _SanitizeImageOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="validated image")
    out_report = File(exists=True, desc="HTML segment containing warning")
[docs]class SanitizeImage(SimpleInterface):
    """
    Check the correctness of x-form headers (matrix and code) and fixes
    problematic combinations of values. Removes any extension form the header
    if present.
    This interface implements the `following logic
    <https://github.com/nipreps/fmriprep/issues/873#issuecomment-349394544>`_:
    +-------------------+------------------+------------------+------------------\
+------------------------------------------------+
    | valid quaternions | `qform_code > 0` | `sform_code > 0` | `qform == sform` \
| actions                                        |
    +===================+==================+==================+==================\
+================================================+
    | True              | True             | True             | True             \
| None                                           |
    +-------------------+------------------+------------------+------------------\
+------------------------------------------------+
    | True              | True             | False            | *                \
| sform, scode <- qform, qcode                   |
    +-------------------+------------------+------------------+------------------\
+------------------------------------------------+
    | *                 | True             | *                | False            \
| sform, scode <- qform, qcode                   |
    +-------------------+------------------+------------------+------------------\
+------------------------------------------------+
    | *                 | False            | True             | *                \
| qform, qcode <- sform, scode                   |
    +-------------------+------------------+------------------+------------------\
+------------------------------------------------+
    | *                 | False            | False            | *                \
| sform, qform <- best affine; scode, qcode <- 1 |
    +-------------------+------------------+------------------+------------------\
+------------------------------------------------+
    | False             | *                | False            | *                \
| sform, qform <- best affine; scode, qcode <- 1 |
    +-------------------+------------------+------------------+------------------\
+------------------------------------------------+
    """
    input_spec = _SanitizeImageInputSpec
    output_spec = _SanitizeImageOutputSpec
    def _run_interface(self, runtime):
        img = nb.load(self.inputs.in_file)
        out_report = os.path.join(runtime.cwd, "report.html")
        # Retrieve xform codes
        sform_code = int(img.header._structarr["sform_code"])
        qform_code = int(img.header._structarr["qform_code"])
        # Check qform is valid
        valid_qform = False
        try:
            img.get_qform()
            valid_qform = True
        except ValueError:
            pass
        # Matching affines
        matching_affines = valid_qform and np.allclose(img.get_qform(), img.get_sform())
        save_file = False
        warning_txt = ""
        # Both match, qform valid (implicit with match), codes okay -> do nothing, empty report
        if matching_affines and qform_code > 0 and sform_code > 0:
            self._results["out_file"] = self.inputs.in_file
            open(out_report, "w").close()
        # Row 2:
        elif valid_qform and qform_code > 0:
            img.set_sform(img.get_qform(), qform_code)
            save_file = True
            warning_txt = "Note on orientation: sform matrix set"
            description = """\
<p class="elem-desc">The sform has been copied from qform.</p>
"""
        # Rows 3-4:
        # Note: if qform is not valid, matching_affines is False
        elif sform_code > 0 and (not matching_affines or qform_code == 0):
            img.set_qform(img.get_sform(), sform_code)
            save_file = True
            warning_txt = "Note on orientation: qform matrix overwritten"
            description = """\
<p class="elem-desc">The qform has been copied from sform.</p>
"""
            if not valid_qform and qform_code > 0:
                warning_txt = "WARNING - Invalid qform information"
                description = """\
<p class="elem-desc">
    The qform matrix found in the file header is invalid.
    The qform has been copied from sform.
    Checking the original qform information from the data produced
    by the scanner is advised.
</p>
"""
        # Rows 5-6:
        else:
            affine = img.affine
            img.set_sform(affine, nb.nifti1.xform_codes["scanner"])
            img.set_qform(affine, nb.nifti1.xform_codes["scanner"])
            save_file = True
            warning_txt = "WARNING - Missing orientation information"
            description = """\
<p class="elem-desc">
    Orientation information could not be retrieved from the image header.
    The qform and sform matrices have been set to a default, LAS-oriented affine.
    Analyses of this dataset MAY BE INVALID.
</p>
"""
        if (
            self.inputs.max_32bit and np.dtype(img.get_data_dtype()).itemsize > 4
        ) or self.inputs.n_volumes_to_discard:
            # force float32 only if 64 bit dtype is detected
            if self.inputs.max_32bit and np.dtype(img.get_data_dtype()).itemsize > 4:
                in_data = img.get_fdata(dtype=np.float32)
            else:
                in_data = img.dataobj
            img = nb.Nifti1Image(
                in_data[:, :, :, self.inputs.n_volumes_to_discard:],
                img.affine,
                img.header,
            )
            save_file = True
        if len(img.header.extensions) != 0:
            img.header.extensions.clear()
            save_file = True
        # Store new file
        if save_file:
            out_fname = fname_presuffix(
                self.inputs.in_file, suffix="_valid", newpath=runtime.cwd
            )
            self._results["out_file"] = out_fname
            img.to_filename(out_fname)
        if warning_txt:
            snippet = '<h3 class="elem-title">%s</h3>\n%s\n' % (
                warning_txt,
                description,
            )
            with open(out_report, "w") as fobj:
                fobj.write(indent(snippet, "\t" * 3))
        self._results["out_report"] = out_report
        return runtime 
class _TPM2ROIInputSpec(BaseInterfaceInputSpec):
    in_tpm = File(
        exists=True, mandatory=True, desc="Tissue probability map file in T1 space"
    )
    in_mask = File(
        exists=True, mandatory=True, desc="Binary mask of skull-stripped T1w image"
    )
    mask_erode_mm = traits.Float(
        xor=["mask_erode_prop"], desc="erode input mask (kernel width in mm)"
    )
    erode_mm = traits.Float(
        xor=["erode_prop"], desc="erode output mask (kernel width in mm)"
    )
    mask_erode_prop = traits.Float(
        xor=["mask_erode_mm"], desc="erode input mask (target volume ratio)"
    )
    erode_prop = traits.Float(
        xor=["erode_mm"], desc="erode output mask (target volume ratio)"
    )
    prob_thresh = traits.Float(
        0.95, usedefault=True, desc="threshold for the tissue probability maps"
    )
class _TPM2ROIOutputSpec(TraitedSpec):
    roi_file = File(exists=True, desc="output ROI file")
    eroded_mask = File(exists=True, desc="resulting eroded mask")
[docs]class TPM2ROI(SimpleInterface):
    """Convert tissue probability maps (TPMs) into ROIs
    This interface follows the following logic:
    #. Erode ``in_mask`` by ``mask_erode_mm`` and apply to ``in_tpm``
    #. Threshold masked TPM at ``prob_thresh``
    #. Erode resulting mask by ``erode_mm``
    """
    input_spec = _TPM2ROIInputSpec
    output_spec = _TPM2ROIOutputSpec
    def _run_interface(self, runtime):
        mask_erode_mm = self.inputs.mask_erode_mm
        if not isdefined(mask_erode_mm):
            mask_erode_mm = None
        erode_mm = self.inputs.erode_mm
        if not isdefined(erode_mm):
            erode_mm = None
        mask_erode_prop = self.inputs.mask_erode_prop
        if not isdefined(mask_erode_prop):
            mask_erode_prop = None
        erode_prop = self.inputs.erode_prop
        if not isdefined(erode_prop):
            erode_prop = None
        roi_file, eroded_mask = _tpm2roi(
            self.inputs.in_tpm,
            self.inputs.in_mask,
            mask_erode_mm,
            erode_mm,
            mask_erode_prop,
            erode_prop,
            self.inputs.prob_thresh,
            newpath=runtime.cwd,
        )
        self._results["roi_file"] = roi_file
        self._results["eroded_mask"] = eroded_mask
        return runtime 
class _AddTPMsInputSpec(BaseInterfaceInputSpec):
    in_files = InputMultiPath(
        File(exists=True), mandatory=True, desc="input list of ROIs"
    )
    indices = traits.List(traits.Int, desc="select specific maps")
class _AddTPMsOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="union of binarized input files")
[docs]class AddTPMs(SimpleInterface):
    """Calculate the union of several :abbr:`TPMs (tissue-probability map)`"""
    input_spec = _AddTPMsInputSpec
    output_spec = _AddTPMsOutputSpec
    def _run_interface(self, runtime):
        in_files = self.inputs.in_files
        indices = list(range(len(in_files)))
        if isdefined(self.inputs.indices):
            indices = self.inputs.indices
        if len(self.inputs.in_files) < 2:
            self._results["out_file"] = in_files[0]
            return runtime
        first_fname = in_files[indices[0]]
        if len(indices) == 1:
            self._results["out_file"] = first_fname
            return runtime
        im = nb.concat_images([in_files[i] for i in indices])
        data = im.get_fdata().sum(axis=3)
        data = np.clip(data, a_min=0.0, a_max=1.0)
        out_file = fname_presuffix(first_fname, suffix="_tpmsum", newpath=runtime.cwd)
        newnii = im.__class__(data, im.affine, im.header)
        newnii.set_data_dtype(np.float32)
        # Set visualization thresholds
        newnii.header["cal_max"] = 1.0
        newnii.header["cal_min"] = 0.0
        newnii.to_filename(out_file)
        self._results["out_file"] = out_file
        return runtime 
class _AddTSVHeaderInputSpec(BaseInterfaceInputSpec):
    in_file = File(exists=True, mandatory=True, desc="input file")
    columns = traits.List(traits.Str, mandatory=True, desc="header for columns")
class _AddTSVHeaderOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="output average file")
class _JoinTSVColumnsInputSpec(BaseInterfaceInputSpec):
    in_file = File(exists=True, mandatory=True, desc="input file")
    join_file = File(exists=True, mandatory=True, desc="file to be adjoined")
    side = traits.Enum("right", "left", usedefault=True, desc="where to join")
    columns = traits.List(traits.Str, desc="header for columns")
class _JoinTSVColumnsOutputSpec(TraitedSpec):
    out_file = File(exists=True, desc="output TSV file")
[docs]class JoinTSVColumns(SimpleInterface):
    r"""Add a header row to a TSV file
    .. testsetup::
    >>> cwd = os.getcwd()
    >>> os.chdir(tmpdir)
    .. doctest::
    An example TSV:
    >>> data = np.arange(30).reshape((6, 5))
    >>> np.savetxt('data.tsv', data[:, :3], delimiter='\t')
    >>> np.savetxt('add.tsv', data[:, 3:], delimiter='\t')
    Join without naming headers:
    >>> join = JoinTSVColumns()
    >>> join.inputs.in_file = 'data.tsv'
    >>> join.inputs.join_file = 'add.tsv'
    >>> res = join.run()
    >>> df = pd.read_csv(res.outputs.out_file, delim_whitespace=True,
    ...                  index_col=None, dtype=float, header=None)
    >>> df.columns.ravel().tolist() == list(range(5))
    True
    >>> np.all(df.values.astype(int) == data)
    True
    Adding column names:
    >>> join = JoinTSVColumns()
    >>> join.inputs.in_file = 'data.tsv'
    >>> join.inputs.join_file = 'add.tsv'
    >>> join.inputs.columns = ['a', 'b', 'c', 'd', 'e']
    >>> res = join.run()
    >>> res.outputs.out_file  # doctest: +ELLIPSIS
    '...data_joined.tsv'
    >>> df = pd.read_csv(res.outputs.out_file, delim_whitespace=True,
    ...                  index_col=None)
    >>> df.columns.ravel().tolist()
    ['a', 'b', 'c', 'd', 'e']
    >>> np.all(df.values == np.arange(30).reshape((6, 5)))
    True
    >>> join = JoinTSVColumns()
    >>> join.inputs.in_file = 'data.tsv'
    >>> join.inputs.join_file = 'add.tsv'
    >>> join.inputs.side = 'left'
    >>> join.inputs.columns = ['a', 'b', 'c', 'd', 'e']
    >>> res = join.run()
    >>> df = pd.read_csv(res.outputs.out_file, delim_whitespace=True,
    ...                  index_col=None)
    >>> df.columns.ravel().tolist()
    ['a', 'b', 'c', 'd', 'e']
    >>> np.all(df.values == np.hstack((data[:, 3:], data[:, :3])))
    True
    .. testcleanup::
    >>> os.chdir(cwd)
    """
    input_spec = _JoinTSVColumnsInputSpec
    output_spec = _JoinTSVColumnsOutputSpec
    def _run_interface(self, runtime):
        out_file = fname_presuffix(
            self.inputs.in_file,
            suffix="_joined.tsv",
            newpath=runtime.cwd,
            use_ext=False,
        )
        header = ""
        if isdefined(self.inputs.columns) and self.inputs.columns:
            header = "\t".join(self.inputs.columns)
        with open(self.inputs.in_file) as ifh:
            data = ifh.read().splitlines(keepends=False)
        with open(self.inputs.join_file) as ifh:
            join = ifh.read().splitlines(keepends=False)
        if len(data) != len(join):
            raise ValueError("Number of columns in datasets do not match")
        merged = []
        for d, j in zip(data, join):
            line = "%s\t%s" % ((j, d) if self.inputs.side == "left" else (d, j))
            merged.append(line)
        if header:
            merged.insert(0, header)
        with open(out_file, "w") as ofh:
            ofh.write("\n".join(merged))
        self._results["out_file"] = out_file
        return runtime 
class _DictMergeInputSpec(BaseInterfaceInputSpec):
    in_dicts = traits.List(
        traits.Either(traits.Dict, traits.Instance(OrderedDict)),
        desc="Dictionaries to be merged. In the event of a collision, values "
        "from dictionaries later in the list receive precedence.",
    )
class _DictMergeOutputSpec(TraitedSpec):
    out_dict = traits.Dict(desc="Merged dictionary")
[docs]class DictMerge(SimpleInterface):
    """Merge (ordered) dictionaries."""
    input_spec = _DictMergeInputSpec
    output_spec = _DictMergeOutputSpec
    def _run_interface(self, runtime):
        out_dict = {}
        for in_dict in self.inputs.in_dicts:
            out_dict.update(in_dict)
        self._results["out_dict"] = out_dict
        return runtime 
class _TSV2JSONInputSpec(BaseInterfaceInputSpec):
    in_file = File(exists=True, mandatory=True, desc="Input TSV file")
    index_column = traits.Str(
        mandatory=True,
        desc="Name of the column in the TSV to be used "
        "as the top-level key in the JSON. All "
        "remaining columns will be assigned as "
        "nested keys.",
    )
    output = traits.Either(
        None,
        File,
        desc="Path where the output file is to be saved. "
        "If this is `None`, then a JSON-compatible "
        "dictionary is returned instead.",
    )
    additional_metadata = traits.Either(
        None,
        traits.Dict,
        traits.Instance(OrderedDict),
        usedefault=True,
        desc="Any additional metadata that "
        "should be applied to all "
        "entries in the JSON.",
    )
    drop_columns = traits.Either(
        None,
        traits.List(),
        usedefault=True,
        desc="List of columns in the TSV to be " "dropped from the JSON.",
    )
    enforce_case = traits.Bool(
        True,
        usedefault=True,
        desc="Enforce snake case for top-level keys " "and camel case for nested keys",
    )
class _TSV2JSONOutputSpec(TraitedSpec):
    output = traits.Either(
        traits.Dict,
        File(exists=True),
        traits.Instance(OrderedDict),
        desc="Output dictionary or JSON file",
    )
[docs]class TSV2JSON(SimpleInterface):
    """Convert metadata from TSV format to JSON format.
    """
    input_spec = _TSV2JSONInputSpec
    output_spec = _TSV2JSONOutputSpec
    def _run_interface(self, runtime):
        if not isdefined(self.inputs.output):
            output = fname_presuffix(
                self.inputs.in_file, suffix=".json", newpath=runtime.cwd, use_ext=False
            )
        else:
            output = self.inputs.output
        self._results["output"] = _tsv2json(
            in_tsv=self.inputs.in_file,
            out_json=output,
            index_column=self.inputs.index_column,
            additional_metadata=self.inputs.additional_metadata,
            drop_columns=self.inputs.drop_columns,
            enforce_case=self.inputs.enforce_case,
        )
        return runtime 
def _tsv2json(
    in_tsv,
    out_json,
    index_column,
    additional_metadata=None,
    drop_columns=None,
    enforce_case=True,
):
    """
    Convert metadata from TSV format to JSON format.
    Parameters
    ----------
    in_tsv: str
        Path to the metadata in TSV format.
    out_json: str
        Path where the metadata should be saved in JSON format after
        conversion. If this is None, then a dictionary is returned instead.
    index_column: str
        Name of the column in the TSV to be used as an index (top-level key in
        the JSON).
    additional_metadata: dict
        Any additional metadata that should be applied to all entries in the
        JSON.
    drop_columns: list
        List of columns from the input TSV to be dropped from the JSON.
    enforce_case: bool
        Indicates whether BIDS case conventions should be followed. Currently,
        this means that index fields (column names in the associated data TSV)
        use snake case and other fields use camel case.
    Returns
    -------
    str
        Path to the metadata saved in JSON format.
    """
    import pandas as pd
    # Adapted from https://dev.to/rrampage/snake-case-to-camel-case-and- ...
    # back-using-regular-expressions-and-python-m9j
    re_to_camel = r"(.*?)_([a-zA-Z0-9])"
    re_to_snake = r"(^.+?|.*?)((?<![_A-Z])[A-Z]|(?<![_0-9])[0-9]+)"
    def snake(match):
        return "{}_{}".format(match.group(1).lower(), match.group(2).lower())
    def camel(match):
        return "{}{}".format(match.group(1), match.group(2).upper())
    # from fmriprep
    def less_breakable(a_string):
        """hardens the string to different envs (i.e. case insensitive, no
        whitespace, '#'"""
        return "".join(a_string.split()).strip("#")
    drop_columns = drop_columns or []
    additional_metadata = additional_metadata or {}
    try:
        tsv_data = pd.read_csv(in_tsv, sep="\t")
    except pd.errors.EmptyDataError:
        tsv_data = pd.DataFrame()
    for k, v in additional_metadata.items():
        tsv_data[k] = [v] * len(tsv_data.index)
    for col in drop_columns:
        tsv_data.drop(labels=col, axis="columns", inplace=True)
    if index_column in tsv_data:
        tsv_data.set_index(index_column, drop=True, inplace=True)
    if enforce_case:
        tsv_data.index = [
            re.sub(re_to_snake, snake, less_breakable(i), 0).lower()
            for i in tsv_data.index
        ]
        tsv_data.columns = [
            re.sub(re_to_camel, camel, less_breakable(i).title(), 0).replace(
                "Csf", "CSF"
            )
            for i in tsv_data.columns
        ]
    json_data = tsv_data.to_json(orient="index")
    json_data = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(json_data)
    for i in json_data:
        json_data[i].update(additional_metadata)
    if out_json is None:
        return json_data
    with open(out_json, "w") as f:
        json.dump(json_data, f, indent=4)
    return out_json
def _tpm2roi(
    in_tpm,
    in_mask,
    mask_erosion_mm=None,
    erosion_mm=None,
    mask_erosion_prop=None,
    erosion_prop=None,
    pthres=0.95,
    newpath=None,
):
    """
    Generate a mask from a tissue probability map
    """
    tpm_img = nb.load(in_tpm)
    roi_mask = (tpm_img.get_fdata() >= pthres).astype(np.uint8)
    eroded_mask_file = None
    erode_in = (mask_erosion_mm is not None and mask_erosion_mm > 0) or (
        mask_erosion_prop is not None and mask_erosion_prop < 1
    )
    if erode_in:
        eroded_mask_file = fname_presuffix(in_mask, suffix="_eroded", newpath=newpath)
        mask_img = nb.load(in_mask)
        mask_data = np.asanyarray(mask_img.dataobj).astype(np.uint8)
        if mask_erosion_mm:
            iter_n = max(int(mask_erosion_mm / max(mask_img.header.get_zooms())), 1)
            mask_data = nd.binary_erosion(mask_data, iterations=iter_n)
        else:
            orig_vol = np.sum(mask_data > 0)
            while np.sum(mask_data > 0) / orig_vol > mask_erosion_prop:
                mask_data = nd.binary_erosion(mask_data, iterations=1)
        # Store mask
        eroded = nb.Nifti1Image(mask_data, mask_img.affine, mask_img.header)
        eroded.set_data_dtype(np.uint8)
        eroded.to_filename(eroded_mask_file)
        # Mask TPM data (no effect if not eroded)
        roi_mask[~mask_data] = 0
    # shrinking
    erode_out = (erosion_mm is not None and erosion_mm > 0) or (
        erosion_prop is not None and erosion_prop < 1
    )
    if erode_out:
        if erosion_mm:
            iter_n = max(int(erosion_mm / max(tpm_img.header.get_zooms())), 1)
            iter_n = int(erosion_mm / max(tpm_img.header.get_zooms()))
            roi_mask = nd.binary_erosion(roi_mask, iterations=iter_n)
        else:
            orig_vol = np.sum(roi_mask > 0)
            while np.sum(roi_mask > 0) / orig_vol > erosion_prop:
                roi_mask = nd.binary_erosion(roi_mask, iterations=1)
    # Create image to resample
    roi_fname = fname_presuffix(in_tpm, suffix="_roi", newpath=newpath)
    roi_img = nb.Nifti1Image(roi_mask, tpm_img.affine, tpm_img.header)
    roi_img.set_data_dtype(np.uint8)
    roi_img.to_filename(roi_fname)
    return roi_fname, eroded_mask_file or in_mask