# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Writing out outputs."""
from nipype.pipeline import engine as pe
from nipype.interfaces import utility as niu
from niworkflows.interfaces.bids import DerivativesDataSink as _DDS
from niworkflows.interfaces.nibabel import MergeSeries
[docs]
class DerivativesDataSink(_DDS):
"""Overload the ``out_path_base`` setting."""
out_path_base = "sdcflows"
del _DDS
[docs]
def init_fmap_reports_wf(
*,
output_dir,
fmap_type,
bids_fmap_id=None,
custom_entities=None,
name="fmap_reports_wf",
):
"""
Set up a battery of datasinks to store reports in the right location.
Parameters
----------
fmap_type : :obj:`str`
The fieldmap estimator type.
output_dir : :obj:`str`
Directory in which to save derivatives
bids_fmap_id : :obj:`str`
Sets the ``B0FieldIdentifier`` metadata into the outputs.
custom_entities : :obj:`dict`
Define extra entities that will be written out in filenames.
name : :obj:`str`
Workflow name (default: ``"fmap_reports_wf"``)
Inputs
------
source_files
One or more fieldmap file(s) of the BIDS dataset that will serve for naming reference.
fieldmap
The preprocessed fieldmap, in its original space with Hz units.
fmap_ref
An anatomical reference (e.g., magnitude file)
fmap_mask
A brain mask in the fieldmap's space.
"""
from sdcflows.interfaces.reportlets import FieldmapReportlet
from sdcflows.utils.misc import front as _pop
custom_entities = custom_entities or {}
if bids_fmap_id:
custom_entities["fmapid"] = bids_fmap_id.replace("_", "")
workflow = pe.Workflow(name=name)
inputnode = pe.Node(
niu.IdentityInterface(
fields=["source_files", "fieldmap", "fmap_ref", "fmap_mask"]
),
name="inputnode",
)
fmap_rpt = pe.Node(FieldmapReportlet(), "fmap_rpt")
fmap_rpt.interface._always_run = True
ds_fmap_report = pe.Node(
DerivativesDataSink(
base_directory=str(output_dir),
datatype="figures",
suffix="fieldmap",
desc=fmap_type,
dismiss_entities=("fmap",),
allowed_entities=tuple(custom_entities.keys()),
),
name="ds_fmap_report",
)
for k, v in custom_entities.items():
setattr(ds_fmap_report.inputs, k, v)
# fmt:off
workflow.connect([
(inputnode, fmap_rpt, [(("fieldmap", _pop), "fieldmap"),
("fmap_ref", "reference"),
("fmap_mask", "mask")]),
(fmap_rpt, ds_fmap_report, [("out_report", "in_file")]),
(inputnode, ds_fmap_report, [("source_files", "source_file")]),
])
# fmt:on
return workflow
[docs]
def init_fmap_derivatives_wf(
*,
output_dir,
bids_fmap_id=None,
custom_entities=None,
name="fmap_derivatives_wf",
write_coeff=False,
):
"""
Set up datasinks to store derivatives in the right location.
Parameters
----------
output_dir : :obj:`str`
Directory in which to save derivatives
bids_fmap_id : :obj:`str`
Sets the ``B0FieldIdentifier`` metadata into the outputs.
custom_entities : :obj:`dict`
Define extra entities that will be written out in filenames.
name : :obj:`str`
Workflow name (default: ``"fmap_derivatives_wf"``)
write_coeff : :obj:`bool`
Build the workflow path to map coefficients into target space.
Inputs
------
source_files
One or more fieldmap file(s) of the BIDS dataset that will serve for naming reference.
fieldmap
The preprocessed fieldmap, in its original space with Hz units.
fmap_coeff
Field coefficient(s) file(s)
fmap_ref
An anatomical reference (e.g., magnitude file)
"""
custom_entities = custom_entities or {}
if bids_fmap_id:
custom_entities["fmapid"] = bids_fmap_id.replace("_", "")
workflow = pe.Workflow(name=name)
inputnode = pe.Node(
niu.IdentityInterface(
fields=["source_files", "fieldmap", "fmap_coeff", "fmap_ref", "fmap_meta"]
),
name="inputnode",
)
merge_fmap = pe.Node(MergeSeries(), name="merge_fmap")
ds_reference = pe.Node(
DerivativesDataSink(
base_directory=output_dir,
compress=True,
suffix="fieldmap",
datatype="fmap",
dismiss_entities=("fmap",),
allowed_entities=tuple(custom_entities.keys()),
),
name="ds_reference",
)
ds_fieldmap = pe.Node(
DerivativesDataSink(
base_directory=output_dir,
desc="preproc",
suffix="fieldmap",
datatype="fmap",
compress=True,
allowed_entities=tuple(custom_entities.keys()),
),
name="ds_fieldmap",
)
ds_fieldmap.inputs.Units = "Hz"
if bids_fmap_id:
ds_fieldmap.inputs.B0FieldIdentifier = bids_fmap_id
for k, v in custom_entities.items():
setattr(ds_reference.inputs, k, v)
setattr(ds_fieldmap.inputs, k, v)
# fmt:off
workflow.connect([
(inputnode, merge_fmap, [("fieldmap", "in_files")]),
(inputnode, ds_reference, [("source_files", "source_file"),
("fmap_ref", "in_file"),
(("source_files", _getsourcetype), "desc")]),
(inputnode, ds_fieldmap, [("source_files", "source_file"),
("source_files", "RawSources")]),
(merge_fmap, ds_fieldmap, [("out_file", "in_file")]),
(ds_reference, ds_fieldmap, [
(("out_file", _getname), "AnatomicalReference"),
]),
(inputnode, ds_fieldmap, [(("fmap_meta", _selectintent), "IntendedFor")]),
])
# fmt:on
if not write_coeff:
return workflow
ds_coeff = pe.MapNode(
DerivativesDataSink(
base_directory=output_dir,
suffix="fieldmap",
datatype="fmap",
compress=True,
allowed_entities=tuple(custom_entities.keys()),
),
name="ds_coeff",
iterfield=("in_file", "desc"),
)
gen_desc = pe.Node(niu.Function(function=_gendesc), name="gen_desc")
for k, v in custom_entities.items():
setattr(ds_coeff.inputs, k, v)
# fmt:off
workflow.connect([
(inputnode, ds_coeff, [("source_files", "source_file"),
("fmap_coeff", "in_file")]),
(inputnode, gen_desc, [("fmap_coeff", "infiles")]),
(gen_desc, ds_coeff, [("out", "desc")]),
(ds_coeff, ds_fieldmap, [(("out_file", _getname), "AssociatedCoefficients")]),
])
# fmt:on
return workflow
def _gendesc(infiles):
"""
Generate a desc entity value.
Examples
--------
>>> _gendesc("f")
'coeff'
>>> _gendesc(list("ab"))
['coeff0', 'coeff1']
"""
if isinstance(infiles, (str, bytes)):
infiles = [infiles]
if len(infiles) == 1:
return "coeff"
return [f"coeff{i}" for i, _ in enumerate(infiles)]
def _getname(infile):
"""
Get file names only.
Examples
--------
>>> _getname("drop/path/filename.txt")
'filename.txt'
>>> _getname(["drop/path/filename.txt", "other/path/filename2.txt"])
['filename.txt', 'filename2.txt']
"""
from pathlib import Path
if isinstance(infile, (list, tuple)):
return [Path(f).name for f in infile]
return Path(infile).name
def _getsourcetype(infiles):
"""
Determine the type of fieldmap estimation strategy.
Example
-------
>>> _getsourcetype(["path/some_epi.nii.gz"])
'epi'
>>> _getsourcetype(["path/some_notepi.nii.gz"])
'magnitude'
"""
from pathlib import Path
fname = Path(infiles[0]).name
return "epi" if fname.endswith(("_epi.nii.gz", "_epi.nii")) else "magnitude"
def _selectintent(metadata):
"""
Extract the IntendedFor metadata.
Example
-------
>>> _selectintent({})
[]
>>> _selectintent({"IntendedFor": "just/one/file.txt"})
['just/one/file.txt']
>>> _selectintent({"IntendedFor": ["file2.txt", "file1.txt"]})
['file1.txt', 'file2.txt']
>>> _selectintent([{"IntendedFor": "just/one/file.txt"}] * 2)
['just/one/file.txt']
>>> _selectintent([
... {"IntendedFor": "just/one/file.txt"},
... {"IntendedFor": ["file2.txt", "file1.txt"]},
... ])
['file1.txt', 'file2.txt', 'just/one/file.txt']
"""
from bids.utils import listify
return sorted(
set([el for m in listify(metadata) for el in listify(m.get("IntendedFor", []))])
)