added redis implemenatation for core.LogStream

This commit is contained in:
Marko Gaćeša 2021-08-02 13:30:52 +02:00
parent ff70133341
commit b91d6c4b9b
10 changed files with 387 additions and 121 deletions

View file

@ -0,0 +1,51 @@
// Copyright 2021 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"github.com/drone/drone/cmd/drone-server/config"
"github.com/go-redis/redis/v8"
"github.com/google/wire"
)
// wire set for loading the external services.
var externalSet = wire.NewSet(
provideRedisClient,
)
func provideRedisClient(config config.Config) (rdb *redis.Client, err error) {
if config.Redis.ConnectionString == "" {
return
}
options, err := redis.ParseURL(config.Redis.ConnectionString)
if err != nil {
return
}
rdb = redis.NewClient(options)
_, err = rdb.Ping(context.Background()).Result()
if err != nil {
err = fmt.Errorf("redis not accessibe: %w", err)
return
}
return
}

View file

@ -34,7 +34,6 @@ import (
"github.com/drone/drone/store/template" "github.com/drone/drone/store/template"
"github.com/drone/drone/store/user" "github.com/drone/drone/store/user"
"github.com/go-redis/redis/v8"
"github.com/google/wire" "github.com/google/wire"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -56,7 +55,6 @@ var storeSet = wire.NewSet(
global.New, global.New,
step.New, step.New,
template.New, template.New,
provideRedisClient,
) )
// provideDatabase is a Wire provider function that provides a // provideDatabase is a Wire provider function that provides a
@ -179,18 +177,3 @@ func provideUserStore(db *db.DB, enc encrypt.Encrypter, config config.Config) co
metric.UserCount(users) metric.UserCount(users)
return users return users
} }
func provideRedisClient(config config.Config) (rdb *redis.Client, err error) {
if config.Redis.ConnectionString == "" {
return
}
options, err := redis.ParseURL(config.Redis.ConnectionString)
if err != nil {
return
}
rdb = redis.NewClient(options)
return
}

View file

@ -32,6 +32,7 @@ func InitializeApplication(config config.Config) (application, error) {
serverSet, serverSet,
serviceSet, serviceSet,
storeSet, storeSet,
externalSet,
newApplication, newApplication,
) )
return application{}, nil return application{}, nil

View file

