expose agent name to pipeline
This commit is contained in:
parent
913d8701f2
commit
63001a7c66
1 changed files with 24 additions and 11 deletions
|
@ -105,7 +105,12 @@ func loop(c *cli.Context) error {
|
||||||
if sigterm.IsSet() {
|
if sigterm.IsSet() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := run(ctx, client, filter); err != nil {
|
r := runner{
|
||||||
|
client: client,
|
||||||
|
filter: filter,
|
||||||
|
hostname: hostname,
|
||||||
|
}
|
||||||
|
if err := r.run(ctx); err != nil {
|
||||||
log.Error().Err(err).Msg("pipeline done with error")
|
log.Error().Err(err).Msg("pipeline done with error")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -125,7 +130,13 @@ const (
|
||||||
maxFileUpload = 1000000
|
maxFileUpload = 1000000
|
||||||
)
|
)
|
||||||
|
|
||||||
func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
type runner struct {
|
||||||
|
client rpc.Peer
|
||||||
|
filter rpc.Filter
|
||||||
|
hostname string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *runner) run(ctx context.Context) error {
|
||||||
log.Debug().
|
log.Debug().
|
||||||
Msg("request next execution")
|
Msg("request next execution")
|
||||||
|
|
||||||
|
@ -133,7 +144,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
ctxmeta := metadata.NewOutgoingContext(context.Background(), meta)
|
ctxmeta := metadata.NewOutgoingContext(context.Background(), meta)
|
||||||
|
|
||||||
// get the next job from the queue
|
// get the next job from the queue
|
||||||
work, err := client.Next(ctx, filter)
|
work, err := r.client.Next(ctx, r.filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -181,7 +192,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
logger.Debug().
|
logger.Debug().
|
||||||
Msg("listen for cancel signal")
|
Msg("listen for cancel signal")
|
||||||
|
|
||||||
if werr := client.Wait(ctx, work.ID); werr != nil {
|
if werr := r.client.Wait(ctx, work.ID); werr != nil {
|
||||||
cancelled.SetTo(true)
|
cancelled.SetTo(true)
|
||||||
logger.Warn().
|
logger.Warn().
|
||||||
Err(werr).
|
Err(werr).
|
||||||
|
@ -206,7 +217,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
logger.Debug().
|
logger.Debug().
|
||||||
Msg("pipeline lease renewed")
|
Msg("pipeline lease renewed")
|
||||||
|
|
||||||
client.Extend(ctx, work.ID)
|
r.client.Extend(ctx, work.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -214,7 +225,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
state := rpc.State{}
|
state := rpc.State{}
|
||||||
state.Started = time.Now().Unix()
|
state.Started = time.Now().Unix()
|
||||||
|
|
||||||
err = client.Init(ctxmeta, work.ID, state)
|
err = r.client.Init(ctxmeta, work.ID, state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().
|
logger.Error().
|
||||||
Err(err).
|
Err(err).
|
||||||
|
@ -245,7 +256,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
loglogger.Debug().Msg("log stream opened")
|
loglogger.Debug().Msg("log stream opened")
|
||||||
|
|
||||||
limitedPart := io.LimitReader(part, maxLogsUpload)
|
limitedPart := io.LimitReader(part, maxLogsUpload)
|
||||||
logstream := rpc.NewLineWriter(client, work.ID, proc.Alias, secrets...)
|
logstream := rpc.NewLineWriter(r.client, work.ID, proc.Alias, secrets...)
|
||||||
io.Copy(logstream, limitedPart)
|
io.Copy(logstream, limitedPart)
|
||||||
|
|
||||||
loglogger.Debug().Msg("log stream copied")
|
loglogger.Debug().Msg("log stream copied")
|
||||||
|
@ -261,7 +272,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
loglogger.Debug().
|
loglogger.Debug().
|
||||||
Msg("log stream uploading")
|
Msg("log stream uploading")
|
||||||
|
|
||||||
if serr := client.Upload(ctxmeta, work.ID, file); serr != nil {
|
if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil {
|
||||||
loglogger.Error().
|
loglogger.Error().
|
||||||
Err(serr).
|
Err(serr).
|
||||||
Msg("log stream upload error")
|
Msg("log stream upload error")
|
||||||
|
@ -301,7 +312,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
Str("mime", file.Mime).
|
Str("mime", file.Mime).
|
||||||
Msg("file stream uploading")
|
Msg("file stream uploading")
|
||||||
|
|
||||||
if serr := client.Upload(ctxmeta, work.ID, file); serr != nil {
|
if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil {
|
||||||
loglogger.Error().
|
loglogger.Error().
|
||||||
Err(serr).
|
Err(serr).
|
||||||
Str("file", file.Name).
|
Str("file", file.Name).
|
||||||
|
@ -335,7 +346,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
proclogger.Debug().
|
proclogger.Debug().
|
||||||
Msg("update step status")
|
Msg("update step status")
|
||||||
|
|
||||||
if uerr := client.Update(ctxmeta, work.ID, procState); uerr != nil {
|
if uerr := r.client.Update(ctxmeta, work.ID, procState); uerr != nil {
|
||||||
proclogger.Debug().
|
proclogger.Debug().
|
||||||
Err(uerr).
|
Err(uerr).
|
||||||
Msg("update step status error")
|
Msg("update step status error")
|
||||||
|
@ -350,6 +361,8 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
if state.Pipeline.Step.Environment == nil {
|
if state.Pipeline.Step.Environment == nil {
|
||||||
state.Pipeline.Step.Environment = map[string]string{}
|
state.Pipeline.Step.Environment = map[string]string{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state.Pipeline.Step.Environment["DRONE_MACHINE"] = r.hostname
|
||||||
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success"
|
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success"
|
||||||
state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
|
state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
|
||||||
state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)
|
state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)
|
||||||
|
@ -413,7 +426,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
|
||||||
Int("exit_code", state.ExitCode).
|
Int("exit_code", state.ExitCode).
|
||||||
Msg("updating pipeline status")
|
Msg("updating pipeline status")
|
||||||
|
|
||||||
err = client.Done(ctxmeta, work.ID, state)
|
err = r.client.Done(ctxmeta, work.ID, state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().Err(err).
|
logger.Error().Err(err).
|
||||||
Msg("updating pipeline status failed")
|
Msg("updating pipeline status failed")
|
||||||
|
|
Loading…
Reference in a new issue