# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Utilities for fieldmap estimation."""
from pathlib import Path
from enum import Enum, auto
from collections import defaultdict
import re
import attr
from json import loads
from bids.layout import parse_file_entities
from bids.utils import listify
from niworkflows.utils.bids import relative_to_root
from .utils.bimap import EstimatorRegistry
_estimators = EstimatorRegistry()
_intents = defaultdict(set)
[docs]
class EstimatorType(Enum):
"""Represents different types of fieldmap estimation approach."""
UNKNOWN = auto()
PEPOLAR = auto()
PHASEDIFF = auto()
MAPPED = auto()
ANAT = auto()
MODALITIES = {
"bold": EstimatorType.PEPOLAR,
"dwi": EstimatorType.PEPOLAR,
"epi": EstimatorType.PEPOLAR,
"asl": EstimatorType.PEPOLAR,
"m0scan": EstimatorType.PEPOLAR,
"fieldmap": EstimatorType.MAPPED,
"magnitude": None,
"magnitude1": None,
"magnitude2": None,
"phase1": EstimatorType.PHASEDIFF,
"phase2": EstimatorType.PHASEDIFF,
"phasediff": EstimatorType.PHASEDIFF,
"sbref": EstimatorType.PEPOLAR,
"T1w": EstimatorType.ANAT,
"T2w": EstimatorType.ANAT,
}
def _type_setter(obj, attribute, value):
"""Make sure the type of estimation is not changed."""
if obj.method == value:
return value
elif obj.method != EstimatorType.UNKNOWN:
raise TypeError(f"Cannot change determined method {obj.method} to {value}.")
if value not in (
EstimatorType.PEPOLAR,
EstimatorType.PHASEDIFF,
EstimatorType.MAPPED,
EstimatorType.ANAT,
):
raise ValueError(f"Invalid estimation method type {value}.")
return value
def _id_setter(obj, attribute, value):
if obj.bids_id and obj.bids_id == value:
return value
raise ValueError("Cannot edit the bids_id")
[docs]
@attr.s(slots=True)
class FieldmapFile:
"""
Represent a file that can be used in some fieldmap estimation method.
The class will read metadata from a sidecar JSON with filename matching that
of the file.
This class may receive metadata as a keyword argument at initialization.
Examples
--------
>>> f = FieldmapFile(dsA_dir / "sub-01" / "anat" / "sub-01_T1w.nii.gz")
>>> f.suffix
'T1w'
>>> FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_dir-LR_epi.nii.gz",
... find_meta=False
... ) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
MetadataError:
>>> f = FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_dir-LR_epi.nii.gz",
... )
>>> f.metadata['TotalReadoutTime']
0.005
>>> f = FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_dir-LR_epi.nii.gz",
... metadata={"TotalReadoutTime": None},
... ) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
MetadataError:
>>> f = FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_dir-LR_epi.nii.gz",
... metadata={'TotalReadoutTime': 0.006}
... )
>>> f.metadata['TotalReadoutTime']
0.006
>>> FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_phasediff.nii.gz",
... find_meta=False
... ) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
MetadataError:
>>> f = FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_phasediff.nii.gz"
... )
>>> f.metadata['EchoTime2']
0.00746
>>> FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_phase2.nii.gz",
... find_meta=False
... ) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
MetadataError:
>>> f = FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_phase2.nii.gz"
... )
>>> f.metadata['EchoTime']
0.00746
>>> FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_fieldmap.nii.gz",
... find_meta=False
... ) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
MetadataError:
>>> f = FieldmapFile(
... dsA_dir / "sub-01" / "fmap" / "sub-01_fieldmap.nii.gz"
... )
>>> f.metadata['Units']
'rad/s'
"""
path = attr.ib(converter=Path, repr=str, on_setattr=attr.setters.NO_OP)
"""Path to a fieldmap file."""
entities = attr.ib(init=False, repr=False)
"""BIDS entities extracted from filepath."""
suffix = attr.ib(init=False, repr=False)
"""Extracted suffix from input file."""
bids_root = attr.ib(init=False, default=None, repr=False)
"""Path of the BIDS root."""
metadata = attr.ib(kw_only=True, default=attr.Factory(dict))
"""
Metadata associated to this file. When provided as keyword argument in initialization,
will overwrite metadata read from sidecar JSON.
"""
find_meta = attr.ib(kw_only=True, default=True, type=bool, repr=False)
"""Enable/disable automated search for corresponding metadata."""
[docs]
@path.validator
def check_path(self, attribute, value):
"""Validate a fieldmap path."""
if not value.is_file():
raise FileNotFoundError(
f"File path <{value}> does not exist, is a broken link, or it is not a file"
)
if not value.name.endswith((".nii", ".nii.gz")):
raise ValueError(f"File path <{value}> does not look like a NIfTI file.")
suffix = re.search(r"(?<=_)\w+(?=\.nii)", value.name).group()
if suffix not in tuple(MODALITIES.keys()):
raise ValueError(
f"File path <{value}> with suffix <{suffix}> is not a valid "
"fieldmap sourcefile."
)
def __attrs_post_init__(self):
"""Validate metadata and additional checks."""
self.entities = parse_file_entities(str(self.path))
self.suffix = self.entities.pop("suffix")
extension = self.entities.pop("extension").lstrip(".")
# Automatically fill metadata in when possible
# TODO: implement BIDS hierarchy of metadata
if self.find_meta:
sidecar = Path(str(self.path).replace(extension, "json"))
if sidecar.is_file():
_meta = self.metadata or {}
self.metadata = loads(sidecar.read_text())
self.metadata.update(_meta)
# Attempt to infer a bids_root folder
relative_path = relative_to_root(self.path)
self.bids_root = (
Path(str(self.path)[: -len(str(relative_path))])
if str(relative_path) != str(self.path)
else None
)
if self.suffix in ("T1w", "T2w"):
self.metadata["TotalReadoutTime"] = 0.0
# Check for REQUIRED metadata (depends on suffix.)
if self.suffix in ("bold", "dwi", "epi", "sbref", "asl", "m0scan"):
if "PhaseEncodingDirection" not in self.metadata:
raise MetadataError(
f"Missing 'PhaseEncodingDirection' for <{self.path}>."
)
from .utils.epimanip import get_trt
try:
get_trt(self.metadata, in_file=self.path)
except ValueError as exc:
raise MetadataError(
f"Missing readout timing information for <{self.path}>."
) from exc
elif self.suffix == "fieldmap" and "Units" not in self.metadata:
raise MetadataError(f"Missing 'Units' for <{self.path}>.")
elif self.suffix == "phasediff" and (
"EchoTime1" not in self.metadata or "EchoTime2" not in self.metadata
):
raise MetadataError(
f"Missing 'EchoTime1' and/or 'EchoTime2' for <{self.path}>."
)
elif self.suffix in ("phase1", "phase2") and ("EchoTime" not in self.metadata):
raise MetadataError(f"Missing 'EchoTime' for <{self.path}>.")
[docs]
@attr.s(slots=True)
class FieldmapEstimation:
"""
Represent fieldmap estimation strategies.
This class provides a consistent interface to all types of fieldmap estimation
strategies.
The actual type of method for estimation is inferred from the ``sources`` input,
and collects all the available metadata.
"""
sources = attr.ib(
default=None,
converter=lambda v: [
FieldmapFile(f) if not isinstance(f, FieldmapFile) else f
for f in listify(v)
],
repr=lambda v: f"<{len(v)} files>",
)
"""File path or list of paths indicating the source data to estimate a fieldmap."""
method = attr.ib(init=False, default=EstimatorType.UNKNOWN, on_setattr=_type_setter)
"""Flag indicating the estimator type inferred from the input sources."""
bids_id = attr.ib(default=None, kw_only=True, type=str, on_setattr=_id_setter)
"""The unique ``B0FieldIdentifier`` field of this fieldmap."""
sanitized_id = attr.ib(init=False, repr=False)
"""Sanitized version of the bids_id with special characters replaced by underscores."""
_wf = attr.ib(init=False, default=None, repr=False)
"""Internal pointer to a workflow."""
def __attrs_post_init__(self):
"""Determine the intended fieldmap estimation type and check for data completeness."""
suffix_list = [f.suffix for f in self.sources]
suffix_set = set(suffix_list)
# Fieldmap option 1: actual field-mapping sequences
fmap_types = suffix_set.intersection(
("fieldmap", "phasediff", "phase1", "phase2")
)
if len(fmap_types) > 1 and fmap_types - set(("phase1", "phase2")):
raise TypeError(f"Incompatible suffices found: <{','.join(fmap_types)}>.")
if fmap_types:
sources = sorted(
f.path
for f in self.sources
if f.suffix in ("fieldmap", "phasediff", "phase1", "phase2")
)
# Automagically add the corresponding phase2 file if missing as argument
missing_phases = ("phase1" not in fmap_types, "phase2" not in fmap_types)
if sum(missing_phases) == 1:
mis_ph = "phase1" if missing_phases[0] else "phase2"
hit_ph = "phase2" if missing_phases[0] else "phase1"
new_source = sources[0].parent / sources[0].name.replace(hit_ph, mis_ph)
self.sources.append(FieldmapFile(new_source))
sources.insert(int(missing_phases[1]), new_source)
# Set method, this cannot be undone
self.method = MODALITIES[fmap_types.pop()]
# Determine the name of the corresponding (first) magnitude file(s)
magnitude = f"magnitude{'' if self.method == EstimatorType.MAPPED else '1'}"
if magnitude not in suffix_set:
try:
new_path = (
sources[0].parent / sources[0].name
.replace("_fieldmap", "_magnitude")
.replace("_phasediff", "_phase1")
.replace("_phase", "_magnitude")
)
self.sources.append(FieldmapFile(new_path))
except Exception:
raise ValueError(
"A fieldmap or phase-difference estimation type was found, "
f"but an anatomical reference ({magnitude} file) is missing."
)
# Check presence and try to find (if necessary) the second magnitude file
if (
self.method == EstimatorType.PHASEDIFF
and "magnitude2" not in suffix_set
):
try:
new_path = (
sources[-1].parent / sources[-1].name
.replace("diff", "2")
.replace("phase", "magnitude")
)
self.sources.append(FieldmapFile(new_path))
except Exception:
if "phase2" in suffix_set:
raise ValueError(
"A phase-difference estimation (phase1/2) type was found, "
"but an anatomical reference (magnitude2 file) is missing."
)
# Fieldmap option 2: PEPOLAR (and fieldmap-less or ANAT)
# IMPORTANT NOTE: fieldmap-less approaches can be considered PEPOLAR with RO = 0.0s
pepolar_types = suffix_set.intersection(("bold", "dwi", "epi", "sbref", "asl", "m0scan"))
anat_types = suffix_set.intersection(("T1w", "T2w"))
_pepolar_estimation = (
len(
[f for f in suffix_list if f in ("bold", "dwi", "epi", "sbref", "asl", "m0scan")]
) > 1
)
if _pepolar_estimation and not anat_types:
self.method = MODALITIES[pepolar_types.pop()]
_pe = set(f.metadata["PhaseEncodingDirection"] for f in self.sources)
if len(_pe) == 1:
raise ValueError(
f"Only one phase-encoding direction <{_pe.pop()}> found across sources."
)
elif anat_types:
self.method = MODALITIES[anat_types.pop()]
if not pepolar_types:
raise ValueError(
"Only anatomical sources were found, cannot estimate fieldmap."
)
if self.method == EstimatorType.UNKNOWN:
# No method has been identified -> fail.
raise ValueError("Insufficient sources to estimate a fieldmap.")
intents_meta = set(
el
for f in self.sources
for el in listify(f.metadata.get("IntendedFor") or [])
)
# Register this estimation method
if not self.bids_id:
# If not manually set, try to get it from BIDS metadata
b0_ids = [
listify(f.metadata.get("B0FieldIdentifier"))
for f in self.sources
if f.metadata.get("B0FieldIdentifier")
]
if b0_ids:
# Find common IDs
bids_ids = set(b0_ids[0]).intersection(*b0_ids[1:])
if not bids_ids:
raise ValueError(
f"No common ``B0FieldIdentifier`` found: <{', '.join(map(str, b0_ids))}>"
)
elif len(bids_ids) > 1:
raise ValueError(
f"Multiple common ``B0FieldIdentifier``s found: <{', '.join(bids_ids)}>"
)
object.__setattr__(self, "bids_id", bids_ids.pop())
if self.bids_id:
_estimators[self.bids_id] = self.paths()
else:
bids_id = _estimators.add(self.paths())
object.__setattr__(self, "bids_id", bids_id)
for intent_file in intents_meta:
_intents[intent_file].add(self.bids_id)
# Provide a sanitized identifier that can be used in cases where
# special characters are not allowed.
self.sanitized_id = re.sub(r'[^a-zA-Z0-9]', '_', self.bids_id)
[docs]
def paths(self):
"""Return a tuple of paths that are sorted."""
return tuple(sorted(str(f.path) for f in self.sources))
[docs]
def get_workflow(self, set_inputs=True, **kwargs):
"""Build the estimation workflow corresponding to this instance."""
if self._wf is not None:
return self._wf
# Override workflow name
kwargs["name"] = f"wf_{self.sanitized_id}"
if self.method in (EstimatorType.MAPPED, EstimatorType.PHASEDIFF):
from .workflows.fit.fieldmap import init_fmap_wf
kwargs["mode"] = str(self.method).rpartition(".")[-1].lower()
self._wf = init_fmap_wf(**kwargs)
if set_inputs:
self._wf.inputs.inputnode.magnitude = [
str(f.path.absolute())
for f in self.sources if f.suffix.startswith("magnitude")
]
self._wf.inputs.inputnode.fieldmap = [
(str(f.path.absolute()), f.metadata)
for f in self.sources
if f.suffix in ("fieldmap", "phasediff", "phase2", "phase1")
]
elif self.method == EstimatorType.PEPOLAR:
from .workflows.fit.pepolar import init_topup_wf
self._wf = init_topup_wf(**kwargs)
if set_inputs:
self._wf.inputs.inputnode.in_data = [
str(f.path.absolute()) for f in self.sources
]
self._wf.inputs.inputnode.metadata = [
f.metadata for f in self.sources
]
elif self.method == EstimatorType.ANAT:
from .workflows.fit.syn import init_syn_sdc_wf
self._wf = init_syn_sdc_wf(**kwargs)
return self._wf
[docs]
def get_identifier(filename, by="IntendedFor"):
"""
Return the fieldmap identifier for a given filename.
Parameters
----------
filename : :obj:`str`
The file we want to search in indexes.
by : :obj:`str` (``IntendedFor`` or ``sources``)
Index table in which the search should be performed.
"""
if by == "IntendedFor":
return tuple(sorted(_intents.get(filename, ())))
if by == "sources":
return _estimators.get_key(filename)
raise KeyError(r"'{by}'")
[docs]
def clear_registry():
"""Empty registries."""
_estimators.clear()
_intents.clear()