upgraded to latest version of gin to take advantage of SSE

This commit is contained in:
Brad Rydzewski 2015-05-16 19:46:12 -07:00
parent e8a39a3b14
commit 27aadca029
4 changed files with 40 additions and 67 deletions

View file

@ -5,6 +5,7 @@ env:
- GOROOT=/usr/local/go - GOROOT=/usr/local/go
- PATH=$PATH:$GOROOT/bin:$GOPATH/bin - PATH=$PATH:$GOROOT/bin:$GOPATH/bin
script: script:
- git clone git://github.com/gin-gonic/gin.git $GOPATH/src/github.com/gin-gonic/gin
- go get -u github.com/jteeuwen/go-bindata/... - go get -u github.com/jteeuwen/go-bindata/...
- make bindata deps - make bindata deps
- make build - make build

View file

@ -24,8 +24,8 @@ func SetQueue(q queue.Queue) gin.HandlerFunc {
} }
func ToQueue(c *gin.Context) queue.Queue { func ToQueue(c *gin.Context) queue.Queue {
v, err := c.Get("queue") v, ok := c.Get("queue")
if err != nil { if !ok {
return nil return nil
} }
return v.(queue.Queue) return v.(queue.Queue)
@ -39,16 +39,16 @@ func SetBus(r eventbus.Bus) gin.HandlerFunc {
} }
func ToBus(c *gin.Context) eventbus.Bus { func ToBus(c *gin.Context) eventbus.Bus {
v, err := c.Get("eventbus") v, ok := c.Get("eventbus")
if err != nil { if !ok {
return nil return nil
} }
return v.(eventbus.Bus) return v.(eventbus.Bus)
} }
func ToRemote(c *gin.Context) remote.Remote { func ToRemote(c *gin.Context) remote.Remote {
v, err := c.Get("remote") v, ok := c.Get("remote")
if err != nil { if !ok {
return nil return nil
} }
return v.(remote.Remote) return v.(remote.Remote)
@ -62,8 +62,8 @@ func SetRemote(r remote.Remote) gin.HandlerFunc {
} }
func ToRunner(c *gin.Context) runner.Runner { func ToRunner(c *gin.Context) runner.Runner {
v, err := c.Get("runner") v, ok := c.Get("runner")
if err != nil { if !ok {
return nil return nil
} }
return v.(runner.Runner) return v.(runner.Runner)
@ -77,8 +77,8 @@ func SetRunner(r runner.Runner) gin.HandlerFunc {
} }
func ToSettings(c *gin.Context) *settings.Settings { func ToSettings(c *gin.Context) *settings.Settings {
v, err := c.Get("settings") v, ok := c.Get("settings")
if err != nil { if !ok {
return nil return nil
} }
return v.(*settings.Settings) return v.(*settings.Settings)
@ -92,24 +92,24 @@ func SetSettings(s *settings.Settings) gin.HandlerFunc {
} }
func ToPerm(c *gin.Context) *common.Perm { func ToPerm(c *gin.Context) *common.Perm {
v, err := c.Get("perm") v, ok := c.Get("perm")
if err != nil { if !ok {
return nil return nil
} }
return v.(*common.Perm) return v.(*common.Perm)
} }
func ToUser(c *gin.Context) *common.User { func ToUser(c *gin.Context) *common.User {
v, err := c.Get("user") v, ok := c.Get("user")
if err != nil { if !ok {
return nil return nil
} }
return v.(*common.User) return v.(*common.User)
} }
func ToRepo(c *gin.Context) *common.Repo { func ToRepo(c *gin.Context) *common.Repo {
v, err := c.Get("repo") v, ok := c.Get("repo")
if err != nil { if !ok {
return nil return nil
} }
return v.(*common.Repo) return v.(*common.Repo)

View file

@ -75,7 +75,7 @@
var callback, var callback,
websocket, events,
token = localStorage.getItem('access_token'); token = localStorage.getItem('access_token');
/** /**
@ -86,25 +86,23 @@
this.subscribe = function(repo, _callback) { this.subscribe = function(repo, _callback) {
callback = _callback; callback = _callback;
var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'), events = new EventSource("/api/stream/" + repo + "?access_token=" + token, { withCredentials: true });
route = [proto, "://", $window.location.host, '/api/stream/'+ repo +'?access_token=', token].join(''); events.onmessage = function (event) {
console.log(event);
websocket = new WebSocket(route);
websocket.onmessage = function (event) {
if (callback !== undefined) { if (callback !== undefined) {
callback(angular.fromJson(event.data)); callback(angular.fromJson(event.data));
} }
}; };
websocket.onclose = function (event) { events.onerror = function (event) {
console.log('user websocket closed'); console.log('user event stream closed due to error.', event);
}; };
}; };
this.unsubscribe = function() { this.unsubscribe = function() {
callback = undefined; callback = undefined;
if (websocket !== undefined) { if (events !== undefined) {
websocket.close(); events.close();
websocket = undefined; events = undefined;
} }
}; };
} }

View file

@ -2,6 +2,7 @@ package server
import ( import (
"bufio" "bufio"
"io"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -37,54 +38,27 @@ var upgrader = websocket.Upgrader{
func GetRepoEvents(c *gin.Context) { func GetRepoEvents(c *gin.Context) {
bus := ToBus(c) bus := ToBus(c)
repo := ToRepo(c) repo := ToRepo(c)
c.Writer.Header().Set("Content-Type", "text/event-stream")
// upgrade the websocket eventc := make(chan *eventbus.Event, 1)
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.Fail(400, err)
return
}
ticker := time.NewTicker(pingPeriod)
eventc := make(chan *eventbus.Event)
bus.Subscribe(eventc) bus.Subscribe(eventc)
defer func() { defer func() {
bus.Unsubscribe(eventc) bus.Unsubscribe(eventc)
ticker.Stop() log.Infof("closed event stream")
ws.Close()
close(eventc)
log.Infof("closed websocket")
}() }()
go func() { c.Stream(func(w io.Writer) bool {
for { event := <-eventc
select {
case <-c.Writer.CloseNotify():
ws.Close()
return
case event := <-eventc:
if event == nil { if event == nil {
log.Infof("closed websocket") return false
ws.Close()
return
} }
if event.Kind == eventbus.EventRepo && event.Name == repo.FullName { if event.Kind == eventbus.EventRepo &&
ws.WriteMessage(websocket.TextMessage, event.Msg) event.Name == repo.FullName {
break c.SSEvent("message", event.Msg)
} }
case <-ticker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait))
err := ws.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
log.Infof("closed websocket")
ws.Close()
return
}
}
}
}()
readWebsocket(ws) return true
})
} }
func GetStream(c *gin.Context) { func GetStream(c *gin.Context) {