Skip to content

Commit 0372982

Browse files
Fix double connect on nested workflows
1 parent 771ab93 commit 0372982

File tree

2 files changed

+43
-5
lines changed

2 files changed

+43
-5
lines changed

nipype/pipeline/engine/tests/test_workflows.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,22 @@ def test_doubleconnect():
8383
assert "Trying to connect" in str(excinfo.value)
8484

8585

86+
def test_nested_workflow_doubleconnect():
87+
# double input with nested workflows
88+
a = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="a")
89+
b = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="b")
90+
c = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="c")
91+
flow1 = pe.Workflow(name="test1")
92+
flow2 = pe.Workflow(name="test2")
93+
flow3 = pe.Workflow(name="test3")
94+
flow1.add_nodes([b])
95+
flow2.connect(a, "a", flow1, "b.a")
96+
with pytest.raises(Exception) as excinfo:
97+
flow3.connect(c, "a", flow2, "test1.b.a")
98+
assert "Some connections were not found" in str(excinfo.value)
99+
flow3.connect(c, "b", flow2, "test1.b.b")
100+
101+
86102
def test_duplicate_node_check():
87103

88104
wf = pe.Workflow(name="testidentity")

nipype/pipeline/engine/workflows.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -771,11 +771,25 @@ def _has_attr(self, parameter, subtype="in"):
771771
"""Checks if a parameter is available as an input or output
772772
"""
773773
hierarchy = parameter.split(".")
774+
775+
# Connecting to a workflow needs at least two values,
776+
# the name of the child node and the name of the input/output
777+
if len(hierarchy) < 2:
778+
return False
779+
774780
attrname = hierarchy.pop()
775781
nodename = hierarchy.pop()
776782

783+
def _check_is_already_connected(workflow, node, attrname):
784+
for _, _, d in workflow._graph.in_edges(nbunch=node, data=True):
785+
for cd in d["connect"]:
786+
if attrname == cd[1]:
787+
return False
788+
return True
789+
777790
targetworkflow = self
778-
for workflowname in hierarchy:
791+
while hierarchy:
792+
workflowname = hierarchy.pop(0)
779793
workflow = None
780794
for node in targetworkflow._graph.nodes():
781795
if node.name == workflowname:
@@ -784,6 +798,13 @@ def _has_attr(self, parameter, subtype="in"):
784798
break
785799
if workflow is None:
786800
return False
801+
# Verify input does not already have an incoming connection
802+
# in the hierarchy of workflows
803+
if subtype == "in":
804+
hierattrname = ".".join(hierarchy + [nodename, attrname])
805+
if not _check_is_already_connected(
806+
targetworkflow, workflow, hierattrname):
807+
return False
787808
targetworkflow = workflow
788809

789810
targetnode = None
@@ -804,11 +825,12 @@ def _has_attr(self, parameter, subtype="in"):
804825
if not hasattr(targetnode.outputs, attrname):
805826
return False
806827

828+
# Verify input does not already have an incoming connection
829+
# in the target workflow
807830
if subtype == "in":
808-
for _, _, d in targetworkflow._graph.in_edges(nbunch=targetnode, data=True):
809-
for cd in d["connect"]:
810-
if attrname == cd[1]:
811-
return False
831+
if not _check_is_already_connected(
832+
targetworkflow, targetnode, attrname):
833+
return False
812834

813835
return True
814836

0 commit comments

Comments
 (0)