abstracted build execution to /agent package and hooked up to drone exec

This commit is contained in:
Brad Rydzewski 2016-05-10 15:04:19 -07:00
parent 3d05659134
commit 8f467ff5ca
23 changed files with 940 additions and 499 deletions

282
agent/agent.go Normal file
View file

@ -0,0 +1,282 @@
package agent
import (
"fmt"
"net/url"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/drone/drone/build"
"github.com/drone/drone/engine/runner"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/drone/drone/version"
"github.com/drone/drone/yaml"
"github.com/drone/drone/yaml/expander"
"github.com/drone/drone/yaml/transform"
)
type Logger interface {
Write(*build.Line)
}
type Agent struct {
Update UpdateFunc
Logger LoggerFunc
Engine build.Engine
Timeout time.Duration
Platform string
Namespace string
Disable []string
Escalate []string
Netrc []string
Local string
Pull bool
}
func (a *Agent) Poll() error {
// logrus.Infof("Starting build %s/%s#%d.%d",
// payload.Repo.Owner, payload.Repo.Name, payload.Build.Number, payload.Job.Number)
//
//
// logrus.Infof("Finished build %s/%s#%d.%d",
// payload.Repo.Owner, payload.Repo.Name, payload.Build.Number, payload.Job.Number)
return nil
}
func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error {
payload.Job.Status = model.StatusRunning
payload.Job.Started = time.Now().Unix()
spec, err := a.prep(payload)
if err != nil {
payload.Job.Error = err.Error()
payload.Job.ExitCode = 255
payload.Job.Finished = payload.Job.Started
payload.Job.Status = model.StatusError
a.Update(payload)
return err
}
err = a.exec(spec, payload, cancel)
if err != nil {
payload.Job.ExitCode = 255
}
if exitErr, ok := err.(*runner.ExitError); ok {
payload.Job.ExitCode = exitErr.Code
}
payload.Job.Finished = time.Now().Unix()
switch payload.Job.ExitCode {
case 128, 130, 137:
payload.Job.Status = model.StatusKilled
case 0:
payload.Job.Status = model.StatusSuccess
default:
payload.Job.Status = model.StatusFailure
}
a.Update(payload)
return err
}
func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) {
envs := toEnv(w)
w.Yaml = expander.ExpandString(w.Yaml, envs)
// inject the netrc file into the clone plugin if the repositroy is
// private and requires authentication.
var secrets []*model.Secret
if w.Verified {
secrets = append(secrets, w.Secrets...)
}
if w.Repo.IsPrivate {
secrets = append(secrets, &model.Secret{
Name: "DRONE_NETRC_USERNAME",
Value: w.Netrc.Login,
Images: []string{"*"},
Events: []string{"*"},
})
secrets = append(secrets, &model.Secret{
Name: "DRONE_NETRC_PASSWORD",
Value: w.Netrc.Password,
Images: []string{"*"},
Events: []string{"*"},
})
secrets = append(secrets, &model.Secret{
Name: "DRONE_NETRC_MACHINE",
Value: w.Netrc.Machine,
Images: []string{"*"},
Events: []string{"*"},
})
}
conf, err := yaml.ParseString(w.Yaml)
if err != nil {
return nil, err
}
src := "src"
if url, _ := url.Parse(w.Repo.Link); url != nil {
src = filepath.Join(src, url.Host, url.Path)
}
transform.Clone(conf, w.Repo.Kind)
transform.Environ(conf, envs)
transform.DefaultFilter(conf)
transform.ImageSecrets(conf, secrets, w.Build.Event)
transform.Identifier(conf)
transform.WorkspaceTransform(conf, "/drone", src)
if err := transform.Check(conf, w.Repo.IsTrusted); err != nil {
return nil, err
}
transform.CommandTransform(conf)
transform.ImagePull(conf, a.Pull)
transform.ImageTag(conf)
transform.ImageName(conf)
transform.ImageNamespace(conf, a.Namespace)
transform.ImageEscalate(conf, a.Escalate)
transform.PluginParams(conf)
if a.Local != "" {
transform.PluginDisable(conf, a.Disable)
transform.ImageVolume(conf, []string{a.Local + ":" + conf.Workspace.Path})
}
transform.Pod(conf)
return conf, nil
}
func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) error {
conf := build.Config{
Engine: a.Engine,
Buffer: 500,
}
pipeline := conf.Pipeline(spec)
defer pipeline.Teardown()
// setup the build environment
if err := pipeline.Setup(); err != nil {
return err
}
timeout := time.After(time.Duration(payload.Repo.Timeout) * time.Minute)
for {
select {
case <-pipeline.Done():
return pipeline.Err()
case <-cancel:
pipeline.Stop()
return fmt.Errorf("termination request received, build cancelled")
case <-timeout:
pipeline.Stop()
return fmt.Errorf("maximum time limit exceeded, build cancelled")
case <-time.After(a.Timeout):
pipeline.Stop()
return fmt.Errorf("terminal inactive for %v, build cancelled", a.Timeout)
case <-pipeline.Next():
// TODO(bradrydzewski) this entire block of code should probably get
// encapsulated in the pipeline.
status := model.StatusSuccess
if pipeline.Err() != nil {
status = model.StatusFailure
}
// updates the build status passed into each container. I realize this is
// a bit out of place and will work to resolve.
pipeline.Head().Environment["DRONE_STATUS"] = status
if !pipeline.Head().Constraints.Match(
a.Platform,
payload.Build.Deploy,
payload.Build.Event,
payload.Build.Branch,
status, payload.Job.Environment) { // TODO: fix this whole section
pipeline.Skip()
} else {
pipeline.Exec()
}
case line := <-pipeline.Pipe():
a.Logger(line)
}
}
}
func toEnv(w *queue.Work) map[string]string {
envs := map[string]string{
"CI": "drone",
"DRONE": "true",
"DRONE_ARCH": "linux/amd64",
"DRONE_REPO": w.Repo.FullName,
"DRONE_REPO_SCM": w.Repo.Kind,
"DRONE_REPO_OWNER": w.Repo.Owner,
"DRONE_REPO_NAME": w.Repo.Name,
"DRONE_REPO_LINK": w.Repo.Link,
"DRONE_REPO_AVATAR": w.Repo.Avatar,
"DRONE_REPO_BRANCH": w.Repo.Branch,
"DRONE_REPO_PRIVATE": fmt.Sprintf("%v", w.Repo.IsPrivate),
"DRONE_REPO_TRUSTED": fmt.Sprintf("%v", w.Repo.IsTrusted),
"DRONE_REMOTE_URL": w.Repo.Clone,
"DRONE_COMMIT_SHA": w.Build.Commit,
"DRONE_COMMIT_REF": w.Build.Ref,
"DRONE_COMMIT_BRANCH": w.Build.Branch,
"DRONE_COMMIT_LINK": w.Build.Link,
"DRONE_COMMIT_MESSAGE": w.Build.Message,
"DRONE_COMMIT_AUTHOR": w.Build.Author,
"DRONE_COMMIT_AUTHOR_EMAIL": w.Build.Email,
"DRONE_COMMIT_AUTHOR_AVATAR": w.Build.Avatar,
"DRONE_BUILD_NUMBER": fmt.Sprintf("%d", w.Build.Number),
"DRONE_BUILD_EVENT": w.Build.Event,
"DRONE_BUILD_STATUS": w.Build.Status,
"DRONE_BUILD_LINK": fmt.Sprintf("%s/%s/%d", w.System.Link, w.Repo.FullName, w.Build.Number),
"DRONE_BUILD_CREATED": fmt.Sprintf("%d", w.Build.Created),
"DRONE_BUILD_STARTED": fmt.Sprintf("%d", w.Build.Started),
"DRONE_BUILD_FINISHED": fmt.Sprintf("%d", w.Build.Finished),
"DRONE_YAML_VERIFIED": fmt.Sprintf("%v", w.Verified),
"DRONE_YAML_SIGNED": fmt.Sprintf("%v", w.Signed),
"DRONE_BRANCH": w.Build.Branch,
"DRONE_COMMIT": w.Build.Commit,
"DRONE_VERSION": version.Version,
}
if w.Build.Event == model.EventTag {
envs["DRONE_TAG"] = strings.TrimPrefix(w.Build.Ref, "refs/tags/")
}
if w.Build.Event == model.EventPull {
envs["DRONE_PULL_REQUEST"] = pullRegexp.FindString(w.Build.Ref)
}
if w.Build.Event == model.EventDeploy {
envs["DRONE_DEPLOY_TO"] = w.Build.Deploy
}
if w.BuildLast != nil {
envs["DRONE_PREV_BUILD_STATUS"] = w.BuildLast.Status
envs["DRONE_PREV_BUILD_NUMBER"] = fmt.Sprintf("%v", w.BuildLast.Number)
envs["DRONE_PREV_COMMIT_SHA"] = w.BuildLast.Commit
}
// inject matrix values as environment variables
for key, val := range w.Job.Environment {
envs[key] = val
}
return envs
}
var pullRegexp = regexp.MustCompile("\\d+")

