diff --git a/queue/builtin/queue.go b/queue/builtin/queue.go index bd444573..6256cad2 100644 --- a/queue/builtin/queue.go +++ b/queue/builtin/queue.go @@ -35,6 +35,37 @@ func (q *Queue) Publish(work *queue.Work) error { // Remove removes the specified work item from this queue, // if it is present. func (q *Queue) Remove(work *queue.Work) error { + q.Lock() + defer q.Unlock() + + _, ok := q.items[work] + if !ok { + return nil + } + var items []*queue.Work + + // loop through and drain all items + // from the queue. +drain: + for { + select { + case item := <-q.itemc: + items = append(items, item) + default: + break drain + } + } + + // re-add all items to the queue except + // the item we're trying to remove + for _, item := range items { + if item == work { + delete(q.items, work) + delete(q.acks, work) + continue + } + q.itemc <- item + } return nil } @@ -48,7 +79,7 @@ func (q *Queue) Pull() *queue.Work { return work } -// PullAct retrieves and removes the head of this queue, waiting +// PullAck retrieves and removes the head of this queue, waiting // if necessary until work becomes available. Items pull from the // queue that aren't acknowledged will be pushed back to the queue // again when the default acknowledgement deadline is reached. @@ -74,7 +105,7 @@ func (q *Queue) Items() []*queue.Work { q.Lock() defer q.Unlock() items := []*queue.Work{} - for work, _ := range q.items { + for work := range q.items { items = append(items, work) } return items diff --git a/queue/builtin/queue_test.go b/queue/builtin/queue_test.go new file mode 100644 index 00000000..24cb8033 --- /dev/null +++ b/queue/builtin/queue_test.go @@ -0,0 +1,75 @@ +package builtin + +import ( + "testing" + + "github.com/drone/drone/queue" + . "github.com/franela/goblin" +) + +func TestBuild(t *testing.T) { + g := Goblin(t) + g.Describe("Queue", func() { + + g.It("Should publish item", func() { + w1 := &queue.Work{} + w2 := &queue.Work{} + q := New() + q.Publish(w1) + q.Publish(w2) + g.Assert(len(q.items)).Equal(2) + g.Assert(len(q.itemc)).Equal(2) + }) + + g.It("Should remove item", func() { + w1 := &queue.Work{} + w2 := &queue.Work{} + w3 := &queue.Work{} + q := New() + q.Publish(w1) + q.Publish(w2) + q.Publish(w3) + q.Remove(w2) + g.Assert(len(q.items)).Equal(2) + g.Assert(len(q.itemc)).Equal(2) + g.Assert(q.Pull()).Equal(w1) + g.Assert(q.Pull()).Equal(w3) + }) + + g.It("Should pull item", func() { + w1 := &queue.Work{} + w2 := &queue.Work{} + q := New() + q.Publish(w1) + q.Publish(w2) + g.Assert(q.Pull()).Equal(w1) + g.Assert(q.Pull()).Equal(w2) + }) + + g.It("Should pull item with ack", func() { + w := &queue.Work{} + q := New() + q.Publish(w) + g.Assert(q.PullAck()).Equal(w) + g.Assert(q.acks[w]).Equal(struct{}{}) + }) + + g.It("Should ack item", func() { + w := &queue.Work{} + q := New() + q.Publish(w) + g.Assert(q.PullAck()).Equal(w) + g.Assert(len(q.acks)).Equal(1) + g.Assert(q.Ack(w)).Equal(nil) + g.Assert(len(q.acks)).Equal(0) + }) + + g.It("Should get all items", func() { + q := New() + q.Publish(&queue.Work{}) + q.Publish(&queue.Work{}) + q.Publish(&queue.Work{}) + g.Assert(len(q.Items())).Equal(3) + }) + }) +}