From 778d0a0aff2053faffbf5b174ebf07ed71257eb5 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Fri, 11 Dec 2020 13:57:49 -0500 Subject: [PATCH] support for limiting concurrency per-repository --- CHANGELOG.md | 4 ++ core/repo.go | 1 + core/stage.go | 1 + handler/api/repos/update.go | 4 ++ scheduler/queue/queue.go | 42 +++++++++++++++++ scheduler/queue/queue_test.go | 46 +++++++++++++++++++ store/batch/batch.go | 2 + store/batch2/batch.go | 2 + store/build/build.go | 2 + store/build/scan.go | 1 + store/repos/repos.go | 4 ++ store/repos/scan.go | 3 ++ store/shared/migrate/mysql/ddl_gen.go | 16 +++++++ .../mysql/files/002_create_table_repos.sql | 4 ++ .../mysql/files/005_create_table_stages.sql | 4 ++ store/shared/migrate/postgres/ddl_gen.go | 16 +++++++ .../postgres/files/002_create_table_repos.sql | 4 ++ .../files/005_create_table_stages.sql | 4 ++ store/shared/migrate/sqlite/ddl_gen.go | 16 +++++++ .../sqlite/files/002_create_table_repos.sql | 4 ++ .../sqlite/files/005_create_table_stages.sql | 4 ++ store/stage/scan.go | 3 ++ store/stage/stage.go | 4 ++ trigger/trigger.go | 1 + 24 files changed, 192 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93125f0a..c0f5b3e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [unreleased] +### Added +- support for repository-level concurrency limits. + ## [1.10.0] ### Added - support for starlark scripts in core. diff --git a/core/repo.go b/core/repo.go index 1a5c3400..b602d760 100644 --- a/core/repo.go +++ b/core/repo.go @@ -54,6 +54,7 @@ type ( CancelPulls bool `json:"auto_cancel_pull_requests"` CancelPush bool `json:"auto_cancel_pushes"` Timeout int64 `json:"timeout"` + Throttle int64 `json:"-"` Counter int64 `json:"counter"` Synced int64 `json:"synced"` Created int64 `json:"created"` diff --git a/core/stage.go b/core/stage.go index 38bf38b8..c4ca9471 100644 --- a/core/stage.go +++ b/core/stage.go @@ -36,6 +36,7 @@ type ( Variant string `json:"variant,omitempty"` Kernel string `json:"kernel,omitempty"` Limit int `json:"limit,omitempty"` + LimitRepo int `json:"throttle,omitempty"` Started int64 `json:"started"` Stopped int64 `json:"stopped"` Created int64 `json:"created"` diff --git a/handler/api/repos/update.go b/handler/api/repos/update.go index dad37f6e..489f4fcb 100644 --- a/handler/api/repos/update.go +++ b/handler/api/repos/update.go @@ -37,6 +37,7 @@ type ( CancelPulls *bool `json:"auto_cancel_pull_requests"` CancelPush *bool `json:"auto_cancel_pushes"` Timeout *int64 `json:"timeout"` + Throttle *int64 `json:"throttle"` Counter *int64 `json:"counter"` } ) @@ -105,6 +106,9 @@ func HandleUpdate(repos core.RepositoryStore) http.HandlerFunc { if in.Timeout != nil { repo.Timeout = *in.Timeout } + if in.Throttle != nil { + repo.Throttle = *in.Throttle + } if in.Counter != nil { repo.Counter = *in.Counter } diff --git a/scheduler/queue/queue.go b/scheduler/queue/queue.go index a849b4c8..9374d03d 100644 --- a/scheduler/queue/queue.go +++ b/scheduler/queue/queue.go @@ -19,6 +19,8 @@ import ( "sync" "time" + "github.com/drone/drone-go/drone" + "github.com/drone/drone/core" ) @@ -28,6 +30,7 @@ type queue struct { ready chan struct{} paused bool interval time.Duration + throttle int store core.StageStore workers map[*worker]struct{} ctx context.Context @@ -144,6 +147,13 @@ func (q *queue) signal(ctx context.Context) error { continue } + // if the system defines concurrencly limits + // per repository we need to make sure those limits + // are not exceeded before proceeding. + if shouldThrottle(item, items, item.LimitRepo) == true { + continue + } + loop: for w := range q.workers { // the worker must match the resource kind and type @@ -263,6 +273,38 @@ func withinLimits(stage *core.Stage, siblings []*core.Stage) bool { return count < stage.Limit } +func shouldThrottle(stage *core.Stage, siblings []*core.Stage, limit int) bool { + // if no throttle limit is defined (defualt) then + // return false to indicate no throttling is needed. + if limit == 0 { + return false + } + // if the repository is running it is too late + // to skip and we can exit + if stage.Status == drone.StatusRunning { + return false + } + + count := 0 + // loop through running stages to count number of + // running stages for the parent repository. + for _, sibling := range siblings { + // ignore stages from other repository. + if sibling.RepoID != stage.RepoID { + continue + } + // ignore this stage and stages that were + // scheduled after this stage. + if sibling.ID >= stage.ID { + continue + } + count++ + } + // if the count of running stages exceeds the + // throttle limit return true. + return count >= limit +} + // matchResource is a helper function that returns func matchResource(kinda, typea, kindb, typeb string) bool { if kinda == "" { diff --git a/scheduler/queue/queue_test.go b/scheduler/queue/queue_test.go index 2a1de66b..82c1238c 100644 --- a/scheduler/queue/queue_test.go +++ b/scheduler/queue/queue_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/drone/drone-go/drone" "github.com/drone/drone/core" "github.com/drone/drone/mock" @@ -187,3 +188,48 @@ func TestMatchResource(t *testing.T) { } } } + +func TestShouldThrottle(t *testing.T) { + tests := []struct { + ID int64 + RepoID int64 + Status string + Limit int + Want bool + }{ + // repo 1: 2 running, 1 pending + {Want: false, ID: 1, RepoID: 1, Status: drone.StatusRunning, Limit: 2}, + {Want: false, ID: 2, RepoID: 1, Status: drone.StatusRunning, Limit: 2}, + {Want: true, ID: 3, RepoID: 1, Status: drone.StatusPending, Limit: 2}, + + // repo 2: 1 running, 1 pending + {Want: false, ID: 4, RepoID: 2, Status: drone.StatusRunning, Limit: 2}, + {Want: false, ID: 5, RepoID: 2, Status: drone.StatusPending, Limit: 2}, + + // repo 3: 3 running, 1 pending + {Want: false, ID: 6, RepoID: 3, Status: drone.StatusRunning, Limit: 2}, + {Want: false, ID: 7, RepoID: 3, Status: drone.StatusRunning, Limit: 2}, + {Want: false, ID: 8, RepoID: 3, Status: drone.StatusRunning, Limit: 2}, + {Want: true, ID: 9, RepoID: 3, Status: drone.StatusPending, Limit: 2}, + + // repo 4: 2 running, 1 pending, no limit + {Want: false, ID: 10, RepoID: 4, Status: drone.StatusRunning, Limit: 0}, + {Want: false, ID: 11, RepoID: 4, Status: drone.StatusRunning, Limit: 0}, + {Want: false, ID: 12, RepoID: 4, Status: drone.StatusPending, Limit: 0}, + } + var stages []*core.Stage + for _, test := range tests { + stages = append(stages, &core.Stage{ + ID: test.ID, + RepoID: test.RepoID, + Status: test.Status, + LimitRepo: test.Limit, + }) + } + for i, test := range tests { + stage := stages[i] + if got, want := shouldThrottle(stage, stages, stage.LimitRepo), test.Want; got != want { + t.Errorf("Unexpectd results at index %d", i) + } + } +} diff --git a/store/batch/batch.go b/store/batch/batch.go index deea623c..849b92a1 100644 --- a/store/batch/batch.go +++ b/store/batch/batch.go @@ -179,6 +179,7 @@ const stmtInsertBase = ` ,repo_counter ,repo_config ,repo_timeout +,repo_throttle ,repo_trusted ,repo_protected ,repo_no_forks @@ -208,6 +209,7 @@ const stmtInsertBase = ` ,:repo_counter ,:repo_config ,:repo_timeout +,:repo_throttle ,:repo_trusted ,:repo_protected ,:repo_no_forks diff --git a/store/batch2/batch.go b/store/batch2/batch.go index 67cae8f3..741de770 100644 --- a/store/batch2/batch.go +++ b/store/batch2/batch.go @@ -237,6 +237,7 @@ const stmtInsertBase = ` ,repo_counter ,repo_config ,repo_timeout +,repo_throttle ,repo_trusted ,repo_protected ,repo_no_forks @@ -266,6 +267,7 @@ const stmtInsertBase = ` ,:repo_counter ,:repo_config ,:repo_timeout +,:repo_throttle ,:repo_trusted ,:repo_protected ,:repo_no_forks diff --git a/store/build/build.go b/store/build/build.go index 7ad71f79..824b5907 100644 --- a/store/build/build.go +++ b/store/build/build.go @@ -668,6 +668,7 @@ INSERT INTO stages ( ,stage_errignore ,stage_exit_code ,stage_limit +,stage_limit_repo ,stage_os ,stage_arch ,stage_variant @@ -694,6 +695,7 @@ INSERT INTO stages ( ,:stage_errignore ,:stage_exit_code ,:stage_limit +,:stage_limit_repo ,:stage_os ,:stage_arch ,:stage_variant diff --git a/store/build/scan.go b/store/build/scan.go index ad53dbbf..4377ed28 100644 --- a/store/build/scan.go +++ b/store/build/scan.go @@ -81,6 +81,7 @@ func toStageParams(stage *core.Stage) map[string]interface{} { "stage_errignore": stage.ErrIgnore, "stage_exit_code": stage.ExitCode, "stage_limit": stage.Limit, + "stage_limit_repo": stage.LimitRepo, "stage_os": stage.OS, "stage_arch": stage.Arch, "stage_variant": stage.Variant, diff --git a/store/repos/repos.go b/store/repos/repos.go index 22539913..53c32519 100644 --- a/store/repos/repos.go +++ b/store/repos/repos.go @@ -283,6 +283,7 @@ SELECT ,repo_counter ,repo_config ,repo_timeout +,repo_throttle ,repo_trusted ,repo_protected ,repo_no_forks @@ -378,6 +379,7 @@ INSERT INTO repos ( ,repo_counter ,repo_config ,repo_timeout +,repo_throttle ,repo_trusted ,repo_protected ,repo_no_forks @@ -407,6 +409,7 @@ INSERT INTO repos ( ,:repo_counter ,:repo_config ,:repo_timeout +,:repo_throttle ,:repo_trusted ,:repo_protected ,:repo_no_forks @@ -461,6 +464,7 @@ UPDATE repos SET ,repo_cancel_pulls = :repo_cancel_pulls ,repo_cancel_push = :repo_cancel_push ,repo_timeout = :repo_timeout +,repo_throttle = :repo_throttle ,repo_counter = :repo_counter ,repo_synced = :repo_synced ,repo_created = :repo_created diff --git a/store/repos/scan.go b/store/repos/scan.go index 8fe1ecc5..a6efd37e 100644 --- a/store/repos/scan.go +++ b/store/repos/scan.go @@ -47,6 +47,7 @@ func ToParams(v *core.Repository) map[string]interface{} { "repo_cancel_pulls": v.CancelPulls, "repo_cancel_push": v.CancelPush, "repo_timeout": v.Timeout, + "repo_throttle": v.Throttle, "repo_counter": v.Counter, "repo_synced": v.Synced, "repo_created": v.Created, @@ -78,6 +79,7 @@ func scanRow(scanner db.Scanner, dest *core.Repository) error { &dest.Counter, &dest.Config, &dest.Timeout, + &dest.Throttle, &dest.Trusted, &dest.Protected, &dest.IgnoreForks, @@ -132,6 +134,7 @@ func scanRowBuild(scanner db.Scanner, dest *core.Repository) error { &dest.Counter, &dest.Config, &dest.Timeout, + &dest.Throttle, &dest.Trusted, &dest.Protected, &dest.IgnoreForks, diff --git a/store/shared/migrate/mysql/ddl_gen.go b/store/shared/migrate/mysql/ddl_gen.go index 5425c34f..4adf8fa7 100644 --- a/store/shared/migrate/mysql/ddl_gen.go +++ b/store/shared/migrate/mysql/ddl_gen.go @@ -32,6 +32,10 @@ var migrations = []struct { name: "alter-table-repos-add-column-cancel-push", stmt: alterTableReposAddColumnCancelPush, }, + { + name: "alter-table-repos-add-column-throttle", + stmt: alterTableReposAddColumnThrottle, + }, { name: "create-table-perms", stmt: createTablePerms, @@ -88,6 +92,10 @@ var migrations = []struct { name: "create-trigger-stage-update", stmt: createTriggerStageUpdate, }, + { + name: "alter-table-stages-add-column-limit-repos", + stmt: alterTableStagesAddColumnLimitRepos, + }, { name: "create-table-steps", stmt: createTableSteps, @@ -302,6 +310,10 @@ var alterTableReposAddColumnCancelPush = ` ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false; ` +var alterTableReposAddColumnThrottle = ` +ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0; +` + // // 003_create_table_perms.sql // @@ -458,6 +470,10 @@ BEGIN END; ` +var alterTableStagesAddColumnLimitRepos = ` +ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0; +` + // // 006_create_table_steps.sql // diff --git a/store/shared/migrate/mysql/files/002_create_table_repos.sql b/store/shared/migrate/mysql/files/002_create_table_repos.sql index acf8b777..73132b57 100644 --- a/store/shared/migrate/mysql/files/002_create_table_repos.sql +++ b/store/shared/migrate/mysql/files/002_create_table_repos.sql @@ -45,3 +45,7 @@ ALTER TABLE repos ADD COLUMN repo_cancel_pulls BOOLEAN NOT NULL DEFAULT false; -- name: alter-table-repos-add-column-cancel-push ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false; + +-- name: alter-table-repos-add-column-throttle + +ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0; diff --git a/store/shared/migrate/mysql/files/005_create_table_stages.sql b/store/shared/migrate/mysql/files/005_create_table_stages.sql index 6d8eb11a..f835f02c 100644 --- a/store/shared/migrate/mysql/files/005_create_table_stages.sql +++ b/store/shared/migrate/mysql/files/005_create_table_stages.sql @@ -61,3 +61,7 @@ BEGIN DELETE FROM stages_unfinished WHERE stage_id = OLD.stage_id; END IF; END; + +-- name: alter-table-stages-add-column-limit-repos + +ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0; diff --git a/store/shared/migrate/postgres/ddl_gen.go b/store/shared/migrate/postgres/ddl_gen.go index 452fc233..7c1cbf56 100644 --- a/store/shared/migrate/postgres/ddl_gen.go +++ b/store/shared/migrate/postgres/ddl_gen.go @@ -32,6 +32,10 @@ var migrations = []struct { name: "alter-table-repos-add-column-cancel-push", stmt: alterTableReposAddColumnCancelPush, }, + { + name: "alter-table-repos-add-column-throttle", + stmt: alterTableReposAddColumnThrottle, + }, { name: "create-table-perms", stmt: createTablePerms, @@ -84,6 +88,10 @@ var migrations = []struct { name: "create-index-stages-status", stmt: createIndexStagesStatus, }, + { + name: "alter-table-stages-add-column-limit-repos", + stmt: alterTableStagesAddColumnLimitRepos, + }, { name: "create-table-steps", stmt: createTableSteps, @@ -298,6 +306,10 @@ var alterTableReposAddColumnCancelPush = ` ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false; ` +var alterTableReposAddColumnThrottle = ` +ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0; +` + // // 003_create_table_perms.sql // @@ -436,6 +448,10 @@ CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status) WHERE stage_status IN ('pending', 'running'); ` +var alterTableStagesAddColumnLimitRepos = ` +ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0; +` + // // 006_create_table_steps.sql // diff --git a/store/shared/migrate/postgres/files/002_create_table_repos.sql b/store/shared/migrate/postgres/files/002_create_table_repos.sql index 16722196..f0a95e0b 100644 --- a/store/shared/migrate/postgres/files/002_create_table_repos.sql +++ b/store/shared/migrate/postgres/files/002_create_table_repos.sql @@ -45,3 +45,7 @@ ALTER TABLE repos ADD COLUMN repo_cancel_pulls BOOLEAN NOT NULL DEFAULT false; -- name: alter-table-repos-add-column-cancel-push ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false; + +-- name: alter-table-repos-add-column-throttle + +ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0; diff --git a/store/shared/migrate/postgres/files/005_create_table_stages.sql b/store/shared/migrate/postgres/files/005_create_table_stages.sql index 6a775409..32505d3c 100644 --- a/store/shared/migrate/postgres/files/005_create_table_stages.sql +++ b/store/shared/migrate/postgres/files/005_create_table_stages.sql @@ -38,3 +38,7 @@ CREATE INDEX IF NOT EXISTS ix_stages_build ON stages (stage_build_id); CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status) WHERE stage_status IN ('pending', 'running'); + +-- name: alter-table-stages-add-column-limit-repos + +ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0; diff --git a/store/shared/migrate/sqlite/ddl_gen.go b/store/shared/migrate/sqlite/ddl_gen.go index c7e86b03..be13f6d6 100644 --- a/store/shared/migrate/sqlite/ddl_gen.go +++ b/store/shared/migrate/sqlite/ddl_gen.go @@ -32,6 +32,10 @@ var migrations = []struct { name: "alter-table-repos-add-column-cancel-push", stmt: alterTableReposAddColumnCancelPush, }, + { + name: "alter-table-repos-add-column-throttle", + stmt: alterTableReposAddColumnThrottle, + }, { name: "create-table-perms", stmt: createTablePerms, @@ -84,6 +88,10 @@ var migrations = []struct { name: "create-index-stages-status", stmt: createIndexStagesStatus, }, + { + name: "alter-table-stages-add-column-limit-repos", + stmt: alterTableStagesAddColumnLimitRepos, + }, { name: "create-table-steps", stmt: createTableSteps, @@ -298,6 +306,10 @@ var alterTableReposAddColumnCancelPush = ` ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT 0; ` +var alterTableReposAddColumnThrottle = ` +ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0; +` + // // 003_create_table_perms.sql // @@ -436,6 +448,10 @@ CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status) WHERE stage_status IN ('pending', 'running'); ` +var alterTableStagesAddColumnLimitRepos = ` +ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0; +` + // // 006_create_table_steps.sql // diff --git a/store/shared/migrate/sqlite/files/002_create_table_repos.sql b/store/shared/migrate/sqlite/files/002_create_table_repos.sql index 93340d01..70e0180a 100644 --- a/store/shared/migrate/sqlite/files/002_create_table_repos.sql +++ b/store/shared/migrate/sqlite/files/002_create_table_repos.sql @@ -45,3 +45,7 @@ ALTER TABLE repos ADD COLUMN repo_cancel_pulls BOOLEAN NOT NULL DEFAULT 0; -- name: alter-table-repos-add-column-cancel-push ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT 0; + +-- name: alter-table-repos-add-column-throttle + +ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0; diff --git a/store/shared/migrate/sqlite/files/005_create_table_stages.sql b/store/shared/migrate/sqlite/files/005_create_table_stages.sql index 16c91453..2c5d1b81 100644 --- a/store/shared/migrate/sqlite/files/005_create_table_stages.sql +++ b/store/shared/migrate/sqlite/files/005_create_table_stages.sql @@ -39,3 +39,7 @@ CREATE INDEX IF NOT EXISTS ix_stages_build ON stages (stage_build_id); CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status) WHERE stage_status IN ('pending', 'running'); + +-- name: alter-table-stages-add-column-limit-repos + +ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0; diff --git a/store/stage/scan.go b/store/stage/scan.go index cee0051e..a488bc2b 100644 --- a/store/stage/scan.go +++ b/store/stage/scan.go @@ -40,6 +40,7 @@ func toParams(stage *core.Stage) map[string]interface{} { "stage_errignore": stage.ErrIgnore, "stage_exit_code": stage.ExitCode, "stage_limit": stage.Limit, + "stage_limit_repo": stage.LimitRepo, "stage_os": stage.OS, "stage_arch": stage.Arch, "stage_variant": stage.Variant, @@ -85,6 +86,7 @@ func scanRow(scanner db.Scanner, dest *core.Stage) error { &dest.ErrIgnore, &dest.ExitCode, &dest.Limit, + &dest.LimitRepo, &dest.OS, &dest.Arch, &dest.Variant, @@ -123,6 +125,7 @@ func scanRowStep(scanner db.Scanner, stage *core.Stage, step *nullStep) error { &stage.ErrIgnore, &stage.ExitCode, &stage.Limit, + &stage.LimitRepo, &stage.OS, &stage.Arch, &stage.Variant, diff --git a/store/stage/stage.go b/store/stage/stage.go index 33a40715..fbf496d8 100644 --- a/store/stage/stage.go +++ b/store/stage/stage.go @@ -229,6 +229,7 @@ SELECT ,stage_errignore ,stage_exit_code ,stage_limit +,stage_limit_repo ,stage_os ,stage_arch ,stage_variant @@ -296,6 +297,7 @@ SELECT ,stage_errignore ,stage_exit_code ,stage_limit +,stage_limit_repo ,stage_os ,stage_arch ,stage_variant @@ -368,6 +370,7 @@ INSERT INTO stages ( ,stage_errignore ,stage_exit_code ,stage_limit +,stage_limit_repo ,stage_os ,stage_arch ,stage_variant @@ -394,6 +397,7 @@ INSERT INTO stages ( ,:stage_errignore ,:stage_exit_code ,:stage_limit +,:stage_limit_repo ,:stage_os ,:stage_arch ,:stage_variant diff --git a/trigger/trigger.go b/trigger/trigger.go index 078552b2..18db04d0 100644 --- a/trigger/trigger.go +++ b/trigger/trigger.go @@ -394,6 +394,7 @@ func (t *triggerer) Trigger(ctx context.Context, repo *core.Repository, base *co Variant: match.Platform.Variant, Kernel: match.Platform.Version, Limit: match.Concurrency.Limit, + LimitRepo: int(repo.Throttle), Status: core.StatusWaiting, DependsOn: match.DependsOn, OnSuccess: onSuccess,