// Copyright 2019 Drone IO, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package events import ( "context" "io" "net/http" "time" "github.com/drone/drone/core" "github.com/drone/drone/handler/api/render" "github.com/drone/drone/logger" "github.com/sirupsen/logrus" "github.com/go-chi/chi" ) // interval at which the client is pinged to prevent // reverse proxy and load balancers from closing the // connection. var pingInterval = time.Second * 30 // implements a 24-hour timeout for connections. This // should not be necessary, but is put in place just // in case we encounter dangling connections. var timeout = time.Hour * 24 // HandleEvents creates an http.HandlerFunc that streams builds events // to the http.Response in an event stream format. func HandleEvents( repos core.RepositoryStore, events core.Pubsub, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var ( namespace = chi.URLParam(r, "owner") name = chi.URLParam(r, "name") ) logger := logger.FromRequest(r).WithFields( logrus.Fields{ "namespace": namespace, "name": name, }, ) repo, err := repos.FindName(r.Context(), namespace, name) if err != nil { render.NotFound(w, err) logger.WithError(err).Debugln("events: cannot find repository") return } h := w.Header() h.Set("Content-Type", "text/event-stream") h.Set("Cache-Control", "no-cache") h.Set("Connection", "keep-alive") h.Set("X-Accel-Buffering", "no") f, ok := w.(http.Flusher) if !ok { return } io.WriteString(w, ": ping\n\n") f.Flush() ctx, cancel := context.WithCancel(r.Context()) defer cancel() events, errc := events.Subscribe(ctx) logger.Debugln("events: stream opened") L: for { select { case <-ctx.Done(): logger.Debugln("events: stream cancelled") break L case <-errc: logger.Debugln("events: stream error") break L case <-time.After(time.Hour): logger.Debugln("events: stream timeout") break L case <-time.After(pingInterval): io.WriteString(w, ": ping\n\n") f.Flush() case event := <-events: if event.Repository == repo.Slug { io.WriteString(w, "data: ") w.Write(event.Data) io.WriteString(w, "\n\n") f.Flush() } } } io.WriteString(w, "event: error\ndata: eof\n\n") f.Flush() logger.Debugln("events: stream closed") } }