Source code for niworkflows.utils.misc

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""Miscellaneous utilities."""


__all__ = [
    "get_template_specs",
    "fix_multi_T1w_source_name",
    "add_suffix",
    "read_crashfile",
    "splitext",
    "_copy_any",
    "clean_directory",
]


[docs]def get_template_specs(in_template, template_spec=None, default_resolution=1): """ Parse template specifications >>> get_template_specs('MNI152NLin2009cAsym', {'suffix': 'T1w'})[1] {'resolution': 1} >>> get_template_specs('MNI152NLin2009cAsym', {'res': '2', 'suffix': 'T1w'})[1] {'resolution': '2'} >>> specs = get_template_specs('MNIInfant', {'res': '2', 'cohort': '10', 'suffix': 'T1w'})[1] >>> sorted(specs.items()) [('cohort', '10'), ('resolution', '2')] >>> get_template_specs('MNI152NLin2009cAsym', ... {'suffix': 'T1w', 'cohort': 1})[1] # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): RuntimeError: ... >>> get_template_specs('MNI152NLin2009cAsym', ... {'suffix': 'T1w', 'res': '1|2'})[1] # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): RuntimeError: ... """ from templateflow.api import get as get_template # Massage spec (start creating if None) template_spec = template_spec or {} template_spec["desc"] = template_spec.get("desc", None) template_spec["atlas"] = template_spec.get("atlas", None) template_spec["resolution"] = template_spec.pop( "res", template_spec.get("resolution", default_resolution) ) common_spec = {"resolution": template_spec["resolution"]} if "cohort" in template_spec: common_spec["cohort"] = template_spec["cohort"] tpl_target_path = get_template(in_template, **template_spec) if not tpl_target_path: raise RuntimeError( """\ Could not find template "{0}" with specs={1}. Please revise your template \ argument.""".format( in_template, template_spec ) ) if isinstance(tpl_target_path, list): raise RuntimeError( """\ The available template modifiers ({0}) did not select a unique template \ (got "{1}"). Please revise your template argument.""".format( template_spec, ", ".join([str(p) for p in tpl_target_path]) ) ) return str(tpl_target_path), common_spec
[docs]def fix_multi_T1w_source_name(in_files): """ Make up a generic source name when there are multiple T1s >>> fix_multi_T1w_source_name([ ... '/path/to/sub-045_ses-test_T1w.nii.gz', ... '/path/to/sub-045_ses-retest_T1w.nii.gz']) '/path/to/sub-045_T1w.nii.gz' """ import os from nipype.utils.filemanip import filename_to_list base, in_file = os.path.split(filename_to_list(in_files)[0]) subject_label = in_file.split("_", 1)[0].split("-")[1] return os.path.join(base, "sub-%s_T1w.nii.gz" % subject_label)
[docs]def add_suffix(in_files, suffix): """ Wrap nipype's fname_presuffix to conveniently just add a prefix >>> add_suffix([ ... '/path/to/sub-045_ses-test_T1w.nii.gz', ... '/path/to/sub-045_ses-retest_T1w.nii.gz'], '_test') 'sub-045_ses-test_T1w_test.nii.gz' """ import os.path as op from nipype.utils.filemanip import fname_presuffix, filename_to_list return op.basename(fname_presuffix(filename_to_list(in_files)[0], suffix=suffix))
[docs]def read_crashfile(path): if path.endswith(".pklz"): return _read_pkl(path) elif path.endswith(".txt"): return _read_txt(path) raise RuntimeError("unknown crashfile format")
def _read_pkl(path): from nipype.utils.filemanip import loadcrash crash_data = loadcrash(path) data = {"file": path, "traceback": "".join(crash_data["traceback"])} if "node" in crash_data: data["node"] = crash_data["node"] if data["node"].base_dir: data["node_dir"] = data["node"].output_dir() else: data["node_dir"] = "Node crashed before execution" data["inputs"] = sorted(data["node"].inputs.trait_get().items()) return data def _read_txt(path): """Read a txt crashfile >>> new_path = Path(__file__).resolve().parent.parent >>> test_data_path = new_path / 'data' / 'tests' >>> info = _read_txt(test_data_path / 'crashfile.txt') >>> info['node'] # doctest: +ELLIPSIS '...func_preproc_task_machinegame_run_02_wf.carpetplot_wf.conf_plot' >>> info['traceback'] # doctest: +ELLIPSIS '...ValueError: zero-size array to reduction operation minimum which has no identity' """ from pathlib import Path lines = Path(path).read_text().splitlines() data = {"file": str(path)} traceback_start = 0 if lines[0].startswith("Node"): data["node"] = lines[0].split(": ", 1)[1].strip() data["node_dir"] = lines[1].split(": ", 1)[1].strip() inputs = [] cur_key = "" cur_val = "" for i, line in enumerate(lines[5:]): if not line.strip(): continue if line[0].isspace(): cur_val += line continue if cur_val: inputs.append((cur_key, cur_val.strip())) if line.startswith("Traceback ("): traceback_start = i + 5 break cur_key, cur_val = tuple(line.split(" = ", 1)) data["inputs"] = sorted(inputs) else: data["node_dir"] = "Node crashed before execution" data["traceback"] = "\n".join(lines[traceback_start:]).strip() return data
[docs]def splitext(fname): """ Split filename in name and extension (.gz safe). Examples -------- >>> splitext('some/file.nii.gz') ('file', '.nii.gz') >>> splitext('some/other/file.nii') ('file', '.nii') >>> splitext('otherext.tar.gz') ('otherext', '.tar.gz') >>> splitext('text.txt') ('text', '.txt') >>> splitext('some/figure.svg') ('figure', '.svg') >>> splitext('some/figure.svg.gz') ('figure', '.svg.gz') >>> splitext('some/sub-01_bold.func.gii') ('sub-01_bold.func', '.gii') """ from pathlib import Path basename = str(Path(fname).name) stem = Path(basename.rstrip(".gz")).stem return stem, basename[len(stem):]
def _copy_any(src, dst): import os import gzip from shutil import copyfileobj from nipype.utils.filemanip import copyfile src_isgz = src.endswith(".gz") dst_isgz = dst.endswith(".gz") if not src_isgz and not dst_isgz: copyfile(src, dst, copy=True, use_hardlink=True) return False # Make sure we do not reuse the hardlink later # Unlink target (should not exist) if os.path.exists(dst): os.unlink(dst) src_open = gzip.open if src_isgz else open with src_open(src, "rb") as f_in: with open(dst, "wb") as f_out: if dst_isgz: # Remove FNAME header from gzip (nipreps/fmriprep#1480) gz_out = gzip.GzipFile("", "wb", 9, f_out, 0.0) copyfileobj(f_in, gz_out) gz_out.close() else: copyfileobj(f_in, f_out) return True
[docs]def clean_directory(path): """ Clears a directory of all contents. Returns `True` if no content remains. If any content cannot be removed, returns `False`. Notes ----- This function is not guaranteed to work across multiple threads or processes. """ from pathlib import Path import shutil try: for f in Path(path).iterdir(): if f.is_file() or f.is_symlink(): f.unlink() elif f.is_dir(): shutil.rmtree(str(f)) except OSError: return False return True
def pass_dummy_scans(algo_dummy_scans, dummy_scans=None): """ Graft manually provided number of dummy scans, if necessary. Parameters ---------- algo_dummy_scans : int number of volumes to skip determined by an algorithm dummy_scans : int or None number of volumes to skip determined by the user Returns ------- skip_vols_num : int number of volumes to skip """ if dummy_scans is None: return algo_dummy_scans return dummy_scans def check_valid_fs_license(): """ Run ``mri_convert`` to assess FreeSurfer access to a license. Returns ------- valid : :obj:`bool` FreeSurfer successfully executed (valid license) """ from pathlib import Path import subprocess as sp from tempfile import TemporaryDirectory from pkg_resources import resource_filename with TemporaryDirectory() as tmpdir: # quick FreeSurfer command _cmd = ( "mri_convert", resource_filename("niworkflows", "data/sentinel.nii.gz"), str(Path(tmpdir) / "out.mgz"), ) proc = sp.run(_cmd, stdout=sp.PIPE, stderr=sp.STDOUT) return proc.returncode == 0 and "ERROR:" not in proc.stdout.decode() if __name__ == "__main__": pass