@ -59,10 +59,7 @@ func InitializeApplication(config2 config.Config) (application, error) {
if err != nil { if err != nil {
return application{}, err return application{}, err
} }
corePubsub, err := pubsub.New(redisClient) corePubsub := pubsub.New(redisClient)
if err != nil {
return application{}, err
}
stageStore := provideStageStore(db) stageStore := provideStageStore(db)
scheduler := provideScheduler(stageStore, config2) scheduler := provideScheduler(stageStore, config2)
statusService := provideStatusService(client, renewer, config2) statusService := provideStatusService(client, renewer, config2)
@ -81,7 +78,7 @@ func InitializeApplication(config2 config.Config) (application, error) {
coreLicense := provideLicense(client, config2) coreLicense := provideLicense(client, config2)
datadog := provideDatadog(userStore, repositoryStore, buildStore, system, coreLicense, config2) datadog := provideDatadog(userStore, repositoryStore, buildStore, system, coreLicense, config2)
logStore := provideLogStore(db, config2) logStore := provideLogStore(db, config2)
logStream := livelog.New() logStream := livelog.New(redisClient)
netrcService := provideNetrcService(client, renewer, config2) netrcService := provideNetrcService(client, renewer, config2)
secretStore := secret.New(db, encrypter) secretStore := secret.New(db, encrypter)
globalSecretStore := global.New(db, encrypter) globalSecretStore := global.New(db, encrypter)

View file

@ -1,4 +1,4 @@
// Copyright 2019 Drone IO, Inc. // Copyright 2021 Drone IO, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -15,80 +15,17 @@
package livelog package livelog
import ( import (
"context"
"errors"
"sync"
"github.com/drone/drone/core" "github.com/drone/drone/core"
"github.com/go-redis/redis/v8"
) )
// error returned when a stream is not registered with // New creates a new log streamer. If Redis client passed as parameter is not nil it uses
// the streamer. // a Redis implementation, otherwise it uses an in-memory implementation.
var errStreamNotFound = errors.New("stream: not found") func New(rdb *redis.Client) core.LogStream {
if rdb != nil {
type streamer struct { return newRedis(rdb)
sync.Mutex
streams map[int64]*stream
}
// New returns a new in-memory log streamer.
func New() core.LogStream {
return &streamer{
streams: make(map[int64]*stream),
} }
}
func (s *streamer) Create(ctx context.Context, id int64) error { return newStreamer()
s.Lock()
s.streams[id] = newStream()
s.Unlock()
return nil
}
func (s *streamer) Delete(ctx context.Context, id int64) error {
s.Lock()
stream, ok := s.streams[id]
if ok {
delete(s.streams, id)
}
s.Unlock()
if !ok {
return errStreamNotFound
}
return stream.close()
}
func (s *streamer) Write(ctx context.Context, id int64, line *core.Line) error {
s.Lock()
stream, ok := s.streams[id]
s.Unlock()
if !ok {
return errStreamNotFound
}
return stream.write(line)
}
func (s *streamer) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) {
s.Lock()
stream, ok := s.streams[id]
s.Unlock()
if !ok {
return nil, nil
}
return stream.subscribe(ctx)
}
func (s *streamer) Info(ctx context.Context) *core.LogStreamInfo {
s.Lock()
defer s.Unlock()
info := &core.LogStreamInfo{
Streams: map[int64]int{},
}
for id, stream := range s.streams {
stream.Lock()
info.Streams[id] = len(stream.list)
stream.Unlock()
}
return info
} }

211
livelog/redis.go Normal file
View file

@ -0,0 +1,211 @@
// Copyright 2019 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package livelog
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/drone/drone/core"
"github.com/go-redis/redis/v8"
)
func newRedis(rdb *redis.Client) core.LogStream {
return &redisStream{
client: rdb,
}
}
const (
redisKeyExpiryTime = 5 * time.Hour // How long each key exists in redis
redisPollTime = 100 * time.Millisecond // should not be too large to avoid redis clients getting occupied for long
redisTailMaxTime = 1 * time.Hour // maximum duration a tail can last
redisEntryKey = "line"
redisStreamPrefix = "drone-log-"
)
type redisStream struct {
client redis.Cmdable
}
// Create creates a redis stream and sets an expiry on it.
func (r *redisStream) Create(ctx context.Context, id int64) error {
// Delete if a stream already exists with the same key
_ = r.Delete(ctx, id)
key := redisStreamPrefix + strconv.FormatInt(id, 10)
addResp := r.client.XAdd(ctx, &redis.XAddArgs{
Stream: key,
ID: "*", // auto-generate a unique incremental ID
MaxLen: bufferSize,
Approx: true,
Values: map[string]interface{}{redisEntryKey: []byte{}},
})
if err := addResp.Err(); err != nil {
return fmt.Errorf("livelog/redis: could not create stream with key %s", key)
}
res := r.client.Expire(ctx, key, redisKeyExpiryTime)
if err := res.Err(); err != nil {
return fmt.Errorf("livelog/redis: could not set expiry for key %s", key)
}
return nil
}
// Delete deletes a stream
func (r *redisStream) Delete(ctx context.Context, id int64) error {
key := redisStreamPrefix + strconv.FormatInt(id, 10)
if err := r._exists(ctx, key); err != nil {
return err
}
deleteResp := r.client.Del(ctx, key)
if err := deleteResp.Err(); err != nil {
return fmt.Errorf("livelog/redis: could not delete stream for step %d", id)
}
return nil
}
// Write writes information into the Redis stream
func (r *redisStream) Write(ctx context.Context, id int64, line *core.Line) error {
key := redisStreamPrefix + strconv.FormatInt(id, 10)
if err := r._exists(ctx, key); err != nil {
return err
}
lineJsonData, _ := json.Marshal(line)
addResp := r.client.XAdd(ctx, &redis.XAddArgs{
Stream: key,
ID: "*", // auto-generate a unique incremental ID
MaxLen: bufferSize,
Approx: true,
Values: map[string]interface{}{redisEntryKey: lineJsonData},
})
if err := addResp.Err(); err != nil {
return err
}
return nil
}
// Tail returns back all the lines in the stream.
func (r *redisStream) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) {
key := redisStreamPrefix + strconv.FormatInt(id, 10)
if err := r._exists(ctx, key); err != nil {
return nil, nil
}
chLines := make(chan *core.Line, bufferSize)
chErr := make(chan error, 1)
go func() {
defer close(chErr)
defer close(chLines)
timeout := time.After(redisTailMaxTime) // polling should not last for longer than tailMaxTime
// Keep reading from the stream and writing to the channel
lastID := "0"
for {
select {
case <-ctx.Done():
return
case <-timeout:
return
default:
readResp := r.client.XRead(ctx, &redis.XReadArgs{
Streams: append([]string{key}, lastID),
Block: redisPollTime, // periodically check for ctx.Done
})
if readResp.Err() != nil && readResp.Err() != redis.Nil { // readResp.Err() is sometimes set to "redis: nil" instead of nil
chErr <- readResp.Err()
return
}
for _, msg := range readResp.Val() {
messages := msg.Messages
if len(messages) > 0 {
lastID = messages[len(messages)-1].ID
} else { // should not happen
return
}
for _, message := range messages {
values := message.Values
if val, ok := values[redisEntryKey]; ok {
var line *core.Line
if err := json.Unmarshal([]byte(val.(string)), &line); err != nil {
continue // ignore errors in the stream
}
chLines <- line
}
}
}
}
}
}()
return chLines, chErr
}
// Info returns info about log streams present in redis
func (r *redisStream) Info(ctx context.Context) (info *core.LogStreamInfo) {
info = &core.LogStreamInfo{
Streams: make(map[int64]int),
}
keysResp := r.client.Keys(ctx, redisStreamPrefix+"*")
if err := keysResp.Err(); err != nil {
return
}
for _, key := range keysResp.Val() {
ids := key[len(redisStreamPrefix):]
id, err := strconv.ParseInt(ids, 10, 64)
if err != nil {
continue
}
lenResp := r.client.XLen(ctx, key)
if err := lenResp.Err(); err != nil {
continue
}
size := int(lenResp.Val())
info.Streams[id] = size
}
return
}
func (r *redisStream) _exists(ctx context.Context, key string) error {
exists := r.client.Exists(ctx, key)
if exists.Err() != nil || exists.Val() == 0 {
return fmt.Errorf("livelog/redis: log stream %s not found", key)
}
return nil
}

