harness-drone/operator/manager/manager.go
2019-09-16 04:59:21 -07:00

514 lines
14 KiB
Go

// Copyright 2019 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"bytes"
"context"
"io"
"time"
"github.com/drone/drone/core"
"github.com/drone/drone/store/shared/db"
"github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus"
)
var noContext = context.Background()
var _ BuildManager = (*Manager)(nil)
type (
// Context represents the minimum amount of information
// required by the runner to execute a build.
Context struct {
Repo *core.Repository `json:"repository"`
Build *core.Build `json:"build"`
Stage *core.Stage `json:"stage"`
Config *core.File `json:"config"`
Secrets []*core.Secret `json:"secrets"`
System *core.System `json:"system"`
}
// BuildManager encapsulets complex build operations and provides
// a simplified interface for build runners.
BuildManager interface {
// Request requests the next available build stage for execution.
Request(ctx context.Context, args *Request) (*core.Stage, error)
// Accept accepts the build stage for execution.
Accept(ctx context.Context, stage int64, machine string) (*core.Stage, error)
// Netrc returns a valid netrc for execution.
Netrc(ctx context.Context, repo int64) (*core.Netrc, error)
// Details fetches build details
Details(ctx context.Context, stage int64) (*Context, error)
// Before signals the build step is about to start.
Before(ctxt context.Context, step *core.Step) error
// After signals the build step is complete.
After(ctx context.Context, step *core.Step) error
// Before signals the build stage is about to start.
BeforeAll(ctxt context.Context, stage *core.Stage) error
// After signals the build stage is complete.
AfterAll(ctx context.Context, stage *core.Stage) error
// Watch watches for build cancellation requests.
Watch(ctx context.Context, stage int64) (bool, error)
// Write writes a line to the build logs
Write(ctx context.Context, step int64, line *core.Line) error
// Upload uploads the full logs
Upload(ctx context.Context, step int64, r io.Reader) error
// UploadBytes uploads the full logs
UploadBytes(ctx context.Context, step int64, b []byte) error
}
// Request provildes filters when requesting a pending
// build from the queue. This allows an agent, for example,
// to request a build that matches its architecture and kernel.
Request struct {
Kind string `json:"kind"`
Type string `json:"type"`
OS string `json:"os"`
Arch string `json:"arch"`
Variant string `json:"variant"`
Kernel string `json:"kernel"`
Labels map[string]string `json:"labels,omitempty"`
}
)
// New returns a new Manager.
func New(
builds core.BuildStore,
config core.ConfigService,
converter core.ConvertService,
events core.Pubsub,
logs core.LogStore,
logz core.LogStream,
netrcs core.NetrcService,
repos core.RepositoryStore,
scheduler core.Scheduler,
secrets core.SecretStore,
globals core.GlobalSecretStore,
status core.StatusService,
stages core.StageStore,
steps core.StepStore,
system *core.System,
users core.UserStore,
webhook core.WebhookSender,
) BuildManager {
return &Manager{
Builds: builds,
Config: config,
Converter: converter,
Events: events,
Globals: globals,
Logs: logs,
Logz: logz,
Netrcs: netrcs,
Repos: repos,
Scheduler: scheduler,
Secrets: secrets,
Status: status,
Stages: stages,
Steps: steps,
System: system,
Users: users,
Webhook: webhook,
}
}
// Manager provides a simplified interface to the build runner so that it
// can more easily interact with the server.
type Manager struct {
Builds core.BuildStore
Config core.ConfigService
Converter core.ConvertService
Events core.Pubsub
Globals core.GlobalSecretStore
Logs core.LogStore
Logz core.LogStream
Netrcs core.NetrcService
Repos core.RepositoryStore
Scheduler core.Scheduler
Secrets core.SecretStore
Status core.StatusService
Stages core.StageStore
Steps core.StepStore
System *core.System
Users core.UserStore
Webhook core.WebhookSender
}
// Request requests the next available build stage for execution.
func (m *Manager) Request(ctx context.Context, args *Request) (*core.Stage, error) {
logger := logrus.WithFields(
logrus.Fields{
"kind": args.Kind,
"type": args.Type,
"os": args.OS,
"arch": args.Arch,
"kernel": args.Kernel,
"variant": args.Variant,
},
)
logger.Debugln("manager: request queue item")
stage, err := m.Scheduler.Request(ctx, core.Filter{
Kind: args.Kind,
Type: args.Type,
OS: args.OS,
Arch: args.Arch,
Kernel: args.Kernel,
Variant: args.Variant,
Labels: args.Labels,
})
if err != nil && ctx.Err() != nil {
logger.Debugln("manager: context canceled")
return nil, err
}
if err != nil {
logger = logrus.WithError(err)
logger.Warnln("manager: request queue item error")
return nil, err
}
return stage, nil
}
// Accept accepts the build stage for execution. It is possible for multiple
// agents to pull the same stage from the queue. The system uses optimistic
// locking at the database-level to prevent multiple agents from executing the
// same stage.
func (m *Manager) Accept(ctx context.Context, id int64, machine string) (*core.Stage, error) {
logger := logrus.WithFields(
logrus.Fields{
"stage-id": id,
"machine": machine,
},
)
logger.Debugln("manager: accept stage")
stage, err := m.Stages.Find(noContext, id)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find stage")
return nil, err
}
if stage.Machine != "" {
logger.Debugln("manager: stage already assigned. abort.")
return nil, db.ErrOptimisticLock
}
stage.Machine = machine
stage.Status = core.StatusPending
stage.Updated = time.Now().Unix()
err = m.Stages.Update(noContext, stage)
if err == db.ErrOptimisticLock {
logger = logger.WithError(err)
logger.Debugln("manager: stage processed by another agent")
} else if err != nil {
logger = logger.WithError(err)
logger.Debugln("manager: cannot update stage")
} else {
logger.Debugln("manager: stage accepted")
}
return stage, err
}
// Details fetches build details.
func (m *Manager) Details(ctx context.Context, id int64) (*Context, error) {
logger := logrus.WithField("step-id", id)
logger.Debugln("manager: fetching stage details")
stage, err := m.Stages.Find(noContext, id)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find stage")
return nil, err
}
build, err := m.Builds.Find(noContext, stage.BuildID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find build")
return nil, err
}
stages, err := m.Stages.List(ctx, stage.BuildID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot list stages")
return nil, err
}
build.Stages = stages
repo, err := m.Repos.Find(noContext, build.RepoID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository")
return nil, err
}
logger = logger.WithFields(
logrus.Fields{
"build": build.Number,
"repo": repo.Slug,
},
)
user, err := m.Users.Find(noContext, repo.UserID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository owner")
return nil, err
}
config, err := m.Config.Find(noContext, &core.ConfigArgs{
User: user,
Repo: repo,
Build: build,
})
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find configuration")
return nil, err
}
config, err = m.Converter.Convert(noContext, &core.ConvertArgs{
Build: build,
Config: config,
Repo: repo,
User: user,
})
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot convert configuration")
return nil, err
}
var secrets []*core.Secret
tmpSecrets, err := m.Secrets.List(noContext, repo.ID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot list secrets")
return nil, err
}
tmpGlobalSecrets, err := m.Globals.List(noContext, repo.Namespace)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot list global secrets")
return nil, err
}
// TODO(bradrydzewski) can we delegate filtering
// secrets to the agent? If not, we should add
// unit tests.
for _, secret := range tmpSecrets {
if secret.PullRequest == false &&
build.Event == core.EventPullRequest {
continue
}
secrets = append(secrets, secret)
}
for _, secret := range tmpGlobalSecrets {
if secret.PullRequest == false &&
build.Event == core.EventPullRequest {
continue
}
secrets = append(secrets, secret)
}
return &Context{
Repo: repo,
Build: build,
Stage: stage,
Secrets: secrets,
System: m.System,
Config: &core.File{Data: []byte(config.Data)},
}, nil
}
// Before signals the build step is about to start.
func (m *Manager) Before(ctx context.Context, step *core.Step) error {
logger := logrus.WithFields(
logrus.Fields{
"step.status": step.Status,
"step.name": step.Name,
"step.id": step.ID,
},
)
logger.Debugln("manager: updating step status")
err := m.Logz.Create(noContext, step.ID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot create log stream")
return err
}
updater := &updater{
Builds: m.Builds,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
Webhook: m.Webhook,
}
return updater.do(ctx, step)
}
// After signals the build step is complete.
func (m *Manager) After(ctx context.Context, step *core.Step) error {
logger := logrus.WithFields(
logrus.Fields{
"step.status": step.Status,
"step.name": step.Name,
"step.id": step.ID,
},
)
logger.Debugln("manager: updating step status")
var errs error
updater := &updater{
Builds: m.Builds,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
Webhook: m.Webhook,
}
if err := updater.do(ctx, step); err != nil {
errs = multierror.Append(errs, err)
logger = logger.WithError(err)
logger.Warnln("manager: cannot update step")
}
if err := m.Logz.Delete(noContext, step.ID); err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot teardown log stream")
}
return errs
}
// BeforeAll signals the build stage is about to start.
func (m *Manager) BeforeAll(ctx context.Context, stage *core.Stage) error {
s := &setup{
Builds: m.Builds,
Events: m.Events,
Repos: m.Repos,
Steps: m.Steps,
Stages: m.Stages,
Status: m.Status,
Users: m.Users,
}
return s.do(ctx, stage)
}
// AfterAll signals the build stage is complete.
func (m *Manager) AfterAll(ctx context.Context, stage *core.Stage) error {
t := &teardown{
Builds: m.Builds,
Events: m.Events,
Logs: m.Logz,
Repos: m.Repos,
Scheduler: m.Scheduler,
Steps: m.Steps,
Stages: m.Stages,
Status: m.Status,
Users: m.Users,
}
return t.do(ctx, stage)
}
// Netrc returns netrc file with a valid, non-expired token
// that can be used to clone the repository.
func (m *Manager) Netrc(ctx context.Context, id int64) (*core.Netrc, error) {
logger := logrus.WithField("repo.id", id)
repo, err := m.Repos.Find(ctx, id)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository")
return nil, err
}
user, err := m.Users.Find(ctx, repo.UserID)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find repository owner")
return nil, err
}
netrc, err := m.Netrcs.Create(ctx, user, repo)
if err != nil {
logger = logger.WithError(err)
logger = logger.WithField("repo.name", repo.Slug)
logger.Warnln("manager: cannot generate netrc")
}
return netrc, err
}
// Watch watches for build cancellation requests.
func (m *Manager) Watch(ctx context.Context, id int64) (bool, error) {
ok, err := m.Scheduler.Cancelled(ctx, id)
if err != nil {
return ok, err
}
// if a not found error is returned we should check
// the database to see if the stage is complete. If
// complete, return true.
stage, err := m.Stages.Find(ctx, id)
if err != nil {
logger := logrus.WithError(err)
logger = logger.WithField("step-id", id)
logger.Warnln("manager: cannot find stage")
return ok, err
}
return stage.IsDone(), nil
}
// Write writes a line to the build logs.
func (m *Manager) Write(ctx context.Context, step int64, line *core.Line) error {
err := m.Logz.Write(ctx, step, line)
if err != nil {
logger := logrus.WithError(err)
logger = logger.WithField("step-id", step)
logger.Warnln("manager: cannot write to log stream")
}
return err
}
// Upload uploads the full logs.
func (m *Manager) Upload(ctx context.Context, step int64, r io.Reader) error {
err := m.Logs.Create(ctx, step, r)
if err != nil {
logger := logrus.WithError(err)
logger = logger.WithField("step-id", step)
logger.Warnln("manager: cannot upload complete logs")
}
return err
}
// UploadBytes uploads the full logs.
func (m *Manager) UploadBytes(ctx context.Context, step int64, data []byte) error {
buf := bytes.NewBuffer(data)
err := m.Logs.Create(ctx, step, buf)
if err != nil {
logger := logrus.WithError(err)
logger = logger.WithField("step-id", step)
logger.Warnln("manager: cannot upload complete logs")
}
return err
}