Skip to content

[WIP,ENH] Revision to the resource profiler #2193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
da6681f
[WIP,ENH] Revision to the resource profiler
oesteban Sep 21, 2017
32c2f39
fix tests
oesteban Sep 22, 2017
0e2c581
Python 2 compatibility
oesteban Sep 22, 2017
5a8e7fe
add nipype_mprof
oesteban Sep 22, 2017
7d953cc
implement monitor in a parallel process
oesteban Sep 22, 2017
306c4ec
set profiling outputs to runtime object, read it from node execution
oesteban Sep 22, 2017
8a903f0
revise profiler callback
oesteban Sep 22, 2017
02fdbda
Merge remote-tracking branch 'upstream/master' into enh/ReviseResourc…
oesteban Sep 24, 2017
e3982d7
robuster constructor
oesteban Sep 25, 2017
48f87af
remove unused import
oesteban Sep 25, 2017
46dde32
various fixes
oesteban Sep 25, 2017
9d70a2f
cleaning up code
oesteban Sep 25, 2017
1fabd25
remove comment
oesteban Sep 25, 2017
ecedfcf
interface.base cleanup
oesteban Sep 25, 2017
2d35959
update new config settings
oesteban Sep 25, 2017
3f34711
make naming consistent across tests
oesteban Sep 25, 2017
99ded42
implement raise_insufficient
oesteban Sep 26, 2017
b0d25bd
fix test
oesteban Sep 26, 2017
2a37693
fix test (amend previous commit)
oesteban Sep 26, 2017
10d0f39
address review comments
oesteban Sep 26, 2017
62a6593
fix typo
oesteban Sep 26, 2017
d6401f3
fixes to the tear-up section of interfaces
oesteban Sep 26, 2017
ce3f08a
fix NoSuchProcess exception
oesteban Sep 26, 2017
ffb7509
making monitor robuster
oesteban Sep 26, 2017
7b7846b
Merge remote-tracking branch 'upstream/master' into enh/ReviseResourc…
oesteban Sep 26, 2017
c9b474b
first functional prototype
oesteban Sep 26, 2017
117924c
Merge remote-tracking branch 'upstream/master' into enh/ReviseResourc…
oesteban Sep 27, 2017
cf1f15b
add warning to old filemanip logger
oesteban Sep 27, 2017
4b7ab93
do not search for filemanip_level in config
oesteban Sep 27, 2017
c7a1992
fix CommandLine interface doctest
oesteban Sep 27, 2017
8d02397
update specs
oesteban Sep 27, 2017
c789b17
fix tests
oesteban Sep 27, 2017
a9824f1
fix location of use_resources
oesteban Sep 27, 2017
30d79e9
fix attribute error when input spec is not standard
oesteban Sep 27, 2017
49d4843
re-include filemanip logger into config documentation
oesteban Sep 27, 2017
ff94a4b
minor additions to resource_monitor option
oesteban Sep 27, 2017
55acde0
fix resource_monitor tests
oesteban Sep 27, 2017
7cd02ee
run build 2 (the shortest) with the resource monitor on
oesteban Sep 27, 2017
a42ef60
fix unbound variable
oesteban Sep 27, 2017
10865f1
collect resource_monitor info after run
oesteban Sep 27, 2017
e0e341b
reduce resource_monitor_frequency on tests (and we test it works)
oesteban Sep 27, 2017
06c9f20
store a new trace before exit
oesteban Sep 27, 2017
03c8d2e
run resource_monitor only for level2 of fmri_spm_nested, switch pytho…
oesteban Sep 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions .circle/tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ case ${CIRCLE_NODE_INDEX} in
exitcode=$?
;;
2)
docker run --rm=false -it -e NIPYPE_NUMBER_OF_CPUS=4 -v $HOME/examples:/data/examples:ro -v $WORKDIR:/work -w /work nipype/nipype:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /data/examples/ level1 && \
docker run --rm=false -it -e NIPYPE_NUMBER_OF_CPUS=4 -v $HOME/examples:/data/examples:ro -v $WORKDIR:/work -w /work nipype/nipype:py36 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /data/examples/ l2pipeline
docker run --rm=false -it -e NIPYPE_NUMBER_OF_CPUS=4 -v $HOME/examples:/data/examples:ro -v $WORKDIR:/work -w /work nipype/nipype:py36 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /data/examples/ level1 && \
docker run --rm=false -it -e NIPYPE_NUMBER_OF_CPUS=4 -e NIPYPE_RESOURCE_MONITOR=1 -v $HOME/examples:/data/examples:ro -v $WORKDIR:/work -w /work nipype/nipype:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /data/examples/ l2pipeline
exitcode=$?
;;
3)
Expand Down
140 changes: 76 additions & 64 deletions doc/users/config_file.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,93 +14,97 @@ Logging
~~~~~~~