94
livelog/streamer.go Normal file
View file

@ -0,0 +1,94 @@
// Copyright 2019 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package livelog
import (
"context"
"errors"
"sync"
"github.com/drone/drone/core"
)
// error returned when a stream is not registered with
// the streamer.
var errStreamNotFound = errors.New("stream: not found")
type streamer struct {
sync.Mutex
streams map[int64]*stream
}
// New returns a new in-memory log streamer.
func newStreamer() core.LogStream {
return &streamer{
streams: make(map[int64]*stream),
}
}
func (s *streamer) Create(ctx context.Context, id int64) error {
s.Lock()
s.streams[id] = newStream()
s.Unlock()
return nil
}
func (s *streamer) Delete(ctx context.Context, id int64) error {
s.Lock()
stream, ok := s.streams[id]
if ok {
delete(s.streams, id)
}
s.Unlock()
if !ok {
return errStreamNotFound
}
return stream.close()
}
func (s *streamer) Write(ctx context.Context, id int64, line *core.Line) error {
s.Lock()
stream, ok := s.streams[id]
s.Unlock()
if !ok {
return errStreamNotFound
}
return stream.write(line)
}
func (s *streamer) Tail(ctx context.Context, id int64) (<-chan *core.Line, <-chan error) {
s.Lock()
stream, ok := s.streams[id]
s.Unlock()
if !ok {
return nil, nil
}
return stream.subscribe(ctx)
}
func (s *streamer) Info(ctx context.Context) *core.LogStreamInfo {
s.Lock()
defer s.Unlock()
info := &core.LogStreamInfo{
Streams: map[int64]int{},
}
for id, stream := range s.streams {
stream.Lock()
info.Streams[id] = len(stream.list)
stream.Unlock()
}
return info
}

View file