50
agent/updater.go Normal file
View file

@ -0,0 +1,50 @@
package agent
import (
"encoding/json"
"fmt"
"io"
"time"
"github.com/Sirupsen/logrus"
"github.com/drone/drone/build"
"github.com/drone/drone/client"
"github.com/drone/drone/queue"
)
// UpdateFunc handles buid pipeline status updates.
type UpdateFunc func(*queue.Work)
// LoggerFunc handles buid pipeline logging updates.
type LoggerFunc func(*build.Line)
var NoopUpdateFunc = func(*queue.Work) {}
var TermLoggerFunc = func(line *build.Line) {
fmt.Println(line)
}
// NewClientUpdater returns an updater that sends updated build details
// to the drone server.
func NewClientUpdater(client client.Client) UpdateFunc {
return func(w *queue.Work) {
for {
err := client.Push(w)
if err == nil {
return
}
logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
logrus.Infof("Retry update in 30s")
time.Sleep(time.Second * 30)
}
}
}
func NewClientLogger(w io.Writer) LoggerFunc {
return func(line *build.Line) {
linejson, _ := json.Marshal(line)
w.Write(linejson)
w.Write([]byte{'\n'})
}
}

48
build/config.go Normal file
View file

@ -0,0 +1,48 @@
package build
import "github.com/drone/drone/yaml"
// Config defines the configuration for creating the Pipeline.
type Config struct {
Engine Engine
// Buffer defines the size of the buffer for the channel to which the
// console output is streamed.
Buffer uint
}
// Pipeline creates a build Pipeline using the specific configuration for
// the given Yaml specification.
func (c *Config) Pipeline(spec *yaml.Config) *Pipeline {
pipeline := Pipeline{
engine: c.Engine,
pipe: make(chan *Line, c.Buffer),
next: make(chan error),
done: make(chan error),
}
var containers []*yaml.Container
containers = append(containers, spec.Services...)
containers = append(containers, spec.Pipeline...)
for _, c := range containers {
if c.Disabled {
continue
}
next := &element{Container: c}
if pipeline.head == nil {
pipeline.head = next
pipeline.tail = next
} else {
pipeline.tail.next = next
pipeline.tail = next
}
}
go func() {
pipeline.next <- nil
}()
return &pipeline
}

