harness-drone/operator/manager/manager.go

515 lines
14 KiB
Go
Raw Normal View History

2019-03-13 21:47:47 +00:00
// Copyright 2019 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2019-02-19 23:56:41 +00:00
package manager
import (
"bytes"
"context"
"io"
"time"
"github.com/drone/drone/core"
"github.com/drone/drone/store/shared/db"
"github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus"
)
var noContext = context.Background()
var _ BuildManager = (*Manager)(nil)
type (
// Context represents the minimum amount of information
// required by the runner to execute a build.
Context struct {
Repo *core.Repository `json:"repository"`
Build *core.Build `json:"build"`
Stage *core.Stage `json:"stage"`
Config *core.File `json:"config"`
Secrets []*core.Secret `json:"secrets"`
System *core.System `json:"system"`
}
// BuildManager encapsulets complex build operations and provides
// a simplified interface for build runners.
BuildManager interface {
// Request requests the next available build stage for execution.
Request(ctx context.Context, args *Request) (*core.Stage, error)
// Accept accepts the build stage for execution.
2019-05-21 18:45:42 +00:00
Accept(ctx context.Context, stage int64, machine string) (*core.Stage, error)
2019-02-19 23:56:41 +00:00
// Netrc returns a valid netrc for execution.
Netrc(ctx context.Context, repo int64) (*core.Netrc, error)
// Details fetches build details
Details(ctx context.Context, stage int64) (*Context, error)
// Before signals the build step is about to start.
Before(ctxt context.Context, step *core.Step) error
// After signals the build step is complete.
After(ctx context.Context, step *core.Step) error
// Before signals the build stage is about to start.
BeforeAll(ctxt context.Context, stage *core.Stage) error
// After signals the build stage is complete.
AfterAll(ctx context.Context, stage *core.Stage) error
// Watch watches for build cancellation requests.
Watch(ctx context.Context, stage int64) (bool, error)
// Write writes a line to the build logs
Write(ctx context.Context, step int64, line *core.Line) error
// Upload uploads the full logs
Upload(ctx context.Context, step int64, r io.Reader) error
// UploadBytes uploads the full logs
UploadBytes(ctx context.Context, step int64, b []byte) error
}
// Request provildes filters when requesting a pending
// build from the queue. This allows an agent, for example,
// to request a build that matches its architecture and kernel.
Request struct {
Kind string `json:"kind"`
Type string `json:"type"`
OS string `json:"os"`
Arch string `json:"arch"`
Variant string `json:"variant"`
Kernel string `json:"kernel"`
Labels map[string]string `json:"labels,omitempty"`
}
)
// New returns a new Manager.
func New(
builds core.BuildStore,
config core.ConfigService,
2019-09-03 06:05:30 +00:00
converter core.ConvertService,
2019-02-19 23:56:41 +00:00
events core.Pubsub,
logs core.LogStore,
logz core.LogStream,
netrcs core.NetrcService,
repos core.RepositoryStore,
scheduler core.Scheduler,
secrets core.SecretStore,
2019-04-15 00:06:16 +00:00
globals core.GlobalSecretStore,
2019-02-19 23:56:41 +00:00
status core.StatusService,
stages core.StageStore,
steps core.StepStore,
system *core.System,
users core.UserStore,
webhook core.WebhookSender,
) BuildManager {
return &Manager{
Builds: builds,
Config: config,
2019-09-03 06:05:30 +00:00
Converter: converter,
2019-02-19 23:56:41 +00:00
Events: events,
2019-04-15 00:06:16 +00:00
Globals: globals,
2019-02-19 23:56:41 +00:00
Logs: logs,
Logz: logz,
Netrcs: netrcs,
Repos: repos,
Scheduler: scheduler,
Secrets: secrets,
Status: status,
Stages: stages,
Steps: steps,
System: system,
Users: users,
Webhook: webhook,
}
}
// Manager provides a simplified interface to the build runner so that it
// can more easily interact with the server.
type Manager struct {
Builds core.BuildStore
Config core.ConfigService
2019-09-03 06:05:30 +00:00
Converter core.ConvertService
2019-02-19 23:56:41 +00:00
Events core.Pubsub
2019-04-15 00:06:16 +00:00
Globals core.GlobalSecretStore
2019-02-19 23:56:41 +00:00
Logs core.LogStore
Logz core.LogStream
Netrcs core.NetrcService
Repos core.RepositoryStore
Scheduler core.Scheduler
Secrets core.SecretStore
Status core.StatusService
Stages core.StageStore
Steps core.StepStore
System *core.System
Users core.UserStore
Webhook core.WebhookSender
}
// Request requests the next available build stage for execution.
func (m *Manager) Request(ctx context.Context, args *Request) (*core.Stage, error) {
logger := logrus.WithFields(
logrus.Fields{
2019-05-20 16:17:50 +00:00
"kind": args.Kind,
"type": args.Type,
2019-02-19 23:56:41 +00:00
"os": args.OS,
"arch": args.Arch,
"kernel": args.Kernel,
"variant": args.Variant,
},
)
logger.Debugln("manager: request queue item")
stage, err := m.Scheduler.Request(ctx, core.Filter{
Kind: args.Kind,
Type: args.Type,
OS: args.OS,
Arch: args.Arch,
Kernel: args.Kernel,
Variant: args.Variant,
Labels: args.Labels,
})
if err != nil && ctx.Err() != nil {
logger.Debugln("manager: context canceled")
return nil, err
}
if err != nil {
logger = logrus.WithError(err)
logger.Warnln("manager: request queue item error")
return nil, err
}
return stage, nil
}
// Accept accepts the build stage for execution. It is possible for multiple
// agents to pull the same stage from the queue. The system uses optimistic
// locking at the database-level to prevent multiple agents from executing the
// same stage.
2019-05-21 18:45:42 +00:00
func (m *Manager) Accept(ctx context.Context, id int64, machine string) (*core.Stage, error) {
2019-02-19 23:56:41 +00:00
logger := logrus.WithFields(
logrus.Fields{
"stage-id": id,
"machine": machine,
},
)
logger.Debugln("manager: accept stage")
stage, err := m.Stages.Find(noContext, id)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find stage")
2019-05-21 18:45:42 +00:00
return nil, err
2019-02-19 23:56:41 +00:00
}
if stage.Machine != "" {
logger.Debugln("manager: stage already assigned. abort.")
2019-05-21 18:45:42 +00:00
return nil, db.ErrOptimisticLock
2019-02-19 23:56:41 +00:00
}
stage.Machine = machine
stage.Status = core.StatusPending
stage.Updated = time.Now().Unix()
err = m.Stages.Update(noContext, stage)
if err == db.ErrOptimisticLock {
logger = logger.WithError(err)
logger.Debugln("manager: stage processed by another agent")
} else if err != nil {
logger = logger.WithError(err)
logger.Debugln("manager: cannot update stage")
} else {
logger.Debugln("manager: stage accepted")
}
2019-05-21 18:45:42 +00:00
return stage, err
2019-02-19 23:56:41 +00:00
}
// Details fetches build details.
func (m *Manager) Details(ctx context.Context, id int64) (*Context, error) {
logger := logrus.WithField("step-id", id)
logger.Debugln("manager: fetching stage details")
stage, err := m.Stages.Find(noContext, id)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find stage")
return nil, err
}
build, err := m.Builds.Find(noContext, stage.BuildID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find build")
return nil, err
}
2019-03-14 17:16:19 +00:00
stages, err := m.Stages.List(ctx, stage.BuildID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot list stages")
return nil, err
}
build.Stages = stages
2019-02-19 23:56:41 +00:00
repo, err := m.Repos.Find(noContext, build.RepoID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository")
return nil, err
}
logger = logger.WithFields(
logrus.Fields{
"build": build.Number,
"repo": repo.Slug,
},
)
user, err := m.Users.Find(noContext, repo.UserID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository owner")
return nil, err
}
config, err := m.Config.Find(noContext, &core.ConfigArgs{
User: user,
Repo: repo,
Build: build,
})
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find configuration")
return nil, err
}
2019-09-03 06:05:30 +00:00
config, err = m.Converter.Convert(noContext, &core.ConvertArgs{
Build: build,
Config: config,
Repo: repo,
User: user,
})
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot convert configuration")
return nil, err
}
2019-02-19 23:56:41 +00:00
var secrets []*core.Secret
tmpSecrets, err := m.Secrets.List(noContext, repo.ID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot list secrets")
return nil, err
}
2019-04-15 00:06:16 +00:00
tmpGlobalSecrets, err := m.Globals.List(noContext, repo.Namespace)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot list global secrets")
return nil, err
}
2019-02-19 23:56:41 +00:00
// TODO(bradrydzewski) can we delegate filtering
// secrets to the agent? If not, we should add
// unit tests.
for _, secret := range tmpSecrets {
if secret.PullRequest == false &&
build.Event == core.EventPullRequest {
continue
}
secrets = append(secrets, secret)
}
2019-04-15 00:06:16 +00:00
for _, secret := range tmpGlobalSecrets {
if secret.PullRequest == false &&
build.Event == core.EventPullRequest {
continue
}
secrets = append(secrets, secret)
}
2019-02-19 23:56:41 +00:00
return &Context{
Repo: repo,
Build: build,
Stage: stage,
Secrets: secrets,
System: m.System,
Config: &core.File{Data: []byte(config.Data)},
}, nil
}
// Before signals the build step is about to start.
func (m *Manager) Before(ctx context.Context, step *core.Step) error {
logger := logrus.WithFields(
logrus.Fields{
"step.status": step.Status,
"step.name": step.Name,
"step.id": step.ID,
},
)
logger.Debugln("manager: updating step status")
err := m.Logz.Create(noContext, step.ID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot create log stream")
return err
}
updater := &updater{
Builds: m.Builds,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
Webhook: m.Webhook,
}
return updater.do(ctx, step)
}
// After signals the build step is complete.
func (m *Manager) After(ctx context.Context, step *core.Step) error {
logger := logrus.WithFields(
logrus.Fields{
"step.status": step.Status,
"step.name": step.Name,
"step.id": step.ID,
},
)
logger.Debugln("manager: updating step status")
var errs error
updater := &updater{
Builds: m.Builds,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
Webhook: m.Webhook,
}
if err := updater.do(ctx, step); err != nil {
errs = multierror.Append(errs, err)
logger = logger.WithError(err)
logger.Warnln("manager: cannot update step")
}
if err := m.Logz.Delete(noContext, step.ID); err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot teardown log stream")
}
return errs
}
// BeforeAll signals the build stage is about to start.
func (m *Manager) BeforeAll(ctx context.Context, stage *core.Stage) error {
s := &setup{
Builds: m.Builds,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
Status: m.Status,
Users: m.Users,
}
return s.do(ctx, stage)
}
// AfterAll signals the build stage is complete.
func (m *Manager) AfterAll(ctx context.Context, stage *core.Stage) error {
t := &teardown{
Builds: m.Builds,
Events: m.Events,
Logs: m.Logz,
Repos: m.Repos,
Scheduler: m.Scheduler,
Steps: m.Steps,
Stages: m.Stages,
Status: m.Status,
Users: m.Users,
}
return t.do(ctx, stage)
}
// Netrc returns netrc file with a valid, non-expired token
// that can be used to clone the repository.
func (m *Manager) Netrc(ctx context.Context, id int64) (*core.Netrc, error) {
logger := logrus.WithField("repo.id", id)
repo, err := m.Repos.Find(ctx, id)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository")
return nil, err
}
user, err := m.Users.Find(ctx, repo.UserID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository owner")
return nil, err
}
netrc, err := m.Netrcs.Create(ctx, user, repo)
if err != nil {
logger = logger.WithError(err)
logger = logger.WithField("repo.name", repo.Slug)
2019-04-16 02:03:49 +00:00
logger.Warnln("manager: cannot generate netrc")
2019-02-19 23:56:41 +00:00
}
return netrc, err
}
// Watch watches for build cancellation requests.
func (m *Manager) Watch(ctx context.Context, id int64) (bool, error) {
ok, err := m.Scheduler.Cancelled(ctx, id)
if err != nil {
return ok, err
}
// if a not found error is returned we should check
// the database to see if the stage is complete. If
// complete, return true.
stage, err := m.Stages.Find(ctx, id)
2019-09-16 05:38:37 +00:00
if err != nil {
2019-02-19 23:56:41 +00:00
logger := logrus.WithError(err)
logger = logger.WithField("step-id", id)
logger.Warnln("manager: cannot find stage")
return ok, err
}
return stage.IsDone(), nil
}
// Write writes a line to the build logs.
func (m *Manager) Write(ctx context.Context, step int64, line *core.Line) error {
err := m.Logz.Write(ctx, step, line)
if err != nil {
logger := logrus.WithError(err)
logger = logger.WithField("step-id", step)
logger.Warnln("manager: cannot write to log stream")
}
return err
}
// Upload uploads the full logs.
func (m *Manager) Upload(ctx context.Context, step int64, r io.Reader) error {
err := m.Logs.Create(ctx, step, r)
if err != nil {
logger := logrus.WithError(err)
logger = logger.WithField("step-id", step)
logger.Warnln("manager: cannot upload complete logs")
}
return err
}
// UploadBytes uploads the full logs.
func (m *Manager) UploadBytes(ctx context.Context, step int64, data []byte) error {
buf := bytes.NewBuffer(data)
err := m.Logs.Create(ctx, step, buf)
if err != nil {
logger := logrus.WithError(err)
logger = logger.WithField("step-id", step)
logger.Warnln("manager: cannot upload complete logs")
}
return err
}