h/a stuff is !oss, simplified redis pubsub subscribe

This commit is contained in:
Marko Gaćeša 2021-08-09 11:56:33 +02:00
parent d2ba72ad40
commit fb29636608
17 changed files with 653 additions and 357 deletions

View file

@ -15,12 +15,9 @@
package main package main
import ( import (
"context"
"fmt"
"github.com/drone/drone/cmd/drone-server/config" "github.com/drone/drone/cmd/drone-server/config"
"github.com/drone/drone/service/redisdb"
"github.com/go-redis/redis/v8"
"github.com/google/wire" "github.com/google/wire"
) )
@ -29,23 +26,6 @@ var externalSet = wire.NewSet(
provideRedisClient, provideRedisClient,
) )
func provideRedisClient(config config.Config) (rdb *redis.Client, err error) { func provideRedisClient(config config.Config) (rdb redisdb.RedisDB, err error) {
if config.Redis.ConnectionString == "" { return redisdb.New(config)
return
}
options, err := redis.ParseURL(config.Redis.ConnectionString)
if err != nil {
return
}
rdb = redis.NewClient(options)
_, err = rdb.Ping(context.Background()).Result()
if err != nil {
err = fmt.Errorf("redis not accessibe: %w", err)
return
}
return
} }

View file

@ -17,8 +17,8 @@ package main
import ( import (
"github.com/drone/drone/core" "github.com/drone/drone/core"
"github.com/drone/drone/scheduler/queue" "github.com/drone/drone/scheduler/queue"
"github.com/drone/drone/service/redisdb"
"github.com/go-redis/redis/v8"
"github.com/google/wire" "github.com/google/wire"
) )
@ -29,6 +29,6 @@ var schedulerSet = wire.NewSet(
// provideScheduler is a Wire provider function that returns a // provideScheduler is a Wire provider function that returns a
// scheduler based on the environment configuration. // scheduler based on the environment configuration.
func provideScheduler(store core.StageStore, rdb *redis.Client) core.Scheduler { func provideScheduler(store core.StageStore, r redisdb.RedisDB) core.Scheduler {
return queue.New(store, rdb) return queue.New(store, r)
} }

View file

@ -55,13 +55,13 @@ func InitializeApplication(config2 config.Config) (application, error) {
cronStore := cron.New(db) cronStore := cron.New(db)
repositoryStore := provideRepoStore(db) repositoryStore := provideRepoStore(db)
buildStore := provideBuildStore(db) buildStore := provideBuildStore(db)
redisClient, err := provideRedisClient(config2) redisDB, err := provideRedisClient(config2)
if err != nil { if err != nil {
return application{}, err return application{}, err
} }
corePubsub := pubsub.New(redisClient) corePubsub := pubsub.New(redisDB)
stageStore := provideStageStore(db) stageStore := provideStageStore(db)
scheduler := provideScheduler(stageStore, redisClient) scheduler := provideScheduler(stageStore, redisDB)
statusService := provideStatusService(client, renewer, config2) statusService := provideStatusService(client, renewer, config2)
stepStore := step.New(db) stepStore := step.New(db)
system := provideSystem(config2) system := provideSystem(config2)
@ -78,7 +78,7 @@ func InitializeApplication(config2 config.Config) (application, error) {
coreLicense := provideLicense(client, config2) coreLicense := provideLicense(client, config2)
datadog := provideDatadog(userStore, repositoryStore, buildStore, system, coreLicense, config2) datadog := provideDatadog(userStore, repositoryStore, buildStore, system, coreLicense, config2)
logStore := provideLogStore(db, config2) logStore := provideLogStore(db, config2)
logStream := livelog.New(redisClient) logStream := livelog.New(redisDB)
netrcService := provideNetrcService(client, renewer, config2) netrcService := provideNetrcService(client, renewer, config2)
secretStore := secret.New(db, encrypter) secretStore := secret.New(db, encrypter)
globalSecretStore := global.New(db, encrypter) globalSecretStore := global.New(db, encrypter)

View file

@ -12,19 +12,20 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// +build !oss
package livelog package livelog
import ( import (
"github.com/drone/drone/core" "github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
"github.com/go-redis/redis/v8"
) )
// New creates a new log streamer. If Redis client passed as parameter is not nil it uses // New creates a new log streamer. If Redis client passed as parameter is not nil it uses
// a Redis implementation, otherwise it uses an in-memory implementation. // a Redis implementation, otherwise it uses an in-memory implementation.
func New(rdb *redis.Client) core.LogStream { func New(rdb redisdb.RedisDB) core.LogStream {
if rdb != nil { if rdb != nil {
return newRedis(rdb) return newStreamRedis(rdb)
} }
return newStreamer() return newStreamer()

27
livelog/livelogOSS.go Normal file
View file

@ -0,0 +1,27 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build oss
package livelog
import (
"github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
)
// New creates a new in-memory log streamer.
func New(r redisdb.RedisDB) core.LogStream {
return newStreamer()
}

View file

@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// +build !oss
package livelog package livelog
import ( import (
@ -22,13 +24,14 @@ import (
"time" "time"
"github.com/drone/drone/core" "github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
func newRedis(rdb *redis.Client) core.LogStream { func newStreamRedis(r redisdb.RedisDB) core.LogStream {
return &redisStream{ return streamRedis{
client: rdb, rdb: r,
} }
} }
@ -40,18 +43,20 @@ const (
redisStreamPrefix = "drone-log-" redisStreamPrefix = "drone-log-"
) )
type redisStream struct { type streamRedis struct {
client redis.Cmdable rdb redisdb.RedisDB
} }
// Create creates a redis stream and sets an expiry on it. // Create creates a redis stream and sets an expiry on it.
func (r *redisStream) Create(ctx context.Context, id int64) error { func (r streamRedis) Create(ctx context.Context, id int64) error {
// Delete if a stream already exists with the same key // Delete if a stream already exists with the same key
_ = r.Delete(ctx, id) _ = r.Delete(ctx, id)
client := r.rdb.Client()
key := redisStreamPrefix + strconv.FormatInt(id, 10) key := redisStreamPrefix + strconv.FormatInt(id, 10)
addResp := r.client.XAdd(ctx, &redis.XAddArgs{ addResp := client.XAdd(ctx, &redis.XAddArgs{
Stream: key, Stream: key,
ID: "*", // auto-generate a unique incremental ID ID: "*", // auto-generate a unique incremental ID
MaxLen: bufferSize, MaxLen: bufferSize,
@ -62,7 +67,7 @@ func (r *redisStream) Create(ctx context.Context, id int64) error {
return fmt.Errorf("livelog/redis: could not create stream with key %s", key) return fmt.Errorf("livelog/redis: could not create stream with key %s", key)
} }
res := r.client.Expire(ctx, key, redisKeyExpiryTime) res := client.Expire(ctx, key, redisKeyExpiryTime)
if err := res.Err(); err != nil { if err := res.Err(); err != nil {
return fmt.Errorf("livelog/redis: could not set expiry for key %s", key) return fmt.Errorf("livelog/redis: could not set expiry for key %s", key)
} }
@ -71,14 +76,16 @@ func (r *redisStream) Create(ctx context.Context, id int64) error {
} }
// Delete deletes a stream // Delete deletes a stream
func (r *redisStream) Delete(ctx context.Context, id int64) error { func (r streamRedis) Delete(ctx context.Context, id int64) error {
client := r.rdb.Client()
key := redisStreamPrefix + strconv.FormatInt(id, 10) key := redisStreamPrefix + strconv.FormatInt(id, 10)
if err := r._exists(ctx, key); err != nil { if err := r._exists(ctx, key); err != nil {
return err return err
} }
deleteResp := r.client.Del(ctx, key) deleteResp := client.Del(ctx, key)
if err := deleteResp.Err(); err != nil { if err := deleteResp.Err(); err != nil {
return fmt.Errorf("livelog/redis: could not delete stream for step %d", id) return fmt.Errorf("livelog/redis: could not delete stream for step %d", id)
} }
@ -87,7 +94,9 @@ func (r *redisStream) Delete(ctx context.Context, id int64) error {
} }
// Write writes information into the Redis stream // Write writes information into the Redis stream
func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) error { func (r streamRedis) Write(ctx context.Context, id int64, line *core.Line) error {
client := r.rdb.Client()
key := redisStreamPrefix + strconv.FormatInt(id, 10) key := redisStreamPrefix + strconv.FormatInt(id, 10)
if err := r._exists(ctx, key); err != nil { if err := r._exists(ctx, key); err != nil {
@ -95,7 +104,7 @@ func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) erro
} }
lineJsonData, _ := json.Marshal(line) lineJsonData, _ := json.Marshal(line)
addResp := r.client.XAdd(ctx, &redis.XAddArgs{ addResp := client.XAdd(ctx, &redis.XAddArgs{
Stream: key, Stream: key,
ID: "*", // auto-generate a unique incremental ID ID: "*", // auto-generate a unique incremental ID
MaxLen: bufferSize, MaxLen: bufferSize,
@ -110,7 +119,9 @@ func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) erro
} }
// Tail returns back all the lines in the stream. // Tail returns back all the lines in the stream.
func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) { func (r streamRedis) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) {
client := r.rdb.Client()
key := redisStreamPrefix + strconv.FormatInt(id, 10) key := redisStreamPrefix + strconv.FormatInt(id, 10)
if err := r._exists(ctx, key); err != nil { if err := r._exists(ctx, key); err != nil {
@ -135,7 +146,7 @@ func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-
case <-timeout: case <-timeout:
return return
default: default:
readResp := r.client.XRead(ctx, &redis.XReadArgs{ readResp := client.XRead(ctx, &redis.XReadArgs{
Streams: append([]string{key}, lastID), Streams: append([]string{key}, lastID),
Block: redisPollTime, // periodically check for ctx.Done Block: redisPollTime, // periodically check for ctx.Done
}) })
@ -171,12 +182,14 @@ func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-
} }
// Info returns info about log streams present in redis // Info returns info about log streams present in redis
func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) { func (r streamRedis) Info(ctx context.Context) (info *core.LogStreamInfo) {
client := r.rdb.Client()
info = &core.LogStreamInfo{ info = &core.LogStreamInfo{
Streams: make(map[int64]int), Streams: make(map[int64]int),
} }
keysResp := r.client.Keys(ctx, redisStreamPrefix+"*") keysResp := client.Keys(ctx, redisStreamPrefix+"*")
if err := keysResp.Err(); err != nil { if err := keysResp.Err(); err != nil {
return return
} }
@ -188,7 +201,7 @@ func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) {
continue continue
} }
lenResp := r.client.XLen(ctx, key) lenResp := client.XLen(ctx, key)
if err := lenResp.Err(); err != nil { if err := lenResp.Err(); err != nil {
continue continue
} }
@ -201,8 +214,10 @@ func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) {
return return
} }
func (r *redisStream) _exists(ctx context.Context, key string) error { func (r streamRedis) _exists(ctx context.Context, key string) error {
exists := r.client.Exists(ctx, key) client := r.rdb.Client()
exists := client.Exists(ctx, key)
if exists.Err() != nil || exists.Val() == 0 { if exists.Err() != nil || exists.Val() == 0 {
return fmt.Errorf("livelog/redis: log stream %s not found", key) return fmt.Errorf("livelog/redis: log stream %s not found", key)
} }

124
pubsub/hubRedis.go Normal file
View file

@ -0,0 +1,124 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !oss
package pubsub
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
)
const (
redisPubSubEvents = "drone-events"
redisPubSubCapacity = 100
)
func newHubRedis(r redisdb.RedisDB) core.Pubsub {
h := &hubRedis{
rdb: r,
subscribers: make(map[chan<- *core.Message]struct{}),
}
go r.Subscribe(context.Background(), redisPubSubEvents, redisPubSubCapacity, h)
return h
}
type hubRedis struct {
sync.Mutex
rdb redisdb.RedisDB
subscribers map[chan<- *core.Message]struct{}
}
// Publish publishes a new message. All subscribers will get it.
func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) {
client := h.rdb.Client()
data, err := json.Marshal(e)
if err != nil {
return
}
_, err = client.Publish(ctx, redisPubSubEvents, data).Result()
if err != nil {
return
}
return
}
// Subscribe add a new subscriber. The subscriber gets event until its context is not finished.
func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) {
chMessage := make(chan *core.Message, redisPubSubCapacity)
chErr := make(chan error)
h.Lock()
h.subscribers[chMessage] = struct{}{}
h.Unlock()
go func() {
<-ctx.Done()
h.Lock()
delete(h.subscribers, chMessage)
h.Unlock()
close(chMessage)
close(chErr)
}()
return chMessage, chErr
}
// Subscribers returns number of subscribers.
func (h *hubRedis) Subscribers() (int, error) {
h.Lock()
n := len(h.subscribers)
h.Unlock()
return n, nil
}
// ProcessMessage relays the message to all subscribers listening to drone events.
// It is a part of redisdb.PubSubProcessor implementation and it's called internally by redisdb.Subscribe.
func (h *hubRedis) ProcessMessage(s string) {
message := &core.Message{}
err := json.Unmarshal([]byte(s), message)
if err != nil {
// Ignore invalid messages. This is a "should not happen" situation,
// because messages are encoded as json in Publish().
_, _ = fmt.Fprintf(os.Stderr, "pubsub/redis: failed to unmarshal a message. %s\n", err)
return
}
h.Lock()
for ss := range h.subscribers {
select {
case ss <- message:
default: // messages are lost if a subscriber channel reaches its capacity
}
}
h.Unlock()
}
// ProcessError is a part of redisdb.PubSubProcessor implementation.
func (h *hubRedis) ProcessError(error) {}

View file

@ -12,19 +12,20 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// +build !oss
package pubsub package pubsub
import ( import (
"github.com/drone/drone/core" "github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
"github.com/go-redis/redis/v8"
) )
// New creates a new publish subscriber. If Redis client passed as parameter is not nil it uses // New creates a new publish subscriber. If Redis client passed as parameter is not nil it uses
// a Redis implementation, otherwise it uses an in-memory implementation. // a Redis implementation, otherwise it uses an in-memory implementation.
func New(rdb *redis.Client) core.Pubsub { func New(r redisdb.RedisDB) core.Pubsub {
if rdb != nil { if r != nil {
return newRedis(rdb) return newHubRedis(r)
} }
return newHub() return newHub()

27
pubsub/pubsubOSS.go Normal file
View file

@ -0,0 +1,27 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build oss
package pubsub
import (
"github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
)
// New creates a new in-memory publish subscriber.
func New(r redisdb.RedisDB) core.Pubsub {
return newHub()
}

View file

@ -1,134 +0,0 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
"github.com/drone/drone/core"
"github.com/go-redis/redis/v8"
)
func newRedis(rdb *redis.Client) core.Pubsub {
return &hubRedis{rdb: rdb}
}
const redisPubSubEvents = "drone-events"
type hubRedis struct {
rdb *redis.Client
}
func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) {
data, err := json.Marshal(e)
if err != nil {
return
}
_, err = h.rdb.Publish(ctx, redisPubSubEvents, data).Result()
if err != nil {
return
}
return
}
func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) {
chMessage := make(chan *core.Message, 100)
chErr := make(chan error)
go func() {
pubsub := h.rdb.Subscribe(ctx, redisPubSubEvents)
ch := pubsub.Channel(redis.WithChannelSize(100))
defer func() {
_ = pubsub.Close()
close(chMessage)
close(chErr)
}()
err := pubsub.Ping(ctx)
if err != nil {
chErr <- err
return
}
for {
select {
case m, ok := <-ch:
if !ok {
chErr <- fmt.Errorf("pubsub/redis: channel=%s closed", redisPubSubEvents)
return
}
message := &core.Message{}
err = json.Unmarshal([]byte(m.Payload), message)
if err != nil {
// This is a "should not happen" situation,
// because messages are encoded as json above in Publish().
_, _ = fmt.Fprintf(os.Stderr, "pubsub/redis: failed to unmarshal a message. %s\n", err)
continue
}
chMessage <- message
case <-ctx.Done():
return
}
}
}()
return chMessage, chErr
}
func (h *hubRedis) Subscribers() (int, error) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
v, err := h.rdb.Do(ctx, "pubsub", "numsub", redisPubSubEvents).Result()
if err != nil {
err = fmt.Errorf("pubsub/redis: failed to get number of subscribers. %w", err)
return 0, err
}
values, ok := v.([]interface{}) // the result should be: [<channel_name:string>, <subscriber_count:int64>]
if !ok || len(values) != 2 {
err = fmt.Errorf("pubsub/redis: failed to extarct number of subscribers from: %v", values)
return 0, err
}
switch n := values[1].(type) {
case int:
return n, nil
case uint:
return int(n), nil
case int32:
return int(n), nil
case uint32:
return int(n), nil
case int64:
return int(n), nil
case uint64:
return int(n), nil
default:
err = fmt.Errorf("pubsub/redis: unsupported type for number of subscribers: %T", values[1])
return 0, err
}
}

View file

@ -0,0 +1,160 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !oss
package queue
import (
"context"
"fmt"
"os"
"strconv"
"sync"
"time"
"github.com/drone/drone/service/redisdb"
"github.com/go-redis/redis/v8"
)
const (
redisPubSubCancel = "drone-cancel"
redisCancelValuePrefix = "drone-cancel-"
redisCancelValueTimeout = 5 * time.Minute
redisCancelValue = "canceled"
)
func newCancellerRedis(r redisdb.RedisDB) *cancellerRedis {
h := &cancellerRedis{
rdb: r,
subscribers: make(map[*cancelSubscriber]struct{}),
}
go r.Subscribe(context.Background(), redisPubSubCancel, 1, h)
return h
}
type cancellerRedis struct {
rdb redisdb.RedisDB
subscribers map[*cancelSubscriber]struct{}
sync.Mutex
}
type cancelSubscriber struct {
id int64
ch chan<- error
}
// Cancel informs all subscribers that a build with the provided id is cancelled.
func (c *cancellerRedis) Cancel(ctx context.Context, id int64) (err error) {
client := c.rdb.Client()
ids := strconv.FormatInt(id, 10)
// publish a cancel event to all subscribers (runners) waiting to
_, err = client.Publish(ctx, redisPubSubCancel, ids).Result()
if err != nil {
return
}
// put a limited duration value in case a runner isn't listening currently.
_, err = client.Set(ctx, redisCancelValuePrefix+ids, redisCancelValue, redisCancelValueTimeout).Result()
if err != nil {
return
}
return
}
// Cancelled waits until it gets info that a build with the provided id is cancelled.
// The waiting is aborted when the provided context is done.
func (c *cancellerRedis) Cancelled(ctx context.Context, id int64) (isCancelled bool, err error) {
client := c.rdb.Client()
ids := strconv.FormatInt(id, 10)
// first check if the build is already cancelled
result, err := client.Get(ctx, redisCancelValuePrefix+ids).Result()
if err != nil && err != redis.Nil {
return
}
isCancelled = err != redis.Nil && result == redisCancelValue
if isCancelled {
return
}
// if it is not cancelled, subscribe and listen to cancel build events
// until the context is cancelled or until the build is cancelled.
ch := make(chan error)
sub := &cancelSubscriber{id: id, ch: ch}
c.Lock()
c.subscribers[sub] = struct{}{}
c.Unlock()
select {
case err = <-ch:
// If the build is cancelled or an error happened,
// than the subscriber is removed from the set by other go routine
isCancelled = err != nil
case <-ctx.Done():
// If the context is cancelled then the subscriber must be be removed here.
c.Lock()
delete(c.subscribers, sub)
c.Unlock()
}
return
}
// ProcessMessage informs all subscribers listening to cancellation that the build with this id is cancelled.
// It is a part of redisdb.PubSubProcessor implementation and it's called internally by Subscribe.
func (c *cancellerRedis) ProcessMessage(s string) {
id, err := strconv.ParseInt(s, 10, 64)
if err != nil {
// Ignore invalid messages. This is a "should not happen" situation,
// because all messages are integers as strings in method Cancel().
_, _ = fmt.Fprintf(os.Stderr, "canceller/redis: message is not an integer: %s\n", s)
return
}
c.Lock()
for ss := range c.subscribers {
if ss.id == id {
ss.ch <- nil
close(ss.ch)
delete(c.subscribers, ss)
}
}
c.Unlock()
}
// ProcessError informs all subscribers that an error happened and clears the set of subscribers.
// The set of subscribers is cleared because each subscriber receives only one message,
// so an error could cause that the message is missed - it's safer to return an error.
// It is a part of redisdb.PubSubProcessor implementation and it's called internally by Subscribe.
func (c *cancellerRedis) ProcessError(err error) {
c.Lock()
for ss := range c.subscribers {
ss.ch <- err
close(ss.ch)
delete(c.subscribers, ss)
}
c.Unlock()
}

View file

@ -1,136 +0,0 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"context"
"fmt"
"os"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
const (
redisPubSubCancel = "drone-cancel"
redisCancelValuePrefix = "drone-cancel-"
redisCancelValueTimeout = 5 * time.Minute
redisCancelValue = "canceled"
)
func newRedisCanceller(rdb *redis.Client) *redisCanceller {
return &redisCanceller{rdb: rdb}
}
type redisCanceller struct {
rdb *redis.Client
}
func (c *redisCanceller) Cancel(ctx context.Context, id int64) (err error) {
ids := strconv.FormatInt(id, 10)
// publish a cancel event to all subscribers (runners) waiting to
_, err = c.rdb.Publish(ctx, redisPubSubCancel, ids).Result()
if err != nil {
return
}
// put a limited duration value in case a runner isn't listening currently.
_, err = c.rdb.Set(ctx, redisCancelValuePrefix+ids, redisCancelValue, redisCancelValueTimeout).Result()
if err != nil {
return
}
return nil
}
func (c *redisCanceller) Cancelled(ctx context.Context, id int64) (isCancelled bool, err error) {
ids := strconv.FormatInt(id, 10)
// first check if the build is already cancelled
result, err := c.rdb.Get(ctx, redisCancelValuePrefix+ids).Result()
if err != nil && err != redis.Nil {
return
}
isCancelled = err != redis.Nil && result == redisCancelValue
if isCancelled {
return
}
// if it is not cancelled, subscribe and listen to cancel build events
// until the context is cancelled or until the build is cancelled.
chResult := make(chan interface{})
go func() {
pubsub := c.rdb.Subscribe(ctx, redisPubSubCancel)
ch := pubsub.Channel()
defer func() {
_ = pubsub.Close()
close(chResult)
}()
err := pubsub.Ping(ctx)
if err != nil {
chResult <- err
return
}
for {
select {
case m, ok := <-ch:
if !ok {
chResult <- fmt.Errorf("canceller/redis: channel=%s closed", redisPubSubCancel)
return
}
idMessage, err := strconv.ParseInt(m.Payload, 10, 64)
if err != nil { // should not happen
_, _ = fmt.Fprintf(os.Stderr, "canceller/redis: message is not an integer: %s\n", m.Payload)
continue // ignore data errors
}
if id == idMessage {
chResult <- true
}
case <-ctx.Done():
return
}
}
}()
value, ok := <-chResult
if !ok {
return
}
err, ok = value.(error)
if ok {
return
}
isCancelled, ok = value.(bool)
if ok {
return
}
return
}

View file

@ -1,4 +1,4 @@
// Copyright 2019 Drone IO, Inc. // Copyright 2021 Drone IO, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -17,10 +17,6 @@ package queue
import ( import (
"context" "context"
"errors" "errors"
"github.com/drone/drone/core"
"github.com/go-redis/redis/v8"
) )
type scheduler struct { type scheduler struct {
@ -28,30 +24,6 @@ type scheduler struct {
*canceller *canceller
} }
type redisScheduler struct {
*queue
*redisCanceller
}
// New creates a new scheduler.
func New(store core.StageStore, rdb *redis.Client) core.Scheduler {
if rdb != nil {
return redisScheduler{
queue: newQueue(store),
redisCanceller: newRedisCanceller(rdb),
}
}
return scheduler{
queue: newQueue(store),
canceller: newCanceller(),
}
}
func (d scheduler) Stats(context.Context) (interface{}, error) { func (d scheduler) Stats(context.Context) (interface{}, error) {
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }
func (d redisScheduler) Stats(context.Context) (interface{}, error) {
return nil, errors.New("not implemented")
}

View file

@ -0,0 +1,37 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !oss
package queue
import (
"github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
)
// New creates a new scheduler.
func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler {
if r == nil {
return scheduler{
queue: newQueue(store),
canceller: newCanceller(),
}
}
return schedulerRedis{
queue: newQueue(store),
cancellerRedis: newCancellerRedis(r),
}
}

View file

@ -0,0 +1,30 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build oss
package queue
import (
"github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
)
// New creates a new scheduler.
func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler {
return scheduler{
queue: newQueue(store),
canceller: newCanceller(),
}
}

View file

@ -0,0 +1,31 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !oss
package queue
import (
"context"
"errors"
)
type schedulerRedis struct {
*queue
*cancellerRedis
}
func (d schedulerRedis) Stats(context.Context) (interface{}, error) {
return nil, errors.New("not implemented")
}

161
service/redisdb/redisdb.go Normal file
View file

@ -0,0 +1,161 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redisdb
import (
"context"
"fmt"
"time"
"github.com/drone/drone/cmd/drone-server/config"
"github.com/go-redis/redis/v8"
"github.com/sirupsen/logrus"
)
func New(config config.Config) (srv RedisDB, err error) {
if config.Redis.ConnectionString == "" {
return
}
options, err := redis.ParseURL(config.Redis.ConnectionString)
if err != nil {
return
}
rdb := redis.NewClient(options)
_, err = rdb.Ping(context.Background()).Result()
if err != nil {
err = fmt.Errorf("redis not accessibe: %w", err)
return
}
srv = redisService{
rdb: rdb,
}
return
}
type RedisDB interface {
Client() redis.Cmdable
Subscribe(ctx context.Context, channelName string, channelSize int, proc PubSubProcessor)
}
type redisService struct {
rdb *redis.Client
}
// Client exposes redis.Cmdable interface
func (r redisService) Client() redis.Cmdable {
return r.rdb
}
type PubSubProcessor interface {
ProcessMessage(s string)
ProcessError(err error)
}
var backoffDurations = []time.Duration{
0, time.Second, 3 * time.Second, 5 * time.Second, 10 * time.Second, 20 * time.Second,
}
// Subscribe subscribes to a redis pub-sub channel. The messages are processed with the supplied PubSubProcessor.
// In case of en error the function will automatically reconnect with an increasing back of delay.
// The only way to exit this function is to terminate or expire the supplied context.
func (r redisService) Subscribe(ctx context.Context, channelName string, channelSize int, proc PubSubProcessor) {
var connectTry int
for {
err := func() (err error) {
defer func() {
// panic recovery because external PubSubProcessor methods might cause panics.
if p := recover(); p != nil {
err = fmt.Errorf("redis pubsub: panic: %v", p)
}
}()
var options []redis.ChannelOption
if channelSize > 1 {
options = append(options, redis.WithChannelSize(channelSize))
}
pubsub := r.rdb.Subscribe(ctx, channelName)
ch := pubsub.Channel(options...)
defer func() {
_ = pubsub.Close()
}()
// make sure the connection is successful
err = pubsub.Ping(ctx)
if err != nil {
return
}
connectTry = 0 // successfully connected, reset the counter
logrus.
WithField("try", connectTry+1).
WithField("channel", channelName).
Trace("redis pubsub: subscribed")
for {
select {
case m, ok := <-ch:
if !ok {
err = fmt.Errorf("redis pubsub: channel=%s closed", channelName)
return
}
proc.ProcessMessage(m.Payload)
case <-ctx.Done():
err = ctx.Err()
return
}
}
}()
if err == nil {
// should not happen, the function should always exit with an error
continue
}
proc.ProcessError(err)
if err == context.Canceled || err == context.DeadlineExceeded {
logrus.
WithField("channel", channelName).
Trace("redis pubsub: finished")
return
}
dur := backoffDurations[connectTry]
logrus.
WithError(err).
WithField("try", connectTry+1).
WithField("pause", dur.String()).
WithField("channel", channelName).
Error("redis pubsub: connection failed, reconnecting")
time.Sleep(dur)
if connectTry < len(backoffDurations)-1 {
connectTry++
}
}
}