537 lines
14 KiB
Go
537 lines
14 KiB
Go
// Copyright 2019 Drone IO, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package manager
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/drone/drone-yaml/yaml/converter"
|
|
"github.com/drone/drone/core"
|
|
"github.com/drone/drone/store/shared/db"
|
|
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var noContext = context.Background()
|
|
|
|
var _ BuildManager = (*Manager)(nil)
|
|
|
|
type (
|
|
// Context represents the minimum amount of information
|
|
// required by the runner to execute a build.
|
|
Context struct {
|
|
Repo *core.Repository `json:"repository"`
|
|
Build *core.Build `json:"build"`
|
|
Stage *core.Stage `json:"stage"`
|
|
Config *core.File `json:"config"`
|
|
Secrets []*core.Secret `json:"secrets"`
|
|
System *core.System `json:"system"`
|
|
}
|
|
|
|
// BuildManager encapsulets complex build operations and provides
|
|
// a simplified interface for build runners.
|
|
BuildManager interface {
|
|
// Request requests the next available build stage for execution.
|
|
Request(ctx context.Context, args *Request) (*core.Stage, error)
|
|
|
|
// Accept accepts the build stage for execution.
|
|
Accept(ctx context.Context, stage int64, machine string) (*core.Stage, error)
|
|
|
|
// Netrc returns a valid netrc for execution.
|
|
Netrc(ctx context.Context, repo int64) (*core.Netrc, error)
|
|
|
|
// Details fetches build details
|
|
Details(ctx context.Context, stage int64) (*Context, error)
|
|
|
|
// Before signals the build step is about to start.
|
|
Before(ctxt context.Context, step *core.Step) error
|
|
|
|
// After signals the build step is complete.
|
|
After(ctx context.Context, step *core.Step) error
|
|
|
|
// Before signals the build stage is about to start.
|
|
BeforeAll(ctxt context.Context, stage *core.Stage) error
|
|
|
|
// After signals the build stage is complete.
|
|
AfterAll(ctx context.Context, stage *core.Stage) error
|
|
|
|
// Watch watches for build cancellation requests.
|
|
Watch(ctx context.Context, stage int64) (bool, error)
|
|
|
|
// Write writes a line to the build logs
|
|
Write(ctx context.Context, step int64, line *core.Line) error
|
|
|
|
// Upload uploads the full logs
|
|
Upload(ctx context.Context, step int64, r io.Reader) error
|
|
|
|
// UploadBytes uploads the full logs
|
|
UploadBytes(ctx context.Context, step int64, b []byte) error
|
|
}
|
|
|
|
// Request provildes filters when requesting a pending
|
|
// build from the queue. This allows an agent, for example,
|
|
// to request a build that matches its architecture and kernel.
|
|
Request struct {
|
|
Kind string `json:"kind"`
|
|
Type string `json:"type"`
|
|
OS string `json:"os"`
|
|
Arch string `json:"arch"`
|
|
Variant string `json:"variant"`
|
|
Kernel string `json:"kernel"`
|
|
Labels map[string]string `json:"labels,omitempty"`
|
|
}
|
|
)
|
|
|
|
// New returns a new Manager.
|
|
func New(
|
|
builds core.BuildStore,
|
|
config core.ConfigService,
|
|
converter core.ConvertService,
|
|
events core.Pubsub,
|
|
logs core.LogStore,
|
|
logz core.LogStream,
|
|
netrcs core.NetrcService,
|
|
repos core.RepositoryStore,
|
|
scheduler core.Scheduler,
|
|
secrets core.SecretStore,
|
|
globals core.GlobalSecretStore,
|
|
status core.StatusService,
|
|
stages core.StageStore,
|
|
steps core.StepStore,
|
|
system *core.System,
|
|
users core.UserStore,
|
|
webhook core.WebhookSender,
|
|
) BuildManager {
|
|
return &Manager{
|
|
Builds: builds,
|
|
Config: config,
|
|
Converter: converter,
|
|
Events: events,
|
|
Globals: globals,
|
|
Logs: logs,
|
|
Logz: logz,
|
|
Netrcs: netrcs,
|
|
Repos: repos,
|
|
Scheduler: scheduler,
|
|
Secrets: secrets,
|
|
Status: status,
|
|
Stages: stages,
|
|
Steps: steps,
|
|
System: system,
|
|
Users: users,
|
|
Webhook: webhook,
|
|
}
|
|
}
|
|
|
|
// Manager provides a simplified interface to the build runner so that it
|
|
// can more easily interact with the server.
|
|
type Manager struct {
|
|
Builds core.BuildStore
|
|
Config core.ConfigService
|
|
Converter core.ConvertService
|
|
Events core.Pubsub
|
|
Globals core.GlobalSecretStore
|
|
Logs core.LogStore
|
|
Logz core.LogStream
|
|
Netrcs core.NetrcService
|
|
Repos core.RepositoryStore
|
|
Scheduler core.Scheduler
|
|
Secrets core.SecretStore
|
|
Status core.StatusService
|
|
Stages core.StageStore
|
|
Steps core.StepStore
|
|
System *core.System
|
|
Users core.UserStore
|
|
Webhook core.WebhookSender
|
|
}
|
|
|
|
// Request requests the next available build stage for execution.
|
|
func (m *Manager) Request(ctx context.Context, args *Request) (*core.Stage, error) {
|
|
logger := logrus.WithFields(
|
|
logrus.Fields{
|
|
"kind": args.Kind,
|
|
"type": args.Type,
|
|
"os": args.OS,
|
|
"arch": args.Arch,
|
|
"kernel": args.Kernel,
|
|
"variant": args.Variant,
|
|
},
|
|
)
|
|
logger.Debugln("manager: request queue item")
|
|
|
|
stage, err := m.Scheduler.Request(ctx, core.Filter{
|
|
Kind: args.Kind,
|
|
Type: args.Type,
|
|
OS: args.OS,
|
|
Arch: args.Arch,
|
|
Kernel: args.Kernel,
|
|
Variant: args.Variant,
|
|
Labels: args.Labels,
|
|
})
|
|
if err != nil && ctx.Err() != nil {
|
|
logger.Debugln("manager: context canceled")
|
|
return nil, err
|
|
}
|
|
if err != nil {
|
|
logger = logrus.WithError(err)
|
|
logger.Warnln("manager: request queue item error")
|
|
return nil, err
|
|
}
|
|
return stage, nil
|
|
}
|
|
|
|
// Accept accepts the build stage for execution. It is possible for multiple
|
|
// agents to pull the same stage from the queue. The system uses optimistic
|
|
// locking at the database-level to prevent multiple agents from executing the
|
|
// same stage.
|
|
func (m *Manager) Accept(ctx context.Context, id int64, machine string) (*core.Stage, error) {
|
|
logger := logrus.WithFields(
|
|
logrus.Fields{
|
|
"stage-id": id,
|
|
"machine": machine,
|
|
},
|
|
)
|
|
logger.Debugln("manager: accept stage")
|
|
|
|
stage, err := m.Stages.Find(noContext, id)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find stage")
|
|
return nil, err
|
|
}
|
|
if stage.Machine != "" {
|
|
logger.Debugln("manager: stage already assigned. abort.")
|
|
return nil, db.ErrOptimisticLock
|
|
}
|
|
|
|
stage.Machine = machine
|
|
stage.Status = core.StatusPending
|
|
stage.Updated = time.Now().Unix()
|
|
|
|
err = m.Stages.Update(noContext, stage)
|
|
if err == db.ErrOptimisticLock {
|
|
logger = logger.WithError(err)
|
|
logger.Debugln("manager: stage processed by another agent")
|
|
} else if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Debugln("manager: cannot update stage")
|
|
} else {
|
|
logger.Debugln("manager: stage accepted")
|
|
}
|
|
return stage, err
|
|
}
|
|
|
|
// Details fetches build details.
|
|
func (m *Manager) Details(ctx context.Context, id int64) (*Context, error) {
|
|
logger := logrus.WithField("step-id", id)
|
|
logger.Debugln("manager: fetching stage details")
|
|
|
|
stage, err := m.Stages.Find(noContext, id)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find stage")
|
|
return nil, err
|
|
}
|
|
build, err := m.Builds.Find(noContext, stage.BuildID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find build")
|
|
return nil, err
|
|
}
|
|
stages, err := m.Stages.List(ctx, stage.BuildID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot list stages")
|
|
return nil, err
|
|
}
|
|
build.Stages = stages
|
|
repo, err := m.Repos.Find(noContext, build.RepoID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find repository")
|
|
return nil, err
|
|
}
|
|
logger = logger.WithFields(
|
|
logrus.Fields{
|
|
"build": build.Number,
|
|
"repo": repo.Slug,
|
|
},
|
|
)
|
|
user, err := m.Users.Find(noContext, repo.UserID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find repository owner")
|
|
return nil, err
|
|
}
|
|
config, err := m.Config.Find(noContext, &core.ConfigArgs{
|
|
User: user,
|
|
Repo: repo,
|
|
Build: build,
|
|
})
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find configuration")
|
|
return nil, err
|
|
}
|
|
|
|
// this code is temporarily in place to detect and convert
|
|
// the legacy yaml configuration file to the new format.
|
|
config.Data, _ = converter.ConvertString(config.Data, converter.Metadata{
|
|
Filename: repo.Config,
|
|
URL: repo.Link,
|
|
Ref: build.Ref,
|
|
})
|
|
|
|
config, err = m.Converter.Convert(noContext, &core.ConvertArgs{
|
|
Build: build,
|
|
Config: config,
|
|
Repo: repo,
|
|
User: user,
|
|
})
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot convert configuration")
|
|
return nil, err
|
|
}
|
|
var secrets []*core.Secret
|
|
tmpSecrets, err := m.Secrets.List(noContext, repo.ID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot list secrets")
|
|
return nil, err
|
|
}
|
|
tmpGlobalSecrets, err := m.Globals.List(noContext, repo.Namespace)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot list global secrets")
|
|
return nil, err
|
|
}
|
|
// TODO(bradrydzewski) can we delegate filtering
|
|
// secrets to the agent? If not, we should add
|
|
// unit tests.
|
|
for _, secret := range tmpSecrets {
|
|
if secret.PullRequest == false &&
|
|
build.Event == core.EventPullRequest {
|
|
continue
|
|
}
|
|
secrets = append(secrets, secret)
|
|
}
|
|
for _, secret := range tmpGlobalSecrets {
|
|
if secret.PullRequest == false &&
|
|
build.Event == core.EventPullRequest {
|
|
continue
|
|
}
|
|
secrets = append(secrets, secret)
|
|
}
|
|
return &Context{
|
|
Repo: repo,
|
|
Build: build,
|
|
Stage: stage,
|
|
Secrets: secrets,
|
|
System: m.System,
|
|
Config: &core.File{Data: []byte(config.Data)},
|
|
}, nil
|
|
}
|
|
|
|
// Before signals the build step is about to start.
|
|
func (m *Manager) Before(ctx context.Context, step *core.Step) error {
|
|
logger := logrus.WithFields(
|
|
logrus.Fields{
|
|
"step.status": step.Status,
|
|
"step.name": step.Name,
|
|
"step.id": step.ID,
|
|
},
|
|
)
|
|
logger.Debugln("manager: updating step status")
|
|
|
|
err := m.Logz.Create(noContext, step.ID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot create log stream")
|
|
return err
|
|
}
|
|
updater := &updater{
|
|
Builds: m.Builds,
|
|
Events: m.Events,
|
|
Repos: m.Repos,
|
|
Steps: m.Steps,
|
|
Stages: m.Stages,
|
|
Webhook: m.Webhook,
|
|
}
|
|
return updater.do(ctx, step)
|
|
}
|
|
|
|
// After signals the build step is complete.
|
|
func (m *Manager) After(ctx context.Context, step *core.Step) error {
|
|
logger := logrus.WithFields(
|
|
logrus.Fields{
|
|
"step.status": step.Status,
|
|
"step.name": step.Name,
|
|
"step.id": step.ID,
|
|
},
|
|
)
|
|
logger.Debugln("manager: updating step status")
|
|
|
|
var errs error
|
|
updater := &updater{
|
|
Builds: m.Builds,
|
|
Events: m.Events,
|
|
Repos: m.Repos,
|
|
Steps: m.Steps,
|
|
Stages: m.Stages,
|
|
Webhook: m.Webhook,
|
|
}
|
|
|
|
if err := updater.do(ctx, step); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot update step")
|
|
}
|
|
|
|
if err := m.Logz.Delete(noContext, step.ID); err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot teardown log stream")
|
|
}
|
|
return errs
|
|
}
|
|
|
|
// BeforeAll signals the build stage is about to start.
|
|
func (m *Manager) BeforeAll(ctx context.Context, stage *core.Stage) error {
|
|
s := &setup{
|
|
Builds: m.Builds,
|
|
Events: m.Events,
|
|
Repos: m.Repos,
|
|
Steps: m.Steps,
|
|
Stages: m.Stages,
|
|
Status: m.Status,
|
|
Users: m.Users,
|
|
}
|
|
return s.do(ctx, stage)
|
|
}
|
|
|
|
// AfterAll signals the build stage is complete.
|
|
func (m *Manager) AfterAll(ctx context.Context, stage *core.Stage) error {
|
|
t := &teardown{
|
|
Builds: m.Builds,
|
|
Events: m.Events,
|
|
Logs: m.Logz,
|
|
Repos: m.Repos,
|
|
Scheduler: m.Scheduler,
|
|
Steps: m.Steps,
|
|
Stages: m.Stages,
|
|
Status: m.Status,
|
|
Users: m.Users,
|
|
Webhook: m.Webhook,
|
|
}
|
|
return t.do(ctx, stage)
|
|
}
|
|
|
|
// Netrc returns netrc file with a valid, non-expired token
|
|
// that can be used to clone the repository.
|
|
func (m *Manager) Netrc(ctx context.Context, id int64) (*core.Netrc, error) {
|
|
logger := logrus.WithField("repo.id", id)
|
|
|
|
repo, err := m.Repos.Find(ctx, id)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find repository")
|
|
return nil, err
|
|
}
|
|
|
|
user, err := m.Users.Find(ctx, repo.UserID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("manager: cannot find repository owner")
|
|
return nil, err
|
|
}
|
|
|
|
netrc, err := m.Netrcs.Create(ctx, user, repo)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger = logger.WithField("repo.name", repo.Slug)
|
|
logger.Warnln("manager: cannot generate netrc")
|
|
}
|
|
return netrc, err
|
|
}
|
|
|
|
// Watch watches for build cancellation requests.
|
|
func (m *Manager) Watch(ctx context.Context, id int64) (bool, error) {
|
|
ok, err := m.Scheduler.Cancelled(ctx, id)
|
|
// we expect a context cancel error here which
|
|
// indicates a polling timeout. The subscribing
|
|
// client should look for the context cancel error
|
|
// and resume polling.
|
|
if err != nil {
|
|
return ok, err
|
|
}
|
|
|
|
// // TODO (bradrydzewski) we should be able to return
|
|
// // immediately if Cancelled returns true. This requires
|
|
// // some more testing but would avoid the extra database
|
|
// // call.
|
|
// if ok {
|
|
// return ok, err
|
|
// }
|
|
|
|
// if no error is returned we should check
|
|
// the database to see if the build is complete. If
|
|
// complete, return true.
|
|
build, err := m.Builds.Find(ctx, id)
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger = logger.WithField("build-id", id)
|
|
logger.Warnln("manager: cannot find build")
|
|
return ok, err
|
|
}
|
|
return build.IsDone(), nil
|
|
}
|
|
|
|
// Write writes a line to the build logs.
|
|
func (m *Manager) Write(ctx context.Context, step int64, line *core.Line) error {
|
|
err := m.Logz.Write(ctx, step, line)
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger = logger.WithField("step-id", step)
|
|
logger.Warnln("manager: cannot write to log stream")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Upload uploads the full logs.
|
|
func (m *Manager) Upload(ctx context.Context, step int64, r io.Reader) error {
|
|
err := m.Logs.Create(ctx, step, r)
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger = logger.WithField("step-id", step)
|
|
logger.Warnln("manager: cannot upload complete logs")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// UploadBytes uploads the full logs.
|
|
func (m *Manager) UploadBytes(ctx context.Context, step int64, data []byte) error {
|
|
buf := bytes.NewBuffer(data)
|
|
err := m.Logs.Create(ctx, step, buf)
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger = logger.WithField("step-id", step)
|
|
logger.Warnln("manager: cannot upload complete logs")
|
|
}
|
|
return err
|
|
}
|