// Copyright 2019 Drone IO, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package events import ( "context" "io" "net/http" "time" "github.com/drone/drone/core" "github.com/drone/drone/handler/api/request" "github.com/drone/drone/logger" ) // HandleGlobal creates an http.HandlerFunc that streams builds events // to the http.Response in an event stream format. func HandleGlobal( repos core.RepositoryStore, events core.Pubsub, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { logger := logger.FromRequest(r) h := w.Header() h.Set("Content-Type", "text/event-stream") h.Set("Cache-Control", "no-cache") h.Set("Connection", "keep-alive") h.Set("X-Accel-Buffering", "no") f, ok := w.(http.Flusher) if !ok { return } access := map[string]struct{}{} user, authenticated := request.UserFrom(r.Context()) if authenticated { list, _ := repos.List(r.Context(), user.ID) for _, repo := range list { access[repo.Slug] = struct{}{} } } io.WriteString(w, ": ping\n\n") f.Flush() ctx, cancel := context.WithCancel(r.Context()) defer cancel() events, errc := events.Subscribe(ctx) logger.Debugln("events: stream opened") timeoutChan := time.After(24 * time.Hour) L: for { select { case <-ctx.Done(): logger.Debugln("events: stream cancelled") break L case <-errc: logger.Debugln("events: stream error") break L case <-timeoutChan: logger.Debugln("events: stream timeout") break L case <-time.After(pingInterval): io.WriteString(w, ": ping\n\n") f.Flush() case event := <-events: _, authorized := access[event.Repository] if event.Visibility == core.VisibilityPublic { authorized = true } if event.Visibility == core.VisibilityInternal && authenticated { authorized = true } if authorized { io.WriteString(w, "data: ") w.Write(event.Data) io.WriteString(w, "\n\n") f.Flush() } } } io.WriteString(w, "event: error\ndata: eof\n\n") f.Flush() logger.Debugln("events: stream closed") } }