integrates drone/mq into agent

This commit is contained in:
Brad Rydzewski 2016-09-26 03:29:05 -05:00
parent f2c1d46f9e
commit 0b2f1c8e51
7 changed files with 145 additions and 281 deletions

View file

@ -1,16 +1,13 @@
package agent
import (
"encoding/json"
"fmt"
"io"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/drone/drone/build"
"github.com/drone/drone/client"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/drone/mq/stomp"
)
// UpdateFunc handles buid pipeline status updates.
@ -27,65 +24,44 @@ var TermLoggerFunc = func(line *build.Line) {
// NewClientUpdater returns an updater that sends updated build details
// to the drone server.
func NewClientUpdater(client client.Client) UpdateFunc {
func NewClientUpdater(client *stomp.Client) UpdateFunc {
return func(w *queue.Work) {
for {
err := client.Push(w)
if err == nil {
return
}
logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s",
err := client.SendJSON("/queue/updates", w)
if err != nil {
logrus.Errorf("Error updating %s/%s#%d.%d. %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
logrus.Infof("Retry update in 30s")
time.Sleep(time.Second * 30)
}
if w.Job.Status != model.StatusRunning {
var dest = fmt.Sprintf("/topic/logs.%d", w.Job.ID)
var opts = []stomp.MessageOption{
stomp.WithHeader("eof", "true"),
stomp.WithRetain("all"),
}
if err := client.Send(dest, []byte("eof"), opts...); err != nil {
logrus.Errorf("Error sending eof %s/%s#%d.%d. %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
}
}
}
}
func NewStreamLogger(stream client.StreamWriter, w io.Writer, limit int64) LoggerFunc {
var err error
var size int64
return func(line *build.Line) {
func NewClientLogger(client *stomp.Client, id int64, limit int64) LoggerFunc {
var size int64
var dest = fmt.Sprintf("/topic/logs.%d", id)
var opts = []stomp.MessageOption{
stomp.WithRetain("all"),
}
return func(line *build.Line) {
if size > limit {
return
}
// TODO remove this double-serialization
linejson, _ := json.Marshal(line)
w.Write(linejson)
w.Write([]byte{'\n'})
if err = stream.WriteJSON(line); err != nil {
if err := client.SendJSON(dest, line, opts...); err != nil {
logrus.Errorf("Error streaming build logs. %s", err)
}
size += int64(len(line.Out))
}
}
func NewClientLogger(client client.Client, id int64, rc io.ReadCloser, wc io.WriteCloser, limit int64) LoggerFunc {
var once sync.Once
var size int64
return func(line *build.Line) {
// annoying hack to only start streaming once the first line is written
once.Do(func() {
go func() {
err := client.Stream(id, rc)
if err != nil && err != io.ErrClosedPipe {
logrus.Errorf("Error streaming build logs. %s", err)
}
}()
})
if size > limit {
return
}
linejson, _ := json.Marshal(line)
wc.Write(linejson)
wc.Write([]byte{'\n'})
size += int64(len(line.Out))
}
}

View file

@ -4,7 +4,6 @@ import (
"io"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
)
// Client is used to communicate with a Drone server.
@ -103,27 +102,4 @@ type Client interface {
// AgentList returns a list of build agents.
AgentList() ([]*model.Agent, error)
//
// below items for Queue (internal use only)
//
// Pull pulls work from the server queue.
Pull(os, arch string) (*queue.Work, error)
// Push pushes an update to the server.
Push(*queue.Work) error
// Stream streams the build logs to the server.
Stream(int64, io.ReadCloser) error
LogStream(int64) (StreamWriter, error)
LogPost(int64, io.ReadCloser) error
// Wait waits for the job to the complete.
Wait(int64) *Wait
// Ping the server
Ping() error
}

View file

@ -12,10 +12,6 @@ import (
"strconv"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/oauth2"
)
@ -323,110 +319,6 @@ func (c *client) AgentList() ([]*model.Agent, error) {
return out, err
}
//
// below items for Queue (internal use only)
//
// Pull pulls work from the server queue.
func (c *client) Pull(os, arch string) (*queue.Work, error) {
out := new(queue.Work)
uri := fmt.Sprintf(pathPull, c.base, os, arch)
err := c.post(uri, nil, out)
return out, err
}
// Push pushes an update to the server.
func (c *client) Push(p *queue.Work) error {
uri := fmt.Sprintf(pathPush, c.base, p.Job.ID)
err := c.post(uri, p, nil)
return err
}
// Ping pings the server.
func (c *client) Ping() error {
uri := fmt.Sprintf(pathPing, c.base)
err := c.post(uri, nil, nil)
return err
}
// Stream streams the build logs to the server.
func (c *client) Stream(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathStream, c.base, id)
err := c.post(uri, rc, nil)
return err
}
// LogPost sends the full build logs to the server.
func (c *client) LogPost(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathLogs, c.base, id)
return c.post(uri, rc, nil)
}
// StreamWriter implements a special writer for streaming log entries to the
// central Drone server. The standard implementation is the gorilla.Connection.
type StreamWriter interface {
Close() error
WriteJSON(interface{}) error
}
// LogStream streams the build logs to the server.
func (c *client) LogStream(id int64) (StreamWriter, error) {
rawurl := fmt.Sprintf(pathLogsAuth, c.base, id, c.token)
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
if uri.Scheme == "https" {
uri.Scheme = "wss"
} else {
uri.Scheme = "ws"
}
// TODO need TLS client configuration
conn, _, err := websocket.DefaultDialer.Dial(uri.String(), nil)
return conn, err
}
// Wait watches and waits for the build to cancel or finish.
func (c *client) Wait(id int64) *Wait {
ctx, cancel := context.WithCancel(context.Background())
return &Wait{id, c, ctx, cancel}
}
type Wait struct {
id int64
client *client
ctx context.Context
cancel context.CancelFunc
}
func (w *Wait) Done() (*model.Job, error) {
uri := fmt.Sprintf(pathWait, w.client.base, w.id)
req, err := w.client.createRequest(uri, "POST", nil)
if err != nil {
return nil, err
}
res, err := ctxhttp.Do(w.ctx, w.client.client, req)
if err != nil {
return nil, err
}
defer res.Body.Close()
job := &model.Job{}
err = json.NewDecoder(res.Body).Decode(&job)
if err != nil {
return nil, err
}
return job, nil
}
func (w *Wait) Cancel() {
w.cancel()
}
//
// http request helper functions
//

View file

@ -7,13 +7,14 @@ import (
"syscall"
"time"
"github.com/drone/drone/client"
"github.com/drone/drone/shared/token"
"github.com/drone/drone/queue"
"github.com/drone/mq/stomp"
"github.com/samalba/dockerclient"
"strings"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"strings"
)
// AgentCmd is the exported command for starting the drone agent.
@ -67,7 +68,7 @@ var AgentCmd = cli.Command{
EnvVar: "DRONE_SERVER",
Name: "drone-server",
Usage: "drone server address",
Value: "http://localhost:8000",
Value: "ws://localhost:8000/ws/broker",
},
cli.StringFlag{
EnvVar: "DRONE_TOKEN",
@ -146,8 +147,9 @@ func start(c *cli.Context) {
var accessToken string
if c.String("drone-secret") != "" {
secretToken := c.String("drone-secret")
accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken)
// secretToken := c.String("drone-secret")
accessToken = c.String("drone-secret")
// accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken)
} else {
accessToken = c.String("drone-token")
}
@ -157,10 +159,17 @@ func start(c *cli.Context) {
accessToken,
)
client := client.NewClientToken(
strings.TrimRight(c.String("drone-server"), "/"),
accessToken,
)
server := strings.TrimRight(c.String("drone-server"), "/")
client, err := stomp.Dial(server)
if err != nil {
logrus.Fatalf("Cannot connect to host %s. %s", server, err)
}
opts := []stomp.MessageOption{
stomp.WithCredentials("x-token", accessToken),
}
if err = client.Connect(opts...); err != nil {
logrus.Fatalf("Cannot connect to host %s. %s", server, err)
}
tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path"))
if err == nil {
@ -171,42 +180,49 @@ func start(c *cli.Context) {
logrus.Fatal(err)
}
go func() {
for {
if err := client.Ping(); err != nil {
logrus.Warnf("unable to ping the server. %s", err.Error())
}
time.Sleep(c.Duration("ping"))
}
}()
var wg sync.WaitGroup
for i := 0; i < c.Int("docker-max-procs"); i++ {
wg.Add(1)
go func() {
r := pipeline{
drone: client,
docker: docker,
config: config{
platform: c.String("docker-os") + "/" + c.String("docker-arch"),
timeout: c.Duration("timeout"),
namespace: c.String("namespace"),
privileged: c.StringSlice("privileged"),
pull: c.BoolT("pull"),
logs: int64(c.Int("max-log-size")) * 1000000,
},
}
for {
if err := r.run(); err != nil {
dur := c.Duration("backoff")
logrus.Warnf("reconnect in %v. %s", dur, err.Error())
time.Sleep(dur)
}
}
handler := func(m *stomp.Message) {
running.Add(1)
defer func() {
running.Done()
client.Ack(m.Ack)
}()
r := pipeline{
drone: client,
docker: docker,
config: config{
platform: c.String("docker-os") + "/" + c.String("docker-arch"),
timeout: c.Duration("timeout"),
namespace: c.String("namespace"),
privileged: c.StringSlice("privileged"),
pull: c.BoolT("pull"),
logs: int64(c.Int("max-log-size")) * 1000000,
},
}
work := new(queue.Work)
m.Unmarshal(work)
r.run(work)
}
_, err = client.Subscribe("/queue/pending", stomp.HandlerFunc(handler),
stomp.WithAck("client"),
stomp.WithPrefetch(
c.Int("docker-max-procs"),
),
// stomp.WithSelector(
// fmt.Sprintf("platorm == '%s/%s'",
// c.String("drone-os"),
// c.String("drone-arch"),
// ),
// ),
)
if err != nil {
logrus.Fatalf("Unable to connect to queue. %s", err)
}
handleSignals()
wg.Wait()
<-client.Done()
}
// tracks running builds

View file

@ -1,14 +1,13 @@
package agent
import (
"bytes"
"io/ioutil"
"time"
"github.com/Sirupsen/logrus"
"github.com/drone/drone/agent"
"github.com/drone/drone/build/docker"
"github.com/drone/drone/client"
"github.com/drone/drone/queue"
"github.com/drone/mq/stomp"
"github.com/samalba/dockerclient"
)
@ -23,20 +22,16 @@ type config struct {
}
type pipeline struct {
drone client.Client
drone *stomp.Client
docker dockerclient.Client
config config
}
func (r *pipeline) run() error {
w, err := r.drone.Pull("linux", "amd64")
if err != nil {
return err
}
running.Add(1)
defer func() {
running.Done()
}()
func (r *pipeline) run(w *queue.Work) {
// defer func() {
// // r.drone.Ack(id, opts)
// }()
logrus.Infof("Starting build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
@ -44,24 +39,9 @@ func (r *pipeline) run() error {
cancel := make(chan bool, 1)
engine := docker.NewClient(r.docker)
// streaming the logs
// rc, wc := io.Pipe()
// defer func() {
// wc.Close()
// rc.Close()
// }()
var buf bytes.Buffer
stream, err := r.drone.LogStream(w.Job.ID)
if err != nil {
return err
}
a := agent.Agent{
Update: agent.NewClientUpdater(r.drone),
// Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs),
Logger: agent.NewStreamLogger(stream, &buf, r.config.logs),
Update: agent.NewClientUpdater(r.drone),
Logger: agent.NewClientLogger(r.drone, w.Job.ID, r.config.logs),
Engine: engine,
Timeout: r.config.timeout,
Platform: r.config.platform,
@ -70,27 +50,34 @@ func (r *pipeline) run() error {
Pull: r.config.pull,
}
// signal for canceling the build.
wait := r.drone.Wait(w.Job.ID)
defer wait.Cancel()
go func() {
if _, err := wait.Done(); err == nil {
cancelFunc := func(m *stomp.Message) {
defer m.Release()
id := m.Header.GetInt64("job-id")
if id == w.Job.ID {
cancel <- true
logrus.Infof("Cancel build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
}
}
// signal for canceling the build.
sub, err := r.drone.Subscribe("/topic/cancels", stomp.HandlerFunc(cancelFunc))
if err != nil {
logrus.Errorf("Error subscribing to /topic/cancels. %s", err)
}
defer func() {
r.drone.Unsubscribe(sub)
}()
a.Run(w, cancel)
if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil {
logrus.Errorf("Error sending logs for %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
}
stream.Close()
// if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil {
// logrus.Errorf("Error sending logs for %s/%s#%d.%d",
// w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
// }
// stream.Close()
logrus.Infof("Finished build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
return nil
}

View file

@ -4,10 +4,12 @@ import (
"bytes"
"fmt"
"net/http"
"strconv"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/drone/drone/bus"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/drone/drone/remote"
@ -360,7 +362,7 @@ func HandleUpdate(c context.Context, message *stomp.Message) {
return
}
if ok && build.Status != model.StatusRunning {
if ok {
// get the user because we transfer the user form the server to agent
// and back we lose the token which does not get serialized to json.
user, uerr := store.GetUser(c, work.User.ID)
@ -372,20 +374,39 @@ func HandleUpdate(c context.Context, message *stomp.Message) {
fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number))
}
client := stomp.MustFromContext(c)
err = client.SendJSON("/topic/events", bus.Event{
Type: bus.Started,
Repo: *work.Repo,
Build: *build,
Job: *job,
},
stomp.WithHeader("repo", work.Repo.FullName),
stomp.WithHeader("private", strconv.FormatBool(work.Repo.IsPrivate)),
)
if err != nil {
logrus.Errorf("Unable to publish to /topic/events. %s", err)
}
if job.Status == model.StatusRunning {
return
}
var buf bytes.Buffer
var sub []byte
done := make(chan bool)
dest := fmt.Sprintf("/topic/%d", job.ID)
client, _ := stomp.FromContext(c)
dest := fmt.Sprintf("/topic/logs.%d", job.ID)
sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) {
if len(m.Header.Get([]byte("eof"))) != 0 {
defer m.Release()
if m.Header.GetBool("eof") {
done <- true
return
}
buf.Write(m.Body)
buf.WriteByte('\n')
m.Release()
}))
if err != nil {
logrus.Errorf("Unable to read logs from broker. %s", err)
return
@ -396,11 +417,6 @@ func HandleUpdate(c context.Context, message *stomp.Message) {
logrus.Errorf("Unable to write logs to store. %s", err)
return
}
// if build.Status == model.StatusRunning {
// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job))
// } else {
// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job))
// }
client.Unsubscribe(sub)
client.Send(dest, []byte{}, stomp.WithRetain("remove"))

View file

@ -66,15 +66,16 @@ func LogStream(c *gin.Context) {
defer ticker.Stop()
done := make(chan bool)
dest := fmt.Sprintf("/topic/%d", job.ID)
dest := fmt.Sprintf("/topic/logs.%d", job.ID)
client, _ := stomp.FromContext(c)
sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) {
if len(m.Header.Get([]byte("eof"))) != 0 {
defer m.Release()
if m.Header.GetBool("eof") {
done <- true
return
}
ws.SetWriteDeadline(time.Now().Add(writeWait))
ws.WriteMessage(websocket.TextMessage, m.Body)
m.Release()
}))
if err != nil {
logrus.Errorf("Unable to read logs from broker. %s", err)