From f153e02be939f2685e5244912286a1b111f8330e Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Wed, 10 Feb 2021 19:13:17 -0500 Subject: [PATCH] do not assume stage execution order in queue concurrency limiting --- scheduler/queue/queue.go | 3 +- scheduler/queue/queue_test.go | 195 +++++++++++++++++++++++++++------- 2 files changed, 161 insertions(+), 37 deletions(-) diff --git a/scheduler/queue/queue.go b/scheduler/queue/queue.go index 9374d03d..72410036 100644 --- a/scheduler/queue/queue.go +++ b/scheduler/queue/queue.go @@ -266,7 +266,8 @@ func withinLimits(stage *core.Stage, siblings []*core.Stage) bool { if sibling.Name != stage.Name { continue } - if sibling.ID < stage.ID { + if sibling.ID < stage.ID || + sibling.Status == core.StatusRunning { count++ } } diff --git a/scheduler/queue/queue_test.go b/scheduler/queue/queue_test.go index 82c1238c..f15ac604 100644 --- a/scheduler/queue/queue_test.go +++ b/scheduler/queue/queue_test.go @@ -115,42 +115,6 @@ func TestQueuePush(t *testing.T) { } } -func TestWithinLimits(t *testing.T) { - tests := []struct { - ID int64 - RepoID int64 - Name string - Limit int - Want bool - }{ - {Want: true, ID: 1, RepoID: 1, Name: "foo"}, - {Want: true, ID: 2, RepoID: 2, Name: "bar", Limit: 1}, - {Want: true, ID: 3, RepoID: 1, Name: "bar", Limit: 1}, - {Want: false, ID: 4, RepoID: 1, Name: "bar", Limit: 1}, - {Want: false, ID: 5, RepoID: 1, Name: "bar", Limit: 1}, - {Want: true, ID: 6, RepoID: 1, Name: "baz", Limit: 2}, - {Want: true, ID: 7, RepoID: 1, Name: "baz", Limit: 2}, - {Want: false, ID: 8, RepoID: 1, Name: "baz", Limit: 2}, - {Want: false, ID: 9, RepoID: 1, Name: "baz", Limit: 2}, - {Want: true, ID: 10, RepoID: 1, Name: "baz", Limit: 0}, - } - var stages []*core.Stage - for _, test := range tests { - stages = append(stages, &core.Stage{ - ID: test.ID, - RepoID: test.RepoID, - Name: test.Name, - Limit: test.Limit, - }) - } - for i, test := range tests { - stage := stages[i] - if got, want := withinLimits(stage, stages), test.Want; got != want { - t.Errorf("Unexpectd results at index %d", i) - } - } -} - func TestMatchResource(t *testing.T) { tests := []struct { kinda, typea, kindb, typeb string @@ -233,3 +197,162 @@ func TestShouldThrottle(t *testing.T) { } } } + +func TestWithinLimits(t *testing.T) { + tests := []struct { + result bool + stage *core.Stage + stages []*core.Stage + }{ + // multiple stages executing for same repository and with same + // name, but no concurrency limits exist. expect true. + { + result: true, + stage: &core.Stage{ + ID: 3, RepoID: 1, Name: "build", Limit: 0, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "build", Status: "running"}, + {ID: 2, RepoID: 1, Name: "build", Status: "running"}, + {ID: 3, RepoID: 1, Name: "build", Status: "pending"}, + }, + }, + + // stage with concurrency 1, no existing stages + // exist for same repositroy id. expect true. + { + result: true, + stage: &core.Stage{ + ID: 3, RepoID: 2, Name: "build", Limit: 0, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "build", Status: "running"}, + {ID: 2, RepoID: 1, Name: "build", Status: "running"}, + {ID: 3, RepoID: 2, Name: "build", Status: "pending"}, + }, + }, + + // stage with concurrency 1, no existing stages + // exist for same stage name. expect true. + { + result: true, + stage: &core.Stage{ + ID: 3, RepoID: 1, Name: "build", Limit: 0, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "test", Status: "running"}, + {ID: 2, RepoID: 1, Name: "test", Status: "running"}, + {ID: 3, RepoID: 1, Name: "build", Status: "pending"}, + }, + }, + + // single stage with concurrency 1, no existing stages + // exist. expect true. + { + result: true, + stage: &core.Stage{ + ID: 1, RepoID: 1, Name: "build", Limit: 1, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "build", Status: "pending"}, + }, + }, + + // stage with concurrency 1, other named stages + // exist in the queue, but they come after this stage. + // expect true. + { + result: true, + stage: &core.Stage{ + ID: 1, RepoID: 1, Name: "build", Limit: 1, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "build", Status: "pending"}, + {ID: 2, RepoID: 1, Name: "build", Status: "pending"}, + }, + }, + + // stage with concurrency 1, however, stage with same + // repository and name is already executing. expect false. + { + result: false, + stage: &core.Stage{ + ID: 2, RepoID: 1, Name: "build", Limit: 1, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "build", Status: "running"}, + {ID: 2, RepoID: 1, Name: "build", Status: "pending"}, + }, + }, + + // stage with concurrency 2. one existing stage in the + // queue before this stage. expect true. + { + result: true, + stage: &core.Stage{ + ID: 2, RepoID: 1, Name: "build", Limit: 2, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "build", Status: "running"}, + {ID: 2, RepoID: 1, Name: "build", Status: "pending"}, + {ID: 3, RepoID: 1, Name: "build", Status: "pending"}, + }, + }, + + // stage with concurrency 1. stages start out of order, and the + // second named stage starts before its predecessor. Its predecessor + // should not execute. expect false. + { + result: false, + stage: &core.Stage{ + ID: 1, RepoID: 1, Name: "build", Limit: 1, + }, + stages: []*core.Stage{ + {ID: 1, RepoID: 1, Name: "build", Status: "pending"}, + {ID: 2, RepoID: 1, Name: "build", Status: "running"}, + }, + }, + } + + for i, test := range tests { + if got, want := withinLimits(test.stage, test.stages), test.result; got != want { + t.Errorf("Unexpectd results at index %d", i) + } + } +} + +func TestWithinLimits_Old(t *testing.T) { + tests := []struct { + ID int64 + RepoID int64 + Name string + Limit int + Want bool + }{ + {Want: true, ID: 1, RepoID: 1, Name: "foo"}, + {Want: true, ID: 2, RepoID: 2, Name: "bar", Limit: 1}, + {Want: true, ID: 3, RepoID: 1, Name: "bar", Limit: 1}, + {Want: false, ID: 4, RepoID: 1, Name: "bar", Limit: 1}, + {Want: false, ID: 5, RepoID: 1, Name: "bar", Limit: 1}, + {Want: true, ID: 6, RepoID: 1, Name: "baz", Limit: 2}, + {Want: true, ID: 7, RepoID: 1, Name: "baz", Limit: 2}, + {Want: false, ID: 8, RepoID: 1, Name: "baz", Limit: 2}, + {Want: false, ID: 9, RepoID: 1, Name: "baz", Limit: 2}, + {Want: true, ID: 10, RepoID: 1, Name: "baz", Limit: 0}, + } + var stages []*core.Stage + for _, test := range tests { + stages = append(stages, &core.Stage{ + ID: test.ID, + RepoID: test.RepoID, + Name: test.Name, + Limit: test.Limit, + }) + } + for i, test := range tests { + stage := stages[i] + if got, want := withinLimits(stage, stages), test.Want; got != want { + t.Errorf("Unexpectd results at index %d", i) + } + } +}