From 71de0d94085eb92fa80d9df0e7dc3afdf7382f7d Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Wed, 13 Jul 2016 17:33:28 -0700 Subject: [PATCH] hacky fixed for race conditions on cancel --- build/pipeline.go | 53 ++++++++++++++++++++++++++++++++++++++++++----- server/build.go | 1 + 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/build/pipeline.go b/build/pipeline.go index 5f7e366a..ddeca757 100644 --- a/build/pipeline.go +++ b/build/pipeline.go @@ -3,6 +3,7 @@ package build import ( "bufio" "strconv" + "sync" "time" "github.com/Sirupsen/logrus" @@ -20,6 +21,7 @@ type Pipeline struct { conf *yaml.Config head *element tail *element + wait sync.WaitGroup pipe chan (*Line) next chan (error) done chan (error) @@ -87,6 +89,11 @@ func (p *Pipeline) Tail() *yaml.Container { // Stop stops the pipeline. func (p *Pipeline) Stop() { go func() { + defer func() { + if r := recover(); r != nil { + logrus.Errorln("recover stopping the pipeline", r) + } + }() p.done <- ErrTerm }() } @@ -98,9 +105,11 @@ func (p *Pipeline) Setup() error { // Teardown removes the pipeline environment. func (p *Pipeline) Teardown() { + for _, id := range p.containers { p.engine.ContainerRemove(id) } + close(p.next) close(p.done) @@ -114,10 +123,32 @@ func (p *Pipeline) Teardown() { func (p *Pipeline) step() { if p.head == p.tail { go func() { + defer func() { + if r := recover(); r != nil { + logrus.Errorln("recover executing step function", r) + } + }() + + // stop all containers + for _, id := range p.containers { + p.engine.ContainerStop(id) + } + + // wait for all logs to terminate + // p.wait.Done() // this is for the ambassador + p.wait.Wait() + + // signal completion p.done <- nil }() } else { go func() { + defer func() { + if r := recover(); r != nil { + logrus.Errorln("recover executing step to head function", r) + } + }() + p.head = p.head.next p.next <- nil }() @@ -137,17 +168,23 @@ func (p *Pipeline) close(err error) { } func (p *Pipeline) exec(c *yaml.Container) error { + name, err := p.engine.ContainerStart(c) if err != nil { return err } p.containers = append(p.containers, name) + logrus.Debugf("wait.add(1) for %s logs", name) + p.wait.Add(1) go func() { defer func() { if r := recover(); r != nil { logrus.Errorln("recover writing build output", r) } + + logrus.Debugf("wait.done() for %s logs", name) + p.wait.Done() }() rc, rerr := p.engine.ContainerLogs(name) @@ -179,17 +216,16 @@ func (p *Pipeline) exec(c *yaml.Container) error { if err != nil { return err } - if state.OOMKilled { - return &OomError{c.Name} - } else if state.ExitCode != 0 { - return &ExitError{c.Name, state.ExitCode} - } + logrus.Debugf("wait.add(1) for %s exit code", name) + p.wait.Add(1) go func() { defer func() { if r := recover(); r != nil { logrus.Errorln("recover writing exit code to output", r) } + p.wait.Done() + logrus.Debugf("wait.done() for %s exit code", name) }() p.pipe <- &Line{ @@ -198,5 +234,12 @@ func (p *Pipeline) exec(c *yaml.Container) error { Out: strconv.Itoa(state.ExitCode), } }() + + if state.OOMKilled { + return &OomError{c.Name} + } else if state.ExitCode != 0 { + return &ExitError{c.Name, state.ExitCode} + } + return nil } diff --git a/server/build.go b/server/build.go index 87beacf4..28571782 100644 --- a/server/build.go +++ b/server/build.go @@ -255,6 +255,7 @@ func PostBuild(c *gin.Context) { build.Finished = 0 build.Enqueued = time.Now().UTC().Unix() for _, job := range jobs { + job.Error = "" job.Status = model.StatusPending job.Started = 0 job.Finished = 0