From 40d09839f7bf198f053afd135e7fa7577a692917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Ga=C4=87e=C5=A1a?= Date: Thu, 22 Jul 2021 15:03:57 +0200 Subject: [PATCH] pubsub: redis implementation --- cmd/drone-server/config/config.go | 6 ++ cmd/drone-server/inject_store.go | 17 ++++ cmd/drone-server/wire_gen.go | 9 ++- pubsub/hub.go | 4 +- pubsub/pubsub.go | 31 ++++++++ pubsub/redis.go | 126 ++++++++++++++++++++++++++++++ 6 files changed, 190 insertions(+), 3 deletions(-) create mode 100644 pubsub/pubsub.go create mode 100644 pubsub/redis.go diff --git a/cmd/drone-server/config/config.go b/cmd/drone-server/config/config.go index fb14fe1c..29b7eae0 100644 --- a/cmd/drone-server/config/config.go +++ b/cmd/drone-server/config/config.go @@ -63,6 +63,7 @@ type ( Logging Logging Prometheus Prometheus Proxy Proxy + Redis Redis Registration Registration Registries Registries Repository Repository @@ -166,6 +167,11 @@ type ( EnableAnonymousAccess bool `envconfig:"DRONE_PROMETHEUS_ANONYMOUS_ACCESS" default:"false"` } + // Redis provides the redis configuration. + Redis struct { + ConnectionString string `envconfig:"DRONE_REDIS_CONNECTION"` + } + // Repository provides the repository configuration. Repository struct { Filter []string `envconfig:"DRONE_REPOSITORY_FILTER"` diff --git a/cmd/drone-server/inject_store.go b/cmd/drone-server/inject_store.go index 0e06e9b0..d9116856 100644 --- a/cmd/drone-server/inject_store.go +++ b/cmd/drone-server/inject_store.go @@ -34,6 +34,7 @@ import ( "github.com/drone/drone/store/template" "github.com/drone/drone/store/user" + "github.com/go-redis/redis/v8" "github.com/google/wire" "github.com/sirupsen/logrus" ) @@ -55,6 +56,7 @@ var storeSet = wire.NewSet( global.New, step.New, template.New, + provideRedisClient, ) // provideDatabase is a Wire provider function that provides a @@ -177,3 +179,18 @@ func provideUserStore(db *db.DB, enc encrypt.Encrypter, config config.Config) co metric.UserCount(users) return users } + +func provideRedisClient(config config.Config) (rdb *redis.Client, err error) { + if config.Redis.ConnectionString == "" { + return + } + + options, err := redis.ParseURL(config.Redis.ConnectionString) + if err != nil { + return + } + + rdb = redis.NewClient(options) + + return +} diff --git a/cmd/drone-server/wire_gen.go b/cmd/drone-server/wire_gen.go index 94eaefc9..d65d65b7 100644 --- a/cmd/drone-server/wire_gen.go +++ b/cmd/drone-server/wire_gen.go @@ -55,7 +55,14 @@ func InitializeApplication(config2 config.Config) (application, error) { cronStore := cron.New(db) repositoryStore := provideRepoStore(db) buildStore := provideBuildStore(db) - corePubsub := pubsub.New() + redisClient, err := provideRedisClient(config2) + if err != nil { + return application{}, err + } + corePubsub, err := pubsub.New(redisClient) + if err != nil { + return application{}, err + } stageStore := provideStageStore(db) scheduler := provideScheduler(stageStore, config2) statusService := provideStatusService(client, renewer, config2) diff --git a/pubsub/hub.go b/pubsub/hub.go index 6249a771..3445e7bc 100644 --- a/pubsub/hub.go +++ b/pubsub/hub.go @@ -27,8 +27,8 @@ type hub struct { subs map[*subscriber]struct{} } -// New creates a new publish subscriber. -func New() core.Pubsub { +// newHub creates a new publish subscriber. +func newHub() core.Pubsub { return &hub{ subs: map[*subscriber]struct{}{}, } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go new file mode 100644 index 00000000..201f42e0 --- /dev/null +++ b/pubsub/pubsub.go @@ -0,0 +1,31 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "github.com/drone/drone/core" + + "github.com/go-redis/redis/v8" +) + +// New creates a new publish subscriber. If Redis client passed as parameter is not nil it uses +// a Redis implementation, otherwise it uses an in-memory implementation. +func New(rdb *redis.Client) (core.Pubsub, error) { + if rdb != nil { + return newRedis(rdb) + } + + return newHub(), nil +} diff --git a/pubsub/redis.go b/pubsub/redis.go new file mode 100644 index 00000000..c322b118 --- /dev/null +++ b/pubsub/redis.go @@ -0,0 +1,126 @@ +// Copyright 2021 Drone IO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/drone/drone/core" + + "github.com/go-redis/redis/v8" +) + +const channelPubSub = "drone-events" + +type hubRedis struct { + rdb *redis.Client +} + +func (h *hubRedis) Publish(ctx context.Context, e *core.Message) (err error) { + data, err := json.Marshal(e) + if err != nil { + return + } + + _, err = h.rdb.Publish(ctx, channelPubSub, data).Result() + if err != nil { + return + } + + return +} + +func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan error) { + messageCh := make(chan *core.Message, 100) + errCh := make(chan error) + + go func() { + pubsub := h.rdb.Subscribe(ctx, channelPubSub) + ch := pubsub.Channel(redis.WithChannelSize(100)) + + defer func() { + _ = pubsub.Close() + close(messageCh) + close(errCh) + }() + + err := pubsub.Ping(ctx) + if err != nil { + errCh <- err + return + } + + for { + select { + case m, ok := <-ch: + if ok { + message := &core.Message{} + err = json.Unmarshal([]byte(m.Payload), message) + if err != nil { + fmt.Printf("error@pubsub: failed to unmarshal a message. %s\n", err) + continue + } + messageCh <- message + } else { + errCh <- fmt.Errorf("redis pubsub channel=%s closed", channelPubSub) + return + } + case <-ctx.Done(): + return + } + } + }() + + return messageCh, errCh +} + +func (h *hubRedis) Subscribers() int { + ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFunc() + + v, err := h.rdb.Do(ctx, "pubsub", "numsub", channelPubSub).Result() + if err != nil { + fmt.Printf("error@pubsub: failed to get number of subscribers. %s\n", err) + return -1 + } + + values, ok := v.([]interface{}) // the result should be: [, ] + if !ok || len(values) != 2 { + return 0 + } + + if subscriberCount, ok := values[1].(int64); ok { + return int(subscriberCount) + } + + return 0 +} + +func newRedis(rdb *redis.Client) (ps core.Pubsub, err error) { + _, err = rdb.Ping(context.Background()).Result() + if err != nil { + err = fmt.Errorf("redis not accessibe: %w", err) + return + } + + ps = &hubRedis{ + rdb: rdb, + } + + return +}