diff --git a/agent/updater.go b/agent/updater.go index 15922e0e..ba5c1aa4 100644 --- a/agent/updater.go +++ b/agent/updater.go @@ -1,16 +1,13 @@ package agent import ( - "encoding/json" "fmt" - "io" - "sync" - "time" "github.com/Sirupsen/logrus" "github.com/drone/drone/build" - "github.com/drone/drone/client" + "github.com/drone/drone/model" "github.com/drone/drone/queue" + "github.com/drone/mq/stomp" ) // UpdateFunc handles buid pipeline status updates. @@ -27,65 +24,44 @@ var TermLoggerFunc = func(line *build.Line) { // NewClientUpdater returns an updater that sends updated build details // to the drone server. -func NewClientUpdater(client client.Client) UpdateFunc { +func NewClientUpdater(client *stomp.Client) UpdateFunc { return func(w *queue.Work) { - for { - err := client.Push(w) - if err == nil { - return - } - logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s", + err := client.SendJSON("/queue/updates", w) + if err != nil { + logrus.Errorf("Error updating %s/%s#%d.%d. %s", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) - logrus.Infof("Retry update in 30s") - time.Sleep(time.Second * 30) + } + if w.Job.Status != model.StatusRunning { + var dest = fmt.Sprintf("/topic/logs.%d", w.Job.ID) + var opts = []stomp.MessageOption{ + stomp.WithHeader("eof", "true"), + stomp.WithRetain("all"), + } + + if err := client.Send(dest, []byte("eof"), opts...); err != nil { + logrus.Errorf("Error sending eof %s/%s#%d.%d. %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) + } } } } -func NewStreamLogger(stream client.StreamWriter, w io.Writer, limit int64) LoggerFunc { - var err error - var size int64 - return func(line *build.Line) { +func NewClientLogger(client *stomp.Client, id int64, limit int64) LoggerFunc { + var size int64 + var dest = fmt.Sprintf("/topic/logs.%d", id) + var opts = []stomp.MessageOption{ + stomp.WithRetain("all"), + } + + return func(line *build.Line) { if size > limit { return } - - // TODO remove this double-serialization - linejson, _ := json.Marshal(line) - w.Write(linejson) - w.Write([]byte{'\n'}) - - if err = stream.WriteJSON(line); err != nil { + if err := client.SendJSON(dest, line, opts...); err != nil { logrus.Errorf("Error streaming build logs. %s", err) } size += int64(len(line.Out)) } } - -func NewClientLogger(client client.Client, id int64, rc io.ReadCloser, wc io.WriteCloser, limit int64) LoggerFunc { - var once sync.Once - var size int64 - return func(line *build.Line) { - // annoying hack to only start streaming once the first line is written - once.Do(func() { - go func() { - err := client.Stream(id, rc) - if err != nil && err != io.ErrClosedPipe { - logrus.Errorf("Error streaming build logs. %s", err) - } - }() - }) - - if size > limit { - return - } - - linejson, _ := json.Marshal(line) - wc.Write(linejson) - wc.Write([]byte{'\n'}) - - size += int64(len(line.Out)) - } -} diff --git a/client/client.go b/client/client.go index 8a423aad..2d849cb7 100644 --- a/client/client.go +++ b/client/client.go @@ -4,7 +4,6 @@ import ( "io" "github.com/drone/drone/model" - "github.com/drone/drone/queue" ) // Client is used to communicate with a Drone server. @@ -103,27 +102,4 @@ type Client interface { // AgentList returns a list of build agents. AgentList() ([]*model.Agent, error) - - // - // below items for Queue (internal use only) - // - - // Pull pulls work from the server queue. - Pull(os, arch string) (*queue.Work, error) - - // Push pushes an update to the server. - Push(*queue.Work) error - - // Stream streams the build logs to the server. - Stream(int64, io.ReadCloser) error - - LogStream(int64) (StreamWriter, error) - - LogPost(int64, io.ReadCloser) error - - // Wait waits for the job to the complete. - Wait(int64) *Wait - - // Ping the server - Ping() error } diff --git a/client/client_impl.go b/client/client_impl.go index ca661a88..d877c466 100644 --- a/client/client_impl.go +++ b/client/client_impl.go @@ -12,10 +12,6 @@ import ( "strconv" "github.com/drone/drone/model" - "github.com/drone/drone/queue" - "github.com/gorilla/websocket" - "golang.org/x/net/context" - "golang.org/x/net/context/ctxhttp" "golang.org/x/oauth2" ) @@ -323,110 +319,6 @@ func (c *client) AgentList() ([]*model.Agent, error) { return out, err } -// -// below items for Queue (internal use only) -// - -// Pull pulls work from the server queue. -func (c *client) Pull(os, arch string) (*queue.Work, error) { - out := new(queue.Work) - uri := fmt.Sprintf(pathPull, c.base, os, arch) - err := c.post(uri, nil, out) - return out, err -} - -// Push pushes an update to the server. -func (c *client) Push(p *queue.Work) error { - uri := fmt.Sprintf(pathPush, c.base, p.Job.ID) - err := c.post(uri, p, nil) - return err -} - -// Ping pings the server. -func (c *client) Ping() error { - uri := fmt.Sprintf(pathPing, c.base) - err := c.post(uri, nil, nil) - return err -} - -// Stream streams the build logs to the server. -func (c *client) Stream(id int64, rc io.ReadCloser) error { - uri := fmt.Sprintf(pathStream, c.base, id) - err := c.post(uri, rc, nil) - - return err -} - -// LogPost sends the full build logs to the server. -func (c *client) LogPost(id int64, rc io.ReadCloser) error { - uri := fmt.Sprintf(pathLogs, c.base, id) - return c.post(uri, rc, nil) -} - -// StreamWriter implements a special writer for streaming log entries to the -// central Drone server. The standard implementation is the gorilla.Connection. -type StreamWriter interface { - Close() error - WriteJSON(interface{}) error -} - -// LogStream streams the build logs to the server. -func (c *client) LogStream(id int64) (StreamWriter, error) { - rawurl := fmt.Sprintf(pathLogsAuth, c.base, id, c.token) - uri, err := url.Parse(rawurl) - if err != nil { - return nil, err - } - if uri.Scheme == "https" { - uri.Scheme = "wss" - } else { - uri.Scheme = "ws" - } - - // TODO need TLS client configuration - - conn, _, err := websocket.DefaultDialer.Dial(uri.String(), nil) - return conn, err -} - -// Wait watches and waits for the build to cancel or finish. -func (c *client) Wait(id int64) *Wait { - ctx, cancel := context.WithCancel(context.Background()) - return &Wait{id, c, ctx, cancel} -} - -type Wait struct { - id int64 - client *client - - ctx context.Context - cancel context.CancelFunc -} - -func (w *Wait) Done() (*model.Job, error) { - uri := fmt.Sprintf(pathWait, w.client.base, w.id) - req, err := w.client.createRequest(uri, "POST", nil) - if err != nil { - return nil, err - } - - res, err := ctxhttp.Do(w.ctx, w.client.client, req) - if err != nil { - return nil, err - } - defer res.Body.Close() - job := &model.Job{} - err = json.NewDecoder(res.Body).Decode(&job) - if err != nil { - return nil, err - } - return job, nil -} - -func (w *Wait) Cancel() { - w.cancel() -} - // // http request helper functions // diff --git a/drone/agent/agent.go b/drone/agent/agent.go index 48f1657f..df005952 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -7,13 +7,14 @@ import ( "syscall" "time" - "github.com/drone/drone/client" - "github.com/drone/drone/shared/token" + "github.com/drone/drone/queue" + "github.com/drone/mq/stomp" "github.com/samalba/dockerclient" + "strings" + "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" - "strings" ) // AgentCmd is the exported command for starting the drone agent. @@ -67,7 +68,7 @@ var AgentCmd = cli.Command{ EnvVar: "DRONE_SERVER", Name: "drone-server", Usage: "drone server address", - Value: "http://localhost:8000", + Value: "ws://localhost:8000/ws/broker", }, cli.StringFlag{ EnvVar: "DRONE_TOKEN", @@ -146,8 +147,9 @@ func start(c *cli.Context) { var accessToken string if c.String("drone-secret") != "" { - secretToken := c.String("drone-secret") - accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken) + // secretToken := c.String("drone-secret") + accessToken = c.String("drone-secret") + // accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken) } else { accessToken = c.String("drone-token") } @@ -157,10 +159,17 @@ func start(c *cli.Context) { accessToken, ) - client := client.NewClientToken( - strings.TrimRight(c.String("drone-server"), "/"), - accessToken, - ) + server := strings.TrimRight(c.String("drone-server"), "/") + client, err := stomp.Dial(server) + if err != nil { + logrus.Fatalf("Cannot connect to host %s. %s", server, err) + } + opts := []stomp.MessageOption{ + stomp.WithCredentials("x-token", accessToken), + } + if err = client.Connect(opts...); err != nil { + logrus.Fatalf("Cannot connect to host %s. %s", server, err) + } tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path")) if err == nil { @@ -171,42 +180,49 @@ func start(c *cli.Context) { logrus.Fatal(err) } - go func() { - for { - if err := client.Ping(); err != nil { - logrus.Warnf("unable to ping the server. %s", err.Error()) - } - time.Sleep(c.Duration("ping")) - } - }() - - var wg sync.WaitGroup - for i := 0; i < c.Int("docker-max-procs"); i++ { - wg.Add(1) - go func() { - r := pipeline{ - drone: client, - docker: docker, - config: config{ - platform: c.String("docker-os") + "/" + c.String("docker-arch"), - timeout: c.Duration("timeout"), - namespace: c.String("namespace"), - privileged: c.StringSlice("privileged"), - pull: c.BoolT("pull"), - logs: int64(c.Int("max-log-size")) * 1000000, - }, - } - for { - if err := r.run(); err != nil { - dur := c.Duration("backoff") - logrus.Warnf("reconnect in %v. %s", dur, err.Error()) - time.Sleep(dur) - } - } + handler := func(m *stomp.Message) { + running.Add(1) + defer func() { + running.Done() + client.Ack(m.Ack) }() + + r := pipeline{ + drone: client, + docker: docker, + config: config{ + platform: c.String("docker-os") + "/" + c.String("docker-arch"), + timeout: c.Duration("timeout"), + namespace: c.String("namespace"), + privileged: c.StringSlice("privileged"), + pull: c.BoolT("pull"), + logs: int64(c.Int("max-log-size")) * 1000000, + }, + } + + work := new(queue.Work) + m.Unmarshal(work) + r.run(work) + } + + _, err = client.Subscribe("/queue/pending", stomp.HandlerFunc(handler), + stomp.WithAck("client"), + stomp.WithPrefetch( + c.Int("docker-max-procs"), + ), + // stomp.WithSelector( + // fmt.Sprintf("platorm == '%s/%s'", + // c.String("drone-os"), + // c.String("drone-arch"), + // ), + // ), + ) + if err != nil { + logrus.Fatalf("Unable to connect to queue. %s", err) } handleSignals() - wg.Wait() + + <-client.Done() } // tracks running builds diff --git a/drone/agent/exec.go b/drone/agent/exec.go index bd538114..04948525 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -1,14 +1,13 @@ package agent import ( - "bytes" - "io/ioutil" "time" "github.com/Sirupsen/logrus" "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" - "github.com/drone/drone/client" + "github.com/drone/drone/queue" + "github.com/drone/mq/stomp" "github.com/samalba/dockerclient" ) @@ -23,20 +22,16 @@ type config struct { } type pipeline struct { - drone client.Client + drone *stomp.Client docker dockerclient.Client config config } -func (r *pipeline) run() error { - w, err := r.drone.Pull("linux", "amd64") - if err != nil { - return err - } - running.Add(1) - defer func() { - running.Done() - }() +func (r *pipeline) run(w *queue.Work) { + + // defer func() { + // // r.drone.Ack(id, opts) + // }() logrus.Infof("Starting build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) @@ -44,24 +39,9 @@ func (r *pipeline) run() error { cancel := make(chan bool, 1) engine := docker.NewClient(r.docker) - // streaming the logs - // rc, wc := io.Pipe() - // defer func() { - // wc.Close() - // rc.Close() - // }() - - var buf bytes.Buffer - - stream, err := r.drone.LogStream(w.Job.ID) - if err != nil { - return err - } - a := agent.Agent{ - Update: agent.NewClientUpdater(r.drone), - // Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs), - Logger: agent.NewStreamLogger(stream, &buf, r.config.logs), + Update: agent.NewClientUpdater(r.drone), + Logger: agent.NewClientLogger(r.drone, w.Job.ID, r.config.logs), Engine: engine, Timeout: r.config.timeout, Platform: r.config.platform, @@ -70,27 +50,34 @@ func (r *pipeline) run() error { Pull: r.config.pull, } - // signal for canceling the build. - wait := r.drone.Wait(w.Job.ID) - defer wait.Cancel() - go func() { - if _, err := wait.Done(); err == nil { + cancelFunc := func(m *stomp.Message) { + defer m.Release() + + id := m.Header.GetInt64("job-id") + if id == w.Job.ID { cancel <- true logrus.Infof("Cancel build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) } + } + + // signal for canceling the build. + sub, err := r.drone.Subscribe("/topic/cancels", stomp.HandlerFunc(cancelFunc)) + if err != nil { + logrus.Errorf("Error subscribing to /topic/cancels. %s", err) + } + defer func() { + r.drone.Unsubscribe(sub) }() a.Run(w, cancel) - if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil { - logrus.Errorf("Error sending logs for %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - } - stream.Close() + // if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil { + // logrus.Errorf("Error sending logs for %s/%s#%d.%d", + // w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + // } + // stream.Close() logrus.Infof("Finished build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - - return nil } diff --git a/server/queue.go b/server/queue.go index adfbd11f..23acf811 100644 --- a/server/queue.go +++ b/server/queue.go @@ -4,10 +4,12 @@ import ( "bytes" "fmt" "net/http" + "strconv" "golang.org/x/net/context" "github.com/Sirupsen/logrus" + "github.com/drone/drone/bus" "github.com/drone/drone/model" "github.com/drone/drone/queue" "github.com/drone/drone/remote" @@ -360,7 +362,7 @@ func HandleUpdate(c context.Context, message *stomp.Message) { return } - if ok && build.Status != model.StatusRunning { + if ok { // get the user because we transfer the user form the server to agent // and back we lose the token which does not get serialized to json. user, uerr := store.GetUser(c, work.User.ID) @@ -372,20 +374,39 @@ func HandleUpdate(c context.Context, message *stomp.Message) { fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) } + client := stomp.MustFromContext(c) + err = client.SendJSON("/topic/events", bus.Event{ + Type: bus.Started, + Repo: *work.Repo, + Build: *build, + Job: *job, + }, + stomp.WithHeader("repo", work.Repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(work.Repo.IsPrivate)), + ) + if err != nil { + logrus.Errorf("Unable to publish to /topic/events. %s", err) + } + + if job.Status == model.StatusRunning { + return + } + var buf bytes.Buffer var sub []byte done := make(chan bool) - dest := fmt.Sprintf("/topic/%d", job.ID) - client, _ := stomp.FromContext(c) + dest := fmt.Sprintf("/topic/logs.%d", job.ID) sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { - if len(m.Header.Get([]byte("eof"))) != 0 { + defer m.Release() + if m.Header.GetBool("eof") { done <- true + return } buf.Write(m.Body) buf.WriteByte('\n') - m.Release() })) + if err != nil { logrus.Errorf("Unable to read logs from broker. %s", err) return @@ -396,11 +417,6 @@ func HandleUpdate(c context.Context, message *stomp.Message) { logrus.Errorf("Unable to write logs to store. %s", err) return } - // if build.Status == model.StatusRunning { - // bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) - // } else { - // bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) - // } client.Unsubscribe(sub) client.Send(dest, []byte{}, stomp.WithRetain("remove")) diff --git a/server/stream.go b/server/stream.go index 4c8786f7..9c88ebed 100644 --- a/server/stream.go +++ b/server/stream.go @@ -66,15 +66,16 @@ func LogStream(c *gin.Context) { defer ticker.Stop() done := make(chan bool) - dest := fmt.Sprintf("/topic/%d", job.ID) + dest := fmt.Sprintf("/topic/logs.%d", job.ID) client, _ := stomp.FromContext(c) sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { - if len(m.Header.Get([]byte("eof"))) != 0 { + defer m.Release() + if m.Header.GetBool("eof") { done <- true + return } ws.SetWriteDeadline(time.Now().Add(writeWait)) ws.WriteMessage(websocket.TextMessage, m.Body) - m.Release() })) if err != nil { logrus.Errorf("Unable to read logs from broker. %s", err)