From c867bcaf3cc7055ecb0563c26793cc6392b2bdfc Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Tue, 18 Jul 2017 12:27:20 -0400 Subject: [PATCH] add sse for user event feed --- router/router.go | 5 +++ server/stream.go | 70 ++++++++++++++++++++++++++++++++ server/template/files/index.html | 46 +++++++++++++-------- server/ui_local.go | 2 + 4 files changed, 107 insertions(+), 16 deletions(-) diff --git a/router/router.go b/router/router.go index 456348f8..73a4a54a 100644 --- a/router/router.go +++ b/router/router.go @@ -119,6 +119,11 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { ) } + sse := e.Group("/stream") + { + sse.GET("/events", server.EventStreamSSE) + } + info := e.Group("/api/info") { info.GET("/queue", diff --git a/server/stream.go b/server/stream.go index db896a0d..a40949c7 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "io" "net/http" "strconv" "time" @@ -204,3 +205,72 @@ func EventStream(c *gin.Context) { }() reader(ws) } + +func EventStreamSSE(c *gin.Context) { + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + + rw := c.Writer + + flusher, ok := rw.(http.Flusher) + if !ok { + c.String(500, "Streaming not supported") + return + } + + logrus.Debugf("user feed: connection opened") + + user := session.User(c) + repo := map[string]bool{} + if user != nil { + repos, _ := store.FromContext(c).RepoList(user) + for _, r := range repos { + repo[r.FullName] = true + } + } + + eventc := make(chan []byte, 10) + ctx, cancel := context.WithCancel( + context.Background(), + ) + + defer func() { + cancel() + close(eventc) + logrus.Debugf("user feed: connection closed") + }() + + go func() { + // TODO remove this from global config + Config.Services.Pubsub.Subscribe(c, "topic/events", func(m pubsub.Message) { + name := m.Labels["repo"] + priv := m.Labels["private"] + if repo[name] || priv == "false" { + select { + case <-ctx.Done(): + return + default: + eventc <- m.Data + } + } + }) + cancel() + }() + + for { + select { + case <-rw.CloseNotify(): + return + case <-ctx.Done(): + return + case buf, ok := <-eventc: + if ok { + io.WriteString(rw, "data: ") + rw.Write(buf) + io.WriteString(rw, "\n\n") + flusher.Flush() + } + } + } +} diff --git a/server/template/files/index.html b/server/template/files/index.html index 248343c8..05d4e648 100644 --- a/server/template/files/index.html +++ b/server/template/files/index.html @@ -1,22 +1,36 @@ - + - - - - {{ if .csrf }}{{ end }} - - - - - + + + + + + + + + + + + + + + + -
- - - + diff --git a/server/ui_local.go b/server/ui_local.go index 9605523d..c9eca265 100644 --- a/server/ui_local.go +++ b/server/ui_local.go @@ -60,6 +60,8 @@ func (w *local) File(rw http.ResponseWriter, r *http.Request) { func (w *local) Routes() []string { return []string{ + "/favicon-32x32.png", + "/favicon-16x16.png", "/src/*filepath", "/bower_components/*filepath", }