*workflow_level*
How detailed the logs regarding workflow should be (possible values:
``INFO`` and ``DEBUG``; default value: ``INFO``)
*filemanip_level*
How detailed the logs regarding file operations (for example overwriting
warning) should be (possible values: ``INFO`` and ``DEBUG``; default value:
``INFO``)
How detailed the logs regarding workflow should be (possible values:
``INFO`` and ``DEBUG``; default value: ``INFO``)
*utils_level*
How detailed the logs regarding nipype utils, like file operations
(for example overwriting warning) or the resource profiler, should be
(possible values: ``INFO`` and ``DEBUG``; default value:
``INFO``)
*interface_level*
How detailed the logs regarding interface execution should be (possible
values: ``INFO`` and ``DEBUG``; default value: ``INFO``)
How detailed the logs regarding interface execution should be (possible
values: ``INFO`` and ``DEBUG``; default value: ``INFO``)
*filemanip_level* (deprecated as of 0.13.2)
How detailed the logs regarding file operations (for example overwriting
warning) should be (possible values: ``INFO`` and ``DEBUG``)
*log_to_file*
Indicates whether logging should also send the output to a file (possible
values: ``true`` and ``false``; default value: ``false``)
*log_directory*
Where to store logs. (string, default value: home directory)
Where to store logs. (string, default value: home directory)
*log_size*
Size of a single log file. (integer, default value: 254000)
Size of a single log file. (integer, default value: 254000)
*log_rotate*
How many rotation should the log file make. (integer, default value: 4)
How many rotation should the log file make. (integer, default value: 4)

Execution
~~~~~~~~~

*plugin*
This defines which execution plugin to use. (possible values: ``Linear``,
``MultiProc``, ``SGE``, ``IPython``; default value: ``Linear``)
This defines which execution plugin to use. (possible values: ``Linear``,
``MultiProc``, ``SGE``, ``IPython``; default value: ``Linear``)

*stop_on_first_crash*
Should the workflow stop upon first node crashing or try to execute as many
nodes as possible? (possible values: ``true`` and ``false``; default value:
``false``)
Should the workflow stop upon first node crashing or try to execute as many
nodes as possible? (possible values: ``true`` and ``false``; default value:
``false``)

*stop_on_first_rerun*
Should the workflow stop upon first node trying to recompute (by that we
mean rerunning a node that has been run before - this can happen due changed
inputs and/or hash_method since the last run). (possible values: ``true``
and ``false``; default value: ``false``)
Should the workflow stop upon first node trying to recompute (by that we
mean rerunning a node that has been run before - this can happen due changed
inputs and/or hash_method since the last run). (possible values: ``true``
and ``false``; default value: ``false``)

*hash_method*
Should the input files be checked for changes using their content (slow, but
100% accurate) or just their size and modification date (fast, but
potentially prone to errors)? (possible values: ``content`` and
``timestamp``; default value: ``timestamp``)
Should the input files be checked for changes using their content (slow, but
100% accurate) or just their size and modification date (fast, but
potentially prone to errors)? (possible values: ``content`` and
``timestamp``; default value: ``timestamp``)

*keep_inputs*
Ensures that all inputs that are created in the nodes working directory are
kept after node execution (possible values: ``true`` and ``false``; default
value: ``false``)

