Source code for sdcflows.cli.parser

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2023 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
#
"""Standalone command line executable for estimation of fieldmaps."""

from argparse import Action, ArgumentDefaultsHelpFormatter, ArgumentParser
from functools import partial
from pathlib import Path

from sdcflows import config


def _parse_participant_labels(value):
    """
    Drop ``sub-`` prefix of participant labels.

    >>> _parse_participant_labels("s060")
    ['s060']
    >>> _parse_participant_labels("sub-s060")
    ['s060']
    >>> _parse_participant_labels("s060 sub-s050")
    ['s050', 's060']
    >>> _parse_participant_labels("s060 sub-s060")
    ['s060']
    >>> _parse_participant_labels("s060\tsub-s060")
    ['s060']

    """
    return sorted({item.removeprefix('sub-') for item in value.split()})


def _parser():
    class ParticipantLabelAction(Action):
        def __call__(self, parser, namespace, values, option_string=None):
            setattr(namespace, self.dest, _parse_participant_labels(' '.join(values)))

    def _path_exists(path, parser):
        """Ensure a given path exists."""
        if path is None or not Path(path).exists():
            raise parser.error(f'Path does not exist: <{path}>.')
        return Path(path).expanduser().absolute()

    def _min_one(value, parser):
        """Ensure an argument is not lower than 1."""
        value = int(value)
        if value < 1:
            raise parser.error("Argument can't be less than one.")
        return value

    def _to_gb(value):
        scale = {'G': 1, 'T': 10**3, 'M': 1e-3, 'K': 1e-6, 'B': 1e-9}
        digits = ''.join([c for c in value if c.isdigit()])
        n_digits = len(digits)
        units = value[n_digits:] or 'G'
        return int(digits) * scale[units[0]]

    def _bids_filter(value):
        from json import loads

        if value and Path(value).exists():
            return loads(Path(value).read_text())

    parser = ArgumentParser(
        description=f"""\
SDCFlows {config.environment.version}

Estimate fieldmaps available in a BIDS-compliant MRI dataset.""",
        formatter_class=ArgumentDefaultsHelpFormatter,
    )
    PathExists = partial(_path_exists, parser=parser)
    PositiveInt = partial(_min_one, parser=parser)

    parser.add_argument(
        'bids_dir',
        action='store',
        type=PathExists,
        help='The root folder of a BIDS valid dataset (sub-XXXXX folders should '
        'be found at the top level in this folder).',
    )
    parser.add_argument(
        'output_dir',
        action='store',
        type=Path,
        help='The directory where the output files '
        'should be stored. If you are running group level analysis '
        'this folder should be prepopulated with the results of the '
        'participant level analysis.',
    )
    parser.add_argument(
        'analysis_level',
        action='store',
        nargs='+',
        help='Level of the analysis that will be performed. '
        'Multiple participant level analyses can be run independently '
        '(in parallel) using the same output_dir.',
        choices=['participant', 'group'],
    )

    # optional arguments
    parser.add_argument(
        '--version', action='version', version=f'SDCFlows {config.environment.version}'
    )
    parser.add_argument(
        '-v',
        '--verbose',
        dest='verbose_count',
        action='count',
        default=0,
        help='Increases log verbosity for each occurrence, debug level is -vvv.',
    )

    # main options
    g_bids = parser.add_argument_group('Options related to BIDS')
    g_bids.add_argument(
        '--participant-label',
        action=ParticipantLabelAction,
        nargs='+',
        help='A space delimited list of participant identifiers or a single '
        'identifier (the sub- prefix can be removed).',
    )
    g_bids.add_argument(
        '--session-label',
        action='store',
        nargs='*',
        type=str,
        help='Filter input dataset by session label.',
    )
    g_bids.add_argument(
        '--bids-filter-file',
        action='store',
        type=Path,
        metavar='PATH',
        help='a JSON file describing custom BIDS input filter using pybids '
        '{<suffix>:{<entity>:<filter>,...},...} '
        '(https://github.com/bids-standard/pybids/blob/master/bids/layout/config/bids.json)',
    )
    g_bids.add_argument(
        '--bids-database-dir',
        metavar='PATH',
        type=PathExists,
        help='Path to an existing PyBIDS database folder, for faster indexing '
        '(especially useful for large datasets).',
    )
    g_bids.add_argument(
        '--bids-database-wipe',
        action='store_true',
        default=False,
        help='Wipe out previously existing BIDS indexing caches, forcing re-indexing.',
    )

    # General performance
    g_perfm = parser.add_argument_group('Options to handle performance')
    g_perfm.add_argument(
        '--nprocs',
        action='store',
        type=PositiveInt,
        help="""\
Maximum number of simultaneously running parallel processes executed by *SDCFlows* \
(e.g., several instances of ANTs' registration). \
However, when ``--nprocs`` is greater or equal to the ``--omp-nthreads`` option, \
it also sets the maximum number of threads that simultaneously running processes \
may aggregate (meaning, with ``--nprocs 16 --omp-nthreads 8`` a maximum of two \
8-CPU-threaded processes will be running at a given time). \
Under this mode of operation, ``--nprocs`` sets the maximum number of processors \
that can be assigned work within an *SDCFlows* job, which includes all the processors \
used by currently running single- and multi-threaded processes. \
If ``None``, the number of CPUs available will be automatically assigned (which may \
not be what you want in, e.g., shared systems like a HPC cluster.""",
    )
    g_perfm.add_argument(
        '--omp-nthreads',
        action='store',
        type=PositiveInt,
        help="""\
Maximum number of threads that multi-threaded processes executed by *SDCFlows* \
(e.g., ANTs' registration) can use. \
If ``None``, the number of CPUs available will be automatically assigned (which may \
not be what you want in, e.g., shared systems like a HPC cluster.""",
    )
    g_perfm.add_argument(
        '--mem-gb',
        dest='memory_gb',
        action='store',
        type=_to_gb,
        help='Upper bound memory limit for SDCFlows processes.',
    )
    g_perfm.add_argument(
        '--debug',
        action='store_true',
        default=False,
        help='Enable changes to processing to aid in debugging',
    )
    g_perfm.add_argument(
        '--pdb',
        dest='pdb',
        action='store_true',
        default=False,
        help='Open Python debugger (pdb) on exceptions.',
    )
    g_perfm.add_argument(
        '--sloppy',
        action='store_true',
        default=False,
        help='Use sloppy mode for minimal footprint.',
    )

    # Control instruments
    g_outputs = parser.add_argument_group('Instrumental options')
    g_outputs.add_argument(
        '-w',
        '--work-dir',
        action='store',
        type=Path,
        default=Path('work').absolute(),
        help='Path where intermediate results should be stored.',
    )
    g_outputs.add_argument(
        '-n',
        '--dry-run',
        action='store_true',
        default=False,
        help='only find estimable fieldmaps (that is, estimation is not triggered)',
    )
    g_outputs.add_argument(
        '--no-fmapless',
        action='store_false',
        dest='fmapless',
        default=True,
        help='Allow fieldmap-less estimation',
    )
    g_outputs.add_argument(
        '--use-plugin',
        action='store',
        default=None,
        type=Path,
        help='Nipype plugin configuration file.',
    )
    g_outputs.add_argument(
        '--notrack',
        action='store_true',
        help='Opt-out of sending tracking information of this run to the NiPreps developers. '
        'This information helps to improve SDCFlows and provides an indicator of '
        'real world usage for obtaining funding.',
    )

    return parser