View file

@ -1 +0,0 @@
package build

112
build/docker/docker.go Normal file
View file

@ -0,0 +1,112 @@
package docker
import (
"io"
"github.com/drone/drone/build"
"github.com/drone/drone/build/docker/internal"
"github.com/drone/drone/yaml"
"github.com/samalba/dockerclient"
)
type dockerEngine struct {
client dockerclient.Client
}
func (e *dockerEngine) ContainerStart(container *yaml.Container) (string, error) {
conf := toContainerConfig(container)
auth := toAuthConfig(container)
// pull the image if it does not exists or if the Container
// is configured to always pull a new image.
_, err := e.client.InspectImage(container.Image)
if err != nil || container.Pull {
e.client.PullImage(container.Image, auth)
}
// create and start the container and return the Container ID.
id, err := e.client.CreateContainer(conf, container.ID, auth)
if err != nil {
return id, err
}
err = e.client.StartContainer(id, &conf.HostConfig)
if err != nil {
// remove the container if it cannot be started
e.client.RemoveContainer(id, true, true)
return id, err
}
return id, nil
}
func (e *dockerEngine) ContainerStop(id string) error {
e.client.StopContainer(id, 1)
e.client.KillContainer(id, "9")
return nil
}
func (e *dockerEngine) ContainerRemove(id string) error {
e.client.StopContainer(id, 1)
e.client.KillContainer(id, "9")
e.client.RemoveContainer(id, true, true)
return nil
}
func (e *dockerEngine) ContainerWait(id string) (*build.State, error) {
// wait for the container to exit
//
// TODO(bradrydzewski) we should have a for loop here
// to re-connect and wait if this channel returns a
// result even though the container is still running.
//
<-e.client.Wait(id)
v, err := e.client.InspectContainer(id)
if err != nil {
return nil, err
}
return &build.State{
ExitCode: v.State.ExitCode,
OOMKilled: v.State.OOMKilled,
}, nil
}
func (e *dockerEngine) ContainerLogs(id string) (io.ReadCloser, error) {
opts := &dockerclient.LogOptions{
Follow: true,
Stdout: true,
Stderr: true,
}
piper, pipew := io.Pipe()
go func() {
defer pipew.Close()
// sometimes the docker logs fails due to parsing errors. this
// routine will check for such a failure and attempt to resume
// if necessary.
for i := 0; i < 5; i++ {
if i > 0 {
opts.Tail = 1
}
rc, err := e.client.ContainerLogs(id, opts)
if err != nil {
return
}
defer rc.Close()
// use Docker StdCopy
internal.StdCopy(pipew, pipew, rc)
// check to see if the container is still running. If not,
// we can safely exit and assume there are no more logs left
// to stream.
v, err := e.client.InspectContainer(id)
if err != nil || !v.State.Running {
return
}
}
}()
return piper, nil
}

View file

@ -0,0 +1 @@
package docker

25
build/docker/helper.go Normal file
View file

@ -0,0 +1,25 @@
package docker
import (
"github.com/drone/drone/build"
"github.com/samalba/dockerclient"
)
// NewClient returns a new Docker engine using the provided Docker client.
func NewClient(client dockerclient.Client) build.Engine {
return &dockerEngine{client}
}
// New returns a new Docker engine from the provided DOCKER_HOST and
// DOCKER_CERT_PATH environment variables.
func New(host, cert string, tls bool) (build.Engine, error) {
config, err := dockerclient.TLSConfigFromCertPath(cert)
if err == nil && tls {
config.InsecureSkipVerify = true
}
client, err := dockerclient.NewDockerClient(host, config)
if err != nil {
return nil, err
}
return NewClient(client), nil
}

View file

@ -0,0 +1 @@
package docker

100
build/docker/util.go Normal file
View file

