Skip to content

Commit 47ba1ef

Browse files
committed
[FIX] status_callback not called with stop_on_first_crash
Some tests were randomly failing in Travis for the Linear plugin. My suspiction is that some other test changed the configuration of ``stop_on_first_crash`` and depending on the ordering tests were actually run, this test would sometimes fail, apparently at random. The tests have been expanded to test also LegacyMultiProc and to check under both conditions (stop_on_first_crash on/off).
1 parent 0fde422 commit 47ba1ef

File tree

2 files changed

+35
-71
lines changed

2 files changed

+35
-71
lines changed

nipype/pipeline/plugins/linear.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,32 @@ def run(self, graph, config, updatehash=False):
3636
donotrun = []
3737
nodes, _ = topological_sort(graph)
3838
for node in nodes:
39+
endstatus = 'end'
3940
try:
4041
if node in donotrun:
4142
continue
4243
if self._status_callback:
4344
self._status_callback(node, 'start')
4445
node.run(updatehash=updatehash)
45-
if self._status_callback:
46-
self._status_callback(node, 'end')
4746
except:
48-
os.chdir(old_wd)
47+
endstatus = 'exception'
4948
# bare except, but i really don't know where a
5049
# node might fail
5150
crashfile = report_crash(node)
5251
if str2bool(config['execution']['stop_on_first_crash']):
5352
raise
5453
# remove dependencies from queue
5554
subnodes = [s for s in dfs_preorder(graph, node)]
56-
notrun.append(
57-
dict(node=node, dependents=subnodes, crashfile=crashfile))
55+
notrun.append({'node': node, 'dependents': subnodes,
56+
'crashfile': crashfile})
5857
donotrun.extend(subnodes)
58+
# Delay raising the crash until we cleaned the house
59+
if str2bool(config['execution']['stop_on_first_crash']):
60+
report_nodes_not_run(notrun) # report before raising
61+
raise
62+
finally:
63+
# Return wherever we were before
64+
os.chdir(old_wd)
5965
if self._status_callback:
60-
self._status_callback(node, 'exception')
66+
self._status_callback(node, endstatus)
6167
report_nodes_not_run(notrun)

nipype/pipeline/plugins/tests/test_callback.py

+23-65
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
from builtins import object
88

9+
from time import sleep
910
import pytest
10-
import sys
1111
import nipype.interfaces.utility as niu
1212
import nipype.pipeline.engine as pe
1313

@@ -25,10 +25,11 @@ def __init__(self):
2525
self.statuses = []
2626

2727
def callback(self, node, status, result=None):
28-
self.statuses.append((node, status))
28+
self.statuses.append((node.name, status))
2929

3030

31-
def test_callback_normal(tmpdir):
31+
@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc'])
32+
def test_callback_normal(tmpdir, plugin):
3233
tmpdir.chdir()
3334

3435
so = Status()
@@ -37,16 +38,17 @@ def test_callback_normal(tmpdir):
3738
niu.Function(function=func, input_names=[], output_names=[]),
3839
name='f_node')
3940
wf.add_nodes([f_node])
40-
wf.config['execution'] = {'crashdump_dir': wf.base_dir}
41-
wf.run(plugin="Linear", plugin_args={'status_callback': so.callback})
42-
assert len(so.statuses) == 2
43-
for (n, s) in so.statuses:
44-
assert n.name == 'f_node'
45-
assert so.statuses[0][1] == 'start'
46-
assert so.statuses[1][1] == 'end'
41+
wf.config['execution'] = {
42+
'crashdump_dir': wf.base_dir,
43+
'poll_sleep_duration': 2
44+
}
45+
wf.run(plugin=plugin, plugin_args={'status_callback': so.callback})
46+
assert so.statuses == [('f_node', 'start'), ('f_node', 'end')]
4747

4848

49-
def test_callback_exception(tmpdir):
49+
@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc'])
50+
@pytest.mark.parametrize("stop_on_first_crash", [False, True])
51+
def test_callback_exception(tmpdir, plugin, stop_on_first_crash):
5052
tmpdir.chdir()
5153

5254
so = Status()
@@ -55,57 +57,13 @@ def test_callback_exception(tmpdir):
5557
niu.Function(function=bad_func, input_names=[], output_names=[]),
5658
name='f_node')
5759
wf.add_nodes([f_node])
58-
wf.config['execution'] = {'crashdump_dir': wf.base_dir}
59-
try:
60-
wf.run(plugin="Linear", plugin_args={'status_callback': so.callback})
61-
except:
62-
pass
63-
assert len(so.statuses) == 2
64-
for (n, s) in so.statuses:
65-
assert n.name == 'f_node'
66-
assert so.statuses[0][1] == 'start'
67-
assert so.statuses[1][1] == 'exception'
68-
69-
70-
def test_callback_multiproc_normal(tmpdir):
71-
tmpdir.chdir()
72-
73-
so = Status()
74-
wf = pe.Workflow(name='test', base_dir=tmpdir.strpath)
75-
f_node = pe.Node(
76-
niu.Function(function=func, input_names=[], output_names=[]),
77-
name='f_node')
78-
wf.add_nodes([f_node])
79-
wf.config['execution']['crashdump_dir'] = wf.base_dir
80-
wf.config['execution']['poll_sleep_duration'] = 2
81-
wf.run(plugin='MultiProc', plugin_args={'status_callback': so.callback})
82-
assert len(so.statuses) == 2
83-
for (n, s) in so.statuses:
84-
assert n.name == 'f_node'
85-
assert so.statuses[0][1] == 'start'
86-
assert so.statuses[1][1] == 'end'
87-
88-
89-
def test_callback_multiproc_exception(tmpdir):
90-
tmpdir.chdir()
91-
92-
so = Status()
93-
wf = pe.Workflow(name='test', base_dir=tmpdir.strpath)
94-
f_node = pe.Node(
95-
niu.Function(function=bad_func, input_names=[], output_names=[]),
96-
name='f_node')
97-
wf.add_nodes([f_node])
98-
wf.config['execution'] = {'crashdump_dir': wf.base_dir}
99-
100-
try:
101-
wf.run(
102-
plugin='MultiProc', plugin_args={
103-
'status_callback': so.callback
104-
})
105-
except:
106-
pass
107-
assert len(so.statuses) == 2
108-
for (n, s) in so.statuses:
109-
assert n.name == 'f_node'
110-
assert so.statuses[0][1] == 'start'
111-
assert so.statuses[1][1] == 'exception'
60+
wf.config['execution'] = {
61+
'crashdump_dir': wf.base_dir,
62+
'stop_on_first_crash': stop_on_first_crash,
63+
'poll_sleep_duration': 2
64+
}
65+
with pytest.raises(Exception):
66+
wf.run(plugin=plugin, plugin_args={'status_callback': so.callback})
67+
68+
sleep(0.5) # Wait for callback to be called (python 2.7)
69+
assert so.statuses == [('f_node', 'start'), ('f_node', 'exception')]

0 commit comments

Comments
 (0)