redis canceller

This commit is contained in:
Marko Gaćeša 2021-08-02 18:47:58 +02:00
parent b91d6c4b9b
commit 3067a16de0
5 changed files with 179 additions and 22 deletions

View file

@ -15,10 +15,10 @@
package main package main
import ( import (
"github.com/drone/drone/cmd/drone-server/config"
"github.com/drone/drone/core" "github.com/drone/drone/core"
"github.com/drone/drone/scheduler/queue" "github.com/drone/drone/scheduler/queue"
"github.com/go-redis/redis/v8"
"github.com/google/wire" "github.com/google/wire"
) )
@ -29,6 +29,6 @@ var schedulerSet = wire.NewSet(
// provideScheduler is a Wire provider function that returns a // provideScheduler is a Wire provider function that returns a
// scheduler based on the environment configuration. // scheduler based on the environment configuration.
func provideScheduler(store core.StageStore, config config.Config) core.Scheduler { func provideScheduler(store core.StageStore, rdb *redis.Client) core.Scheduler {
return queue.New(store) return queue.New(store, rdb)
} }

View file

@ -61,7 +61,7 @@ func InitializeApplication(config2 config.Config) (application, error) {
} }
corePubsub := pubsub.New(redisClient) corePubsub := pubsub.New(redisClient)
stageStore := provideStageStore(db) stageStore := provideStageStore(db)
scheduler := provideScheduler(stageStore, config2) scheduler := provideScheduler(stageStore, redisClient)
statusService := provideStatusService(client, renewer, config2) statusService := provideStatusService(client, renewer, config2)
stepStore := step.New(db) stepStore := step.New(db)
system := provideSystem(config2) system := provideSystem(config2)

View file