@ -0,0 +1,100 @@
package docker
import (
"fmt"
"strings"
"github.com/drone/drone/yaml"
"github.com/samalba/dockerclient"
)
// helper function that converts the Continer data structure to the exepcted
// dockerclient.ContainerConfig.
func toContainerConfig(c *yaml.Container) *dockerclient.ContainerConfig {
config := &dockerclient.ContainerConfig{
Image: c.Image,
Env: toEnvironmentSlice(c.Environment),
Cmd: c.Command,
Entrypoint: c.Entrypoint,
WorkingDir: c.WorkingDir,
HostConfig: dockerclient.HostConfig{
Privileged: c.Privileged,
NetworkMode: c.Network,
Memory: c.MemLimit,
CpuShares: c.CPUShares,
CpuQuota: c.CPUQuota,
CpusetCpus: c.CPUSet,
MemorySwappiness: -1,
OomKillDisable: c.OomKillDisable,
},
}
if len(config.Entrypoint) == 0 {
config.Entrypoint = nil
}
if len(config.Cmd) == 0 {
config.Cmd = nil
}
if len(c.ExtraHosts) > 0 {
config.HostConfig.ExtraHosts = c.ExtraHosts
}
if len(c.DNS) != 0 {
config.HostConfig.Dns = c.DNS
}
if len(c.DNSSearch) != 0 {
config.HostConfig.DnsSearch = c.DNSSearch
}
if len(c.VolumesFrom) != 0 {
config.HostConfig.VolumesFrom = c.VolumesFrom
}
config.Volumes = map[string]struct{}{}
for _, path := range c.Volumes {
if strings.Index(path, ":") == -1 {
config.Volumes[path] = struct{}{}
continue
}
parts := strings.Split(path, ":")
config.Volumes[parts[1]] = struct{}{}
config.HostConfig.Binds = append(config.HostConfig.Binds, path)
}
for _, path := range c.Devices {
if strings.Index(path, ":") == -1 {
continue
}
parts := strings.Split(path, ":")
device := dockerclient.DeviceMapping{
PathOnHost: parts[0],
PathInContainer: parts[1],
CgroupPermissions: "rwm",
}
config.HostConfig.Devices = append(config.HostConfig.Devices, device)
}
return config
}
// helper function that converts the AuthConfig data structure to the exepcted
// dockerclient.AuthConfig.
func toAuthConfig(container *yaml.Container) *dockerclient.AuthConfig {
if container.AuthConfig.Username == "" &&
container.AuthConfig.Password == "" {
return nil
}
return &dockerclient.AuthConfig{
Email: container.AuthConfig.Email,
Username: container.AuthConfig.Username,
Password: container.AuthConfig.Password,
}
}
// helper function that converts a key value map of environment variables to a
// string slice in key=value format.
func toEnvironmentSlice(env map[string]string) []string {
var envs []string
for k, v := range env {
envs = append(envs, fmt.Sprintf("%s=%s", k, v))
}
return envs
}

24
build/docker/util_test.go Normal file
View file

@ -0,0 +1,24 @@
package docker
import (
"testing"
)
func Test_toContainerConfig(t *testing.T) {
t.Skip()
}
func Test_toAuthConfig(t *testing.T) {
t.Skip()
}
func Test_toEnvironmentSlice(t *testing.T) {
env := map[string]string{
"HOME": "/root",
}
envs := toEnvironmentSlice(env)
want, got := "HOME=/root", envs[0]
if want != got {
t.Errorf("Wanted envar %s got %s", want, got)
}
}

16
build/engine.go Normal file
View file

@ -0,0 +1,16 @@
package build
import (
"io"
"github.com/drone/drone/yaml"
)
// Engine defines the container runtime engine.
type Engine interface {
ContainerStart(*yaml.Container) (string, error)
ContainerStop(string) error
ContainerRemove(string) error
ContainerWait(string) (*State, error)
ContainerLogs(string) (io.ReadCloser, error)
}

View file

@ -1,49 +0,0 @@
package build
import "fmt"
// Pipe returns a buffered pipe that is connected to the console output.
type Pipe struct {
lines chan *Line
eof chan bool
}
// Next returns the next Line of console output.
func (p *Pipe) Next() *Line {
select {
case line := <-p.lines:
return line
case <-p.eof:
return nil
}
}
// Close closes the pipe of console output.
func (p *Pipe) Close() {
go func() {
p.eof <- true
}()
}
func newPipe(buffer int) *Pipe {
return &Pipe{
lines: make(chan *Line, buffer),
eof: make(chan bool),
}
}
// Line is a line of console output.
type Line struct {
Proc string `json:"proc,omitempty"`
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Pos int `json:"pos,omityempty"`
Out string `json:"out,omitempty"`
}
func (l *Line) String() string {
return fmt.Sprintf("[%s:L%v:%vs] %s", l.Proc, l.Pos, l.Time, l.Out)
}
// TODO(bradrydzewski) consider an alternate buffer impelmentation based on the
// x.crypto ssh buffer https://github.com/golang/crypto/blob/master/ssh/buffer.go

View file

@ -1,54 +0,0 @@
package build
import (
"sync"
"testing"
"github.com/franela/goblin"
)
func TestPipe(t *testing.T) {
g := goblin.Goblin(t)
g.Describe("Pipe", func() {
g.It("should get next line from buffer", func() {
line := &Line{
Proc: "redis",
Pos: 1,
Out: "starting redis server",
}
pipe := newPipe(10)
pipe.lines <- line
next := pipe.Next()
g.Assert(next).Equal(line)
})
g.It("should get null line on buffer closed", func() {
pipe := newPipe(10)
var wg sync.WaitGroup
wg.Add(1)
go func() {
next := pipe.Next()
g.Assert(next == nil).IsTrue("line should be nil")
wg.Done()
}()
pipe.Close()
wg.Wait()
})
g.Describe("Line output", func() {
g.It("should prefix string() with metadata", func() {
line := Line{
Proc: "redis",
Time: 60,
Pos: 1,
Out: "starting redis server",
}
g.Assert(line.String()).Equal("[redis:L1:60s] starting redis server")
})
})
})
}