@ -17,7 +17,7 @@ import (
) )
func TestStreamer(t *testing.T) { func TestStreamer(t *testing.T) {
s := New().(*streamer) s := newStreamer().(*streamer)
err := s.Create(context.Background(), 1) err := s.Create(context.Background(), 1)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -57,7 +57,7 @@ func TestStreamer(t *testing.T) {
} }
func TestStreamerDelete(t *testing.T) { func TestStreamerDelete(t *testing.T) {
s := New().(*streamer) s := newStreamer().(*streamer)
err := s.Create(context.Background(), 1) err := s.Create(context.Background(), 1)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -75,7 +75,7 @@ func TestStreamerDelete(t *testing.T) {
} }
func TestStreamerDeleteErr(t *testing.T) { func TestStreamerDeleteErr(t *testing.T) {
s := New() s := newStreamer()
err := s.Delete(context.Background(), 1) err := s.Delete(context.Background(), 1)
if err != errStreamNotFound { if err != errStreamNotFound {
t.Errorf("Want errStreamNotFound") t.Errorf("Want errStreamNotFound")
@ -83,7 +83,7 @@ func TestStreamerDeleteErr(t *testing.T) {
} }
func TestStreamerWriteErr(t *testing.T) { func TestStreamerWriteErr(t *testing.T) {
s := New() s := newStreamer()
err := s.Write(context.Background(), 1, &core.Line{}) err := s.Write(context.Background(), 1, &core.Line{})
if err != errStreamNotFound { if err != errStreamNotFound {
t.Errorf("Want errStreamNotFound") t.Errorf("Want errStreamNotFound")
@ -91,7 +91,7 @@ func TestStreamerWriteErr(t *testing.T) {
} }
func TestStreamTailNotFound(t *testing.T) { func TestStreamTailNotFound(t *testing.T) {
s := New() s := newStreamer()
outc, errc := s.Tail(context.Background(), 0) outc, errc := s.Tail(context.Background(), 0)
if outc != nil && errc != nil { if outc != nil && errc != nil {
t.Errorf("Expect nil channel when stream not found") t.Errorf("Expect nil channel when stream not found")
@ -99,7 +99,7 @@ func TestStreamTailNotFound(t *testing.T) {
} }
func TestStreamerInfo(t *testing.T) { func TestStreamerInfo(t *testing.T) {
s := New().(*streamer) s := newStreamer().(*streamer)
s.streams[1] = &stream{list: map[*subscriber]struct{}{{}: struct{}{}, {}: struct{}{}}} s.streams[1] = &stream{list: map[*subscriber]struct{}{{}: struct{}{}, {}: struct{}{}}}
s.streams[2] = &stream{list: map[*subscriber]struct{}{{}: struct{}{}}} s.streams[2] = &stream{list: map[*subscriber]struct{}{{}: struct{}{}}}
s.streams[3] = &stream{list: map[*subscriber]struct{}{}} s.streams[3] = &stream{list: map[*subscriber]struct{}{}}

View file

@ -22,10 +22,10 @@ import (
// New creates a new publish subscriber. If Redis client passed as parameter is not nil it uses // New creates a new publish subscriber. If Redis client passed as parameter is not nil it uses
// a Redis implementation, otherwise it uses an in-memory implementation. // a Redis implementation, otherwise it uses an in-memory implementation.
func New(rdb *redis.Client) (core.Pubsub, error) { func New(rdb *redis.Client) core.Pubsub {
if rdb != nil { if rdb != nil {
return newRedis(rdb) return newRedis(rdb)
} }
return newHub(), nil return newHub()
} }

View file

@ -26,6 +26,12 @@ import (
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
func newRedis(rdb *redis.Client) core.Pubsub {
return &hubRedis{
rdb: rdb,
}
}
const channelPubSub = "drone-events" const channelPubSub = "drone-events"
type hubRedis struct { type hubRedis struct {
@ -70,7 +76,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan
select { select {
case m, ok := <-ch: case m, ok := <-ch:
if !ok { if !ok {
errCh <- fmt.Errorf("redis pubsub channel=%s closed", channelPubSub) errCh <- fmt.Errorf("pubsub/redis: channel=%s closed", channelPubSub)
return return
} }
@ -79,7 +85,7 @@ func (h *hubRedis) Subscribe(ctx context.Context) (<-chan *core.Message, <-chan
if err != nil { if err != nil {
// This is a "should not happen" situation, // This is a "should not happen" situation,
// because messages are encoded as json above in Publish(). // because messages are encoded as json above in Publish().
_, _ = fmt.Fprintf(os.Stderr, "error@pubsub: failed to unmarshal a message. %s\n", err) _, _ = fmt.Fprintf(os.Stderr, "pubsub/redis: failed to unmarshal a message. %s\n", err)
continue continue
} }
@ -100,13 +106,13 @@ func (h *hubRedis) Subscribers() (int, error) {
v, err := h.rdb.Do(ctx, "pubsub", "numsub", channelPubSub).Result() v, err := h.rdb.Do(ctx, "pubsub", "numsub", channelPubSub).Result()
if err != nil { if err != nil {
err = fmt.Errorf("error@pubsub: failed to get number of subscribers. %w", err) err = fmt.Errorf("pubsub/redis: failed to get number of subscribers. %w", err)
return 0, err return 0, err
} }
values, ok := v.([]interface{}) // the result should be: [<channel_name:string>, <subscriber_count:int64>] values, ok := v.([]interface{}) // the result should be: [<channel_name:string>, <subscriber_count:int64>]
if !ok || len(values) != 2 { if !ok || len(values) != 2 {
err = fmt.Errorf("error@pubsub: failed to extarct number of subscribers from: %v", values) err = fmt.Errorf("pubsub/redis: failed to extarct number of subscribers from: %v", values)
return 0, err return 0, err
} }
@ -124,21 +130,7 @@ func (h *hubRedis) Subscribers() (int, error) {
case uint64: case uint64:
return int(n), nil return int(n), nil
default: default:
err = fmt.Errorf("error@pubsub: unsupported type for number of subscribers: %T", values[1]) err = fmt.Errorf("pubsub/redis: unsupported type for number of subscribers: %T", values[1])
return 0, err return 0, err
} }
} }
func newRedis(rdb *redis.Client) (ps core.Pubsub, err error) {
_, err = rdb.Ping(context.Background()).Result()
if err != nil {
err = fmt.Errorf("redis not accessibe: %w", err)
return
}
ps = &hubRedis{
rdb: rdb,
}
return
}