*single_thread_matlab*
Should all of the Matlab interfaces (including SPM) use only one thread?
This is useful if you are parallelizing your workflow using MultiProc or
IPython on a single multicore machine. (possible values: ``true`` and
``false``; default value: ``true``)
Should all of the Matlab interfaces (including SPM) use only one thread?
This is useful if you are parallelizing your workflow using MultiProc or
IPython on a single multicore machine. (possible values: ``true`` and
``false``; default value: ``true``)

*display_variable*
What ``DISPLAY`` variable should all command line interfaces be
run with. This is useful if you are using `xnest
<http://www.x.org/archive/X11R7.5/doc/man/man1/Xnest.1.html>`_
or `Xvfb <http://www.x.org/archive/X11R6.8.1/doc/Xvfb.1.html>`_
and you would like to redirect all spawned windows to
it. (possible values: any X server address; default value: not
set)
What ``DISPLAY`` variable should all command line interfaces be
run with. This is useful if you are using `xnest
<http://www.x.org/archive/X11R7.5/doc/man/man1/Xnest.1.html>`_
or `Xvfb <http://www.x.org/archive/X11R6.8.1/doc/Xvfb.1.html>`_
and you would like to redirect all spawned windows to
it. (possible values: any X server address; default value: not
set)

*remove_unnecessary_outputs*
This will remove any interface outputs not needed by the workflow. If the
required outputs from a node changes, rerunning the workflow will rerun the
node. Outputs of leaf nodes (nodes whose outputs are not connected to any
other nodes) will never be deleted independent of this parameter. (possible
values: ``true`` and ``false``; default value: ``true``)
This will remove any interface outputs not needed by the workflow. If the
required outputs from a node changes, rerunning the workflow will rerun the
node. Outputs of leaf nodes (nodes whose outputs are not connected to any
other nodes) will never be deleted independent of this parameter. (possible
values: ``true`` and ``false``; default value: ``true``)

*try_hard_link_datasink*
When the DataSink is used to produce an orginized output file outside
of nipypes internal cache structure, a file system hard link will be
attempted first. A hard link allow multiple file paths to point to the
same physical storage location on disk if the conditions allow. By
refering to the same physical file on disk (instead of copying files
byte-by-byte) we can avoid unnecessary data duplication. If hard links
are not supported for the source or destination paths specified, then
a standard byte-by-byte copy is used. (possible values: ``true`` and
``false``; default value: ``true``)
When the DataSink is used to produce an orginized output file outside
of nipypes internal cache structure, a file system hard link will be
attempted first. A hard link allow multiple file paths to point to the
same physical storage location on disk if the conditions allow. By
refering to the same physical file on disk (instead of copying files
byte-by-byte) we can avoid unnecessary data duplication. If hard links
are not supported for the source or destination paths specified, then
a standard byte-by-byte copy is used. (possible values: ``true`` and
``false``; default value: ``true``)

*use_relative_paths*
Should the paths stored in results (and used to look for inputs)
be relative or absolute. Relative paths allow moving the whole
working directory around but may cause problems with
symlinks. (possible values: ``true`` and ``false``; default
value: ``false``)
Should the paths stored in results (and used to look for inputs)
be relative or absolute. Relative paths allow moving the whole
working directory around but may cause problems with
symlinks. (possible values: ``true`` and ``false``; default
value: ``false``)

*local_hash_check*
Perform the hash check on the job submission machine. This option minimizes
Expand All @@ -115,10 +119,10 @@ Execution
done after a job finish is detected. (float in seconds; default value: 5)

*remove_node_directories (EXPERIMENTAL)*
Removes directories whose outputs have already been used
up. Doesn't work with IdentiInterface or any node that patches
data through (without copying) (possible values: ``true`` and
``false``; default value: ``false``)
Removes directories whose outputs have already been used
up. Doesn't work with IdentiInterface or any node that patches
data through (without copying) (possible values: ``true`` and
``false``; default value: ``false``)

*stop_on_unknown_version*
If this is set to True, an underlying interface will raise an error, when no
Expand Down Expand Up @@ -146,18 +150,26 @@ Execution
crashfiles allow portability across machines and shorter load time.
(possible values: ``pklz`` and ``txt``; default value: ``pklz``)