View file

@ -2,15 +2,9 @@ package build
import (
"bufio"
"fmt"
"io"
"strings"
"time"
"github.com/drone/drone/build/internal"
"github.com/drone/drone/yaml"
"github.com/samalba/dockerclient"
)
// element represents a link in the linked list.
@ -29,46 +23,11 @@ type Pipeline struct {
done chan (error)
err error
ambassador string
containers []string
volumes []string
networks []string
client dockerclient.Client
}
// Load loads the pipeline from the Yaml configuration file.
func Load(conf *yaml.Config, client dockerclient.Client) *Pipeline {
pipeline := Pipeline{
client: client,
pipe: make(chan *Line, 500), // buffer 500 lines of logs
next: make(chan error),
done: make(chan error),
}
var containers []*yaml.Container
containers = append(containers, conf.Services...)
containers = append(containers, conf.Pipeline...)
for _, c := range containers {
if c.Disabled {
continue
}
next := &element{Container: c}
if pipeline.head == nil {
pipeline.head = next
pipeline.tail = next
} else {
pipeline.tail.next = next
pipeline.tail = next
}
}
go func() {
pipeline.next <- nil
}()
return &pipeline
engine Engine
}
// Done returns when the process is done executing.
@ -132,19 +91,15 @@ func (p *Pipeline) Setup() error {
// Teardown removes the pipeline environment.
func (p *Pipeline) Teardown() {
for _, id := range p.containers {
p.client.StopContainer(id, 1)
p.client.KillContainer(id, "9")
p.client.RemoveContainer(id, true, true)
}
for _, id := range p.networks {
p.client.RemoveNetwork(id)
}
for _, id := range p.volumes {
p.client.RemoveVolume(id)
p.engine.ContainerRemove(id)
}
close(p.next)
close(p.done)
close(p.pipe)
// TODO we have a race condition here where the program can try to async
// write to a closed pipe channel. This package, in general, needs to be
// tested for race conditions.
// close(p.pipe)
}
// step steps through the pipeline to head.next
@ -169,34 +124,14 @@ func (p *Pipeline) close(err error) {
}
func (p *Pipeline) exec(c *yaml.Container) error {
conf := toContainerConfig(c)
auth := toAuthConfig(c)
// check for the image and pull if not exists or if configured to always
// pull the latest version.
_, err := p.client.InspectImage(c.Image)
if err != nil || c.Pull {
err = p.client.PullImage(c.Image, auth)
if err != nil {
return err
}
}
// creates and starts the container.
id, err := p.client.CreateContainer(conf, c.ID, auth)
name, err := p.engine.ContainerStart(c)
if err != nil {
return err
}
p.containers = append(p.containers, id)
p.containers = append(p.containers, name)
err = p.client.StartContainer(c.ID, &conf.HostConfig)
if err != nil {
return err
}
// stream the container logs
go func() {
rc, rerr := toLogs(p.client, c.ID)
rc, rerr := p.engine.ContainerLogs(name)
if rerr != nil {
return
}
@ -216,152 +151,19 @@ func (p *Pipeline) exec(c *yaml.Container) error {
}
}()
// if the container is run in detached mode we can exit without waiting
// for execution to complete.
// exit when running container in detached mode in background
if c.Detached {
return nil
}
<-p.client.Wait(c.ID)
res, err := p.client.InspectContainer(c.ID)
state, err := p.engine.ContainerWait(name)
if err != nil {
return err
}
if res.State.OOMKilled {
if state.OOMKilled {
return &OomError{c.Name}
} else if res.State.ExitCode != 0 {
return &ExitError{c.Name, res.State.ExitCode}
} else if state.ExitCode != 0 {
return &ExitError{c.Name, state.ExitCode}
}
return nil
}
func toLogs(client dockerclient.Client, id string) (io.ReadCloser, error) {
opts := &dockerclient.LogOptions{
Follow: true,
Stdout: true,
Stderr: true,
}
piper, pipew := io.Pipe()
go func() {
defer pipew.Close()
// sometimes the docker logs fails due to parsing errors. this routine will
// check for such a failure and attempt to resume if necessary.
for i := 0; i < 5; i++ {
if i > 0 {
opts.Tail = 1
}
rc, err := client.ContainerLogs(id, opts)
if err != nil {
return
}
defer rc.Close()
// use Docker StdCopy
internal.StdCopy(pipew, pipew, rc)
// check to see if the container is still running. If not, we can safely
// exit and assume there are no more logs left to stream.
v, err := client.InspectContainer(id)
if err != nil || !v.State.Running {
return
}
}
}()
return piper, nil
}
// helper function that converts the Continer data structure to the exepcted
// dockerclient.ContainerConfig.
func toContainerConfig(c *yaml.Container) *dockerclient.ContainerConfig {
config := &dockerclient.ContainerConfig{
Image: c.Image,
Env: toEnvironmentSlice(c.Environment),
Cmd: c.Command,
Entrypoint: c.Entrypoint,
WorkingDir: c.WorkingDir,
HostConfig: dockerclient.HostConfig{
Privileged: c.Privileged,
NetworkMode: c.Network,
Memory: c.MemLimit,
CpuShares: c.CPUShares,
CpuQuota: c.CPUQuota,
CpusetCpus: c.CPUSet,
MemorySwappiness: -1,
OomKillDisable: c.OomKillDisable,
},
}
if len(config.Entrypoint) == 0 {
config.Entrypoint = nil
}
if len(config.Cmd) == 0 {
config.Cmd = nil
}
if len(c.ExtraHosts) > 0 {
config.HostConfig.ExtraHosts = c.ExtraHosts
}
if len(c.DNS) != 0 {
config.HostConfig.Dns = c.DNS
}
if len(c.DNSSearch) != 0 {
config.HostConfig.DnsSearch = c.DNSSearch
}
if len(c.VolumesFrom) != 0 {
config.HostConfig.VolumesFrom = c.VolumesFrom
}
config.Volumes = map[string]struct{}{}
for _, path := range c.Volumes {
if strings.Index(path, ":") == -1 {
config.Volumes[path] = struct{}{}
continue
}
parts := strings.Split(path, ":")
config.Volumes[parts[1]] = struct{}{}
config.HostConfig.Binds = append(config.HostConfig.Binds, path)
}
for _, path := range c.Devices {
if strings.Index(path, ":") == -1 {
continue
}
parts := strings.Split(path, ":")
device := dockerclient.DeviceMapping{
PathOnHost: parts[0],
PathInContainer: parts[1],
CgroupPermissions: "rwm",
}
config.HostConfig.Devices = append(config.HostConfig.Devices, device)
}
return config
}
// helper function that converts the AuthConfig data structure to the exepcted
// dockerclient.AuthConfig.
func toAuthConfig(c *yaml.Container) *dockerclient.AuthConfig {
if c.AuthConfig.Username == "" &&
c.AuthConfig.Password == "" {
return nil
}
return &dockerclient.AuthConfig{
Email: c.AuthConfig.Email,
Username: c.AuthConfig.Username,
Password: c.AuthConfig.Password,
}
}
// helper function that converts a key value map of environment variables to a
// string slice in key=value format.
func toEnvironmentSlice(env map[string]string) []string {
var envs []string
for k, v := range env {
envs = append(envs, fmt.Sprintf("%s=%s", k, v))
}
return envs
}

View file

@ -1,38 +1,5 @@
package build
import (
"fmt"
"testing"
"github.com/drone/drone/yaml"
)
func TestInterpreter(t *testing.T) {
conf, err := yaml.ParseString(sampleYaml)
if err != nil {
t.Fatal(err)
}
pipeline := Load(conf, nil)
pipeline.pipe <- &Line{Out: "foo"}
pipeline.pipe <- &Line{Out: "bar"}
pipeline.pipe <- &Line{Out: "baz"}
for {
select {
case <-pipeline.Done():
fmt.Println("GOT DONE")
return
case line := <-pipeline.Pipe():
fmt.Println(line.String())
case <-pipeline.Next():
pipeline.Exec()
}
}
}
var sampleYaml = `
image: hello-world
build:

22
build/types.go Normal file
View file

@ -0,0 +1,22 @@
package build
import "fmt"
// Line is a line of console output.
type Line struct {
Proc string `json:"proc,omitempty"`
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Pos int `json:"pos,omityempty"`
Out string `json:"out,omitempty"`
}
func (l *Line) String() string {
return fmt.Sprintf("[%s:L%v:%vs] %s", l.Proc, l.Pos, l.Time, l.Out)
}
// State defines the state of the container.
type State struct {
ExitCode int // container exit code
OOMKilled bool // container exited due to oom error
}

23
build/types_test.go Normal file
View file

@ -0,0 +1,23 @@
package build
import (
"testing"
"github.com/franela/goblin"
)
func TestLine(t *testing.T) {
g := goblin.Goblin(t)
g.Describe("Line output", func() {
g.It("should prefix string() with metadata", func() {
line := Line{
Proc: "redis",
Time: 60,
Pos: 1,
Out: "starting redis server",
}
g.Assert(line.String()).Equal("[redis:L1:60s] starting redis server")
})
})
}

View file

@ -1,24 +1,20 @@
package main
import (
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"os/signal"
"path/filepath"
"strings"
"time"
"github.com/drone/drone/build"
"github.com/drone/drone/agent"
"github.com/drone/drone/build/docker"
"github.com/drone/drone/model"
"github.com/drone/drone/yaml"
"github.com/drone/drone/yaml/expander"
"github.com/drone/drone/yaml/transform"
"github.com/drone/drone/queue"
"github.com/codegangsta/cli"
"github.com/samalba/dockerclient"
)
var execCmd = cli.Command{
@ -52,15 +48,15 @@ var execCmd = cli.Command{
},
cli.DurationFlag{
Name: "timeout",
Usage: "build timeout for inactivity",
Usage: "build timeout",
Value: time.Hour,
EnvVar: "DRONE_TIMEOUT",
},
cli.DurationFlag{
Name: "duration",
Usage: "build duration",
Value: time.Hour,
EnvVar: "DRONE_DURATION",
Name: "timeout.inactivity",
Usage: "build timeout for inactivity",
Value: time.Minute * 15,
EnvVar: "DRONE_TIMEOUT_INACTIVITY",
},
cli.BoolFlag{
EnvVar: "DRONE_PLUGIN_PULL",
@ -248,12 +244,12 @@ var execCmd = cli.Command{
Usage: "build deployment target",
EnvVar: "DRONE_DEPLOY_TO",
},
cli.BoolFlag{
cli.BoolTFlag{
Name: "yaml.verified",
Usage: "build yaml is verified",
EnvVar: "DRONE_YAML_VERIFIED",
},
cli.BoolFlag{
cli.BoolTFlag{
Name: "yaml.signed",
Usage: "build yaml is signed",
EnvVar: "DRONE_YAML_SIGNED",
@ -293,53 +289,13 @@ var execCmd = cli.Command{
}
func exec(c *cli.Context) error {
// get environment variables from flags
var envs = map[string]string{}
for _, flag := range c.Command.Flags {
switch f := flag.(type) {
case cli.StringFlag:
envs[f.EnvVar] = c.String(f.Name)
case cli.IntFlag:
envs[f.EnvVar] = c.String(f.Name)
case cli.BoolFlag:
envs[f.EnvVar] = c.String(f.Name)
}
}
// get matrix variales from flags
for _, s := range c.StringSlice("matrix") {
parts := strings.SplitN(s, "=", 2)
if len(parts) != 2 {
continue
}
k := parts[0]
v := parts[1]
envs[k] = v
}
// get secret variales from flags
for _, s := range c.StringSlice("secret") {
parts := strings.SplitN(s, "=", 2)
if len(parts) != 2 {
continue
}
k := parts[0]
v := parts[1]
envs[k] = v
}
// builtin.NewFilterOp(
// c.String("prev.build.status"),
// c.String("commit.branch"),
// c.String("build.event"),
// c.String("build.deploy"),
// envs,
// ),
// }
sigterm := make(chan os.Signal, 1)
cancelc := make(chan bool, 1)
signal.Notify(sigterm, os.Interrupt)
go func() {
<-sigterm
cancelc <- true
}()
path := c.Args().First()
if path == "" {
@ -353,101 +309,116 @@ func exec(c *cli.Context) error {
return err
}
// unmarshal the Yaml file with expanded environment variables.
conf, err := yaml.Parse(expander.Expand(file, envs))
engine, err := docker.New(
c.String("docker-host"),
c.String("docker-cert-path"),
c.Bool("docker-tls-verify"),
)
if err != nil {
return err
}
tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path"))
if err == nil {
tls.InsecureSkipVerify = c.Bool("docker-tls-verify")
}
client, err := dockerclient.NewDockerClient(c.String("docker-host"), tls)
if err != nil {
return err
a := agent.Agent{
Update: agent.NoopUpdateFunc,
Logger: agent.TermLoggerFunc,
Engine: engine,
Timeout: c.Duration("timeout.inactivity"),
Platform: "linux/amd64",
Namespace: c.String("namespace"),
Disable: c.StringSlice("plugin"),
Escalate: c.StringSlice("privileged"),
Netrc: []string{},
Local: dir,
Pull: c.Bool("pull"),
}
src := "src"
if url, _ := url.Parse(c.String("repo.link")); url != nil {
src = filepath.Join(src, url.Host, url.Path)
payload := queue.Work{
Yaml: string(file),
Verified: c.BoolT("yaml.verified"),
Signed: c.BoolT("yaml.signed"),
Repo: &model.Repo{
FullName: c.String("repo.fullname"),
Owner: c.String("repo.owner"),
Name: c.String("repo.name"),
Kind: c.String("repo.type"),
Link: c.String("repo.link"),
Branch: c.String("repo.branch"),
Avatar: c.String("repo.avatar"),
Timeout: int64(c.Duration("timeout").Minutes()),
IsPrivate: c.Bool("repo.private"),
IsTrusted: c.Bool("repo.trusted"),
Clone: c.String("remote.url"),
},
System: &model.System{
Link: c.GlobalString("server"),
},
Secrets: getSecrets(c),
Netrc: &model.Netrc{
Login: c.String("netrc.username"),
Password: c.String("netrc.password"),
Machine: c.String("netrc.machine"),
},
Build: &model.Build{
Commit: c.String("commit.sha"),
Branch: c.String("commit.branch"),
Ref: c.String("commit.ref"),
Link: c.String("commit.link"),
Message: c.String("commit.message"),
Author: c.String("commit.author.name"),
Email: c.String("commit.author.email"),
Avatar: c.String("commit.author.avatar"),
Number: c.Int("build.number"),
Event: c.String("build.event"),
Deploy: c.String("build.deploy"),
},
BuildLast: &model.Build{
Number: c.Int("prev.build.number"),
Status: c.String("prev.build.status"),
Commit: c.String("prev.commit.sha"),
},
Job: &model.Job{
Environment: getMatrix(c),
},
}
transform.Clone(conf, "git")
transform.Environ(conf, envs)
transform.DefaultFilter(conf)
transform.PluginDisable(conf, c.StringSlice("plugin"))
// transform.Secret(conf, secrets)
transform.Identifier(conf)
transform.WorkspaceTransform(conf, "/drone", src)
if err := transform.Check(conf, c.Bool("repo.trusted")); err != nil {
return err
}
transform.CommandTransform(conf)
transform.ImagePull(conf, c.Bool("pull"))
transform.ImageTag(conf)
transform.ImageName(conf)
transform.ImageNamespace(conf, c.String("namespace"))
transform.ImageEscalate(conf, c.StringSlice("privileged"))
if c.BoolT("local") {
transform.ImageVolume(conf, []string{dir + ":" + conf.Workspace.Path})
}
transform.PluginParams(conf)
transform.Pod(conf)
timeout := time.After(c.Duration("duration"))
// load the Yaml into the pipeline
pipeline := build.Load(conf, client)
defer pipeline.Teardown()
// setup the build environment
err = pipeline.Setup()
if err != nil {
return err
}
for {
select {
case <-pipeline.Done():
return pipeline.Err()
case <-sigterm:
pipeline.Stop()
return fmt.Errorf("interrupt received, build cancelled")
case <-timeout:
pipeline.Stop()
return fmt.Errorf("maximum time limit exceeded, build cancelled")
case <-time.After(c.Duration("timeout")):
pipeline.Stop()
return fmt.Errorf("terminal inactive for %v, build cancelled", c.Duration("timeout"))
case <-pipeline.Next():
// TODO(bradrydzewski) this entire block of code should probably get
// encapsulated in the pipeline.
status := model.StatusSuccess
if pipeline.Err() != nil {
status = model.StatusFailure
}
if !pipeline.Head().Constraints.Match(
"linux/amd64",
c.String("build.deploy"),
c.String("build.event"),
c.String("commit.branch"),
status, envs) {
pipeline.Skip()
} else {
pipeline.Exec()
pipeline.Head().Environment["DRONE_STATUS"] = status
}
case line := <-pipeline.Pipe():
println(line.String())
}
}
return a.Run(&payload, cancelc)
}
// helper function to retrieve matrix variables.
func getMatrix(c *cli.Context) map[string]string {
envs := map[string]string{}
for _, s := range c.StringSlice("matrix") {
parts := strings.SplitN(s, "=", 2)
if len(parts) != 2 {
continue
}
k := parts[0]
v := parts[1]
envs[k] = v
}
return envs
}
// helper function to retrieve secret variables.
func getSecrets(c *cli.Context) []*model.Secret {
var secrets []*model.Secret
for _, s := range c.StringSlice("secret") {
parts := strings.SplitN(s, "=", 2)
if len(parts) != 2 {
continue
}
secret := &model.Secret{
Name: parts[0],
Value: parts[1],
Events: []string{
model.EventPull,
model.EventPush,
model.EventTag,
model.EventDeploy,
},
Images: []string{"*"},
}
secrets = append(secrets, secret)
}
return secrets
}

View file

@ -48,3 +48,104 @@ func Test_pull(t *testing.T) {
})
})
}
func Test_escalate(t *testing.T) {
g := goblin.Goblin(t)
g.Describe("privileged transform", func() {
g.It("should handle matches", func() {
c := newConfig(&yaml.Container{
Image: "plugins/docker",
})
ImageEscalate(c, []string{"plugins/docker"})
g.Assert(c.Pipeline[0].Privileged).IsTrue()
})
g.It("should handle glob matches", func() {
c := newConfig(&yaml.Container{
Image: "plugins/docker:latest",
})
ImageEscalate(c, []string{"plugins/docker:*"})
g.Assert(c.Pipeline[0].Privileged).IsTrue()
})
g.It("should handle non matches", func() {
c := newConfig(&yaml.Container{
Image: "plugins/git:latest",
})
ImageEscalate(c, []string{"plugins/docker:*"})
g.Assert(c.Pipeline[0].Privileged).IsFalse()
})
g.It("should handle non glob matches", func() {
c := newConfig(&yaml.Container{
Image: "plugins/docker:latest",
})
ImageEscalate(c, []string{"plugins/docker"})
g.Assert(c.Pipeline[0].Privileged).IsFalse()
})
})
}
func Test_normalize(t *testing.T) {
g := goblin.Goblin(t)
g.Describe("normalizing", func() {
g.Describe("images", func() {
g.It("should append tag if empty", func() {
c := newConfig(&yaml.Container{
Image: "golang",
})
ImageTag(c)
g.Assert(c.Pipeline[0].Image).Equal("golang:latest")
})
g.It("should not override existing tag", func() {
c := newConfig(&yaml.Container{
Image: "golang:1.5",
})
ImageTag(c)
g.Assert(c.Pipeline[0].Image).Equal("golang:1.5")
})
})
g.Describe("plugins", func() {
g.It("should prepend namespace", func() {
c := newConfig(&yaml.Container{
Image: "slack",
})
ImageNamespace(c, "plugins")
g.Assert(c.Pipeline[0].Image).Equal("plugins/slack")
})
g.It("should not override existing namespace", func() {
c := newConfig(&yaml.Container{
Image: "index.docker.io/drone/git",
})
ImageNamespace(c, "plugins")
g.Assert(c.Pipeline[0].Image).Equal("index.docker.io/drone/git")
})
g.It("should replace underscores with dashes", func() {
c := newConfig(&yaml.Container{
Image: "gh_pages",
})
ImageName(c)
g.Assert(c.Pipeline[0].Image).Equal("gh-pages")
})
})
})
}

View file

@ -29,7 +29,7 @@ func PluginDisable(conf *yaml.Config, patterns []string) error {
}
// PluginParams is a transform function that alters the Yaml configuration to
// include plugin parameters as environment variables.
// include plugin vargs parameters as environment variables.
func PluginParams(conf *yaml.Config) error {
for _, container := range conf.Pipeline {
if len(container.Vargs) == 0 {