83577a7d5d
removed amber files. replacing with angular removed queue package in favor or worker package removed channel package in favor of pubsub package
118 lines
2.3 KiB
Go
118 lines
2.3 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"time"
|
|
)
|
|
|
|
type Channel struct {
|
|
record bool
|
|
history []interface{}
|
|
timeout time.Duration
|
|
closed chan bool
|
|
broadcast chan interface{}
|
|
subscribe chan *Subscription
|
|
unsubscribe chan *Subscription
|
|
subscriptions map[*Subscription]bool
|
|
}
|
|
|
|
func NewChannel(opts *Opts) *Channel {
|
|
return &Channel{
|
|
timeout: opts.Timeout,
|
|
record: opts.Record,
|
|
history: make([]interface{}, 0),
|
|
closed: make(chan bool),
|
|
broadcast: make(chan interface{}),
|
|
subscribe: make(chan *Subscription),
|
|
unsubscribe: make(chan *Subscription),
|
|
subscriptions: make(map[*Subscription]bool),
|
|
}
|
|
}
|
|
|
|
func (c *Channel) Publish(data interface{}) {
|
|
c.broadcast <- data
|
|
return
|
|
}
|
|
|
|
func (c *Channel) Subscribe() *Subscription {
|
|
s := NewSubscription(c)
|
|
c.subscribe <- s
|
|
return s
|
|
}
|
|
|
|
func (c *Channel) Close() {
|
|
go func() { c.closed <- true }()
|
|
}
|
|
|
|
func (c *Channel) start() {
|
|
// make sure we don't bring down the application
|
|
// if somehow we encounter a nil pointer or some
|
|
// other unexpected behavior.
|
|
defer func() {
|
|
recover()
|
|
}()
|
|
|
|
// timeout the channel after N duration
|
|
// ignore the timeout if set to 0
|
|
var timeout <-chan time.Time
|
|
if c.timeout > 0 {
|
|
timeout = time.After(c.timeout)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
|
|
case sub := <-c.unsubscribe:
|
|
delete(c.subscriptions, sub)
|
|
close(sub.send)
|
|
|
|
case sub := <-c.subscribe:
|
|
c.subscriptions[sub] = true
|
|
|
|
// if we are recording the output
|
|
// we should send it to the subscriber
|
|
// upon first connecting.
|
|
if c.record && len(c.history) > 0 {
|
|
history := make([]interface{}, len(c.history))
|
|
copy(history, c.history)
|
|
go replay(sub, history)
|
|
}
|
|
|
|
case msg := <-c.broadcast:
|
|
// if we are recording the output, append
|
|
// the message to the history
|
|
if c.record {
|
|
c.history = append(c.history, msg)
|
|
}
|
|
|
|
// loop through each subscription and
|
|
// send the message.
|
|
for sub := range c.subscriptions {
|
|
select {
|
|
case sub.send <- msg:
|
|
// do nothing
|
|
default:
|
|
sub.Close()
|
|
}
|
|
}
|
|
|
|
case <-timeout:
|
|
c.Close()
|
|
|
|
case <-c.closed:
|
|
c.stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func replay(s *Subscription, history []interface{}) {
|
|
for _, msg := range history {
|
|
s.send <- msg
|
|
}
|
|
}
|
|
|
|
func (c *Channel) stop() {
|
|
for sub := range c.subscriptions {
|
|
sub.Close()
|
|
}
|
|
}
|