From eaa6d41699585e09bcc13651671480d0abdf684e Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Thu, 20 Jul 2017 12:21:15 -0400 Subject: [PATCH] pass agent hostname to all rpc calls --- cmd/drone-agent/agent.go | 15 +++--- server/rpc.go | 8 +++ .../cncd/pipeline/pipeline/rpc/client_grpc.go | 3 ++ vendor/vendor.json | 54 +++++++++---------- 4 files changed, 47 insertions(+), 33 deletions(-) diff --git a/cmd/drone-agent/agent.go b/cmd/drone-agent/agent.go index b11745c9..44d1b758 100644 --- a/cmd/drone-agent/agent.go +++ b/cmd/drone-agent/agent.go @@ -100,6 +100,9 @@ const ( func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { log.Println("pipeline: request next execution") + meta, _ := metadata.FromOutgoingContext(ctx) + ctxmeta := metadata.NewOutgoingContext(context.Background(), meta) + // get the next job from the queue work, err := client.Next(ctx, filter) if err != nil { @@ -121,7 +124,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { timeout = time.Duration(minutes) * time.Minute } - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(ctxmeta, timeout) defer cancel() cancelled := abool.New() @@ -150,7 +153,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { state := rpc.State{} state.Started = time.Now().Unix() - err = client.Init(context.Background(), work.ID, state) + err = client.Init(ctxmeta, work.ID, state) if err != nil { log.Printf("pipeline: error signaling pipeline init: %s: %s", work.ID, err) } @@ -182,7 +185,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { file.Size = len(file.Data) file.Time = time.Now().Unix() - if serr := client.Upload(context.Background(), work.ID, file); serr != nil { + if serr := client.Upload(ctxmeta, work.ID, file); serr != nil { log.Printf("pipeline: cannot upload logs: %s: %s: %s", work.ID, file.Mime, serr) } else { log.Printf("pipeline: finish uploading logs: %s: step %s: %s", file.Mime, work.ID, proc.Alias) @@ -207,7 +210,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { file.Size = len(file.Data) file.Time = time.Now().Unix() - if serr := client.Upload(context.Background(), work.ID, file); serr != nil { + if serr := client.Upload(ctxmeta, work.ID, file); serr != nil { log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, file.Mime, serr) } else { log.Printf("pipeline: finish uploading artifact: %s: step %s: %s", file.Mime, work.ID, proc.Alias) @@ -224,7 +227,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { Finished: time.Now().Unix(), } defer func() { - if uerr := client.Update(context.Background(), work.ID, procState); uerr != nil { + if uerr := client.Update(ctxmeta, work.ID, procState); uerr != nil { log.Printf("Pipeine: error updating pipeline step status: %s: %s: %s", work.ID, procState.Proc, uerr) } }() @@ -285,7 +288,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error { log.Printf("pipeline: logging complete: %s", work.ID) - err = client.Done(context.Background(), work.ID, state) + err = client.Done(ctxmeta, work.ID, state) if err != nil { log.Printf("Pipeine: error signaling pipeline done: %s: %s", work.ID, err) } else { diff --git a/server/rpc.go b/server/rpc.go index f88e5552..57637d9f 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -79,6 +79,14 @@ type RPC struct { // Next implements the rpc.Next function func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) { + metadata, ok := metadata.FromContext(c) + if ok { + hostname, ok := metadata["hostname"] + if ok && len(hostname) != 0 { + logrus.Debugf("agent connected: %s: polling", hostname[0]) + } + } + fn := func(task *queue.Task) bool { for k, v := range filter.Labels { if task.Labels[k] != v { diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/client_grpc.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/client_grpc.go index f612c435..1ce1128d 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/client_grpc.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/client_grpc.go @@ -47,6 +47,9 @@ func (c *client) Next(ctx context.Context, f Filter) (*Pipeline, error) { if grpc.Code(err) == codes.Unknown { return nil, err } + if ctx.Err() != nil { + return nil, ctx.Err() + } <-time.After(backoff) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 95a9f952..d6fa7435 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -39,80 +39,80 @@ { "checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=", "path": "github.com/cncd/pipeline/pipeline", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "rO+djTfB4LrT+FBbpotyUUobOtU=", "path": "github.com/cncd/pipeline/pipeline/backend", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "DzP4c915B+gJTE5RCKQHzxwrUg4=", "path": "github.com/cncd/pipeline/pipeline/backend/docker", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "8Hj/OZnYZyz5N2hqENCTTaGtkNQ=", "path": "github.com/cncd/pipeline/pipeline/frontend", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "9KYIsY8WlWbrRAP7caEpWT70P9c=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "k+JVxks9V84qd+3kefHEtlqLEAA=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "L7Q5qJmPITNmvFEEaj5MPwCWFRk=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/types", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=", "path": "github.com/cncd/pipeline/pipeline/interrupt", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "uOjTfke7Qxosrivgz/nVTHeIP5g=", "path": "github.com/cncd/pipeline/pipeline/multipart", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { - "checksumSHA1": "ndDase2wF8bWu83aDT8PFxVNSiU=", + "checksumSHA1": "YlIaMsvB5fFXRolugambNzkSu4I=", "path": "github.com/cncd/pipeline/pipeline/rpc", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "WAQJYKWUIFBnL1s8AnY4fePnzJ8=", "path": "github.com/cncd/pipeline/pipeline/rpc/proto", - "revision": "a117e57da055285a2385d5f09314d1f3f8b8073e", - "revisionTime": "2017-07-19T00:51:24Z" + "revision": "d98623616df20e535445b08baef41bde554454e3", + "revisionTime": "2017-07-20T13:40:08Z" }, { "checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=",