niworkflows.engine.plugin module

A lightweight NiPype MultiProc execution plugin.

class niworkflows.engine.plugin.DistributedPluginBase(plugin_args=None)[source]

Bases: PluginBase

Execute workflow with a distribution engine.

Combinations of proc_done and proc_pending: +————+—————+——————————–+ | proc_done | proc_pending | outcome | +============+===============+================================+ | True | False | Process is finished | +————+—————+——————————–+ | True | True | Process is currently being run | +————+—————+——————————–+ | False | False | Process is queued | +————+—————+——————————–+ | False | True | INVALID COMBINATION | +————+—————+——————————–+

Attributes:
  • procs (list) – list (N) of underlying interface elements to be processed

  • proc_done (numpy.ndarray) – a boolean numpy array (N,) signifying whether a process has been submitted for execution

  • proc_pending (numpy.ndarray) – a boolean numpy array (N,) signifying whether a process is currently running.

  • depidx (numpy.matrix) – a boolean matrix (NxN) storing the dependency structure across processes. Process dependencies are derived from each column.

run(graph, config, updatehash=False)[source]

Execute a pre-defined pipeline using distributed approaches.

class niworkflows.engine.plugin.MultiProcPlugin(pool=None, plugin_args=None)[source]

Bases: DistributedPluginBase

A lightweight re-implementation of NiPype’s MultiProc plugin.

Execute workflow with multiprocessing, not sending more jobs at once than the system can support. The plugin_args input to run can be used to control the multiprocessing execution and defining the maximum amount of memory and threads that should be used. When those parameters are not specified, the number of threads and memory of the system is used. System consuming nodes should be tagged:

memory_consuming_node.mem_gb = 8
thread_consuming_node.n_procs = 16

The default number of threads and memory are set at node creation, and are 1 and 0.25GB respectively.

class niworkflows.engine.plugin.PluginBase(plugin_args=None)[source]

Bases: object

Base class for plugins.

run(graph, config, updatehash=False)[source]

Instruct the plugin to execute the workflow graph.

The core plugin member that should be implemented by all plugins.

Parameters:
  • graph – a networkx, flattened DAG to be executed

  • config (config) – a nipype.config object

  • updatehash (bool) – whether cached nodes with stale hash should be just updated.

niworkflows.engine.plugin.run_node(node, updatehash, taskid)[source]

Execute node.run(), catch and log any errors and get a result.

Parameters:
  • node (nipype Node instance) – the node to run

  • updatehash (boolean) – flag for updating hash

  • taskid (int) – an identifier for this task

Returns:

result – dictionary containing the node runtime results and stats

Return type:

dictionary