[docs] def parse_args(args=None, namespace=None): """Parse args and run further checks on the command line.""" from json import loads from logging import DEBUG parser = _parser() opts = parser.parse_args(args, namespace) config.execution.log_level = int(max(25 - 5 * opts.verbose_count, DEBUG)) config.from_dict(vars(opts)) # Load base plugin_settings from file if --use-plugin if opts.use_plugin is not None: from yaml import load as loadyml with open(opts.use_plugin) as f: plugin_settings = loadyml(f) _plugin = plugin_settings.get('plugin') if _plugin: config.nipype.plugin = _plugin config.nipype.plugin_args = plugin_settings.get('plugin_args', {}) config.nipype.nprocs = config.nipype.plugin_args.get('nprocs', config.nipype.nprocs) # Load BIDS filters if opts.bids_filter_file: config.execution.bids_filters = loads(opts.bids_filter_file.read_text()) bids_dir = config.execution.bids_dir output_dir = config.execution.output_dir work_dir = config.execution.work_dir version = config.environment.version # Ensure input and output folders are not the same if output_dir == bids_dir: suggested_path = bids_dir / 'derivatives' / f'sdcflows_{version.split("+")[0]}' parser.error( 'The selected output folder is the same as the input BIDS folder. ' f'Please modify the output path (suggestion: {suggested_path}).' ) if bids_dir in work_dir.parents: parser.error( 'The selected working directory is a subdirectory of the input BIDS folder. ' 'Please modify the output path.' ) # Setup directories config.execution.log_dir = output_dir / 'logs' # Check and create output and working directories config.execution.log_dir.mkdir(exist_ok=True, parents=True) output_dir.mkdir(exist_ok=True, parents=True) work_dir.mkdir(exist_ok=True, parents=True) # Force initialization of the BIDSLayout config.execution.init() participant_label = config.execution.layout.get_subjects() if config.execution.participant_label is not None: selected_label = set(config.execution.participant_label) missing_subjects = selected_label - set(participant_label) if missing_subjects: parser.error( 'One or more participant labels were not found in the BIDS directory: ' f'{", ".join(missing_subjects)}.' ) participant_label = selected_label config.execution.participant_label = sorted(participant_label) # Handle analysis_level analysis_level = set(config.workflow.analysis_level) config.workflow.analysis_level = list(analysis_level)