Source code for niworkflows.utils.misc

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
#
"""Miscellaneous utilities."""
import os
from typing import Optional
import warnings


__all__ = [
    "get_template_specs",
    "fix_multi_T1w_source_name",
    "add_suffix",
    "read_crashfile",
    "splitext",
    "_copy_any",
    "clean_directory",
]


[docs] def get_template_specs( in_template: str, template_spec: Optional[dict] = None, default_resolution: int = 1, fallback: bool = False ): """ Parse template specifications >>> get_template_specs('MNI152NLin2009cAsym', {'suffix': 'T1w'})[1] {'resolution': 1} >>> get_template_specs('MNI152NLin2009cAsym', {'res': '2', 'suffix': 'T1w'})[1] {'resolution': '2'} >>> specs = get_template_specs('MNIInfant', {'res': '2', 'cohort': '10', 'suffix': 'T1w'})[1] >>> sorted(specs.items()) [('cohort', '10'), ('resolution', '2')] >>> get_template_specs('MNI152NLin2009cAsym', ... {'suffix': 'T1w', 'cohort': 1})[1] # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): RuntimeError: ... >>> get_template_specs('MNI152NLin2009cAsym', ... {'suffix': 'T1w', 'res': '1|2'})[1] # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): RuntimeError: ... >>> get_template_specs('UNCInfant', ... {'suffix': 'T1w', 'res': 1})[1] # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): RuntimeError: ... >>> get_template_specs('UNCInfant', ... {'cohort': 1, 'suffix': 'T1w', 'res': 1}, fallback=True)[1] {'resolution': None, 'cohort': 1} """ import templateflow.api as tf # Massage spec (start creating if None) template_spec = template_spec or {} template_spec["desc"] = template_spec.get("desc", None) template_spec["atlas"] = template_spec.get("atlas", None) template_spec["resolution"] = template_spec.pop( "res", template_spec.get("resolution", default_resolution) ) # Verify resolution is valid if fallback: res = template_spec['resolution'] if not isinstance(res, list): try: res = [int(res)] except Exception: res = None if res is None: res = [] available_resolutions = tf.TF_LAYOUT.get_resolutions(template=in_template) if not (set(res) & set(available_resolutions)): fallback_res = available_resolutions[0] if available_resolutions else None warnings.warn( f"Template {in_template} does not have resolution: {res}." f"Falling back to resolution: {fallback_res}." ) template_spec["resolution"] = fallback_res common_spec = {"resolution": template_spec["resolution"]} if "cohort" in template_spec: common_spec["cohort"] = template_spec["cohort"] tpl_target_path = tf.get(in_template, **template_spec) if not tpl_target_path: raise RuntimeError( """\ Could not find template "{0}" with specs={1}. Please revise your template \ argument.""".format( in_template, template_spec ) ) if isinstance(tpl_target_path, list): raise RuntimeError( """\ The available template modifiers ({0}) did not select a unique template \ (got "{1}"). Please revise your template argument.""".format( template_spec, ", ".join([str(p) for p in tpl_target_path]) ) ) return str(tpl_target_path), common_spec
[docs] def fix_multi_T1w_source_name(in_files): """ Make up a generic source name when there are multiple T1s >>> fix_multi_T1w_source_name([ ... '/path/to/sub-045_ses-test_T1w.nii.gz', ... '/path/to/sub-045_ses-retest_T1w.nii.gz']) '/path/to/sub-045_T1w.nii.gz' >>> fix_multi_T1w_source_name([ ... ('/path/to/sub-045-echo-1_T1w.nii.gz', 'path/to/sub-045-echo-2_T1w.nii.gz')]) '/path/to/sub-045_T1w.nii.gz' """ import os from nipype.utils.filemanip import filename_to_list in_file = filename_to_list(in_files)[0] if isinstance(in_file, (list, tuple)): in_file = in_file[0] base, in_file = os.path.split(in_file) subject_label = in_file.split("_", 1)[0].split("-")[1] return os.path.join(base, "sub-%s_T1w.nii.gz" % subject_label)
[docs] def add_suffix(in_files, suffix): """ Wrap nipype's fname_presuffix to conveniently just add a prefix >>> add_suffix([ ... '/path/to/sub-045_ses-test_T1w.nii.gz', ... '/path/to/sub-045_ses-retest_T1w.nii.gz'], '_test') 'sub-045_ses-test_T1w_test.nii.gz' """ import os.path as op from nipype.utils.filemanip import fname_presuffix, filename_to_list return op.basename(fname_presuffix(filename_to_list(in_files)[0], suffix=suffix))
[docs] def read_crashfile(path): if path.endswith(".pklz"): return _read_pkl(path) elif path.endswith(".txt"): return _read_txt(path) raise RuntimeError("unknown crashfile format")
def _read_pkl(path): from nipype.utils.filemanip import loadcrash crash_data = loadcrash(path) data = {"file": path, "traceback": "".join(crash_data["traceback"])} if "node" in crash_data: data["node"] = crash_data["node"] if data["node"].base_dir: data["node_dir"] = data["node"].output_dir() else: data["node_dir"] = "Node crashed before execution" data["inputs"] = sorted(data["node"].inputs.trait_get().items()) return data def _read_txt(path): """Read a txt crashfile >>> from niworkflows import data >>> crashfile = data.load('tests/crashfile.txt') >>> info = _read_txt(crashfile) >>> info['node'] # doctest: +ELLIPSIS '...func_preproc_task_machinegame_run_02_wf.carpetplot_wf.conf_plot' >>> info['traceback'] # doctest: +ELLIPSIS '...ValueError: zero-size array to reduction operation minimum which has no identity' """ from pathlib import Path lines = Path(path).read_text().splitlines() data = {"file": str(path)} traceback_start = 0 if lines[0].startswith("Node"): data["node"] = lines[0].split(": ", 1)[1].strip() data["node_dir"] = lines[1].split(": ", 1)[1].strip() inputs = [] cur_key = "" cur_val = "" for i, line in enumerate(lines[5:]): if not line.strip(): continue if line[0].isspace(): cur_val += line continue if cur_val: inputs.append((cur_key, cur_val.strip())) if line.startswith("Traceback ("): traceback_start = i + 5 break cur_key, cur_val = tuple(line.split(" = ", 1)) data["inputs"] = sorted(inputs) else: data["node_dir"] = "Node crashed before execution" data["traceback"] = "\n".join(lines[traceback_start:]).strip() return data
[docs] def splitext(fname): """ Split filename in name and extension (.gz safe). Examples -------- >>> splitext('some/file.nii.gz') ('file', '.nii.gz') >>> splitext('some/other/file.nii') ('file', '.nii') >>> splitext('otherext.tar.gz') ('otherext', '.tar.gz') >>> splitext('text.txt') ('text', '.txt') >>> splitext('some/figure.svg') ('figure', '.svg') >>> splitext('some/figure.svg.gz') ('figure', '.svg.gz') >>> splitext('some/sub-01_bold.func.gii') ('sub-01_bold.func', '.gii') """ from pathlib import Path basename = str(Path(fname).name) stem = Path(basename.rstrip(".gz")).stem return stem, basename[len(stem):]
def _copy_any(src, dst): import os import gzip from shutil import copyfileobj from nipype.utils.filemanip import copyfile src_isgz = os.fspath(src).endswith(".gz") dst_isgz = os.fspath(dst).endswith(".gz") if not src_isgz and not dst_isgz: copyfile(src, dst, copy=True, use_hardlink=True) return False # Make sure we do not reuse the hardlink later # Unlink target (should not exist) if os.path.exists(dst): os.unlink(dst) src_open = gzip.open if src_isgz else open with src_open(src, "rb") as f_in: with open(dst, "wb") as f_out: if dst_isgz: # Remove FNAME header from gzip (nipreps/fmriprep#1480) gz_out = gzip.GzipFile("", "wb", 9, f_out, 0.0) copyfileobj(f_in, gz_out) gz_out.close() else: copyfileobj(f_in, f_out) return True
[docs] def clean_directory(path): """ Clears a directory of all contents. Returns `True` if no content remains. If any content cannot be removed, returns `False`. Notes ----- This function is not guaranteed to work across multiple threads or processes. """ from pathlib import Path import shutil try: for f in Path(path).iterdir(): if f.is_file() or f.is_symlink(): f.unlink() elif f.is_dir(): shutil.rmtree(str(f)) except OSError: return False return True
def pass_dummy_scans(algo_dummy_scans, dummy_scans=None): """ Graft manually provided number of dummy scans, if necessary. Parameters ---------- algo_dummy_scans : int number of volumes to skip determined by an algorithm dummy_scans : int or None number of volumes to skip determined by the user Returns ------- skip_vols_num : int number of volumes to skip """ if dummy_scans is None: return algo_dummy_scans return dummy_scans def check_valid_fs_license(): """ Run ``mri_convert`` to assess FreeSurfer access to a license. Returns ------- valid : :obj:`bool` FreeSurfer successfully executed (valid license) """ from pathlib import Path import subprocess as sp from tempfile import TemporaryDirectory from .. import data with TemporaryDirectory() as tmpdir, data.load.as_path("sentinel.nii.gz") as sentinel: # quick FreeSurfer command _cmd = ("mri_convert", str(sentinel), str(Path(tmpdir) / "out.mgz")) proc = sp.run(_cmd, stdout=sp.PIPE, stderr=sp.STDOUT) return proc.returncode == 0 and "ERROR:" not in proc.stdout.decode() def unlink(pathlike, missing_ok=False): """Backport of Path.unlink from Python 3.8+ with missing_ok keyword""" # PY37 hack; drop when python_requires >= 3.8 try: os.unlink(pathlike) except FileNotFoundError: if not missing_ok: raise if __name__ == "__main__": pass