Skip to content

Commit 9f0ca09

Browse files
authored
Merge pull request #2810 from oesteban/fix/test-callback
[FIX] ``status_callback`` not called with ``stop_on_first_crash``
2 parents 0fde422 + 491c123 commit 9f0ca09

File tree

2 files changed

+36
-71
lines changed

2 files changed

+36
-71
lines changed

nipype/pipeline/plugins/linear.py

+13-6
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,33 @@ def run(self, graph, config, updatehash=False):
3636
donotrun = []
3737
nodes, _ = topological_sort(graph)
3838
for node in nodes:
39+
endstatus = 'end'
3940
try:
4041
if node in donotrun:
4142
continue
4243
if self._status_callback:
4344
self._status_callback(node, 'start')
4445
node.run(updatehash=updatehash)
45-
if self._status_callback:
46-
self._status_callback(node, 'end')
4746
except:
48-
os.chdir(old_wd)
47+
endstatus = 'exception'
4948
# bare except, but i really don't know where a
5049
# node might fail
5150
crashfile = report_crash(node)
5251
if str2bool(config['execution']['stop_on_first_crash']):
5352
raise
5453
# remove dependencies from queue
5554
subnodes = [s for s in dfs_preorder(graph, node)]
56-
notrun.append(
57-
dict(node=node, dependents=subnodes, crashfile=crashfile))
55+
notrun.append({'node': node, 'dependents': subnodes,
56+
'crashfile': crashfile})
5857
donotrun.extend(subnodes)
58+
# Delay raising the crash until we cleaned the house
59+
if str2bool(config['execution']['stop_on_first_crash']):
60+
os.chdir(old_wd) # Return wherever we were before
61+
report_nodes_not_run(notrun) # report before raising
62+
raise
63+
finally:
5964
if self._status_callback:
60-
self._status_callback(node, 'exception')
65+
self._status_callback(node, endstatus)
66+
67+
os.chdir(old_wd) # Return wherever we were before
6168
report_nodes_not_run(notrun)

nipype/pipeline/plugins/tests/test_callback.py

+23-65
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
from builtins import object
88

9+
from time import sleep
910
import pytest
10-
import sys
1111
import nipype.interfaces.utility as niu
1212
import nipype.pipeline.engine as pe
1313

@@ -25,10 +25,11 @@ def __init__(self):
2525
self.statuses = []
2626

2727
def callback(self, node, status, result=None):
28-
self.statuses.append((node, status))
28+
self.statuses.append((node.name, status))
2929

3030

31-
def test_callback_normal(tmpdir):
31+
@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc'])
32+
def test_callback_normal(tmpdir, plugin):
3233
tmpdir.chdir()
3334

3435
so = Status()
@@ -37,16 +38,17 @@ def test_callback_normal(tmpdir):
3738
niu.Function(function=func, input_names=[], output_names=[]),
3839
name='f_node')
3940
wf.add_nodes([f_node])
40-
wf.config['execution'] = {'crashdump_dir': wf.base_dir}
41-
wf.run(plugin="Linear", plugin_args={'status_callback': so.callback})
42-
assert len(so.statuses) == 2
43-
for (n, s) in so.statuses:
44-
assert n.name == 'f_node'
45-
assert so.statuses[0][1] == 'start'
46-
assert so.statuses[1][1] == 'end'
41+
wf.config['execution'] = {
42+
'crashdump_dir': wf.base_dir,
43+
'poll_sleep_duration': 2
44+
}
45+
wf.run(plugin=plugin, plugin_args={'status_callback': so.callback})
46+
assert so.statuses == [('f_node', 'start'), ('f_node', 'end')]
4747

4848

49-
def test_callback_exception(tmpdir):
49+
@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc'])
50+
@pytest.mark.parametrize("stop_on_first_crash", [False, True])
51+
def test_callback_exception(tmpdir, plugin, stop_on_first_crash):
5052
tmpdir.chdir()
5153

5254
so = Status()
@@ -55,57 +57,13 @@ def test_callback_exception(tmpdir):
5557
niu.Function(function=bad_func, input_names=[], output_names=[]),
5658
name='f_node')
5759
wf.add_nodes([f_node])
58-
wf.config['execution'] = {'crashdump_dir': wf.base_dir}
59-
try:
60-
wf.run(plugin="Linear", plugin_args={'status_callback': so.callback})
61-
except:
62-
pass
63-
assert len(so.statuses) == 2
64-
for (n, s) in so.statuses:
65-
assert n.name == 'f_node'
66-
assert so.statuses[0][1] == 'start'
67-
assert so.statuses[1][1] == 'exception'
68-
69-
70-
def test_callback_multiproc_normal(tmpdir):
71-
tmpdir.chdir()
72-
73-
so = Status()
74-
wf = pe.Workflow(name='test', base_dir=tmpdir.strpath)
75-
f_node = pe.Node(
76-
niu.Function(function=func, input_names=[], output_names=[]),
77-
name='f_node')
78-
wf.add_nodes([f_node])
79-
wf.config['execution']['crashdump_dir'] = wf.base_dir
80-
wf.config['execution']['poll_sleep_duration'] = 2
81-
wf.run(plugin='MultiProc', plugin_args={'status_callback': so.callback})
82-
assert len(so.statuses) == 2
83-
for (n, s) in so.statuses:
84-
assert n.name == 'f_node'
85-
assert so.statuses[0][1] == 'start'
86-
assert so.statuses[1][1] == 'end'
87-
88-
89-
def test_callback_multiproc_exception(tmpdir):
90-
tmpdir.chdir()
91-
92-
so = Status()
93-
wf = pe.Workflow(name='test', base_dir=tmpdir.strpath)
94-
f_node = pe.Node(
95-
niu.Function(function=bad_func, input_names=[], output_names=[]),
96-
name='f_node')
97-
wf.add_nodes([f_node])
98-
wf.config['execution'] = {'crashdump_dir': wf.base_dir}
99-
100-
try:
101-
wf.run(
102-
plugin='MultiProc', plugin_args={
103-
'status_callback': so.callback
104-
})
105-
except:
106-
pass
107-
assert len(so.statuses) == 2
108-
for (n, s) in so.statuses:
109-
assert n.name == 'f_node'
110-
assert so.statuses[0][1] == 'start'
111-
assert so.statuses[1][1] == 'exception'
60+
wf.config['execution'] = {
61+
'crashdump_dir': wf.base_dir,
62+
'stop_on_first_crash': stop_on_first_crash,
63+
'poll_sleep_duration': 2
64+
}
65+
with pytest.raises(Exception):
66+
wf.run(plugin=plugin, plugin_args={'status_callback': so.callback})
67+
68+
sleep(0.5) # Wait for callback to be called (python 2.7)
69+
assert so.statuses == [('f_node', 'start'), ('f_node', 'exception')]

0 commit comments

Comments
 (0)