removed unused packages
This commit is contained in:
parent
584ec88b07
commit
778971eb68
31 changed files with 69 additions and 1120 deletions
|
@ -11,7 +11,6 @@ import (
|
|||
|
||||
"github.com/drone/drone/build"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/version"
|
||||
"github.com/drone/drone/yaml"
|
||||
"github.com/drone/drone/yaml/expander"
|
||||
|
@ -48,7 +47,7 @@ func (a *Agent) Poll() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error {
|
||||
func (a *Agent) Run(payload *model.Work, cancel <-chan bool) error {
|
||||
|
||||
payload.Job.Status = model.StatusRunning
|
||||
payload.Job.Started = time.Now().Unix()
|
||||
|
@ -90,7 +89,7 @@ func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) {
|
||||
func (a *Agent) prep(w *model.Work) (*yaml.Config, error) {
|
||||
|
||||
envs := toEnv(w)
|
||||
w.Yaml = expander.ExpandString(w.Yaml, envs)
|
||||
|
@ -172,7 +171,7 @@ func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) {
|
|||
return conf, nil
|
||||
}
|
||||
|
||||
func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) error {
|
||||
func (a *Agent) exec(spec *yaml.Config, payload *model.Work, cancel <-chan bool) error {
|
||||
|
||||
conf := build.Config{
|
||||
Engine: a.Engine,
|
||||
|
@ -231,7 +230,7 @@ func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool)
|
|||
}
|
||||
}
|
||||
|
||||
func toEnv(w *queue.Work) map[string]string {
|
||||
func toEnv(w *model.Work) map[string]string {
|
||||
envs := map[string]string{
|
||||
"CI": "drone",
|
||||
"DRONE": "true",
|
||||
|
|
|
@ -6,17 +6,16 @@ import (
|
|||
"github.com/Sirupsen/logrus"
|
||||
"github.com/drone/drone/build"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/mq/stomp"
|
||||
)
|
||||
|
||||
// UpdateFunc handles buid pipeline status updates.
|
||||
type UpdateFunc func(*queue.Work)
|
||||
type UpdateFunc func(*model.Work)
|
||||
|
||||
// LoggerFunc handles buid pipeline logging updates.
|
||||
type LoggerFunc func(*build.Line)
|
||||
|
||||
var NoopUpdateFunc = func(*queue.Work) {}
|
||||
var NoopUpdateFunc = func(*model.Work) {}
|
||||
|
||||
var TermLoggerFunc = func(line *build.Line) {
|
||||
fmt.Println(line)
|
||||
|
@ -25,7 +24,7 @@ var TermLoggerFunc = func(line *build.Line) {
|
|||
// NewClientUpdater returns an updater that sends updated build details
|
||||
// to the drone server.
|
||||
func NewClientUpdater(client *stomp.Client) UpdateFunc {
|
||||
return func(w *queue.Work) {
|
||||
return func(w *model.Work) {
|
||||
err := client.SendJSON("/queue/updates", w)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error updating %s/%s#%d.%d. %s",
|
||||
|
|
39
bus/bus.go
39
bus/bus.go
|
@ -1,39 +0,0 @@
|
|||
package bus
|
||||
|
||||
//go:generate mockery -name Bus -output mock -case=underscore
|
||||
|
||||
// Bus represents an event bus implementation that
|
||||
// allows a publisher to broadcast Event notifications
|
||||
// to a list of subscribers.
|
||||
type Bus interface {
|
||||
// Publish broadcasts an event to all subscribers.
|
||||
Publish(*Event)
|
||||
|
||||
// Subscribe adds the channel to the list of
|
||||
// subscribers. Each subscriber in the list will
|
||||
// receive broadcast events.
|
||||
Subscribe(chan *Event)
|
||||
|
||||
// Unsubscribe removes the channel from the list
|
||||
// of subscribers.
|
||||
Unsubscribe(chan *Event)
|
||||
}
|
||||
|
||||
//
|
||||
// // Publish broadcasts an event to all subscribers.
|
||||
// func Publish(c context.Context, event *Event) {
|
||||
// FromContext(c).Publish(event)
|
||||
// }
|
||||
//
|
||||
// // Subscribe adds the channel to the list of
|
||||
// // subscribers. Each subscriber in the list will
|
||||
// // receive broadcast events.
|
||||
// func Subscribe(c context.Context, eventc chan *Event) {
|
||||
// FromContext(c).Subscribe(eventc)
|
||||
// }
|
||||
//
|
||||
// // Unsubscribe removes the channel from the
|
||||
// // list of subscribers.
|
||||
// func Unsubscribe(c context.Context, eventc chan *Event) {
|
||||
// FromContext(c).Unsubscribe(eventc)
|
||||
// }
|
|
@ -1,46 +0,0 @@
|
|||
package bus
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type eventbus struct {
|
||||
sync.Mutex
|
||||
subs map[chan *Event]bool
|
||||
}
|
||||
|
||||
// New creates a simple event bus that manages a list of
|
||||
// subscribers to which events are published.
|
||||
func New() Bus {
|
||||
return newEventbus()
|
||||
}
|
||||
|
||||
func newEventbus() *eventbus {
|
||||
return &eventbus{
|
||||
subs: make(map[chan *Event]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *eventbus) Subscribe(c chan *Event) {
|
||||
b.Lock()
|
||||
b.subs[c] = true
|
||||
b.Unlock()
|
||||
}
|
||||
|
||||
func (b *eventbus) Unsubscribe(c chan *Event) {
|
||||
b.Lock()
|
||||
delete(b.subs, c)
|
||||
b.Unlock()
|
||||
}
|
||||
|
||||
func (b *eventbus) Publish(event *Event) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
for s := range b.subs {
|
||||
go func(c chan *Event) {
|
||||
defer recover()
|
||||
c <- event
|
||||
}(s)
|
||||
}
|
||||
}
|
|
@ -1,73 +0,0 @@
|
|||
package bus
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/drone/drone/model"
|
||||
. "github.com/franela/goblin"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestBus(t *testing.T) {
|
||||
g := Goblin(t)
|
||||
g.Describe("Event bus", func() {
|
||||
|
||||
g.It("Should unsubscribe", func() {
|
||||
c := new(gin.Context)
|
||||
b := newEventbus()
|
||||
ToContext(c, b)
|
||||
|
||||
c1 := make(chan *Event)
|
||||
c2 := make(chan *Event)
|
||||
Subscribe(c, c1)
|
||||
Subscribe(c, c2)
|
||||
|
||||
g.Assert(len(b.subs)).Equal(2)
|
||||
})
|
||||
|
||||
g.It("Should subscribe", func() {
|
||||
c := new(gin.Context)
|
||||
b := newEventbus()
|
||||
ToContext(c, b)
|
||||
|
||||
c1 := make(chan *Event)
|
||||
c2 := make(chan *Event)
|
||||
Subscribe(c, c1)
|
||||
Subscribe(c, c2)
|
||||
|
||||
g.Assert(len(b.subs)).Equal(2)
|
||||
|
||||
Unsubscribe(c, c1)
|
||||
Unsubscribe(c, c2)
|
||||
|
||||
g.Assert(len(b.subs)).Equal(0)
|
||||
})
|
||||
|
||||
g.It("Should publish", func() {
|
||||
c := new(gin.Context)
|
||||
b := New()
|
||||
ToContext(c, b)
|
||||
|
||||
e1 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{})
|
||||
e2 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{})
|
||||
c1 := make(chan *Event)
|
||||
|
||||
Subscribe(c, c1)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
var r1, r2 *Event
|
||||
go func() {
|
||||
r1 = <-c1
|
||||
r2 = <-c1
|
||||
wg.Done()
|
||||
}()
|
||||
Publish(c, e1)
|
||||
Publish(c, e2)
|
||||
wg.Wait()
|
||||
})
|
||||
})
|
||||
|
||||
}
|
|
@ -1,21 +0,0 @@
|
|||
package bus
|
||||
|
||||
import "golang.org/x/net/context"
|
||||
|
||||
const key = "bus"
|
||||
|
||||
// Setter defines a context that enables setting values.
|
||||
type Setter interface {
|
||||
Set(string, interface{})
|
||||
}
|
||||
|
||||
// FromContext returns the Bus associated with this context.
|
||||
func FromContext(c context.Context) Bus {
|
||||
return c.Value(key).(Bus)
|
||||
}
|
||||
|
||||
// ToContext adds the Bus to this context if it supports
|
||||
// the Setter interface.
|
||||
func ToContext(c Setter, b Bus) {
|
||||
c.Set(key, b)
|
||||
}
|
|
@ -8,7 +8,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/mq/stomp"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
|
@ -190,7 +190,7 @@ func start(c *cli.Context) {
|
|||
},
|
||||
}
|
||||
|
||||
work := new(queue.Work)
|
||||
work := new(model.Work)
|
||||
m.Unmarshal(work)
|
||||
r.run(work)
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"github.com/Sirupsen/logrus"
|
||||
"github.com/drone/drone/agent"
|
||||
"github.com/drone/drone/build/docker"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/mq/stomp"
|
||||
|
||||
"github.com/samalba/dockerclient"
|
||||
|
@ -27,7 +27,7 @@ type pipeline struct {
|
|||
config config
|
||||
}
|
||||
|
||||
func (r *pipeline) run(w *queue.Work) {
|
||||
func (r *pipeline) run(w *model.Work) {
|
||||
|
||||
// defer func() {
|
||||
// // r.drone.Ack(id, opts)
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/drone/drone/agent"
|
||||
"github.com/drone/drone/build/docker"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/yaml"
|
||||
|
||||
"github.com/codegangsta/cli"
|
||||
|
@ -340,7 +339,7 @@ func exec(c *cli.Context) error {
|
|||
Pull: c.Bool("pull"),
|
||||
}
|
||||
|
||||
payload := &queue.Work{
|
||||
payload := &model.Work{
|
||||
Yaml: string(file),
|
||||
Verified: c.BoolT("yaml.verified"),
|
||||
Signed: c.BoolT("yaml.signed"),
|
||||
|
|
|
@ -288,9 +288,6 @@ func server(c *cli.Context) error {
|
|||
ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true),
|
||||
middleware.Version,
|
||||
middleware.Config(c),
|
||||
middleware.Queue(c),
|
||||
middleware.Stream(c),
|
||||
middleware.Bus(c),
|
||||
middleware.Cache(c),
|
||||
middleware.Store(c),
|
||||
middleware.Remote(c),
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
package bus
|
||||
|
||||
import "github.com/drone/drone/model"
|
||||
package model
|
||||
|
||||
// EventType defines the possible types of build events.
|
||||
type EventType string
|
||||
|
@ -14,15 +12,15 @@ const (
|
|||
|
||||
// Event represents a build event.
|
||||
type Event struct {
|
||||
Type EventType `json:"type"`
|
||||
Repo model.Repo `json:"repo"`
|
||||
Build model.Build `json:"build"`
|
||||
Job model.Job `json:"job"`
|
||||
Type EventType `json:"type"`
|
||||
Repo Repo `json:"repo"`
|
||||
Build Build `json:"build"`
|
||||
Job Job `json:"job"`
|
||||
}
|
||||
|
||||
// NewEvent creates a new Event for the build, using copies of
|
||||
// the build data to avoid possible mutation or race conditions.
|
||||
func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event {
|
||||
func NewEvent(t EventType, r *Repo, b *Build, j *Job) *Event {
|
||||
return &Event{
|
||||
Type: t,
|
||||
Repo: *r,
|
||||
|
@ -31,7 +29,7 @@ func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event {
|
|||
}
|
||||
}
|
||||
|
||||
func NewBuildEvent(t EventType, r *model.Repo, b *model.Build) *Event {
|
||||
func NewBuildEvent(t EventType, r *Repo, b *Build) *Event {
|
||||
return &Event{
|
||||
Type: t,
|
||||
Repo: *r,
|
19
model/work.go
Normal file
19
model/work.go
Normal file
|
@ -0,0 +1,19 @@
|
|||
package model
|
||||
|
||||
// Work represents an item for work to be
|
||||
// processed by a worker.
|
||||
type Work struct {
|
||||
Signed bool `json:"signed"`
|
||||
Verified bool `json:"verified"`
|
||||
Yaml string `json:"config"`
|
||||
YamlEnc string `json:"secret"`
|
||||
Repo *Repo `json:"repo"`
|
||||
Build *Build `json:"build"`
|
||||
BuildLast *Build `json:"build_last"`
|
||||
Job *Job `json:"job"`
|
||||
Netrc *Netrc `json:"netrc"`
|
||||
Keys *Key `json:"keys"`
|
||||
System *System `json:"system"`
|
||||
Secrets []*Secret `json:"secrets"`
|
||||
User *User `json:"user"`
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const key = "queue"
|
||||
|
||||
// Setter defines a context that enables setting values.
|
||||
type Setter interface {
|
||||
Set(string, interface{})
|
||||
}
|
||||
|
||||
// FromContext returns the Queue associated with this context.
|
||||
func FromContext(c context.Context) Queue {
|
||||
return c.Value(key).(Queue)
|
||||
}
|
||||
|
||||
// ToContext adds the Queue to this context if it supports
|
||||
// the Setter interface.
|
||||
func ToContext(c Setter, q Queue) {
|
||||
c.Set(key, q)
|
||||
}
|
|
@ -1,67 +0,0 @@
|
|||
package queue
|
||||
|
||||
//go:generate mockery -name Queue -output mock -case=underscore
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// ErrNotFound indicates the requested work item does not
|
||||
// exist in the queue.
|
||||
var ErrNotFound = errors.New("queue item not found")
|
||||
|
||||
type Queue interface {
|
||||
// Publish inserts work at the tail of this queue, waiting for
|
||||
// space to become available if the queue is full.
|
||||
Publish(*Work) error
|
||||
|
||||
// Remove removes the specified work item from this queue,
|
||||
// if it is present.
|
||||
Remove(*Work) error
|
||||
|
||||
// PullClose retrieves and removes the head of this queue,
|
||||
// waiting if necessary until work becomes available.
|
||||
Pull() *Work
|
||||
|
||||
// PullClose retrieves and removes the head of this queue,
|
||||
// waiting if necessary until work becomes available. The
|
||||
// CloseNotifier should be provided to clone the channel
|
||||
// if the subscribing client terminates its connection.
|
||||
PullClose(CloseNotifier) *Work
|
||||
}
|
||||
|
||||
// Publish inserts work at the tail of this queue, waiting for
|
||||
// space to become available if the queue is full.
|
||||
func Publish(c context.Context, w *Work) error {
|
||||
return FromContext(c).Publish(w)
|
||||
}
|
||||
|
||||
// Remove removes the specified work item from this queue,
|
||||
// if it is present.
|
||||
func Remove(c context.Context, w *Work) error {
|
||||
return FromContext(c).Remove(w)
|
||||
}
|
||||
|
||||
// Pull retrieves and removes the head of this queue,
|
||||
// waiting if necessary until work becomes available.
|
||||
func Pull(c context.Context) *Work {
|
||||
return FromContext(c).Pull()
|
||||
}
|
||||
|
||||
// PullClose retrieves and removes the head of this queue,
|
||||
// waiting if necessary until work becomes available. The
|
||||
// CloseNotifier should be provided to clone the channel
|
||||
// if the subscribing client terminates its connection.
|
||||
func PullClose(c context.Context, cn CloseNotifier) *Work {
|
||||
return FromContext(c).PullClose(cn)
|
||||
}
|
||||
|
||||
// CloseNotifier defines a datastructure that is capable of notifying
|
||||
// a subscriber when its connection is closed.
|
||||
type CloseNotifier interface {
|
||||
// CloseNotify returns a channel that receives a single value
|
||||
// when the client connection has gone away.
|
||||
CloseNotify() <-chan bool
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
package queue
|
||||
|
||||
import "sync"
|
||||
|
||||
type queue struct {
|
||||
sync.Mutex
|
||||
|
||||
items map[*Work]struct{}
|
||||
itemc chan *Work
|
||||
}
|
||||
|
||||
func New() Queue {
|
||||
return newQueue()
|
||||
}
|
||||
|
||||
func newQueue() *queue {
|
||||
return &queue{
|
||||
items: make(map[*Work]struct{}),
|
||||
itemc: make(chan *Work, 999),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) Publish(work *Work) error {
|
||||
q.Lock()
|
||||
q.items[work] = struct{}{}
|
||||
q.Unlock()
|
||||
q.itemc <- work
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) Remove(work *Work) error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
_, ok := q.items[work]
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
}
|
||||
var items []*Work
|
||||
|
||||
// loop through and drain all items
|
||||
// from the
|
||||
drain:
|
||||
for {
|
||||
select {
|
||||
case item := <-q.itemc:
|
||||
items = append(items, item)
|
||||
default:
|
||||
break drain
|
||||
}
|
||||
}
|
||||
|
||||
// re-add all items to the queue except
|
||||
// the item we're trying to remove
|
||||
for _, item := range items {
|
||||
if item == work {
|
||||
delete(q.items, work)
|
||||
continue
|
||||
}
|
||||
q.itemc <- item
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) Pull() *Work {
|
||||
work := <-q.itemc
|
||||
q.Lock()
|
||||
delete(q.items, work)
|
||||
q.Unlock()
|
||||
return work
|
||||
}
|
||||
|
||||
func (q *queue) PullClose(cn CloseNotifier) *Work {
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
return nil
|
||||
case work := <-q.itemc:
|
||||
q.Lock()
|
||||
delete(q.items, work)
|
||||
q.Unlock()
|
||||
return work
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,94 +0,0 @@
|
|||
package queue
|
||||
|
||||
//
|
||||
// import (
|
||||
// "sync"
|
||||
// "testing"
|
||||
//
|
||||
// . "github.com/franela/goblin"
|
||||
// "github.com/gin-gonic/gin"
|
||||
// )
|
||||
//
|
||||
// func TestBuild(t *testing.T) {
|
||||
// g := Goblin(t)
|
||||
// g.Describe("Queue", func() {
|
||||
//
|
||||
// g.It("Should publish item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := newQueue()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// w1 := &Work{}
|
||||
// w2 := &Work{}
|
||||
// Publish(c, w1)
|
||||
// Publish(c, w2)
|
||||
// g.Assert(len(q.items)).Equal(2)
|
||||
// g.Assert(len(q.itemc)).Equal(2)
|
||||
// })
|
||||
//
|
||||
// g.It("Should remove item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := newQueue()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// w1 := &Work{}
|
||||
// w2 := &Work{}
|
||||
// w3 := &Work{}
|
||||
// Publish(c, w1)
|
||||
// Publish(c, w2)
|
||||
// Publish(c, w3)
|
||||
// Remove(c, w2)
|
||||
// g.Assert(len(q.items)).Equal(2)
|
||||
// g.Assert(len(q.itemc)).Equal(2)
|
||||
//
|
||||
// g.Assert(Pull(c)).Equal(w1)
|
||||
// g.Assert(Pull(c)).Equal(w3)
|
||||
// g.Assert(Remove(c, w2)).Equal(ErrNotFound)
|
||||
// })
|
||||
//
|
||||
// g.It("Should pull item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := New()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// cn := new(closeNotifier)
|
||||
// cn.closec = make(chan bool, 1)
|
||||
// w1 := &Work{}
|
||||
// w2 := &Work{}
|
||||
//
|
||||
// Publish(c, w1)
|
||||
// g.Assert(Pull(c)).Equal(w1)
|
||||
//
|
||||
// Publish(c, w2)
|
||||
// g.Assert(PullClose(c, cn)).Equal(w2)
|
||||
// })
|
||||
//
|
||||
// g.It("Should cancel pulling item", func() {
|
||||
// c := new(gin.Context)
|
||||
// q := New()
|
||||
// ToContext(c, q)
|
||||
//
|
||||
// cn := new(closeNotifier)
|
||||
// cn.closec = make(chan bool, 1)
|
||||
// var wg sync.WaitGroup
|
||||
// go func() {
|
||||
// wg.Add(1)
|
||||
// g.Assert(PullClose(c, cn) == nil).IsTrue()
|
||||
// wg.Done()
|
||||
// }()
|
||||
// go func() {
|
||||
// cn.closec <- true
|
||||
// }()
|
||||
// wg.Wait()
|
||||
//
|
||||
// })
|
||||
// })
|
||||
// }
|
||||
//
|
||||
// type closeNotifier struct {
|
||||
// closec chan bool
|
||||
// }
|
||||
//
|
||||
// func (c *closeNotifier) CloseNotify() <-chan bool {
|
||||
// return c.closec
|
||||
// }
|
|
@ -1,21 +0,0 @@
|
|||
package queue
|
||||
|
||||
import "github.com/drone/drone/model"
|
||||
|
||||
// Work represents an item for work to be
|
||||
// processed by a worker.
|
||||
type Work struct {
|
||||
Signed bool `json:"signed"`
|
||||
Verified bool `json:"verified"`
|
||||
Yaml string `json:"config"`
|
||||
YamlEnc string `json:"secret"`
|
||||
Repo *model.Repo `json:"repo"`
|
||||
Build *model.Build `json:"build"`
|
||||
BuildLast *model.Build `json:"build_last"`
|
||||
Job *model.Job `json:"job"`
|
||||
Netrc *model.Netrc `json:"netrc"`
|
||||
Keys *model.Key `json:"keys"`
|
||||
System *model.System `json:"system"`
|
||||
Secrets []*model.Secret `json:"secrets"`
|
||||
User *model.User `json:"user"`
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"github.com/drone/drone/bus"
|
||||
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Bus is a middleware function that initializes the Event Bus and attaches to
|
||||
// the context of every http.Request.
|
||||
func Bus(cli *cli.Context) gin.HandlerFunc {
|
||||
v := bus.New()
|
||||
return func(c *gin.Context) {
|
||||
bus.ToContext(c, v)
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"github.com/drone/drone/queue"
|
||||
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Queue is a middleware function that initializes the Queue and attaches to
|
||||
// the context of every http.Request.
|
||||
func Queue(cli *cli.Context) gin.HandlerFunc {
|
||||
v := queue.New()
|
||||
return func(c *gin.Context) {
|
||||
queue.ToContext(c, v)
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"github.com/drone/drone/stream"
|
||||
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Stream is a middleware function that initializes the Stream and attaches to
|
||||
// the context of every http.Request.
|
||||
func Stream(cli *cli.Context) gin.HandlerFunc {
|
||||
v := stream.New()
|
||||
return func(c *gin.Context) {
|
||||
stream.ToContext(c, v)
|
||||
}
|
||||
}
|
|
@ -1,17 +1,16 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/drone/drone/bus"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/remote"
|
||||
"github.com/drone/drone/shared/httputil"
|
||||
"github.com/drone/drone/store"
|
||||
"github.com/drone/drone/stream"
|
||||
"github.com/drone/drone/yaml"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/square/go-jose"
|
||||
|
@ -114,7 +113,7 @@ func GetBuildLogs(c *gin.Context) {
|
|||
}
|
||||
|
||||
c.Header("Content-Type", "application/json")
|
||||
stream.Copy(c.Writer, r)
|
||||
copyLogs(c.Writer, r)
|
||||
}
|
||||
|
||||
func DeleteBuild(c *gin.Context) {
|
||||
|
@ -151,8 +150,8 @@ func DeleteBuild(c *gin.Context) {
|
|||
store.UpdateBuildJob(c, build, job)
|
||||
|
||||
client := stomp.MustFromContext(c)
|
||||
client.SendJSON("/topic/cancel", bus.Event{
|
||||
Type: bus.Cancelled,
|
||||
client.SendJSON("/topic/cancel", model.Event{
|
||||
Type: model.Cancelled,
|
||||
Repo: *repo,
|
||||
Build: *build,
|
||||
Job: *job,
|
||||
|
@ -328,8 +327,8 @@ func PostBuild(c *gin.Context) {
|
|||
log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified)
|
||||
|
||||
client := stomp.MustFromContext(c)
|
||||
client.SendJSON("/topic/events", bus.Event{
|
||||
Type: bus.Enqueued,
|
||||
client.SendJSON("/topic/events", model.Event{
|
||||
Type: model.Enqueued,
|
||||
Repo: *repo,
|
||||
Build: *build,
|
||||
},
|
||||
|
@ -339,7 +338,7 @@ func PostBuild(c *gin.Context) {
|
|||
|
||||
for _, job := range jobs {
|
||||
broker, _ := stomp.FromContext(c)
|
||||
broker.SendJSON("/queue/pending", &queue.Work{
|
||||
broker.SendJSON("/queue/pending", &model.Work{
|
||||
Signed: signed,
|
||||
Verified: verified,
|
||||
User: user,
|
||||
|
@ -371,3 +370,20 @@ func GetBuildQueue(c *gin.Context) {
|
|||
}
|
||||
c.JSON(200, out)
|
||||
}
|
||||
|
||||
// copyLogs copies the stream from the source to the destination in valid JSON
|
||||
// format. This converts the logs, which are per-line JSON objects, to a
|
||||
// proper JSON array.
|
||||
func copyLogs(dest io.Writer, src io.Reader) error {
|
||||
io.WriteString(dest, "[")
|
||||
|
||||
scanner := bufio.NewScanner(src)
|
||||
for scanner.Scan() {
|
||||
io.WriteString(dest, scanner.Text())
|
||||
io.WriteString(dest, ",\n")
|
||||
}
|
||||
|
||||
io.WriteString(dest, "{}]")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -9,9 +9,7 @@ import (
|
|||
"github.com/square/go-jose"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/drone/drone/bus"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/remote"
|
||||
"github.com/drone/drone/shared/httputil"
|
||||
"github.com/drone/drone/shared/token"
|
||||
|
@ -214,8 +212,8 @@ func PostHook(c *gin.Context) {
|
|||
}
|
||||
|
||||
client := stomp.MustFromContext(c)
|
||||
client.SendJSON("/topic/events", bus.Event{
|
||||
Type: bus.Enqueued,
|
||||
client.SendJSON("/topic/events", model.Event{
|
||||
Type: model.Enqueued,
|
||||
Repo: *repo,
|
||||
Build: *build,
|
||||
},
|
||||
|
@ -225,7 +223,7 @@ func PostHook(c *gin.Context) {
|
|||
|
||||
for _, job := range jobs {
|
||||
broker, _ := stomp.FromContext(c)
|
||||
broker.SendJSON("/queue/pending", &queue.Work{
|
||||
broker.SendJSON("/queue/pending", &model.Work{
|
||||
Signed: build.Signed,
|
||||
Verified: build.Verified,
|
||||
User: user,
|
||||
|
|
282
server/queue.go
282
server/queue.go
|
@ -10,289 +10,13 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/drone/drone/bus"
|
||||
"github.com/drone/drone/model"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/remote"
|
||||
"github.com/drone/drone/store"
|
||||
"github.com/drone/mq/stomp"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
//
|
||||
// // Pull is a long request that polls and attemts to pull work off the queue stack.
|
||||
// func Pull(c *gin.Context) {
|
||||
// logrus.Debugf("Agent %s connected.", c.ClientIP())
|
||||
//
|
||||
// w := queue.PullClose(c, c.Writer)
|
||||
// if w == nil {
|
||||
// logrus.Debugf("Agent %s could not pull work.", c.ClientIP())
|
||||
// } else {
|
||||
//
|
||||
// // setup the channel to stream logs
|
||||
// if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil {
|
||||
// logrus.Errorf("Unable to create stream. %s", err)
|
||||
// }
|
||||
//
|
||||
// c.JSON(202, w)
|
||||
//
|
||||
// logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d",
|
||||
// c.ClientIP(),
|
||||
// w.Repo.Owner,
|
||||
// w.Repo.Name,
|
||||
// w.Build.Number,
|
||||
// w.Job.Number,
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Wait is a long request that polls and waits for cancelled build requests.
|
||||
// func Wait(c *gin.Context) {
|
||||
// id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// if err != nil {
|
||||
// c.String(500, "Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// eventc := make(chan *bus.Event, 1)
|
||||
//
|
||||
// bus.Subscribe(c, eventc)
|
||||
// defer bus.Unsubscribe(c, eventc)
|
||||
//
|
||||
// for {
|
||||
// select {
|
||||
// case event := <-eventc:
|
||||
// if event.Job.ID == id && event.Type == bus.Cancelled {
|
||||
// c.JSON(200, event.Job)
|
||||
// return
|
||||
// }
|
||||
// case <-c.Writer.CloseNotify():
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Update handles build updates from the agent and persists to the database.
|
||||
// func Update(c *gin.Context) {
|
||||
// work := &queue.Work{}
|
||||
// if err := c.BindJSON(work); err != nil {
|
||||
// logrus.Errorf("Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // TODO(bradrydzewski) it is really annoying that we have to do this lookup
|
||||
// // and I'd prefer not to. The reason we do this is because the Build and Job
|
||||
// // have fields that aren't serialized to json and would be reset to their
|
||||
// // empty values if we just saved what was coming in the http.Request body.
|
||||
// build, err := store.GetBuild(c, work.Build.ID)
|
||||
// if err != nil {
|
||||
// c.String(404, "Unable to find build. %s", err)
|
||||
// return
|
||||
// }
|
||||
// job, err := store.GetJob(c, work.Job.ID)
|
||||
// if err != nil {
|
||||
// c.String(404, "Unable to find job. %s", err)
|
||||
// return
|
||||
// }
|
||||
// build.Started = work.Build.Started
|
||||
// build.Finished = work.Build.Finished
|
||||
// build.Status = work.Build.Status
|
||||
// job.Started = work.Job.Started
|
||||
// job.Finished = work.Job.Finished
|
||||
// job.Status = work.Job.Status
|
||||
// job.ExitCode = work.Job.ExitCode
|
||||
// job.Error = work.Job.Error
|
||||
//
|
||||
// if build.Status == model.StatusPending {
|
||||
// build.Started = work.Job.Started
|
||||
// build.Status = model.StatusRunning
|
||||
// store.UpdateBuild(c, build)
|
||||
// }
|
||||
//
|
||||
// // if job.Status == model.StatusRunning {
|
||||
// // err := stream.Create(c, stream.ToKey(job.ID))
|
||||
// // if err != nil {
|
||||
// // logrus.Errorf("Unable to create stream. %s", err)
|
||||
// // }
|
||||
// // }
|
||||
//
|
||||
// ok, err := store.UpdateBuildJob(c, build, job)
|
||||
// if err != nil {
|
||||
// c.String(500, "Unable to update job. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// if ok && build.Status != model.StatusRunning {
|
||||
// // get the user because we transfer the user form the server to agent
|
||||
// // and back we lose the token which does not get serialized to json.
|
||||
// user, err := store.GetUser(c, work.User.ID)
|
||||
// if err != nil {
|
||||
// c.String(500, "Unable to find user. %s", err)
|
||||
// return
|
||||
// }
|
||||
// remote.Status(c, user, work.Repo, build,
|
||||
// fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number))
|
||||
// }
|
||||
//
|
||||
// if build.Status == model.StatusRunning {
|
||||
// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job))
|
||||
// } else {
|
||||
// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job))
|
||||
// }
|
||||
//
|
||||
// c.JSON(200, work)
|
||||
// }
|
||||
//
|
||||
// // Stream streams the logs to disk or memory for broadcasing to listeners. Once
|
||||
// // the stream is closed it is moved to permanent storage in the database.
|
||||
// func Stream(c *gin.Context) {
|
||||
// id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// if err != nil {
|
||||
// c.String(500, "Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// key := c.Param("id")
|
||||
// logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key)
|
||||
//
|
||||
// wc, err := stream.Writer(c, key)
|
||||
// if err != nil {
|
||||
// c.String(500, "Failed to create stream writer. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// defer func() {
|
||||
// wc.Close()
|
||||
// stream.Delete(c, key)
|
||||
// }()
|
||||
//
|
||||
// io.Copy(wc, c.Request.Body)
|
||||
//
|
||||
// rc, err := stream.Reader(c, key)
|
||||
// if err != nil {
|
||||
// c.String(500, "Failed to create stream reader. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// wg := sync.WaitGroup{}
|
||||
// wg.Add(1)
|
||||
//
|
||||
// go func() {
|
||||
// defer recover()
|
||||
// store.WriteLog(c, &model.Job{ID: id}, rc)
|
||||
// wg.Done()
|
||||
// }()
|
||||
//
|
||||
// wc.Close()
|
||||
// wg.Wait()
|
||||
// c.String(200, "")
|
||||
//
|
||||
// logrus.Debugf("Agent %s wrote stream to database", c.ClientIP())
|
||||
// }
|
||||
//
|
||||
// func Ping(c *gin.Context) {
|
||||
// agent, err := store.GetAgentAddr(c, c.ClientIP())
|
||||
// if err == nil {
|
||||
// agent.Updated = time.Now().Unix()
|
||||
// err = store.UpdateAgent(c, agent)
|
||||
// } else {
|
||||
// err = store.CreateAgent(c, &model.Agent{
|
||||
// Address: c.ClientIP(),
|
||||
// Platform: "linux/amd64",
|
||||
// Capacity: 2,
|
||||
// Created: time.Now().Unix(),
|
||||
// Updated: time.Now().Unix(),
|
||||
// })
|
||||
// }
|
||||
// if err != nil {
|
||||
// logrus.Errorf("Unable to register agent. %s", err.Error())
|
||||
// }
|
||||
// c.String(200, "PONG")
|
||||
// }
|
||||
|
||||
//
|
||||
//
|
||||
// Below are alternate implementations for the Queue that use websockets.
|
||||
//
|
||||
//
|
||||
//
|
||||
// // PostLogs handles an http request from the agent to post build logs. These
|
||||
// // logs are posted at the end of the build process.
|
||||
// func PostLogs(c *gin.Context) {
|
||||
// id, _ := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// job, err := store.GetJob(c, id)
|
||||
// if err != nil {
|
||||
// c.String(404, "Cannot upload logs. %s", err)
|
||||
// return
|
||||
// }
|
||||
// if err := store.WriteLog(c, job, c.Request.Body); err != nil {
|
||||
// c.String(500, "Cannot persist logs", err)
|
||||
// return
|
||||
// }
|
||||
// c.String(200, "")
|
||||
// }
|
||||
//
|
||||
// // WriteLogs handles an http request from the agent to stream build logs from
|
||||
// // the agent to the server to enable real time streamings to the client.
|
||||
// func WriteLogs(c *gin.Context) {
|
||||
// id, err := strconv.ParseInt(c.Param("id"), 10, 64)
|
||||
// if err != nil {
|
||||
// c.String(500, "Invalid input. %s", err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
// if err != nil {
|
||||
// c.String(500, "Cannot upgrade to websocket. %s", err)
|
||||
// return
|
||||
// }
|
||||
// defer conn.Close()
|
||||
//
|
||||
// wc, err := stream.Writer(c, stream.ToKey(id))
|
||||
// if err != nil {
|
||||
// c.String(500, "Cannot create stream writer. %s", err)
|
||||
// return
|
||||
// }
|
||||
// defer func() {
|
||||
// wc.Close()
|
||||
// stream.Delete(c, stream.ToKey(id))
|
||||
// }()
|
||||
//
|
||||
// var msg []byte
|
||||
// for {
|
||||
// _, msg, err = conn.ReadMessage()
|
||||
// if err != nil {
|
||||
// break
|
||||
// }
|
||||
// wc.Write(msg)
|
||||
// wc.Write(newline)
|
||||
// }
|
||||
//
|
||||
// if err != nil && err != io.EOF {
|
||||
// c.String(500, "Error reading logs. %s", err)
|
||||
// return
|
||||
// }
|
||||
// //
|
||||
// // rc, err := stream.Reader(c, stream.ToKey(id))
|
||||
// // if err != nil {
|
||||
// // c.String(500, "Failed to create stream reader. %s", err)
|
||||
// // return
|
||||
// // }
|
||||
// //
|
||||
// // wg := sync.WaitGroup{}
|
||||
// // wg.Add(1)
|
||||
// //
|
||||
// // go func() {
|
||||
// // defer recover()
|
||||
// // store.WriteLog(c, &model.Job{ID: id}, rc)
|
||||
// // wg.Done()
|
||||
// // }()
|
||||
// //
|
||||
// // wc.Close()
|
||||
// // wg.Wait()
|
||||
//
|
||||
// }
|
||||
|
||||
// newline defines a newline constant to separate lines in the build output
|
||||
var newline = []byte{'\n'}
|
||||
|
||||
|
@ -315,7 +39,7 @@ func HandleUpdate(c context.Context, message *stomp.Message) {
|
|||
}
|
||||
}()
|
||||
|
||||
work := new(queue.Work)
|
||||
work := new(model.Work)
|
||||
if err := message.Unmarshal(work); err != nil {
|
||||
logrus.Errorf("Invalid input. %s", err)
|
||||
return
|
||||
|
@ -376,8 +100,8 @@ func HandleUpdate(c context.Context, message *stomp.Message) {
|
|||
}
|
||||
|
||||
client := stomp.MustFromContext(c)
|
||||
err = client.SendJSON("/topic/events", bus.Event{
|
||||
Type: bus.Started,
|
||||
err = client.SendJSON("/topic/events", model.Event{
|
||||
Type: model.Started,
|
||||
Repo: *work.Repo,
|
||||
Build: *build,
|
||||
Job: *job,
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
package stream
|
||||
|
||||
import "golang.org/x/net/context"
|
||||
|
||||
const key = "stream"
|
||||
|
||||
// Setter defines a context that enables setting values.
|
||||
type Setter interface {
|
||||
Set(string, interface{})
|
||||
}
|
||||
|
||||
// FromContext returns the Stream associated with this context.
|
||||
func FromContext(c context.Context) Stream {
|
||||
return c.Value(key).(Stream)
|
||||
}
|
||||
|
||||
// ToContext adds the Stream to this context if it supports the
|
||||
// Setter interface.
|
||||
func ToContext(c Setter, s Stream) {
|
||||
c.Set(key, s)
|
||||
}
|
|
@ -1,54 +0,0 @@
|
|||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type reader struct {
|
||||
w *writer
|
||||
off int
|
||||
closed uint32
|
||||
}
|
||||
|
||||
// Read reads from the Buffer
|
||||
func (r *reader) Read(p []byte) (n int, err error) {
|
||||
r.w.RLock()
|
||||
defer r.w.RUnlock()
|
||||
|
||||
var m int
|
||||
|
||||
for len(p) > 0 {
|
||||
|
||||
m, _ = bytes.NewReader(r.w.buffer.Bytes()[r.off:]).Read(p)
|
||||
n += m
|
||||
r.off += n
|
||||
|
||||
if n > 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if r.w.Closed() {
|
||||
err = io.EOF
|
||||
break
|
||||
}
|
||||
if r.Closed() {
|
||||
err = io.EOF
|
||||
break
|
||||
}
|
||||
|
||||
r.w.Wait()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *reader) Close() error {
|
||||
atomic.StoreUint32(&r.closed, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *reader) Closed() bool {
|
||||
return atomic.LoadUint32(&r.closed) != 0
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
package stream
|
||||
|
||||
import "testing"
|
||||
|
||||
func TetsReader(t *testing.T) {
|
||||
t.Skip() //TODO(bradrydzewski) implement reader tests
|
||||
}
|
|
@ -1,60 +0,0 @@
|
|||
package stream
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"strconv"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Stream manages the stream of build logs.
|
||||
type Stream interface {
|
||||
Create(string) error
|
||||
Delete(string) error
|
||||
Reader(string) (io.ReadCloser, error)
|
||||
Writer(string) (io.WriteCloser, error)
|
||||
}
|
||||
|
||||
// Create creates a new stream.
|
||||
func Create(c context.Context, key string) error {
|
||||
return FromContext(c).Create(key)
|
||||
}
|
||||
|
||||
// Reader opens the stream for reading.
|
||||
func Reader(c context.Context, key string) (io.ReadCloser, error) {
|
||||
return FromContext(c).Reader(key)
|
||||
}
|
||||
|
||||
// Writer opens the stream for writing.
|
||||
func Writer(c context.Context, key string) (io.WriteCloser, error) {
|
||||
return FromContext(c).Writer(key)
|
||||
}
|
||||
|
||||
// Delete deletes the stream by key.
|
||||
func Delete(c context.Context, key string) error {
|
||||
return FromContext(c).Delete(key)
|
||||
}
|
||||
|
||||
// ToKey is a helper function that converts a unique identifier
|
||||
// of type int64 into a string.
|
||||
func ToKey(i int64) string {
|
||||
return strconv.FormatInt(i, 10)
|
||||
}
|
||||
|
||||
// Copy copies the stream from the source to the destination in valid JSON
|
||||
// format. This converts the logs, which are per-line JSON objects, to a
|
||||
// proper JSON array.
|
||||
func Copy(dest io.Writer, src io.Reader) error {
|
||||
io.WriteString(dest, "[")
|
||||
|
||||
scanner := bufio.NewScanner(src)
|
||||
for scanner.Scan() {
|
||||
io.WriteString(dest, scanner.Text())
|
||||
io.WriteString(dest, ",\n")
|
||||
}
|
||||
|
||||
io.WriteString(dest, "{}]")
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,72 +0,0 @@
|
|||
package stream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type stream struct {
|
||||
sync.Mutex
|
||||
writers map[string]*writer
|
||||
}
|
||||
|
||||
// New returns a new in-memory implementation of Stream.
|
||||
func New() Stream {
|
||||
return &stream{
|
||||
writers: map[string]*writer{},
|
||||
}
|
||||
}
|
||||
|
||||
// Reader returns an io.Reader for reading from to the stream.
|
||||
func (s *stream) Reader(name string) (io.ReadCloser, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if !s.exists(name) {
|
||||
return nil, fmt.Errorf("stream: cannot read stream %s, not found", name)
|
||||
}
|
||||
return s.writers[name].Reader()
|
||||
}
|
||||
|
||||
// Writer returns an io.WriteCloser for writing to the stream.
|
||||
func (s *stream) Writer(name string) (io.WriteCloser, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if !s.exists(name) {
|
||||
return nil, fmt.Errorf("stream: cannot write stream %s, not found", name)
|
||||
}
|
||||
return s.writers[name], nil
|
||||
}
|
||||
|
||||
// Create creates a new stream.
|
||||
func (s *stream) Create(name string) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if s.exists(name) {
|
||||
return fmt.Errorf("stream: cannot create stream %s, already exists", name)
|
||||
}
|
||||
|
||||
s.writers[name] = newWriter()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the stream by key.
|
||||
func (s *stream) Delete(name string) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if !s.exists(name) {
|
||||
return fmt.Errorf("stream: cannot delete stream %s, not found", name)
|
||||
}
|
||||
w := s.writers[name]
|
||||
delete(s.writers, name)
|
||||
return w.Close()
|
||||
}
|
||||
|
||||
func (s *stream) exists(name string) bool {
|
||||
_, exists := s.writers[name]
|
||||
return exists
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
package stream
|
||||
|
||||
import "testing"
|
||||
|
||||
func TetsStream(t *testing.T) {
|
||||
t.Skip() //TODO(bradrydzewski) implement stream tests
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type writer struct {
|
||||
sync.RWMutex
|
||||
*sync.Cond
|
||||
|
||||
buffer bytes.Buffer
|
||||
closed uint32
|
||||
}
|
||||
|
||||
func newWriter() *writer {
|
||||
var w writer
|
||||
w.Cond = sync.NewCond(w.RWMutex.RLocker())
|
||||
return &w
|
||||
}
|
||||
|
||||
func (w *writer) Write(p []byte) (n int, err error) {
|
||||
defer w.Broadcast()
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
if w.Closed() {
|
||||
return 0, io.EOF
|
||||
}
|
||||
return w.buffer.Write(p)
|
||||
}
|
||||
|
||||
func (w *writer) Reader() (io.ReadCloser, error) {
|
||||
return &reader{w: w}, nil
|
||||
}
|
||||
|
||||
func (w *writer) Wait() {
|
||||
if !w.Closed() {
|
||||
w.Cond.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) Close() error {
|
||||
atomic.StoreUint32(&w.closed, 1)
|
||||
w.Cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *writer) Closed() bool {
|
||||
return atomic.LoadUint32(&w.closed) != 0
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
package stream
|
||||
|
||||
import "testing"
|
||||
|
||||
func TetsWriter(t *testing.T) {
|
||||
t.Skip() //TODO(bradrydzewski) implement writer tests
|
||||
}
|
Loading…
Reference in a new issue