Merge pull request #2137 from bradrydzewski/master
serve logs with sse; update acme pkg
This commit is contained in:
commit
401a3fc426
8 changed files with 202 additions and 10 deletions
|
@ -122,6 +122,12 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
|
|||
sse := e.Group("/stream")
|
||||
{
|
||||
sse.GET("/events", server.EventStreamSSE)
|
||||
sse.GET("/logs/:owner/:name/:build/:number",
|
||||
session.SetRepo(),
|
||||
session.SetPerm(),
|
||||
session.MustPull,
|
||||
server.LogStreamSSE,
|
||||
)
|
||||
}
|
||||
|
||||
info := e.Group("/api/info")
|
||||
|
|
128
server/stream.go
128
server/stream.go
|
@ -206,6 +206,10 @@ func EventStream(c *gin.Context) {
|
|||
reader(ws)
|
||||
}
|
||||
|
||||
//
|
||||
// event source streaming for compatibility with quic and http2
|
||||
//
|
||||
|
||||
func EventStreamSSE(c *gin.Context) {
|
||||
c.Header("Content-Type", "text/event-stream")
|
||||
c.Header("Cache-Control", "no-cache")
|
||||
|
@ -275,3 +279,127 @@ func EventStreamSSE(c *gin.Context) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func LogStreamSSE(c *gin.Context) {
|
||||
c.Header("Content-Type", "text/event-stream")
|
||||
c.Header("Cache-Control", "no-cache")
|
||||
c.Header("Connection", "keep-alive")
|
||||
c.Header("X-Accel-Buffering", "no")
|
||||
|
||||
rw := c.Writer
|
||||
|
||||
flusher, ok := rw.(http.Flusher)
|
||||
if !ok {
|
||||
c.String(500, "Streaming not supported")
|
||||
return
|
||||
}
|
||||
|
||||
// repo := session.Repo(c)
|
||||
//
|
||||
// // parse the build number and job sequence number from
|
||||
// // the repquest parameter.
|
||||
// num, _ := strconv.Atoi(c.Params.ByName("number"))
|
||||
// ppid, _ := strconv.Atoi(c.Params.ByName("ppid"))
|
||||
// name := c.Params.ByName("proc")
|
||||
//
|
||||
// build, err := store.GetBuildNumber(c, repo, num)
|
||||
// if err != nil {
|
||||
// c.AbortWithError(404, err)
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// proc, err := store.FromContext(c).ProcChild(build, ppid, name)
|
||||
// if err != nil {
|
||||
// c.AbortWithError(404, err)
|
||||
// return
|
||||
// }
|
||||
|
||||
repo := session.Repo(c)
|
||||
buildn, _ := strconv.Atoi(c.Param("build"))
|
||||
jobn, _ := strconv.Atoi(c.Param("number"))
|
||||
|
||||
build, err := store.GetBuildNumber(c, repo, buildn)
|
||||
if err != nil {
|
||||
logrus.Debugln("stream cannot get build number.", err)
|
||||
io.WriteString(rw, "event: error\ndata: build not found\n\n")
|
||||
return
|
||||
}
|
||||
proc, err := store.FromContext(c).ProcFind(build, jobn)
|
||||
if err != nil {
|
||||
logrus.Debugln("stream cannot get proc number.", err)
|
||||
io.WriteString(rw, "event: error\ndata: process not found\n\n")
|
||||
return
|
||||
}
|
||||
if proc.State != model.StatusRunning {
|
||||
logrus.Debugln("stream not found.")
|
||||
io.WriteString(rw, "event: error\ndata: stream not found\n\n")
|
||||
return
|
||||
}
|
||||
|
||||
logc := make(chan []byte, 10)
|
||||
ctx, cancel := context.WithCancel(
|
||||
context.Background(),
|
||||
)
|
||||
|
||||
logrus.Debugf("log stream: connection opened")
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
close(logc)
|
||||
logrus.Debugf("log stream: connection closed")
|
||||
}()
|
||||
|
||||
go func() {
|
||||
// TODO remove global variable
|
||||
Config.Services.Logs.Tail(ctx, fmt.Sprint(proc.ID), func(entries ...*logging.Entry) {
|
||||
for _, entry := range entries {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
logc <- entry.Data
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
io.WriteString(rw, "event: error\ndata: eof\n\n")
|
||||
|
||||
cancel()
|
||||
}()
|
||||
|
||||
id := 1
|
||||
last, _ := strconv.Atoi(
|
||||
c.Request.Header.Get("Last-Event-ID"),
|
||||
)
|
||||
if last != 0 {
|
||||
logrus.Debugf("log stream: reconnect: last-event-id: %d", last)
|
||||
}
|
||||
|
||||
// retry: 10000\n
|
||||
|
||||
for {
|
||||
select {
|
||||
// after 1 hour of idle (no response) end the stream.
|
||||
// this is more of a safety mechanism than anything,
|
||||
// and can be removed once the code is more mature.
|
||||
case <-time.After(time.Hour):
|
||||
return
|
||||
case <-rw.CloseNotify():
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case buf, ok := <-logc:
|
||||
if ok {
|
||||
if id > last {
|
||||
io.WriteString(rw, "id: "+strconv.Itoa(id))
|
||||
io.WriteString(rw, "\n")
|
||||
io.WriteString(rw, "data: ")
|
||||
rw.Write(buf)
|
||||
io.WriteString(rw, "\n\n")
|
||||
flusher.Flush()
|
||||
}
|
||||
id++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,15 @@
|
|||
window.USER = {{ json .user }};
|
||||
{{ end }}
|
||||
</script>
|
||||
<script>
|
||||
// TODO load a polyfill for SSE for the Edge browser. Consider downloading
|
||||
// with bower instead of loading from cndjs.
|
||||
if (!window.EventSource) {
|
||||
var ssePolyfill = document.createElement("script");
|
||||
ssePolyfill.src = "https://cdnjs.cloudflare.com/ajax/libs/event-source-polyfill/0.0.9/eventsource.min.js";
|
||||
document.body.appendChild(ssePolyfill);
|
||||
}
|
||||
</script>
|
||||
<script src="/bower_components/webcomponentsjs/webcomponents-loader.js"></script>
|
||||
|
||||
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto">
|
||||
|
|
|
@ -123,6 +123,15 @@ var indexpolymer = `<!DOCTYPE html>
|
|||
window.USER = {{ json .user }};
|
||||
{{ end }}
|
||||
</script>
|
||||
<script>
|
||||
// TODO load a polyfill for SSE for the Edge browser. Consider downloading
|
||||
// with bower instead of loading from cndjs.
|
||||
if (!window.EventSource) {
|
||||
var ssePolyfill = document.createElement("script");
|
||||
ssePolyfill.src = "https://cdnjs.cloudflare.com/ajax/libs/event-source-polyfill/0.0.9/eventsource.min.js";
|
||||
document.body.appendChild(ssePolyfill);
|
||||
}
|
||||
</script>
|
||||
<script src="/bower_components/webcomponentsjs/webcomponents-loader.js"></script>
|
||||
|
||||
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto">
|
||||
|
|
8
vendor/golang.org/x/crypto/acme/acme.go
generated
vendored
8
vendor/golang.org/x/crypto/acme/acme.go
generated
vendored
|
@ -207,7 +207,7 @@ func (c *Client) CreateCert(ctx context.Context, csr []byte, exp time.Duration,
|
|||
return nil, "", responseError(res)
|
||||
}
|
||||
|
||||
curl := res.Header.Get("location") // cert permanent URL
|
||||
curl := res.Header.Get("Location") // cert permanent URL
|
||||
if res.ContentLength == 0 {
|
||||
// no cert in the body; poll until we get it
|
||||
cert, err := c.FetchCert(ctx, curl, bundle)
|
||||
|
@ -240,7 +240,7 @@ func (c *Client) FetchCert(ctx context.Context, url string, bundle bool) ([][]by
|
|||
if res.StatusCode > 299 {
|
||||
return nil, responseError(res)
|
||||
}
|
||||
d := retryAfter(res.Header.Get("retry-after"), 3*time.Second)
|
||||
d := retryAfter(res.Header.Get("Retry-After"), 3*time.Second)
|
||||
select {
|
||||
case <-time.After(d):
|
||||
// retry
|
||||
|
@ -444,7 +444,7 @@ func (c *Client) WaitAuthorization(ctx context.Context, url string) (*Authorizat
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
retry := res.Header.Get("retry-after")
|
||||
retry := res.Header.Get("Retry-After")
|
||||
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAccepted {
|
||||
res.Body.Close()
|
||||
if err := sleep(retry, 1); err != nil {
|
||||
|
@ -703,7 +703,7 @@ func (c *Client) retryPostJWS(ctx context.Context, key crypto.Signer, url string
|
|||
// clear any nonces that we might've stored that might now be
|
||||
// considered bad
|
||||
c.clearNonces()
|
||||
retry := res.Header.Get("retry-after")
|
||||
retry := res.Header.Get("Retry-After")
|
||||
if err := sleep(retry, 1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
39
vendor/golang.org/x/crypto/acme/autocert/autocert.go
generated
vendored
39
vendor/golang.org/x/crypto/acme/autocert/autocert.go
generated
vendored
|
@ -33,6 +33,12 @@ import (
|
|||
"golang.org/x/crypto/acme"
|
||||
)
|
||||
|
||||
// createCertRetryAfter is how much time to wait before removing a failed state
|
||||
// entry due to an unsuccessful createCert call.
|
||||
// This is a variable instead of a const for testing.
|
||||
// TODO: Consider making it configurable or an exp backoff?
|
||||
var createCertRetryAfter = time.Minute
|
||||
|
||||
// pseudoRand is safe for concurrent use.
|
||||
var pseudoRand *lockedMathRand
|
||||
|
||||
|
@ -170,6 +176,12 @@ func (m *Manager) GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate,
|
|||
if name == "" {
|
||||
return nil, errors.New("acme/autocert: missing server name")
|
||||
}
|
||||
if !strings.Contains(strings.Trim(name, "."), ".") {
|
||||
return nil, errors.New("acme/autocert: server name component count invalid")
|
||||
}
|
||||
if strings.ContainsAny(name, `/\`) {
|
||||
return nil, errors.New("acme/autocert: server name contains invalid character")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
@ -363,6 +375,23 @@ func (m *Manager) createCert(ctx context.Context, domain string) (*tls.Certifica
|
|||
|
||||
der, leaf, err := m.authorizedCert(ctx, state.key, domain)
|
||||
if err != nil {
|
||||
// Remove the failed state after some time,
|
||||
// making the manager call createCert again on the following TLS hello.
|
||||
time.AfterFunc(createCertRetryAfter, func() {
|
||||
defer testDidRemoveState(domain)
|
||||
m.stateMu.Lock()
|
||||
defer m.stateMu.Unlock()
|
||||
// Verify the state hasn't changed and it's still invalid
|
||||
// before deleting.
|
||||
s, ok := m.state[domain]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if _, err := validCert(domain, s.cert, s.key); err == nil {
|
||||
return
|
||||
}
|
||||
delete(m.state, domain)
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
state.cert = der
|
||||
|
@ -411,7 +440,6 @@ func (m *Manager) certState(domain string) (*certState, error) {
|
|||
// authorizedCert starts domain ownership verification process and requests a new cert upon success.
|
||||
// The key argument is the certificate private key.
|
||||
func (m *Manager) authorizedCert(ctx context.Context, key crypto.Signer, domain string) (der [][]byte, leaf *x509.Certificate, err error) {
|
||||
// TODO: make m.verify retry or retry m.verify calls here
|
||||
if err := m.verify(ctx, domain); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -782,5 +810,10 @@ func (r *lockedMathRand) int63n(max int64) int64 {
|
|||
return n
|
||||
}
|
||||
|
||||
// for easier testing
|
||||
var timeNow = time.Now
|
||||
// For easier testing.
|
||||
var (
|
||||
timeNow = time.Now
|
||||
|
||||
// Called when a state is removed.
|
||||
testDidRemoveState = func(domain string) {}
|
||||
)
|
||||
|
|
9
vendor/golang.org/x/crypto/acme/autocert/listener.go
generated
vendored
9
vendor/golang.org/x/crypto/acme/autocert/listener.go
generated
vendored
|
@ -36,6 +36,9 @@ import (
|
|||
// operating system-specific cache or temp directory. This may not
|
||||
// be suitable for servers spanning multiple machines.
|
||||
//
|
||||
// The returned listener uses a *tls.Config that enables HTTP/2, and
|
||||
// should only be used with servers that support HTTP/2.
|
||||
//
|
||||
// The returned Listener also enables TCP keep-alives on the accepted
|
||||
// connections. The returned *tls.Conn are returned before their TLS
|
||||
// handshake has completed.
|
||||
|
@ -58,6 +61,9 @@ func NewListener(domains ...string) net.Listener {
|
|||
// Listener listens on the standard TLS port (443) on all interfaces
|
||||
// and returns a net.Listener returning *tls.Conn connections.
|
||||
//
|
||||
// The returned listener uses a *tls.Config that enables HTTP/2, and
|
||||
// should only be used with servers that support HTTP/2.
|
||||
//
|
||||
// The returned Listener also enables TCP keep-alives on the accepted
|
||||
// connections. The returned *tls.Conn are returned before their TLS
|
||||
// handshake has completed.
|
||||
|
@ -68,7 +74,8 @@ func (m *Manager) Listener() net.Listener {
|
|||
ln := &listener{
|
||||
m: m,
|
||||
conf: &tls.Config{
|
||||
GetCertificate: m.GetCertificate, // bonus: panic on nil m
|
||||
GetCertificate: m.GetCertificate, // bonus: panic on nil m
|
||||
NextProtos: []string{"h2", "http/1.1"}, // Enable HTTP/2
|
||||
},
|
||||
}
|
||||
ln.tcpListener, ln.tcpListenErr = net.Listen("tcp", ":443")
|
||||
|
|
4
vendor/vendor.json
vendored
4
vendor/vendor.json
vendored
|
@ -698,13 +698,13 @@
|
|||
"revisionTime": "2016-11-19T21:37:11Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "W0j4I7QpxXlChjyhAojZmFjU6Bg=",
|
||||
"checksumSHA1": "Ag672Laei0E45NrvTO4LV9B3Jfc=",
|
||||
"path": "golang.org/x/crypto/acme",
|
||||
"revision": "c7af5bf2638a1164f2eb5467c39c6cffbd13a02e",
|
||||
"revisionTime": "2017-04-25T18:31:00Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "/FdEtM7DVyyunjtRcW1zX4Nvj8o=",
|
||||
"checksumSHA1": "TrKJW+flz7JulXU3sqnBJjGzgQc=",
|
||||
"path": "golang.org/x/crypto/acme/autocert",
|
||||
"revision": "c7af5bf2638a1164f2eb5467c39c6cffbd13a02e",
|
||||
"revisionTime": "2017-04-25T18:31:00Z"
|
||||
|
|
Loading…
Reference in a new issue