diff --git a/cmd/drone-server/inject_scheduler.go b/cmd/drone-server/inject_scheduler.go index efc28d9f..fa3337a1 100644 --- a/cmd/drone-server/inject_scheduler.go +++ b/cmd/drone-server/inject_scheduler.go @@ -15,10 +15,10 @@ package main import ( - "github.com/drone/drone/cmd/drone-server/config" "github.com/drone/drone/core" "github.com/drone/drone/scheduler/queue" + "github.com/go-redis/redis/v8" "github.com/google/wire" ) @@ -29,6 +29,6 @@ var schedulerSet = wire.NewSet( // provideScheduler is a Wire provider function that returns a // scheduler based on the environment configuration. -func provideScheduler(store core.StageStore, config config.Config) core.Scheduler { - return queue.New(store) +func provideScheduler(store core.StageStore, rdb *redis.Client) core.Scheduler { + return queue.New(store, rdb) } diff --git a/cmd/drone-server/wire_gen.go b/cmd/drone-server/wire_gen.go index ebc2f883..295605f2 100644 --- a/cmd/drone-server/wire_gen.go +++ b/cmd/drone-server/wire_gen.go @@ -61,7 +61,7 @@ func InitializeApplication(config2 config.Config) (application, error) { } corePubsub := pubsub.New(redisClient) stageStore := provideStageStore(db) - scheduler := provideScheduler(stageStore, config2) + scheduler := provideScheduler(stageStore, redisClient) statusService := provideStatusService(client, renewer, config2) stepStore := step.New(db) system := provideSystem(config2) diff --git a/pubsub/redis.go b/pubsub/redis.go index 8a2b2b36..9fef153c 100644 --- a/pubsub/redis.go +++ b/pubsub/redis.go @@ -27,12 +27,10 @@ import ( ) func newRedis(rdb *redis.Client) core.Pubsub { - return &hubRedis{ - rdb: rdb, - } + return &hubRedis{rdb: rdb} } -const channelPubSub = "drone-events" +const redisPubSubEvents = "drone-events" type hubRedis struct { rdb *redis.Client @@ -44,7 +42,7 @@ func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) { return } - _, err = h.rdb.Publish(ctx, channelPubSub, data).Result() + _, err = h.rdb.Publish(ctx, redisPubSubEvents, data).Result() if err != nil { return } @@ -53,22 +51,22 @@ func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) { } func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) { - messageCh := make(chan *core.Message, 100) - errCh := make(chan error) + chMessage := make(chan *core.Message, 100) + chErr := make(chan error) go func() { - pubsub := h.rdb.Subscribe(ctx, channelPubSub) + pubsub := h.rdb.Subscribe(ctx, redisPubSubEvents) ch := pubsub.Channel(redis.WithChannelSize(100)) defer func() { _ = pubsub.Close() - close(messageCh) - close(errCh) + close(chMessage) + close(chErr) }() err := pubsub.Ping(ctx) if err != nil { - errCh <- err + chErr <- err return } @@ -76,7 +74,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan select { case m, ok := <-ch: if !ok { - errCh <- fmt.Errorf("pubsub/redis: channel=%s closed", channelPubSub) + chErr <- fmt.Errorf("pubsub/redis: channel=%s closed", redisPubSubEvents) return } @@ -89,7 +87,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan continue } - messageCh <- message + chMessage <- message case <-ctx.Done(): return @@ -97,14 +95,14 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan } }() - return messageCh, errCh + return chMessage, chErr } func (h *hubRedis) Subscribers() (int, error) { ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFunc() - v, err := h.rdb.Do(ctx, "pubsub", "numsub", channelPubSub).Result() + v, err := h.rdb.Do(ctx, "pubsub", "numsub", redisPubSubEvents).Result() if err != nil { err = fmt.Errorf("pubsub/redis: failed to get number of subscribers. %w", err) return 0, err diff --git a/scheduler/queue/redis.go b/scheduler/queue/redis.go new file mode 100644 index 00000000..f7507a64 --- /dev/null +++ b/scheduler/queue/redis.go @@ -0,0 +1,141 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/go-redis/redis/v8" +) + +const ( + redisPubSubCancel = "drone-cancel" + redisCancelValuePrefix = "drone-cancel-" + redisCancelValueTimeout = 5 * time.Minute + redisCancelValue = "canceled" +) + +func newRedisCanceller(rdb *redis.Client) *redisCanceller { + return &redisCanceller{rdb: rdb} +} + +type redisCanceller struct { + rdb *redis.Client +} + +func (c *redisCanceller) Cancel(ctx context.Context, id int64) (err error) { + ids := strconv.FormatInt(id, 10) + + // publish a cancel event to all subscribers (runners) waiting to + _, err = c.rdb.Publish(ctx, redisPubSubCancel, ids).Result() + if err != nil { + return + } + + // put a limited duration value in case a runner isn't listening currently. + _, err = c.rdb.Set(ctx, redisCancelValuePrefix+ids, redisCancelValue, redisCancelValueTimeout).Result() + if err != nil { + return + } + + return nil +} + +func (c *redisCanceller) Cancelled(ctx context.Context, id int64) (isCancelled bool, err error) { + ids := strconv.FormatInt(id, 10) + + // first check if the build is already cancelled + + result, err := c.rdb.Get(ctx, redisCancelValuePrefix+ids).Result() + if err != nil && err != redis.Nil { + fmt.Println("GOT AN ERROR", err.Error()) ////////////////////////////////////////////////////// + return + } + + isCancelled = err != redis.Nil && result == redisCancelValue + if isCancelled { + fmt.Println("BUILD IS CANCELED", result) //////////////////////////////////////////////// + return + } + + // if it is not cancelled, subscribe and listen to cancel build events + // until the context is cancelled or until the build is cancelled. + + chResult := make(chan interface{}) + + go func() { + pubsub := c.rdb.Subscribe(ctx, redisPubSubCancel) + ch := pubsub.Channel() + + defer func() { + _ = pubsub.Close() + close(chResult) + }() + + err := pubsub.Ping(ctx) + if err != nil { + chResult <- err + return + } + + for { + select { + case m, ok := <-ch: + if !ok { + chResult <- fmt.Errorf("canceller/redis: channel=%s closed", redisPubSubCancel) + return + } + + idMessage, err := strconv.ParseInt(m.Payload, 10, 64) + if err != nil { // should not happen + _, _ = fmt.Fprintf(os.Stderr, "canceller/redis: message is not an integer: %s\n", m.Payload) + continue // ignore data errors + } + + if id == idMessage { + fmt.Println("BUILD HAS JUST BEEN CANCELED", m.Payload) //////////////////////////////////////////////// + chResult <- true + } else { ////////////////////// + fmt.Println("SOMETHING ELSE IS CANCELLED", m.Payload) //////////////////////////////////////////////// + } ///////////////////////////// + + case <-ctx.Done(): + return + } + } + }() + + value, ok := <-chResult + + if !ok { + return + } + + err, ok = value.(error) + if ok { + return + } + + isCancelled, ok = value.(bool) + if ok { + return + } + + return +} diff --git a/scheduler/queue/scheduler.go b/scheduler/queue/scheduler.go index d0a4c9d9..2ab5c5de 100644 --- a/scheduler/queue/scheduler.go +++ b/scheduler/queue/scheduler.go @@ -19,6 +19,8 @@ import ( "errors" "github.com/drone/drone/core" + + "github.com/go-redis/redis/v8" ) type scheduler struct { @@ -26,14 +28,30 @@ type scheduler struct { *canceller } +type redisScheduler struct { + *queue + *redisCanceller +} + // New creates a new scheduler. -func New(store core.StageStore) core.Scheduler { - return &scheduler{ +func New(store core.StageStore, rdb *redis.Client) core.Scheduler { + if rdb != nil { + return redisScheduler{ + queue: newQueue(store), + redisCanceller: newRedisCanceller(rdb), + } + } + + return scheduler{ queue: newQueue(store), canceller: newCanceller(), } } -func (d *scheduler) Stats(context.Context) (interface{}, error) { +func (d scheduler) Stats(context.Context) (interface{}, error) { + return nil, errors.New("not implemented") +} + +func (d redisScheduler) Stats(context.Context) (interface{}, error) { return nil, errors.New("not implemented") }