@ -27,12 +27,10 @@ import (
) )
func newRedis(rdb *redis.Client) core.Pubsub { func newRedis(rdb *redis.Client) core.Pubsub {
return &hubRedis{ return &hubRedis{rdb: rdb}
rdb: rdb,
}
} }
const channelPubSub = "drone-events" const redisPubSubEvents = "drone-events"
type hubRedis struct { type hubRedis struct {
rdb *redis.Client rdb *redis.Client
@ -44,7 +42,7 @@ func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) {
return return
} }
_, err = h.rdb.Publish(ctx, channelPubSub, data).Result() _, err = h.rdb.Publish(ctx, redisPubSubEvents, data).Result()
if err != nil { if err != nil {
return return
} }
@ -53,22 +51,22 @@ func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) {
} }
func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) { func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) {
messageCh := make(chan *core.Message, 100) chMessage := make(chan *core.Message, 100)
errCh := make(chan error) chErr := make(chan error)
go func() { go func() {
pubsub := h.rdb.Subscribe(ctx, channelPubSub) pubsub := h.rdb.Subscribe(ctx, redisPubSubEvents)
ch := pubsub.Channel(redis.WithChannelSize(100)) ch := pubsub.Channel(redis.WithChannelSize(100))
defer func() { defer func() {
_ = pubsub.Close() _ = pubsub.Close()
close(messageCh) close(chMessage)
close(errCh) close(chErr)
}() }()
err := pubsub.Ping(ctx) err := pubsub.Ping(ctx)
if err != nil { if err != nil {
errCh <- err chErr <- err
return return
} }
@ -76,7 +74,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan
select { select {
case m, ok := <-ch: case m, ok := <-ch:
if !ok { if !ok {
errCh <- fmt.Errorf("pubsub/redis: channel=%s closed", channelPubSub) chErr <- fmt.Errorf("pubsub/redis: channel=%s closed", redisPubSubEvents)
return return
} }
@ -89,7 +87,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan
continue continue
} }
messageCh <- message chMessage <- message
case <-ctx.Done(): case <-ctx.Done():
return return
@ -97,14 +95,14 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan
} }
}() }()
return messageCh, errCh return chMessage, chErr
} }
func (h *hubRedis) Subscribers() (int, error) { func (h *hubRedis) Subscribers() (int, error) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc() defer cancelFunc()
v, err := h.rdb.Do(ctx, "pubsub", "numsub", channelPubSub).Result() v, err := h.rdb.Do(ctx, "pubsub", "numsub", redisPubSubEvents).Result()
if err != nil { if err != nil {
err = fmt.Errorf("pubsub/redis: failed to get number of subscribers. %w", err) err = fmt.Errorf("pubsub/redis: failed to get number of subscribers. %w", err)
return 0, err return 0, err

141
scheduler/queue/redis.go Normal file
View file

@ -0,0 +1,141 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"context"
"fmt"
"os"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
const (
redisPubSubCancel = "drone-cancel"
redisCancelValuePrefix = "drone-cancel-"
redisCancelValueTimeout = 5 * time.Minute
redisCancelValue = "canceled"
)
func newRedisCanceller(rdb *redis.Client) *redisCanceller {
return &redisCanceller{rdb: rdb}
}
type redisCanceller struct {
rdb *redis.Client
}
func (c *redisCanceller) Cancel(ctx context.Context, id int64) (err error) {
ids := strconv.FormatInt(id, 10)
// publish a cancel event to all subscribers (runners) waiting to
_, err = c.rdb.Publish(ctx, redisPubSubCancel, ids).Result()
if err != nil {
return
}
// put a limited duration value in case a runner isn't listening currently.
_, err = c.rdb.Set(ctx, redisCancelValuePrefix+ids, redisCancelValue, redisCancelValueTimeout).Result()
if err != nil {
return
}
return nil
}
func (c *redisCanceller) Cancelled(ctx context.Context, id int64) (isCancelled bool, err error) {
ids := strconv.FormatInt(id, 10)
// first check if the build is already cancelled
result, err := c.rdb.Get(ctx, redisCancelValuePrefix+ids).Result()
if err != nil && err != redis.Nil {
fmt.Println("GOT AN ERROR", err.Error()) //////////////////////////////////////////////////////
return
}
isCancelled = err != redis.Nil && result == redisCancelValue
if isCancelled {
fmt.Println("BUILD IS CANCELED", result) ////////////////////////////////////////////////
return
}
// if it is not cancelled, subscribe and listen to cancel build events
// until the context is cancelled or until the build is cancelled.
chResult := make(chan interface{})
go func() {
pubsub := c.rdb.Subscribe(ctx, redisPubSubCancel)
ch := pubsub.Channel()
defer func() {
_ = pubsub.Close()
close(chResult)
}()
err := pubsub.Ping(ctx)
if err != nil {
chResult <- err
return
}
for {
select {
case m, ok := <-ch:
if !ok {
chResult <- fmt.Errorf("canceller/redis: channel=%s closed", redisPubSubCancel)
return
}
idMessage, err := strconv.ParseInt(m.Payload, 10, 64)
if err != nil { // should not happen
_, _ = fmt.Fprintf(os.Stderr, "canceller/redis: message is not an integer: %s\n", m.Payload)
continue // ignore data errors
}
if id == idMessage {
fmt.Println("BUILD HAS JUST BEEN CANCELED", m.Payload) ////////////////////////////////////////////////
chResult <- true
} else { //////////////////////
fmt.Println("SOMETHING ELSE IS CANCELLED", m.Payload) ////////////////////////////////////////////////
} /////////////////////////////
case <-ctx.Done():
return
}
}
}()
value, ok := <-chResult
if !ok {
return
}
err, ok = value.(error)
if ok {
return
}
isCancelled, ok = value.(bool)
if ok {
return
}
return
}

View file

@ -19,6 +19,8 @@ import (
"errors" "errors"
"github.com/drone/drone/core" "github.com/drone/drone/core"
"github.com/go-redis/redis/v8"
) )
type scheduler struct { type scheduler struct {
@ -26,14 +28,30 @@ type scheduler struct {
*canceller *canceller
} }
type redisScheduler struct {
*queue
*redisCanceller
}
// New creates a new scheduler. // New creates a new scheduler.
func New(store core.StageStore) core.Scheduler { func New(store core.StageStore, rdb *redis.Client) core.Scheduler {
return &scheduler{ if rdb != nil {
return redisScheduler{
queue: newQueue(store),
redisCanceller: newRedisCanceller(rdb),
}
}
return scheduler{
queue: newQueue(store), queue: newQueue(store),
canceller: newCanceller(), canceller: newCanceller(),
} }
} }
func (d *scheduler) Stats(context.Context) (interface{}, error) { func (d scheduler) Stats(context.Context) (interface{}, error) {
return nil, errors.New("not implemented")
}
func (d redisScheduler) Stats(context.Context) (interface{}, error) {
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }