# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2023 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Standalone command line executable for estimation of fieldmaps."""
from argparse import Action, ArgumentDefaultsHelpFormatter, ArgumentParser
from functools import partial
from pathlib import Path
from sdcflows import config
def _parse_participant_labels(value):
"""
Drop ``sub-`` prefix of participant labels.
>>> _parse_participant_labels("s060")
['s060']
>>> _parse_participant_labels("sub-s060")
['s060']
>>> _parse_participant_labels("s060 sub-s050")
['s050', 's060']
>>> _parse_participant_labels("s060 sub-s060")
['s060']
>>> _parse_participant_labels("s060\tsub-s060")
['s060']
"""
return sorted({item.removeprefix('sub-') for item in value.split()})
def _parser():
class ParticipantLabelAction(Action):
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, _parse_participant_labels(' '.join(values)))
def _path_exists(path, parser):
"""Ensure a given path exists."""
if path is None or not Path(path).exists():
raise parser.error(f'Path does not exist: <{path}>.')
return Path(path).expanduser().absolute()
def _min_one(value, parser):
"""Ensure an argument is not lower than 1."""
value = int(value)
if value < 1:
raise parser.error("Argument can't be less than one.")
return value
def _to_gb(value):
scale = {'G': 1, 'T': 10**3, 'M': 1e-3, 'K': 1e-6, 'B': 1e-9}
digits = ''.join([c for c in value if c.isdigit()])
n_digits = len(digits)
units = value[n_digits:] or 'G'
return int(digits) * scale[units[0]]
def _bids_filter(value):
from json import loads
if value and Path(value).exists():
return loads(Path(value).read_text())
parser = ArgumentParser(
description=f"""\
SDCFlows {config.environment.version}
Estimate fieldmaps available in a BIDS-compliant MRI dataset.""",
formatter_class=ArgumentDefaultsHelpFormatter,
)
PathExists = partial(_path_exists, parser=parser)
PositiveInt = partial(_min_one, parser=parser)
parser.add_argument(
'bids_dir',
action='store',
type=PathExists,
help='The root folder of a BIDS valid dataset (sub-XXXXX folders should '
'be found at the top level in this folder).',
)
parser.add_argument(
'output_dir',
action='store',
type=Path,
help='The directory where the output files '
'should be stored. If you are running group level analysis '
'this folder should be prepopulated with the results of the '
'participant level analysis.',
)
parser.add_argument(
'analysis_level',
action='store',
nargs='+',
help='Level of the analysis that will be performed. '
'Multiple participant level analyses can be run independently '
'(in parallel) using the same output_dir.',
choices=['participant', 'group'],
)
# optional arguments
parser.add_argument(
'--version', action='version', version=f'SDCFlows {config.environment.version}'
)
parser.add_argument(
'-v',
'--verbose',
dest='verbose_count',
action='count',
default=0,
help='Increases log verbosity for each occurrence, debug level is -vvv.',
)
# main options
g_bids = parser.add_argument_group('Options related to BIDS')
g_bids.add_argument(
'--participant-label',
action=ParticipantLabelAction,
nargs='+',
help='A space delimited list of participant identifiers or a single '
'identifier (the sub- prefix can be removed).',
)
g_bids.add_argument(
'--session-label',
action='store',
nargs='*',
type=str,
help='Filter input dataset by session label.',
)
g_bids.add_argument(
'--bids-filter-file',
action='store',
type=Path,
metavar='PATH',
help='a JSON file describing custom BIDS input filter using pybids '
'{<suffix>:{<entity>:<filter>,...},...} '
'(https://github.com/bids-standard/pybids/blob/master/bids/layout/config/bids.json)',
)
g_bids.add_argument(
'--bids-database-dir',
metavar='PATH',
type=PathExists,
help='Path to an existing PyBIDS database folder, for faster indexing '
'(especially useful for large datasets).',
)
g_bids.add_argument(
'--bids-database-wipe',
action='store_true',
default=False,
help='Wipe out previously existing BIDS indexing caches, forcing re-indexing.',
)
# General performance
g_perfm = parser.add_argument_group('Options to handle performance')
g_perfm.add_argument(
'--nprocs',
action='store',
type=PositiveInt,
help="""\
Maximum number of simultaneously running parallel processes executed by *SDCFlows* \
(e.g., several instances of ANTs' registration). \
However, when ``--nprocs`` is greater or equal to the ``--omp-nthreads`` option, \
it also sets the maximum number of threads that simultaneously running processes \
may aggregate (meaning, with ``--nprocs 16 --omp-nthreads 8`` a maximum of two \
8-CPU-threaded processes will be running at a given time). \
Under this mode of operation, ``--nprocs`` sets the maximum number of processors \
that can be assigned work within an *SDCFlows* job, which includes all the processors \
used by currently running single- and multi-threaded processes. \
If ``None``, the number of CPUs available will be automatically assigned (which may \
not be what you want in, e.g., shared systems like a HPC cluster.""",
)
g_perfm.add_argument(
'--omp-nthreads',
action='store',
type=PositiveInt,
help="""\
Maximum number of threads that multi-threaded processes executed by *SDCFlows* \
(e.g., ANTs' registration) can use. \
If ``None``, the number of CPUs available will be automatically assigned (which may \
not be what you want in, e.g., shared systems like a HPC cluster.""",
)
g_perfm.add_argument(
'--mem-gb',
dest='memory_gb',
action='store',
type=_to_gb,
help='Upper bound memory limit for SDCFlows processes.',
)
g_perfm.add_argument(
'--debug',
action='store_true',
default=False,
help='Enable changes to processing to aid in debugging',
)
g_perfm.add_argument(
'--pdb',
dest='pdb',
action='store_true',
default=False,
help='Open Python debugger (pdb) on exceptions.',
)
g_perfm.add_argument(
'--sloppy',
action='store_true',
default=False,
help='Use sloppy mode for minimal footprint.',
)
# Control instruments
g_outputs = parser.add_argument_group('Instrumental options')
g_outputs.add_argument(
'-w',
'--work-dir',
action='store',
type=Path,
default=Path('work').absolute(),
help='Path where intermediate results should be stored.',
)
g_outputs.add_argument(
'-n',
'--dry-run',
action='store_true',
default=False,
help='only find estimable fieldmaps (that is, estimation is not triggered)',
)
g_outputs.add_argument(
'--no-fmapless',
action='store_false',
dest='fmapless',
default=True,
help='Allow fieldmap-less estimation',
)
g_outputs.add_argument(
'--use-plugin',
action='store',
default=None,
type=Path,
help='Nipype plugin configuration file.',
)
g_outputs.add_argument(
'--notrack',
action='store_true',
help='Opt-out of sending tracking information of this run to the NiPreps developers. '
'This information helps to improve SDCFlows and provides an indicator of '
'real world usage for obtaining funding.',
)
return parser
[docs]
def parse_args(args=None, namespace=None):
"""Parse args and run further checks on the command line."""
from json import loads
from logging import DEBUG
parser = _parser()
opts = parser.parse_args(args, namespace)
config.execution.log_level = int(max(25 - 5 * opts.verbose_count, DEBUG))
config.from_dict(vars(opts))
# Load base plugin_settings from file if --use-plugin
if opts.use_plugin is not None:
from yaml import load as loadyml
with open(opts.use_plugin) as f:
plugin_settings = loadyml(f)
_plugin = plugin_settings.get('plugin')
if _plugin:
config.nipype.plugin = _plugin
config.nipype.plugin_args = plugin_settings.get('plugin_args', {})
config.nipype.nprocs = config.nipype.plugin_args.get('nprocs', config.nipype.nprocs)
# Load BIDS filters
if opts.bids_filter_file:
config.execution.bids_filters = loads(opts.bids_filter_file.read_text())
bids_dir = config.execution.bids_dir
output_dir = config.execution.output_dir
work_dir = config.execution.work_dir
version = config.environment.version
# Ensure input and output folders are not the same
if output_dir == bids_dir:
suggested_path = bids_dir / 'derivatives' / f'sdcflows_{version.split("+")[0]}'
parser.error(
'The selected output folder is the same as the input BIDS folder. '
f'Please modify the output path (suggestion: {suggested_path}).'
)
if bids_dir in work_dir.parents:
parser.error(
'The selected working directory is a subdirectory of the input BIDS folder. '
'Please modify the output path.'
)
# Setup directories
config.execution.log_dir = output_dir / 'logs'
# Check and create output and working directories
config.execution.log_dir.mkdir(exist_ok=True, parents=True)
output_dir.mkdir(exist_ok=True, parents=True)
work_dir.mkdir(exist_ok=True, parents=True)
# Force initialization of the BIDSLayout
config.execution.init()
participant_label = config.execution.layout.get_subjects()
if config.execution.participant_label is not None:
selected_label = set(config.execution.participant_label)
missing_subjects = selected_label - set(participant_label)
if missing_subjects:
parser.error(
'One or more participant labels were not found in the BIDS directory: '
f'{", ".join(missing_subjects)}.'
)
participant_label = selected_label
config.execution.participant_label = sorted(participant_label)
# Handle analysis_level
analysis_level = set(config.workflow.analysis_level)
config.workflow.analysis_level = list(analysis_level)