support for limiting concurrency per-repository

This commit is contained in:
Brad Rydzewski 2020-12-11 13:57:49 -05:00
parent 27b7d29c89
commit 778d0a0aff
24 changed files with 192 additions and 0 deletions

View file

@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [unreleased]
### Added
- support for repository-level concurrency limits.
## [1.10.0]
### Added
- support for starlark scripts in core.

View file

@ -54,6 +54,7 @@ type (
CancelPulls bool `json:"auto_cancel_pull_requests"`
CancelPush bool `json:"auto_cancel_pushes"`
Timeout int64 `json:"timeout"`
Throttle int64 `json:"-"`
Counter int64 `json:"counter"`
Synced int64 `json:"synced"`
Created int64 `json:"created"`

View file

@ -36,6 +36,7 @@ type (
Variant string `json:"variant,omitempty"`
Kernel string `json:"kernel,omitempty"`
Limit int `json:"limit,omitempty"`
LimitRepo int `json:"throttle,omitempty"`
Started int64 `json:"started"`
Stopped int64 `json:"stopped"`
Created int64 `json:"created"`

View file

@ -37,6 +37,7 @@ type (
CancelPulls *bool `json:"auto_cancel_pull_requests"`
CancelPush *bool `json:"auto_cancel_pushes"`
Timeout *int64 `json:"timeout"`
Throttle *int64 `json:"throttle"`
Counter *int64 `json:"counter"`
}
)
@ -105,6 +106,9 @@ func HandleUpdate(repos core.RepositoryStore) http.HandlerFunc {
if in.Timeout != nil {
repo.Timeout = *in.Timeout
}
if in.Throttle != nil {
repo.Throttle = *in.Throttle
}
if in.Counter != nil {
repo.Counter = *in.Counter
}

View file

@ -19,6 +19,8 @@ import (
"sync"
"time"
"github.com/drone/drone-go/drone"
"github.com/drone/drone/core"
)
@ -28,6 +30,7 @@ type queue struct {
ready chan struct{}
paused bool
interval time.Duration
throttle int
store core.StageStore
workers map[*worker]struct{}
ctx context.Context
@ -144,6 +147,13 @@ func (q *queue) signal(ctx context.Context) error {
continue
}
// if the system defines concurrencly limits
// per repository we need to make sure those limits
// are not exceeded before proceeding.
if shouldThrottle(item, items, item.LimitRepo) == true {
continue
}
loop:
for w := range q.workers {
// the worker must match the resource kind and type
@ -263,6 +273,38 @@ func withinLimits(stage *core.Stage, siblings []*core.Stage) bool {
return count < stage.Limit
}
func shouldThrottle(stage *core.Stage, siblings []*core.Stage, limit int) bool {
// if no throttle limit is defined (defualt) then
// return false to indicate no throttling is needed.
if limit == 0 {
return false
}
// if the repository is running it is too late
// to skip and we can exit
if stage.Status == drone.StatusRunning {
return false
}
count := 0
// loop through running stages to count number of
// running stages for the parent repository.
for _, sibling := range siblings {
// ignore stages from other repository.
if sibling.RepoID != stage.RepoID {
continue
}
// ignore this stage and stages that were
// scheduled after this stage.
if sibling.ID >= stage.ID {
continue
}
count++
}
// if the count of running stages exceeds the
// throttle limit return true.
return count >= limit
}
// matchResource is a helper function that returns
func matchResource(kinda, typea, kindb, typeb string) bool {
if kinda == "" {

View file

@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/drone/drone-go/drone"
"github.com/drone/drone/core"
"github.com/drone/drone/mock"
@ -187,3 +188,48 @@ func TestMatchResource(t *testing.T) {
}
}
}
func TestShouldThrottle(t *testing.T) {
tests := []struct {
ID int64
RepoID int64
Status string
Limit int
Want bool
}{
// repo 1: 2 running, 1 pending
{Want: false, ID: 1, RepoID: 1, Status: drone.StatusRunning, Limit: 2},
{Want: false, ID: 2, RepoID: 1, Status: drone.StatusRunning, Limit: 2},
{Want: true, ID: 3, RepoID: 1, Status: drone.StatusPending, Limit: 2},
// repo 2: 1 running, 1 pending
{Want: false, ID: 4, RepoID: 2, Status: drone.StatusRunning, Limit: 2},
{Want: false, ID: 5, RepoID: 2, Status: drone.StatusPending, Limit: 2},
// repo 3: 3 running, 1 pending
{Want: false, ID: 6, RepoID: 3, Status: drone.StatusRunning, Limit: 2},
{Want: false, ID: 7, RepoID: 3, Status: drone.StatusRunning, Limit: 2},
{Want: false, ID: 8, RepoID: 3, Status: drone.StatusRunning, Limit: 2},
{Want: true, ID: 9, RepoID: 3, Status: drone.StatusPending, Limit: 2},
// repo 4: 2 running, 1 pending, no limit
{Want: false, ID: 10, RepoID: 4, Status: drone.StatusRunning, Limit: 0},
{Want: false, ID: 11, RepoID: 4, Status: drone.StatusRunning, Limit: 0},
{Want: false, ID: 12, RepoID: 4, Status: drone.StatusPending, Limit: 0},
}
var stages []*core.Stage
for _, test := range tests {
stages = append(stages, &core.Stage{
ID: test.ID,
RepoID: test.RepoID,
Status: test.Status,
LimitRepo: test.Limit,
})
}
for i, test := range tests {
stage := stages[i]
if got, want := shouldThrottle(stage, stages, stage.LimitRepo), test.Want; got != want {
t.Errorf("Unexpectd results at index %d", i)
}
}
}

View file

@ -179,6 +179,7 @@ const stmtInsertBase = `
,repo_counter
,repo_config
,repo_timeout
,repo_throttle
,repo_trusted
,repo_protected
,repo_no_forks
@ -208,6 +209,7 @@ const stmtInsertBase = `
,:repo_counter
,:repo_config
,:repo_timeout
,:repo_throttle
,:repo_trusted
,:repo_protected
,:repo_no_forks

View file

@ -237,6 +237,7 @@ const stmtInsertBase = `
,repo_counter
,repo_config
,repo_timeout
,repo_throttle
,repo_trusted
,repo_protected
,repo_no_forks
@ -266,6 +267,7 @@ const stmtInsertBase = `
,:repo_counter
,:repo_config
,:repo_timeout
,:repo_throttle
,:repo_trusted
,:repo_protected
,:repo_no_forks

View file

@ -668,6 +668,7 @@ INSERT INTO stages (
,stage_errignore
,stage_exit_code
,stage_limit
,stage_limit_repo
,stage_os
,stage_arch
,stage_variant
@ -694,6 +695,7 @@ INSERT INTO stages (
,:stage_errignore
,:stage_exit_code
,:stage_limit
,:stage_limit_repo
,:stage_os
,:stage_arch
,:stage_variant

View file

@ -81,6 +81,7 @@ func toStageParams(stage *core.Stage) map[string]interface{} {
"stage_errignore": stage.ErrIgnore,
"stage_exit_code": stage.ExitCode,
"stage_limit": stage.Limit,
"stage_limit_repo": stage.LimitRepo,
"stage_os": stage.OS,
"stage_arch": stage.Arch,
"stage_variant": stage.Variant,

View file

@ -283,6 +283,7 @@ SELECT
,repo_counter
,repo_config
,repo_timeout
,repo_throttle
,repo_trusted
,repo_protected
,repo_no_forks
@ -378,6 +379,7 @@ INSERT INTO repos (
,repo_counter
,repo_config
,repo_timeout
,repo_throttle
,repo_trusted
,repo_protected
,repo_no_forks
@ -407,6 +409,7 @@ INSERT INTO repos (
,:repo_counter
,:repo_config
,:repo_timeout
,:repo_throttle
,:repo_trusted
,:repo_protected
,:repo_no_forks
@ -461,6 +464,7 @@ UPDATE repos SET
,repo_cancel_pulls = :repo_cancel_pulls
,repo_cancel_push = :repo_cancel_push
,repo_timeout = :repo_timeout
,repo_throttle = :repo_throttle
,repo_counter = :repo_counter
,repo_synced = :repo_synced
,repo_created = :repo_created

View file

@ -47,6 +47,7 @@ func ToParams(v *core.Repository) map[string]interface{} {
"repo_cancel_pulls": v.CancelPulls,
"repo_cancel_push": v.CancelPush,
"repo_timeout": v.Timeout,
"repo_throttle": v.Throttle,
"repo_counter": v.Counter,
"repo_synced": v.Synced,
"repo_created": v.Created,
@ -78,6 +79,7 @@ func scanRow(scanner db.Scanner, dest *core.Repository) error {
&dest.Counter,
&dest.Config,
&dest.Timeout,
&dest.Throttle,
&dest.Trusted,
&dest.Protected,
&dest.IgnoreForks,
@ -132,6 +134,7 @@ func scanRowBuild(scanner db.Scanner, dest *core.Repository) error {
&dest.Counter,
&dest.Config,
&dest.Timeout,
&dest.Throttle,
&dest.Trusted,
&dest.Protected,
&dest.IgnoreForks,

View file

@ -32,6 +32,10 @@ var migrations = []struct {
name: "alter-table-repos-add-column-cancel-push",
stmt: alterTableReposAddColumnCancelPush,
},
{
name: "alter-table-repos-add-column-throttle",
stmt: alterTableReposAddColumnThrottle,
},
{
name: "create-table-perms",
stmt: createTablePerms,
@ -88,6 +92,10 @@ var migrations = []struct {
name: "create-trigger-stage-update",
stmt: createTriggerStageUpdate,
},
{
name: "alter-table-stages-add-column-limit-repos",
stmt: alterTableStagesAddColumnLimitRepos,
},
{
name: "create-table-steps",
stmt: createTableSteps,
@ -302,6 +310,10 @@ var alterTableReposAddColumnCancelPush = `
ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false;
`
var alterTableReposAddColumnThrottle = `
ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0;
`
//
// 003_create_table_perms.sql
//
@ -458,6 +470,10 @@ BEGIN
END;
`
var alterTableStagesAddColumnLimitRepos = `
ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0;
`
//
// 006_create_table_steps.sql
//

View file

@ -45,3 +45,7 @@ ALTER TABLE repos ADD COLUMN repo_cancel_pulls BOOLEAN NOT NULL DEFAULT false;
-- name: alter-table-repos-add-column-cancel-push
ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false;
-- name: alter-table-repos-add-column-throttle
ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0;

View file

@ -61,3 +61,7 @@ BEGIN
DELETE FROM stages_unfinished WHERE stage_id = OLD.stage_id;
END IF;
END;
-- name: alter-table-stages-add-column-limit-repos
ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0;

View file

@ -32,6 +32,10 @@ var migrations = []struct {
name: "alter-table-repos-add-column-cancel-push",
stmt: alterTableReposAddColumnCancelPush,
},
{
name: "alter-table-repos-add-column-throttle",
stmt: alterTableReposAddColumnThrottle,
},
{
name: "create-table-perms",
stmt: createTablePerms,
@ -84,6 +88,10 @@ var migrations = []struct {
name: "create-index-stages-status",
stmt: createIndexStagesStatus,
},
{
name: "alter-table-stages-add-column-limit-repos",
stmt: alterTableStagesAddColumnLimitRepos,
},
{
name: "create-table-steps",
stmt: createTableSteps,
@ -298,6 +306,10 @@ var alterTableReposAddColumnCancelPush = `
ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false;
`
var alterTableReposAddColumnThrottle = `
ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0;
`
//
// 003_create_table_perms.sql
//
@ -436,6 +448,10 @@ CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status)
WHERE stage_status IN ('pending', 'running');
`
var alterTableStagesAddColumnLimitRepos = `
ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0;
`
//
// 006_create_table_steps.sql
//

View file

@ -45,3 +45,7 @@ ALTER TABLE repos ADD COLUMN repo_cancel_pulls BOOLEAN NOT NULL DEFAULT false;
-- name: alter-table-repos-add-column-cancel-push
ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT false;
-- name: alter-table-repos-add-column-throttle
ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0;

View file

@ -38,3 +38,7 @@ CREATE INDEX IF NOT EXISTS ix_stages_build ON stages (stage_build_id);
CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status)
WHERE stage_status IN ('pending', 'running');
-- name: alter-table-stages-add-column-limit-repos
ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0;

View file

@ -32,6 +32,10 @@ var migrations = []struct {
name: "alter-table-repos-add-column-cancel-push",
stmt: alterTableReposAddColumnCancelPush,
},
{
name: "alter-table-repos-add-column-throttle",
stmt: alterTableReposAddColumnThrottle,
},
{
name: "create-table-perms",
stmt: createTablePerms,
@ -84,6 +88,10 @@ var migrations = []struct {
name: "create-index-stages-status",
stmt: createIndexStagesStatus,
},
{
name: "alter-table-stages-add-column-limit-repos",
stmt: alterTableStagesAddColumnLimitRepos,
},
{
name: "create-table-steps",
stmt: createTableSteps,
@ -298,6 +306,10 @@ var alterTableReposAddColumnCancelPush = `
ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT 0;
`
var alterTableReposAddColumnThrottle = `
ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0;
`
//
// 003_create_table_perms.sql
//
@ -436,6 +448,10 @@ CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status)
WHERE stage_status IN ('pending', 'running');
`
var alterTableStagesAddColumnLimitRepos = `
ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0;
`
//
// 006_create_table_steps.sql
//

View file

@ -45,3 +45,7 @@ ALTER TABLE repos ADD COLUMN repo_cancel_pulls BOOLEAN NOT NULL DEFAULT 0;
-- name: alter-table-repos-add-column-cancel-push
ALTER TABLE repos ADD COLUMN repo_cancel_push BOOLEAN NOT NULL DEFAULT 0;
-- name: alter-table-repos-add-column-throttle
ALTER TABLE repos ADD COLUMN repo_throttle INTEGER NOT NULL DEFAULT 0;

View file

@ -39,3 +39,7 @@ CREATE INDEX IF NOT EXISTS ix_stages_build ON stages (stage_build_id);
CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status)
WHERE stage_status IN ('pending', 'running');
-- name: alter-table-stages-add-column-limit-repos
ALTER TABLE stages ADD COLUMN stage_limit_repo INTEGER NOT NULL DEFAULT 0;

View file

@ -40,6 +40,7 @@ func toParams(stage *core.Stage) map[string]interface{} {
"stage_errignore": stage.ErrIgnore,
"stage_exit_code": stage.ExitCode,
"stage_limit": stage.Limit,
"stage_limit_repo": stage.LimitRepo,
"stage_os": stage.OS,
"stage_arch": stage.Arch,
"stage_variant": stage.Variant,
@ -85,6 +86,7 @@ func scanRow(scanner db.Scanner, dest *core.Stage) error {
&dest.ErrIgnore,
&dest.ExitCode,
&dest.Limit,
&dest.LimitRepo,
&dest.OS,
&dest.Arch,
&dest.Variant,
@ -123,6 +125,7 @@ func scanRowStep(scanner db.Scanner, stage *core.Stage, step *nullStep) error {
&stage.ErrIgnore,
&stage.ExitCode,
&stage.Limit,
&stage.LimitRepo,
&stage.OS,
&stage.Arch,
&stage.Variant,

View file

@ -229,6 +229,7 @@ SELECT
,stage_errignore
,stage_exit_code
,stage_limit
,stage_limit_repo
,stage_os
,stage_arch
,stage_variant
@ -296,6 +297,7 @@ SELECT
,stage_errignore
,stage_exit_code
,stage_limit
,stage_limit_repo
,stage_os
,stage_arch
,stage_variant
@ -368,6 +370,7 @@ INSERT INTO stages (
,stage_errignore
,stage_exit_code
,stage_limit
,stage_limit_repo
,stage_os
,stage_arch
,stage_variant
@ -394,6 +397,7 @@ INSERT INTO stages (
,:stage_errignore
,:stage_exit_code
,:stage_limit
,:stage_limit_repo
,:stage_os
,:stage_arch
,:stage_variant

View file

@ -394,6 +394,7 @@ func (t *triggerer) Trigger(ctx context.Context, repo *core.Repository, base *co
Variant: match.Platform.Variant,
Kernel: match.Platform.Version,
Limit: match.Concurrency.Limit,
LimitRepo: int(repo.Throttle),
Status: core.StatusWaiting,
DependsOn: match.DependsOn,
OnSuccess: onSuccess,