Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit 5a90754

Browse files
authored
Merge pull request #603 from erizocosmico/fix/process-subquery
sql/analyzer: remove QueryProcess nodes from subqueries
2 parents 92013a1 + fce9fa6 commit 5a90754

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

sql/analyzer/process.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,19 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
7171
return n, nil
7272
}
7373

74-
return plan.NewQueryProcess(n, func() { processList.Done(ctx.Pid()) }), nil
74+
// Remove QueryProcess nodes from the subqueries. Otherwise, the process
75+
// will be marked as done as soon as a subquery finishes.
76+
node, err := n.TransformUp(func(n sql.Node) (sql.Node, error) {
77+
if sq, ok := n.(*plan.SubqueryAlias); ok {
78+
if qp, ok := sq.Child.(*plan.QueryProcess); ok {
79+
return plan.NewSubqueryAlias(sq.Name(), qp.Child), nil
80+
}
81+
}
82+
return n, nil
83+
})
84+
if err != nil {
85+
return nil, err
86+
}
87+
88+
return plan.NewQueryProcess(node, func() { processList.Done(ctx.Pid()) }), nil
7589
}

sql/analyzer/process_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,37 @@ func TestTrackProcess(t *testing.T) {
7070
}
7171
}
7272

73+
func TestTrackProcessSubquery(t *testing.T) {
74+
require := require.New(t)
75+
rule := getRuleFrom(OnceAfterAll, "track_process")
76+
catalog := sql.NewCatalog()
77+
a := NewDefault(catalog)
78+
79+
node := plan.NewProject(
80+
nil,
81+
plan.NewSubqueryAlias("f",
82+
plan.NewQueryProcess(
83+
plan.NewResolvedTable(mem.NewTable("foo", nil)),
84+
nil,
85+
),
86+
),
87+
)
88+
89+
result, err := rule.Apply(sql.NewEmptyContext(), a, node)
90+
require.NoError(err)
91+
92+
expectedChild := plan.NewProject(
93+
nil,
94+
plan.NewSubqueryAlias("f",
95+
plan.NewResolvedTable(mem.NewTable("foo", nil)),
96+
),
97+
)
98+
99+
proc, ok := result.(*plan.QueryProcess)
100+
require.True(ok)
101+
require.Equal(expectedChild, proc.Child)
102+
}
103+
73104
func withoutProcessTracking(a *Analyzer) *Analyzer {
74105
afterAll := a.Batches[len(a.Batches)-1]
75106
afterAll.Rules = afterAll.Rules[1:]

0 commit comments

Comments
 (0)