# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Helpers for handling BIDS-like neuroimaging structures."""
import copy
import json
import re
import warnings
from pathlib import Path
from bids import BIDSLayout
from bids.layout import Query
from bids.utils import listify
from packaging.version import Version
DEFAULT_BIDS_QUERIES = {
'bold': {
'datatype': 'func',
'suffix': 'bold',
'part': ['mag', None],
},
'dwi': {'suffix': 'dwi'},
'flair': {'datatype': 'anat', 'suffix': 'FLAIR', 'part': ['mag', None]},
'fmap': {'datatype': 'fmap'},
'pet': {'suffix': 'pet'},
'roi': {'datatype': 'anat', 'suffix': 'roi'},
'sbref': {'datatype': 'func', 'suffix': 'sbref', 'part': ['mag', None]},
't1w': {'datatype': 'anat', 'suffix': 'T1w', 'part': ['mag', None]},
't2w': {'datatype': 'anat', 'suffix': 'T2w', 'part': ['mag', None]},
'asl': {'datatype': 'perf', 'suffix': 'asl'},
}
[docs]
class BIDSError(ValueError):
def __init__(self, message, bids_root):
indent = 10
header = '{sep} BIDS root folder: "{bids_root}" {sep}'.format(
bids_root=bids_root, sep=''.join(['-'] * indent)
)
self.msg = '\n{header}\n{indent}{message}\n{footer}'.format(
header=header,
indent=''.join([' '] * (indent + 1)),
message=message,
footer=''.join(['-'] * len(header)),
)
super().__init__(self.msg)
self.bids_root = bids_root
[docs]
class BIDSWarning(RuntimeWarning):
pass
[docs]
def collect_participants(bids_dir, participant_label=None, strict=False, bids_validate=True):
"""
List the participants under the BIDS root and checks that participants
designated with the participant_label argument exist in that folder.
Returns the list of participants to be finally processed.
Requesting all subjects in a BIDS directory root:
.. testsetup::
>>> data_dir_canary()
Examples
--------
>>> collect_participants(str(datadir / 'ds114'), bids_validate=False)
['01', '02', '03', '04', '05', '06', '07', '08', '09', '10']
Requesting two subjects, given their IDs:
>>> collect_participants(str(datadir / 'ds114'), participant_label=['02', '04'],
... bids_validate=False)
['02', '04']
Requesting two subjects, given their IDs (works with 'sub-' prefixes):
>>> collect_participants(str(datadir / 'ds114'), participant_label=['sub-02', 'sub-04'],
... bids_validate=False)
['02', '04']
Requesting two subjects, but one does not exist:
>>> collect_participants(str(datadir / 'ds114'), participant_label=['02', '14'],
... bids_validate=False)
['02']
>>> collect_participants(
... str(datadir / 'ds114'), participant_label=['02', '14'],
... strict=True, bids_validate=False) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
BIDSError:
...
"""
if isinstance(bids_dir, BIDSLayout):
layout = bids_dir
else:
layout = BIDSLayout(str(bids_dir), validate=bids_validate)
all_participants = set(layout.get_subjects())
# Error: bids_dir does not contain subjects
if not all_participants:
raise BIDSError(
'Could not find participants. Please make sure the BIDS data '
'structure is present and correct. Datasets can be validated '
'online using the BIDS Validator '
'(https://bids-standard.github.io/bids-validator/).\n'
'If you are using Docker for Mac or Docker for Windows, you '
'may need to adjust your "File sharing" preferences.',
bids_dir,
)
# No --participant-label was set, return all
if not participant_label:
return sorted(all_participants)
if isinstance(participant_label, str):
participant_label = [participant_label]
# Drop sub- prefixes
participant_label = [sub.removeprefix('sub-') for sub in participant_label]
# Remove duplicates
participant_label = sorted(set(participant_label))
# Remove labels not found
found_label = sorted(set(participant_label) & all_participants)
if not found_label:
raise BIDSError(
'Could not find participants [{}]'.format(', '.join(participant_label)),
bids_dir,
)
# Warn if some IDs were not found
notfound_label = sorted(set(participant_label) - all_participants)
if notfound_label:
exc = BIDSError(
'Some participants were not found: {}'.format(', '.join(notfound_label)),
bids_dir,
)
if strict:
raise exc
warnings.warn(exc.msg, BIDSWarning, stacklevel=2)
return found_label
[docs]
def collect_data(
bids_dir,
participant_label,
session_id=None,
task=None,
echo=None,
group_echos=True,
bids_validate=True,
bids_filters=None,
queries=None,
):
"""
Uses pybids to retrieve the input data for a given participant
.. testsetup::
>>> data_dir_canary()
Parameters
----------
bids_dir : :obj:`str` or :obj:`bids.layout.BIDSLayout`
The BIDS directory
participant_label : :obj:`str`
The participant identifier
session_id : :obj:`str`, None, or :obj:`bids.layout.Query`
The session identifier. By default, all sessions will be used.
task : :obj:`str` or None
The task identifier (for BOLD queries)
echo : :obj:`int` or None
The echo identifier (for BOLD queries)
bids_validate : :obj:`bool`
Whether the `bids_dir` is validated upon initialization
bids_filters: :obj:`dict` or None
Custom filters to alter default queries
Examples
--------
>>> bids_root, _ = collect_data(str(datadir / 'ds054'), '100185',
... bids_validate=False)
>>> bids_root['fmap'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/fmap/sub-100185_magnitude1.nii.gz', \
'.../ds054/sub-100185/fmap/sub-100185_magnitude2.nii.gz', \
'.../ds054/sub-100185/fmap/sub-100185_phasediff.nii.gz']
>>> bids_root['bold'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/func/sub-100185_task-machinegame_run-01_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-02_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-03_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-04_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-05_bold.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-06_bold.nii.gz']
>>> bids_root['sbref'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/func/sub-100185_task-machinegame_run-01_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-02_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-03_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-04_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-05_sbref.nii.gz', \
'.../ds054/sub-100185/func/sub-100185_task-machinegame_run-06_sbref.nii.gz']
>>> bids_root['t1w'] # doctest: +ELLIPSIS
['.../ds054/sub-100185/anat/sub-100185_T1w.nii.gz']
>>> bids_root['t2w'] # doctest: +ELLIPSIS
[]
>>> bids_root, _ = collect_data(str(datadir / 'ds051'), '01',
... bids_validate=False,
... bids_filters={'t1w':{'run': 1, 'session': None}})
>>> bids_root['t1w'] # doctest: +ELLIPSIS
['.../ds051/sub-01/anat/sub-01_run-01_T1w.nii.gz']
>>> bids_root, _ = collect_data(
... str(datadir / 'ds114'),
... '01',
... bids_validate=False,
... session_id='retest',
... bids_filters={'bold': {'session': 'madeup'}}) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ValueError: Conflicting entities for "session" found: madeup // retest
>>> bids_root, _ = collect_data(
... str(datadir / 'ds114'),
... '01',
... bids_validate=False,
... session_id=None,
... bids_filters={'t1w': {'session': 'retest'}})
>>> bids_root['t1w'] # doctest: +ELLIPSIS
['.../ds114/sub-01/ses-retest/anat/sub-01_ses-retest_T1w.nii.gz']
"""
if isinstance(bids_dir, BIDSLayout):
layout = bids_dir
else:
layout = BIDSLayout(str(bids_dir), validate=bids_validate)
if not queries:
queries = copy.deepcopy(DEFAULT_BIDS_QUERIES)
session_id = session_id or Query.OPTIONAL
layout_get_kwargs = {
'return_type': 'file',
'subject': participant_label,
'extension': ['.nii', '.nii.gz'],
'session': session_id,
}
reserved_entities = [('subject', participant_label), ('session', session_id)]
bids_filters = bids_filters or {}
for acq, entities in bids_filters.items():
# BIDS filters will not be able to override subject / session entities
for entity, param in reserved_entities:
if param == Query.OPTIONAL:
continue
if entity in entities and listify(param) != listify(entities[entity]):
raise ValueError(
f'Conflicting entities for "{entity}" found: {entities[entity]} // {param}'
)
queries[acq].update(entities)
for entity in list(layout_get_kwargs.keys()):
if entity in entities:
# avoid clobbering layout.get
del layout_get_kwargs[entity]
if task:
queries['bold']['task'] = queries['pet']['task'] = task
if echo:
queries['bold']['echo'] = echo
subj_data = {
dtype: sorted(layout.get(**layout_get_kwargs, **query)) for dtype, query in queries.items()
}
# Special case: multi-echo BOLD, grouping echos
if group_echos and 'bold' in subj_data and any('_echo-' in bold for bold in subj_data['bold']):
subj_data['bold'] = group_multiecho(subj_data['bold'])
return subj_data, layout
def _init_layout(in_file=None, bids_dir=None, validate=True, database_path=None):
if isinstance(bids_dir, BIDSLayout):
return bids_dir
if bids_dir is None:
in_file = Path(in_file)
for parent in in_file.parents:
if parent.name.startswith('sub-'):
bids_dir = parent.parent.resolve()
break
if bids_dir is None:
raise RuntimeError('Could not infer BIDS root')
layout = BIDSLayout(
str(bids_dir),
validate=validate,
database_path=database_path,
)
return layout
[docs]
def group_multiecho(bold_sess):
"""
Multiplex multi-echo EPIs into arrays.
Dual-echo is a special case of multi-echo, which is treated as single-echo data.
Examples
--------
>>> bold_sess = ["sub-01_task-rest_echo-1_run-01_bold.nii.gz",
... "sub-01_task-rest_echo-2_run-01_bold.nii.gz",
... "sub-01_task-rest_echo-1_run-02_bold.nii.gz",
... "sub-01_task-rest_echo-2_run-02_bold.nii.gz",
... "sub-01_task-rest_echo-3_run-02_bold.nii.gz",
... "sub-01_task-rest_run-03_bold.nii.gz"]
>>> group_multiecho(bold_sess) # doctest: +NORMALIZE_WHITESPACE
['sub-01_task-rest_echo-1_run-01_bold.nii.gz',
'sub-01_task-rest_echo-2_run-01_bold.nii.gz',
['sub-01_task-rest_echo-1_run-02_bold.nii.gz',
'sub-01_task-rest_echo-2_run-02_bold.nii.gz',
'sub-01_task-rest_echo-3_run-02_bold.nii.gz'],
'sub-01_task-rest_run-03_bold.nii.gz']
>>> bold_sess.insert(2, "sub-01_task-rest_echo-3_run-01_bold.nii.gz")
>>> group_multiecho(bold_sess) # doctest: +NORMALIZE_WHITESPACE
[['sub-01_task-rest_echo-1_run-01_bold.nii.gz',
'sub-01_task-rest_echo-2_run-01_bold.nii.gz',
'sub-01_task-rest_echo-3_run-01_bold.nii.gz'],
['sub-01_task-rest_echo-1_run-02_bold.nii.gz',
'sub-01_task-rest_echo-2_run-02_bold.nii.gz',
'sub-01_task-rest_echo-3_run-02_bold.nii.gz'],
'sub-01_task-rest_run-03_bold.nii.gz']
>>> bold_sess += ["sub-01_task-beh_echo-1_run-01_bold.nii.gz",
... "sub-01_task-beh_echo-2_run-01_bold.nii.gz",
... "sub-01_task-beh_echo-1_run-02_bold.nii.gz",
... "sub-01_task-beh_echo-2_run-02_bold.nii.gz",
... "sub-01_task-beh_echo-3_run-02_bold.nii.gz",
... "sub-01_task-beh_run-03_bold.nii.gz"]
>>> group_multiecho(bold_sess) # doctest: +NORMALIZE_WHITESPACE
[['sub-01_task-rest_echo-1_run-01_bold.nii.gz',
'sub-01_task-rest_echo-2_run-01_bold.nii.gz',
'sub-01_task-rest_echo-3_run-01_bold.nii.gz'],
['sub-01_task-rest_echo-1_run-02_bold.nii.gz',
'sub-01_task-rest_echo-2_run-02_bold.nii.gz',
'sub-01_task-rest_echo-3_run-02_bold.nii.gz'],
'sub-01_task-rest_run-03_bold.nii.gz',
'sub-01_task-beh_echo-1_run-01_bold.nii.gz',
'sub-01_task-beh_echo-2_run-01_bold.nii.gz',
['sub-01_task-beh_echo-1_run-02_bold.nii.gz',
'sub-01_task-beh_echo-2_run-02_bold.nii.gz',
'sub-01_task-beh_echo-3_run-02_bold.nii.gz'],
'sub-01_task-beh_run-03_bold.nii.gz']
Some tests from https://neurostars.org/t/fmriprep-from-singularity-unboundlocalerror/3299/7
>>> bold_sess = ['sub-01_task-AudLoc_echo-1_bold.nii',
... 'sub-01_task-AudLoc_echo-2_bold.nii',
... 'sub-01_task-FJT_echo-1_bold.nii',
... 'sub-01_task-FJT_echo-2_bold.nii',
... 'sub-01_task-LDT_echo-1_bold.nii',
... 'sub-01_task-LDT_echo-2_bold.nii',
... 'sub-01_task-MotLoc_echo-1_bold.nii',
... 'sub-01_task-MotLoc_echo-2_bold.nii']
>>> group_multiecho(bold_sess) == bold_sess
True
>>> bold_sess += ['sub-01_task-MotLoc_echo-3_bold.nii']
>>> groups = group_multiecho(bold_sess)
>>> len(groups[:-1])
6
>>> [isinstance(g, list) for g in groups]
[False, False, False, False, False, False, True]
>>> len(groups[-1])
3
"""
from itertools import groupby
def _grp_echos(x):
if '_echo-' not in x:
return x
echo = re.search(r'_echo-\d*', x).group(0)
return x.replace(echo, '_echo-?')
ses_uids = []
for _, bold in groupby(bold_sess, key=_grp_echos):
bold = list(bold)
# If single- or dual-echo, flatten list; keep list otherwise.
action = getattr(ses_uids, 'append' if len(bold) > 2 else 'extend')
action(bold)
return ses_uids
[docs]
def relative_to_root(path):
"""
Calculate the BIDS root folder given one file path's.
Examples
--------
>>> str(relative_to_root(
... "/sub-03/sourcedata/sub-01/anat/sub-01_T1.nii.gz"
... ))
'sub-01/anat/sub-01_T1.nii.gz'
>>> str(relative_to_root(
... "/sub-03/anat/sourcedata/sub-01/ses-preop/anat/sub-01_ses-preop_T1.nii.gz"
... ))
'sub-01/ses-preop/anat/sub-01_ses-preop_T1.nii.gz'
>>> str(relative_to_root(
... "sub-01/anat/sub-01_T1.nii.gz"
... ))
'sub-01/anat/sub-01_T1.nii.gz'
>>> str(relative_to_root("anat/sub-01_T1.nii.gz"))
'anat/sub-01_T1.nii.gz'
"""
path = Path(path)
if path.name.startswith('sub-'):
parents = [path.name]
for p in path.parents:
parents.insert(0, p.name)
if p.name.startswith('sub-'):
return Path(*parents)
return path
raise ValueError(
f'Could not determine the BIDS root of <{path}>. '
'Only files under a subject directory are currently supported.'
)
[docs]
def check_pipeline_version(cvers, data_desc):
"""
Search for existing BIDS pipeline output and compares against current pipeline version.
.. testsetup::
>>> import json
>>> data = {"PipelineDescription": {"Version": "1.1.1rc5"}}
>>> desc_file = Path('sample_dataset_description.json')
>>> _ = desc_file.write_text(json.dumps(data))
Parameters
----------
cvers : :obj:`str`
Current pipeline version
data_desc : :obj:`str` or :obj:`os.PathLike`
Path to pipeline output's ``dataset_description.json``
Examples
--------
>>> check_pipeline_version('1.1.1rc5', 'sample_dataset_description.json') is None
True
>>> check_pipeline_version('1.1.1rc5+129.gbe0e5158', 'sample_dataset_description.json')
>>> check_pipeline_version('1.2', 'sample_dataset_description.json') # doctest: +ELLIPSIS
'Previous output generated ...'
Returns
-------
message : :obj:`str` or :obj:`None`
A warning string if there is a difference between versions, otherwise ``None``.
"""
data_desc = Path(data_desc)
if not data_desc.exists():
return
desc = json.loads(data_desc.read_text())
dvers = desc.get('PipelineDescription', {}).get('Version', '0+unknown')
if Version(cvers).public != Version(dvers).public:
return f'Previous output generated by version {dvers} found.'
def _find_nearest_path(path_dict, input_path):
"""Find the nearest relative path from an input path to a dictionary of paths.
If ``input_path`` is not relative to any of the paths in ``path_dict``,
the absolute path string is returned.
If ``input_path`` is already a BIDS-URI, then it will be returned unmodified.
Parameters
----------
path_dict : dict of (str, Path)
A dictionary of paths.
input_path : Path
The input path to match.
Returns
-------
matching_path : str
The nearest relative path from the input path to a path in the dictionary.
This is either the concatenation of the associated key from ``path_dict``
and the relative path from the associated value from ``path_dict`` to ``input_path``,
or the absolute path to ``input_path`` if no matching path is found from ``path_dict``.
Examples
--------
>>> from pathlib import Path
>>> path_dict = {
... 'bids::': Path('/data/derivatives/fmriprep'),
... 'bids:raw:': Path('/data'),
... 'bids:deriv-0:': Path('/data/derivatives/source-1'),
... }
>>> input_path = Path('/data/derivatives/source-1/sub-01/func/sub-01_task-rest_bold.nii.gz')
>>> _find_nearest_path(path_dict, input_path) # match to 'bids:deriv-0:'
'bids:deriv-0:sub-01/func/sub-01_task-rest_bold.nii.gz'
>>> input_path = Path('/out/sub-01/func/sub-01_task-rest_bold.nii.gz')
>>> _find_nearest_path(path_dict, input_path) # no match- absolute path
'/out/sub-01/func/sub-01_task-rest_bold.nii.gz'
>>> input_path = Path('/data/sub-01/func/sub-01_task-rest_bold.nii.gz')
>>> _find_nearest_path(path_dict, input_path) # match to 'bids:raw:'
'bids:raw:sub-01/func/sub-01_task-rest_bold.nii.gz'
>>> input_path = 'bids::sub-01/func/sub-01_task-rest_bold.nii.gz'
>>> _find_nearest_path(path_dict, input_path) # already a BIDS-URI
'bids::sub-01/func/sub-01_task-rest_bold.nii.gz'
>>> input_path = '/data/sub-01/func/sub-01_task-rest_bold.nii.gz'
>>> _find_nearest_path(path_dict, input_path) # Is relative
'bids:raw:sub-01/func/sub-01_task-rest_bold.nii.gz'
"""
# Don't modify BIDS-URIs
if isinstance(input_path, str) and input_path.startswith('bids:'):
return input_path
input_path = Path(input_path)
matching_path = []
for key, path in path_dict.items():
if input_path.is_relative_to(path):
relative_path = input_path.relative_to(path)
matching_path.append((len(relative_path.parts), f'{key}{relative_path}'))
return str(input_path.absolute()) if not matching_path else sorted(matching_path)[0][1]