*resource_monitor*
Enables monitoring the resources occupation (possible values: ``true`` and
``false``; default value: ``false``)

*resource_monitor_frequency*
Sampling period (in seconds) between measurements of resources (memory, cpus)
being used by an interface. Requires ``resource_monitor`` to be ``true``.

Example
~~~~~~~

::

[logging]
workflow_level = DEBUG
[logging]
workflow_level = DEBUG

[execution]
stop_on_first_crash = true
hash_method = timestamp
display_variable = :1
[execution]
stop_on_first_crash = true
hash_method = timestamp
display_variable = :1

Workflow.config property has a form of a nested dictionary reflecting the
structure of the .cfg file.
Expand Down
8 changes: 8 additions & 0 deletions doc/users/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ Optional arguments::
n_procs : Number of processes to launch in parallel, if not set number of
processors/threads will be automatically detected

memory_gb : Total memory available to be shared by all simultaneous tasks
currently running, if not set it will be automatically set to 90\% of
system RAM.

raise_insufficient : Raise exception when the estimated resources of a node
exceed the total amount of resources available (memory and threads), when
``False`` (default), only a warning will be issued.

To distribute processing on a multicore machine, simply call::

workflow.run(plugin='MultiProc')
Expand Down
4 changes: 2 additions & 2 deletions doc/users/resource_sched_profiler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ by setting the ``status_callback`` parameter to point to this function in the

::

from nipype.pipeline.plugins.callback_log import log_nodes_cb
from nipype.utils.profiler import log_nodes_cb
args_dict = {'n_procs' : 8, 'memory_gb' : 10, 'status_callback' : log_nodes_cb}

To set the filepath for the callback log the ``'callback'`` logger must be
Expand Down Expand Up @@ -141,7 +141,7 @@ The pandas_ Python package is required to use this feature.

::

from nipype.pipeline.plugins.callback_log import log_nodes_cb
from nipype.utils.profiler import log_nodes_cb
args_dict = {'n_procs' : 8, 'memory_gb' : 10, 'status_callback' : log_nodes_cb}
workflow.run(plugin='MultiProc', plugin_args=args_dict)

Expand Down
8 changes: 7 additions & 1 deletion docker/files/run_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ mkdir -p ${HOME}/.nipype ${WORKDIR}/logs/example_${example_id} ${WORKDIR}/tests
echo "[logging]" > ${HOME}/.nipype/nipype.cfg
echo "workflow_level = DEBUG" >> ${HOME}/.nipype/nipype.cfg
echo "interface_level = DEBUG" >> ${HOME}/.nipype/nipype.cfg
echo "filemanip_level = DEBUG" >> ${HOME}/.nipype/nipype.cfg
echo "utils_level = DEBUG" >> ${HOME}/.nipype/nipype.cfg
echo "log_to_file = true" >> ${HOME}/.nipype/nipype.cfg
echo "log_directory = ${WORKDIR}/logs/example_${example_id}" >> ${HOME}/.nipype/nipype.cfg

if [[ "${NIPYPE_RESOURCE_MONITOR:-0}" == "1" ]]; then
echo '[execution]' >> ${HOME}/.nipype/nipype.cfg
echo 'resource_monitor = true' >> ${HOME}/.nipype/nipype.cfg
echo 'resource_monitor_frequency = 3' >> ${HOME}/.nipype/nipype.cfg
fi

# Set up coverage
export COVERAGE_FILE=${WORKDIR}/tests/.coverage.${example_id}
if [ "$2" == "MultiProc" ]; then
Expand Down
8 changes: 4 additions & 4 deletions docker/files/run_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ echo '[logging]' > ${HOME}/.nipype/nipype.cfg
echo 'log_to_file = true' >> ${HOME}/.nipype/nipype.cfg
echo "log_directory = ${WORKDIR}/logs/py${PYTHON_VERSION}" >> ${HOME}/.nipype/nipype.cfg

