package pubsub import ( "time" ) type Channel struct { record bool history []interface{} timeout time.Duration closed chan bool broadcast chan interface{} subscribe chan *Subscription unsubscribe chan *Subscription subscriptions map[*Subscription]bool } func NewChannel(opts *Opts) *Channel { return &Channel{ timeout: opts.Timeout, record: opts.Record, history: make([]interface{}, 0), closed: make(chan bool), broadcast: make(chan interface{}), subscribe: make(chan *Subscription), unsubscribe: make(chan *Subscription), subscriptions: make(map[*Subscription]bool), } } func (c *Channel) Publish(data interface{}) { c.broadcast <- data return } func (c *Channel) Subscribe() *Subscription { s := NewSubscription(c) c.subscribe <- s return s } func (c *Channel) Close() { go func() { c.closed <- true }() } func (c *Channel) start() { // make sure we don't bring down the application // if somehow we encounter a nil pointer or some // other unexpected behavior. defer func() { recover() }() // timeout the channel after N duration // ignore the timeout if set to 0 var timeout <-chan time.Time if c.timeout > 0 { timeout = time.After(c.timeout) } for { select { case sub := <-c.unsubscribe: delete(c.subscriptions, sub) close(sub.send) case sub := <-c.subscribe: c.subscriptions[sub] = true // if we are recording the output // we should send it to the subscriber // upon first connecting. if c.record && len(c.history) > 0 { history := make([]interface{}, len(c.history)) copy(history, c.history) go replay(sub, history) } case msg := <-c.broadcast: // if we are recording the output, append // the message to the history if c.record { c.history = append(c.history, msg) } // loop through each subscription and // send the message. for sub := range c.subscriptions { select { case sub.send <- msg: // do nothing default: sub.Close() } } case <-timeout: c.Close() case <-c.closed: c.stop() return } } } func replay(s *Subscription, history []interface{}) { for _, msg := range history { s.send <- msg } } func (c *Channel) stop() { for sub := range c.subscriptions { sub.Close() } }