From 9e8aa205e410d9e9ecca5aa62a8f3715e02815fb Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Tue, 7 Apr 2015 22:46:05 -0700 Subject: [PATCH] added event bus for managing websocket events --- eventbus/bus.go | 15 ++++++++++++++ eventbus/eventbus.go | 48 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 eventbus/bus.go create mode 100644 eventbus/eventbus.go diff --git a/eventbus/bus.go b/eventbus/bus.go new file mode 100644 index 00000000..35a7c224 --- /dev/null +++ b/eventbus/bus.go @@ -0,0 +1,15 @@ +package eventbus + +import "github.com/bradrydzewski/drone/common" + +type Event struct { + Build *common.Build + Repo *common.Repo + Task *common.Task +} + +type Bus interface { + Subscribe(chan *Event) + Unsubscribe(chan *Event) + Send(*Event) +} diff --git a/eventbus/eventbus.go b/eventbus/eventbus.go new file mode 100644 index 00000000..c08666d3 --- /dev/null +++ b/eventbus/eventbus.go @@ -0,0 +1,48 @@ +package eventbus + +import ( + "sync" +) + +type EventBus struct { + sync.Mutex + subs map[chan *Event]bool +} + +// New creates a new EventBus that manages a list of +// subscribers to which events are published. +func New() *EventBus { + return &EventBus{ + subs: make(map[chan *Event]bool), + } +} + +// Subscribe adds the channel to the list of +// subscribers. Each subscriber in the list will +// receive broadcast events. +func (b *EventBus) Subscribe(c chan *Event) { + b.Lock() + b.subs[c] = true + b.Unlock() +} + +// Unsubscribe removes the channel from the +// list of subscribers. +func (b *EventBus) Unsubscribe(c chan *Event) { + b.Lock() + delete(b.subs, c) + b.Unlock() +} + +// Send dispatches a message to all subscribers. +func (b *EventBus) Send(event *Event) { + b.Lock() + defer b.Unlock() + + for s, _ := range b.subs { + go func(c chan *Event) { + defer recover() + c <- event + }(s) + } +}