# Enable profile_runtime tests only for python 2.7
# Enable resource_monitor tests only for python 2.7
if [[ "${PYTHON_VERSION}" -lt "30" ]]; then
echo '[execution]' >> ${HOME}/.nipype/nipype.cfg
echo 'profile_runtime = true' >> ${HOME}/.nipype/nipype.cfg
echo 'resource_monitor = true' >> ${HOME}/.nipype/nipype.cfg
fi

# Run tests using pytest
Expand All @@ -31,9 +31,9 @@ exit_code=$?
# Workaround: run here the profiler tests in python 3
if [[ "${PYTHON_VERSION}" -ge "30" ]]; then
echo '[execution]' >> ${HOME}/.nipype/nipype.cfg
echo 'profile_runtime = true' >> ${HOME}/.nipype/nipype.cfg
echo 'resource_monitor = true' >> ${HOME}/.nipype/nipype.cfg
export COVERAGE_FILE=${WORKDIR}/tests/.coverage.py${PYTHON_VERSION}_extra
py.test -v --junitxml=${WORKDIR}/tests/pytests_py${PYTHON_VERSION}_extra.xml --cov nipype --cov-report xml:${WORKDIR}/tests/coverage_py${PYTHON_VERSION}_extra.xml /src/nipype/nipype/interfaces/tests/test_runtime_profiler.py /src/nipype/nipype/pipeline/plugins/tests/test_multiproc*.py
py.test -v --junitxml=${WORKDIR}/tests/pytests_py${PYTHON_VERSION}_extra.xml --cov nipype --cov-report xml:${WORKDIR}/tests/coverage_py${PYTHON_VERSION}_extra.xml /src/nipype/nipype/utils/tests/test_profiler.py /src/nipype/nipype/pipeline/plugins/tests/test_multiproc*.py
exit_code=$(( $exit_code + $? ))
fi

Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_ACompCor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def test_ACompCor_inputs():
regress_poly_degree=dict(usedefault=True,
),
repetition_time=dict(),
resource_monitor=dict(nohash=True,
usedefault=True,
),
save_pre_filter=dict(),
use_regress_poly=dict(deprecated='0.15.0',
new_name='pre_filter',
Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_AddCSVRow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def test_AddCSVRow_inputs():
),
in_file=dict(mandatory=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
)
inputs = AddCSVRow.input_spec()

Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_ArtifactDetect.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def test_ArtifactDetect_inputs():
),
realignment_parameters=dict(mandatory=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
rotation_threshold=dict(mandatory=True,
xor=['norm_threshold'],
),
Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_CalculateMedian.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def test_CalculateMedian_inputs():
median_file=dict(),
median_per_file=dict(usedefault=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
)
inputs = CalculateMedian.input_spec()

Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_ComputeDVARS.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def test_ComputeDVARS_inputs():
),
remove_zerovariance=dict(usedefault=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
save_all=dict(usedefault=True,
),
save_nstd=dict(usedefault=True,
Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_ComputeMeshWarp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ def test_ComputeMeshWarp_inputs():
),
out_warp=dict(usedefault=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
surface1=dict(mandatory=True,
),
surface2=dict(mandatory=True,
Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_CreateNifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ def test_CreateNifti_inputs():
ignore_exception=dict(nohash=True,
usedefault=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
)
inputs = CreateNifti.input_spec()

Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_Distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ def test_Distance_inputs():
mask_volume=dict(),
method=dict(usedefault=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
volume1=dict(mandatory=True,
),
volume2=dict(mandatory=True,
Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_FramewiseDisplacement.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def test_FramewiseDisplacement_inputs():
),
radius=dict(usedefault=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
save_plot=dict(usedefault=True,
),
series_tr=dict(),
Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_FuzzyOverlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ def test_FuzzyOverlap_inputs():
),
out_file=dict(usedefault=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
weighting=dict(usedefault=True,
),
)
Expand Down
3 changes: 3 additions & 0 deletions nipype/algorithms/tests/test_auto_Gunzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ def test_Gunzip_inputs():
),
in_file=dict(mandatory=True,
),
resource_monitor=dict(nohash=True,
usedefault=True,
),
)
inputs = Gunzip.input_spec()

Expand Down
Loading