From 88d0cdbaf099e55419fd9686daf74ffa8ccec927 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Mon, 18 May 2015 15:47:13 -0700 Subject: [PATCH] code re-enables build agents. needs to be more thoroughly tested --- cmd/drone-agent/main.go | 31 +- cmd/drone-agent/updater.go | 126 +++++--- cmd/drone-server/drone.go | 30 +- .../static/scripts/controllers/agents.js | 31 +- .../static/scripts/services/agents.js | 15 +- .../static/scripts/views/agents.html | 26 +- doc/setup-nginx.md | 1 + pkg/server/agent.go | 92 +++++- pkg/server/queue.go | 282 ++++++++---------- pkg/server/server.go | 40 ++- pkg/server/ws.go | 41 ++- pkg/store/builtin/agent.go | 76 +++++ pkg/store/builtin/migrate/migrate.go | 20 ++ pkg/store/builtin/repo.go | 26 -- pkg/store/builtin/store.go | 2 + pkg/store/store.go | 22 ++ pkg/types/agent.go | 11 +- pkg/types/commit.go | 1 + pkg/types/util.go | 48 +++ pkg/types/util_test.go | 19 ++ 20 files changed, 637 insertions(+), 303 deletions(-) create mode 100644 doc/setup-nginx.md create mode 100644 pkg/types/util.go create mode 100644 pkg/types/util_test.go diff --git a/cmd/drone-agent/main.go b/cmd/drone-agent/main.go index db21ae54..6fc8f9c9 100644 --- a/cmd/drone-agent/main.go +++ b/cmd/drone-agent/main.go @@ -1,13 +1,9 @@ package main import ( - "bytes" - "encoding/json" "flag" "fmt" "io" - "net/http" - "net/url" "os" "time" @@ -54,7 +50,7 @@ func main() { time.Sleep(30 * time.Second) continue } - runner_ := runner.Runner{&updater{addr, token}} + runner_ := runner.Runner{&updater{}} runner_.Run(w) } }() @@ -66,6 +62,12 @@ func main() { s.Run(":1999") } +func pull() (*queue.Work, error) { + out := &queue.Work{} + err := send("POST", "/api/queue/pull", nil, out) + return out, err +} + // ping handler returns a simple response to the // caller indicating the server is running. This // can be used for heartbeats. @@ -86,6 +88,11 @@ func about(c *gin.Context) { // stream handler is a proxy that streams the Docker // stdout and stderr for a running build to the caller. func stream(c *gin.Context) { + if c.Request.FormValue("token") != token { + c.AbortWithStatus(401) + return + } + client, err := dockerclient.NewDockerClient(DockerHost, nil) if err != nil { c.Fail(500, err) @@ -135,17 +142,3 @@ func stream(c *gin.Context) { } io.Copy(c.Writer, rc) } - -func pull() (*queue.Work, error) { - url_, _ := url.Parse(addr) - url_.Path = "/api/queue/pull" - var body bytes.Buffer - resp, err := http.Post(url_.String(), "application/json", &body) - if err != nil { - return nil, err - } - defer resp.Body.Close() - work := &queue.Work{} - err = json.NewDecoder(resp.Body).Decode(work) - return work, err -} diff --git a/cmd/drone-agent/updater.go b/cmd/drone-agent/updater.go index 9d0110c3..409b8ff8 100644 --- a/cmd/drone-agent/updater.go +++ b/cmd/drone-agent/updater.go @@ -5,71 +5,113 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "net/http" "net/url" + "strconv" + "time" //logs "github.com/Sirupsen/logrus" common "github.com/drone/drone/pkg/types" ) -type updater struct { - addr string - token string -} +type updater struct{} func (u *updater) SetCommit(user *common.User, r *common.Repo, c *common.Commit) error { - url_, err := url.Parse(addr) - if err != nil { - return err - } - url_.Path = fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence) - var body bytes.Buffer - json.NewEncoder(&body).Encode(c) - resp, err := http.Post(url_.String(), "application/json", &body) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - return fmt.Errorf("Error pushing task state. Code %d", resp.StatusCode) - } - return nil + path := fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence) + return sendBackoff("POST", path, c, nil) } func (u *updater) SetBuild(r *common.Repo, c *common.Commit, b *common.Build) error { - url_, err := url.Parse(u.addr) - if err != nil { - return err - } - - url_.Path = fmt.Sprintf("/api/queue/push/%s", r.FullName) - var body bytes.Buffer - json.NewEncoder(&body).Encode(b) - resp, err := http.Post(url_.String(), "application/json", &body) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - return fmt.Errorf("Error pushing build state. Code %d", resp.StatusCode) - } - return nil + path := fmt.Sprintf("/api/queue/push/%s", r.FullName) + return sendBackoff("POST", path, c, nil) } func (u *updater) SetLogs(r *common.Repo, c *common.Commit, b *common.Build, rc io.ReadCloser) error { - url_, err := url.Parse(u.addr) + path := fmt.Sprintf("/api/queue/push/%s/%v/%v", r.FullName, c.Sequence, b.Sequence) + return sendBackoff("POST", path, rc, nil) +} + +func sendBackoff(method, path string, in, out interface{}) error { + var err error + var attempts int + for { + err = send(method, path, in, out) + if err == nil { + break + } + if attempts > 30 { + break + } + attempts++ + time.Sleep(time.Second * 30) + } + return err +} + +// do makes an http.Request and returns the response +func send(method, path string, in, out interface{}) error { + + // create the URI + uri, err := url.Parse(addr + path) if err != nil { return err } - url_.Path = fmt.Sprintf("/api/queue/push/%s/%v/%v/logs", r.FullName, c.Sequence, b.Sequence) - resp, err := http.Post(url_.String(), "application/json", rc) + if len(uri.Scheme) == 0 { + uri.Scheme = "http" + } + + params := uri.Query() + params.Add("token", token) + uri.RawQuery = params.Encode() + + // create the request + req, err := http.NewRequest(method, uri.String(), nil) + if err != nil { + return err + } + req.ProtoAtLeast(1, 1) + req.Close = true + req.ContentLength = 0 + + // If the data is a readCloser we can attach directly + // to the request body. + // + // Else we serialize the data input as JSON. + if rc, ok := in.(io.ReadCloser); ok { + req.Body = rc + + } else if in != nil { + inJson, err := json.Marshal(in) + if err != nil { + return err + } + + buf := bytes.NewBuffer(inJson) + req.Body = ioutil.NopCloser(buf) + + req.ContentLength = int64(len(inJson)) + req.Header.Set("Content-Length", strconv.Itoa(len(inJson))) + req.Header.Set("Content-Type", "application/json") + } + + // make the request using the default http client + resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() - if resp.StatusCode != 200 { - return fmt.Errorf("Error pushing build logs. Code %d", resp.StatusCode) + + // Check for an http error status (ie not 200 StatusOK) + if resp.StatusCode > 300 { + return fmt.Errorf(resp.Status) } + + // Decode the JSON response + if out != nil { + return json.NewDecoder(resp.Body).Decode(out) + } + return nil } diff --git a/cmd/drone-server/drone.go b/cmd/drone-server/drone.go index 9128003a..bcb3c0a4 100644 --- a/cmd/drone-server/drone.go +++ b/cmd/drone-server/drone.go @@ -89,7 +89,9 @@ func main() { agents := api.Group("/agents") { agents.Use(server.MustAdmin()) - agents.GET("/token", server.GetAgentToken) + agents.GET("", server.GetAgents) + agents.POST("", server.PostAgent) + agents.DELETE("/:id", server.DeleteAgent) } repos := api.Group("/repos/:owner/:name") @@ -131,20 +133,20 @@ func main() { hooks.POST("", server.PostHook) } - // queue := api.Group("/queue") - // { - // queue.Use(server.MustAgent()) - // queue.GET("", server.GetQueue) - // queue.POST("/pull", server.PollBuild) + queue := api.Group("/queue") + { + queue.Use(server.MustAgent()) + queue.Use(server.SetUpdater(updater)) + queue.POST("/pull", server.PollBuild) - // push := queue.Group("/push/:owner/:name") - // { - // push.Use(server.SetRepo()) - // push.POST("", server.PushBuild) - // push.POST("/:build", server.PushTask) - // push.POST("/:build/:task/logs", server.PushLogs) - // } - // } + push := queue.Group("/push/:owner/:name") + { + push.Use(server.SetRepo()) + push.POST("", server.PushCommit) + push.POST("/:commit", server.PushBuild) + push.POST("/:commit/:build/logs", server.PushLogs) + } + } stream := api.Group("/stream") { diff --git a/cmd/drone-server/static/scripts/controllers/agents.js b/cmd/drone-server/static/scripts/controllers/agents.js index e261bc33..cb9627b4 100644 --- a/cmd/drone-server/static/scripts/controllers/agents.js +++ b/cmd/drone-server/static/scripts/controllers/agents.js @@ -1,19 +1,34 @@ (function () { function AgentsCtrl($scope, $window, users, agents) { - - // this is the address that agents should connect with. - $scope.addr = $window.location.origin; - - // Gets the currently authenticated user + // this is the address that agents should connect with. + $scope.addr = $window.location.origin; + + // Gets the currently authenticated user users.getCached().then(function(payload){ $scope.user = payload.data; }); - // Generages a remote token. - agents.getToken().then(function(payload){ - $scope.token = payload.data; + // Generages a remote agents. + agents.getAgents().then(function(payload){ + $scope.agents = payload.data; }); + + $scope.onDelete = function(agent) { + console.log("delete agent", agent) + agents.deleteAgent(agent).then(function(payload){ + var index = $scope.agents.indexOf(agent); + $scope.agents.splice(index, 1); + }); + } + + $scope.newAgent={address: ""}; + $scope.onAdd = function(agent) { + agents.postAgent(agent).then(function(payload){ + $scope.agents.push(payload.data); + $scope.newAgent={address: ""}; + }); + } } angular diff --git a/cmd/drone-server/static/scripts/services/agents.js b/cmd/drone-server/static/scripts/services/agents.js index 44d0372b..15dd6f47 100644 --- a/cmd/drone-server/static/scripts/services/agents.js +++ b/cmd/drone-server/static/scripts/services/agents.js @@ -9,11 +9,20 @@ function AgentService($http) { /** - * Gets an agent token. + * Gets an agent list. */ - this.getToken = function() { - return $http.get('/api/agents/token'); + this.getAgents = function() { + return $http.get('/api/agents'); }; + + this.deleteAgent = function(agent) { + return $http.delete('/api/agents/'+agent.id); + }; + + this.postAgent = function(agent) { + return $http.post('/api/agents', agent); + }; + } angular diff --git a/cmd/drone-server/static/scripts/views/agents.html b/cmd/drone-server/static/scripts/views/agents.html index 144813c8..1592f725 100644 --- a/cmd/drone-server/static/scripts/views/agents.html +++ b/cmd/drone-server/static/scripts/views/agents.html @@ -2,11 +2,27 @@ Back -
-
Token
-
{{ token }}
-
+ + + + + + + + + + + + + + + + + + +
AddressToken
{{ agent.address }}{{ agent.token }} +
 docker run -d drone/drone-agent --addr={{ addr }} --token={{ token }}
-
+ \ No newline at end of file diff --git a/doc/setup-nginx.md b/doc/setup-nginx.md new file mode 100644 index 00000000..bdb0c02b --- /dev/null +++ b/doc/setup-nginx.md @@ -0,0 +1 @@ +setup-nginx.md \ No newline at end of file diff --git a/pkg/server/agent.go b/pkg/server/agent.go index 82664c73..107a82b4 100644 --- a/pkg/server/agent.go +++ b/pkg/server/agent.go @@ -1,20 +1,94 @@ package server import ( + "strconv" + "github.com/drone/drone/pkg/types" "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + + common "github.com/drone/drone/pkg/types" ) -// GET /api/agents/token -func GetAgentToken(c *gin.Context) { - sess := ToSession(c) - token := &types.Token{} - token.Kind = types.TokenAgent - token.Label = "drone-agent" - tokenstr, err := sess.GenerateToken(token) +// GetAgents accepts a request to retrieve all build +// agents from the datastore and return encoded in JSON +// format. +// +// GET /api/agents +// +func GetAgents(c *gin.Context) { + store := ToDatastore(c) + agents, err := store.AgentList() if err != nil { - c.Fail(500, err) + c.Fail(400, err) } else { - c.JSON(200, tokenstr) + c.JSON(200, agents) } } + +// PostAgent accepts a request to register a new build +// agent with the system. The registered agent is returned +// from the datastore and return encoded in JSON format. +// +// POST /api/agents +// +func PostAgent(c *gin.Context) { + store := ToDatastore(c) + + in := &common.Agent{} + if !c.BindWith(in, binding.JSON) { + return + } + + // attept to fetch the agent from the + // datastore. If the agent already exists we + // should re-activate + agent, err := store.AgentAddr(in.Addr) + if err != nil { + agent = &common.Agent{} + agent.Addr = in.Addr + agent.Token = types.GenerateToken() + agent.Active = true + agent.IsHealthy = true + err = store.AddAgent(agent) + if err != nil { + c.Fail(400, err) + } else { + c.JSON(200, agent) + } + return + } + + agent.Active = true + err = store.SetAgent(agent) + if err != nil { + c.Fail(400, err) + } else { + c.JSON(200, agent) + } +} + +// DeleteAgent accepts a request to delete a build agent +// from the system. +// +// DELETE /api/agents/:id +// +func DeleteAgent(c *gin.Context) { + store := ToDatastore(c) + idstr := c.Params.ByName("id") + id, _ := strconv.Atoi(idstr) + + agent, err := store.Agent(int64(id)) + if err != nil { + c.Fail(404, err) + return + } + agent.Active = false + err = store.SetAgent(agent) + if err != nil { + c.Fail(400, err) + return + } + + c.Writer.WriteHeader(200) +} diff --git a/pkg/server/queue.go b/pkg/server/queue.go index c9a4f991..12f945bd 100644 --- a/pkg/server/queue.go +++ b/pkg/server/queue.go @@ -1,182 +1,144 @@ package server -// import ( -// "encoding/json" -// "io" -// "net" -// "strconv" +import ( + "strconv" -// log "github.com/Sirupsen/logrus" -// "github.com/gin-gonic/gin" -// "github.com/gin-gonic/gin/binding" + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" -// "github.com/drone/drone/common" -// "github.com/drone/drone/eventbus" -// ) + log "github.com/Sirupsen/logrus" + common "github.com/drone/drone/pkg/types" +) -// // TODO (bradrydzewski) the callback URL should be signed. -// // TODO (bradrydzewski) we shouldn't need to fetch the Repo if specified in the URL path -// // TODO (bradrydzewski) use SetRepoLast to update the last repository +// GET /queue/pull +func PollBuild(c *gin.Context) { + queue := ToQueue(c) + store := ToDatastore(c) + agent := ToAgent(c) -// // GET /queue/pull -// func PollBuild(c *gin.Context) { -// queue := ToQueue(c) -// store := ToDatastore(c) -// agent := &common.Agent{ -// Addr: c.Request.RemoteAddr, -// } + log.Infof("agent connected and polling builds at %s", agent.Addr) -// // extact the host port and name and -// // replace with the default agent port (1999) -// host, _, err := net.SplitHostPort(agent.Addr) -// if err == nil { -// agent.Addr = host -// } -// agent.Addr = net.JoinHostPort(agent.Addr, "1999") + // pull an item from the queue + work := queue.PullClose(c.Writer) + if work == nil { + c.AbortWithStatus(500) + return + } -// log.Infof("agent connected and polling builds at %s", agent.Addr) + // store the agent details with the commit + work.Commit.AgentID = agent.ID + err := store.SetCommit(work.Commit) + if err != nil { + log.Errorf("unable to associate agent with commit. %s", err) + // IMPORTANT: this should never happen, and even if it does + // it is an error scenario that will only impact live streaming + // so we choose it log and ignore. + } -// work := queue.PullClose(c.Writer) -// if work == nil { -// c.AbortWithStatus(500) -// return -// } + c.JSON(200, work) -// // TODO (bradrydzewski) decide how we want to handle a failure here -// // still not sure exact behavior we want ... -// err = store.SetBuildAgent(work.Repo.FullName, work.Build.Number, agent) -// if err != nil { -// log.Errorf("error persisting build agent. %s", err) -// } + // acknowledge work received by the client + queue.Ack(work) +} -// c.JSON(200, work) +// POST /queue/push/:owner/:repo +func PushCommit(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) -// // acknowledge work received by the client -// queue.Ack(work) -// } + in := &common.Commit{} + if !c.BindWith(in, binding.JSON) { + return + } + user, err := store.User(repo.UserID) + if err != nil { + c.Fail(404, err) + return + } + commit, err := store.CommitSeq(repo, in.Sequence) + if err != nil { + c.Fail(404, err) + return + } -// // GET /queue/push/:owner/:repo -// func PushBuild(c *gin.Context) { -// store := ToDatastore(c) -// repo := ToRepo(c) -// bus := ToBus(c) -// in := &common.Build{} -// if !c.BindWith(in, binding.JSON) { -// return -// } -// build, err := store.Build(repo.FullName, in.Number) -// if err != nil { -// c.Fail(404, err) -// return -// } + commit.Started = in.Started + commit.Finished = in.Finished + commit.State = in.State -// if in.State != common.StatePending && in.State != common.StateRunning { -// store.DelBuildAgent(repo.FullName, build.Number) -// } + updater := ToUpdater(c) + err = updater.SetCommit(user, repo, commit) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} -// build.Duration = in.Duration -// build.Started = in.Started -// build.Finished = in.Finished -// build.State = in.State -// err = store.SetBuildState(repo.FullName, build) -// if err != nil { -// c.Fail(500, err) -// return -// } +// POST /queue/push/:owner/:repo/:commit +func PushBuild(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + cnum, _ := strconv.Atoi(c.Params.ByName("commit")) -// if build.State != common.StatePending && build.State != common.StateRunning { -// if repo.Last == nil || build.Number >= repo.Last.Number { -// repo.Last = build -// store.SetRepo(repo) -// } -// } + in := &common.Build{} + if !c.BindWith(in, binding.JSON) { + return + } -// // <-- FIXME -// // for some reason the Repo and Build fail to marshal to JSON. -// // It has something to do with memory / pointers. So it goes away -// // if I just refetch these items. Needs to be fixed in the future, -// // but for now should be ok -// repo, err = store.Repo(repo.FullName) -// if err != nil { -// c.Fail(500, err) -// return -// } -// build, err = store.Build(repo.FullName, in.Number) -// if err != nil { -// c.Fail(404, err) -// return -// } -// // END FIXME --> + commit, err := store.CommitSeq(repo, cnum) + if err != nil { + c.Fail(404, err) + return + } + build, err := store.BuildSeq(commit, in.Sequence) + if err != nil { + c.Fail(404, err) + return + } -// msg, err := json.Marshal(build) -// if err == nil { -// c.String(200, err.Error()) // we can ignore this error -// return -// } + build.Duration = in.Duration + build.Started = in.Started + build.Finished = in.Finished + build.ExitCode = in.ExitCode + build.State = in.State -// bus.Send(&eventbus.Event{ -// Name: repo.FullName, -// Kind: eventbus.EventRepo, -// Msg: msg, -// }) + updater := ToUpdater(c) + err = updater.SetBuild(repo, commit, build) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} -// c.Writer.WriteHeader(200) -// } +// POST /queue/push/:owner/:repo/:comimt/:build/logs +func PushLogs(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + cnum, _ := strconv.Atoi(c.Params.ByName("commit")) + bnum, _ := strconv.Atoi(c.Params.ByName("build")) -// // POST /queue/push/:owner/:repo/:build -// func PushTask(c *gin.Context) { -// store := ToDatastore(c) -// repo := ToRepo(c) -// bus := ToBus(c) -// num, _ := strconv.Atoi(c.Params.ByName("build")) -// in := &common.Task{} -// if !c.BindWith(in, binding.JSON) { -// return -// } -// err := store.SetBuildTask(repo.FullName, num, in) -// if err != nil { -// c.Fail(404, err) -// return -// } -// build, err := store.Build(repo.FullName, num) -// if err != nil { -// c.Fail(404, err) -// return -// } + commit, err := store.CommitSeq(repo, cnum) + if err != nil { + c.Fail(404, err) + return + } + build, err := store.BuildSeq(commit, bnum) + if err != nil { + c.Fail(404, err) + return + } + updater := ToUpdater(c) + err = updater.SetLogs(repo, commit, build, c.Request.Body) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} -// msg, err := json.Marshal(build) -// if err == nil { -// c.String(200, err.Error()) // we can ignore this error -// return -// } - -// bus.Send(&eventbus.Event{ -// Name: repo.FullName, -// Kind: eventbus.EventRepo, -// Msg: msg, -// }) - -// c.Writer.WriteHeader(200) -// } - -// // POST /queue/push/:owner/:repo/:build/:task/logs -// func PushLogs(c *gin.Context) { -// store := ToDatastore(c) -// repo := ToRepo(c) -// bnum, _ := strconv.Atoi(c.Params.ByName("build")) -// tnum, _ := strconv.Atoi(c.Params.ByName("task")) - -// const maxBuffToRead int64 = 5000000 // 5MB. -// err := store.SetLogs(repo.FullName, bnum, tnum, io.LimitReader(c.Request.Body, maxBuffToRead)) -// if err != nil { -// c.Fail(500, err) -// return -// } -// c.Writer.WriteHeader(200) -// } - -// func GetQueue(c *gin.Context) { -// queue := ToQueue(c) -// items := queue.Items() -// c.JSON(200, items) -// } +func GetQueue(c *gin.Context) { + queue := ToQueue(c) + items := queue.Items() + c.JSON(200, items) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index a0f69dfb..5daf095c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -76,6 +76,21 @@ func SetRunner(r runner.Runner) gin.HandlerFunc { } } +func ToUpdater(c *gin.Context) runner.Updater { + v, ok := c.Get("updater") + if !ok { + return nil + } + return v.(runner.Updater) +} + +func SetUpdater(u runner.Updater) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("updater", u) + c.Next() + } +} + func ToSettings(c *gin.Context) *settings.Settings { v, ok := c.Get("settings") if !ok { @@ -115,6 +130,14 @@ func ToRepo(c *gin.Context) *common.Repo { return v.(*common.Repo) } +func ToAgent(c *gin.Context) *common.Agent { + v, ok := c.Get("agent") + if !ok { + return nil + } + return v.(*common.Agent) +} + func ToDatastore(c *gin.Context) store.Store { return c.MustGet("datastore").(store.Store) } @@ -231,15 +254,22 @@ func MustAdmin() gin.HandlerFunc { func MustAgent() gin.HandlerFunc { return func(c *gin.Context) { - sess := ToSession(c) - token := sess.GetLogin(c.Request) - if token == nil { + store := ToDatastore(c) + token := c.Request.FormValue("token") + if len(token) == 0 { c.AbortWithStatus(401) return - } else if token.Kind != common.TokenAgent { - c.AbortWithStatus(500) + } + agent, err := store.AgentToken(token) + if err != nil { + c.Fail(401, err) return } + if agent.Active == false { + c.AbortWithStatus(403) + return + } + c.Set("agent", agent) c.Next() } } diff --git a/pkg/server/ws.go b/pkg/server/ws.go index 3a49f4f7..126bc59f 100644 --- a/pkg/server/ws.go +++ b/pkg/server/ws.go @@ -2,6 +2,7 @@ package server import ( "io" + "net/http" "strconv" "github.com/drone/drone/pkg/bus" @@ -68,11 +69,38 @@ func GetStream(c *gin.Context) { return } - rc, err := runner.Logs(build) - if err != nil { - c.Fail(404, err) - return + var rc io.ReadCloser + + // if no agent is assigned to the build we + // should stream the local logs + if commit.AgentID == 0 { + rc, err = runner.Logs(build) + if err != nil { + c.Fail(404, err) + return + } + } else { + agent, err := store.Agent(commit.AgentID) + if err != nil { + c.Fail(404, err) + return + } + resp, err := http.Get("http://" + agent.Addr) + if err != nil { + c.Fail(500, err) + return + } else if resp.StatusCode != 200 { + resp.Body.Close() + c.AbortWithStatus(resp.StatusCode) + return + } + rc = resp.Body } + + defer func() { + rc.Close() + }() + go func() { <-c.Writer.CloseNotify() rc.Close() @@ -80,11 +108,6 @@ func GetStream(c *gin.Context) { rw := &StreamWriter{c.Writer, 0} - defer func() { - log.Infof("closed log stream") - rc.Close() - }() - docker.StdCopy(rw, rw, rc) } diff --git a/pkg/store/builtin/agent.go b/pkg/store/builtin/agent.go index 5d9ee613..73e30566 100644 --- a/pkg/store/builtin/agent.go +++ b/pkg/store/builtin/agent.go @@ -1 +1,77 @@ package builtin + +import ( + "database/sql" + + common "github.com/drone/drone/pkg/types" + "github.com/russross/meddler" +) + +type Agentstore struct { + *sql.DB +} + +func NewAgentstore(db *sql.DB) *Agentstore { + return &Agentstore{db} +} + +// Agent returns an agent by ID. +func (db *Agentstore) Agent(id int64) (*common.Agent, error) { + var agent = new(common.Agent) + var err = meddler.Load(db, agentTable, agent, id) + return agent, err +} + +// AgentAddr returns an agent by address. +func (db *Agentstore) AgentAddr(addr string) (*common.Agent, error) { + var agent = new(common.Agent) + var err = meddler.QueryRow(db, agent, rebind(agentAddrQuery), addr) + return agent, err +} + +// AgentToken returns an agent by token. +func (db *Agentstore) AgentToken(token string) (*common.Agent, error) { + var agent = new(common.Agent) + var err = meddler.QueryRow(db, agent, rebind(agentTokenQuery), token) + return agent, err +} + +// AgentList returns a list of all build agents. +func (db *Agentstore) AgentList() ([]*common.Agent, error) { + var agents []*common.Agent + var err = meddler.QueryAll(db, &agents, rebind(agentListQuery), true) + return agents, err +} + +// AddAgent inserts an agent in the datastore. +func (db *Agentstore) AddAgent(agent *common.Agent) error { + return meddler.Insert(db, agentTable, agent) +} + +// SetAgent updates an agent in the datastore. +func (db *Agentstore) SetAgent(agent *common.Agent) error { + return meddler.Update(db, agentTable, agent) +} + +// Agent table name in database. +const agentTable = "agents" + +const agentTokenQuery = ` +SELECT * +FROM agents +WHERE agent_token = ? +LIMIT 1; +` + +const agentAddrQuery = ` +SELECT * +FROM agents +WHERE agent_addr = ? +LIMIT 1; +` + +const agentListQuery = ` +SELECT * +FROM agents +WHERE agent_active = ?; +` diff --git a/pkg/store/builtin/migrate/migrate.go b/pkg/store/builtin/migrate/migrate.go index 04be1433..ba6dcc7a 100644 --- a/pkg/store/builtin/migrate/migrate.go +++ b/pkg/store/builtin/migrate/migrate.go @@ -24,6 +24,7 @@ func Setup(tx migration.LimitedTx) error { statusTable, statusCommitIndex, blobTable, + agentTable, } for _, stmt := range stmts { _, err := tx.Exec(transform(stmt)) @@ -124,6 +125,7 @@ var commitTable = ` CREATE TABLE IF NOT EXISTS commits ( commit_id INTEGER PRIMARY KEY AUTOINCREMENT ,repo_id INTEGER + ,agent_id INTEGER ,commit_seq INTEGER ,commit_state VARCHAR(255) ,commit_started INTEGER @@ -150,6 +152,10 @@ var commitRepoIndex = ` CREATE INDEX commits_repo_idx ON commits (repo_id); ` +var agentRepoIndex = ` +CREATE INDEX commits_agent_idx ON commits (agent_id); +` + var tokenTable = ` CREATE TABLE IF NOT EXISTS tokens ( token_id INTEGER PRIMARY KEY AUTOINCREMENT @@ -212,3 +218,17 @@ CREATE TABLE IF NOT EXISTS blobs ( ,UNIQUE(blob_path) ); ` + +var agentTable = ` +CREATE TABLE IF NOT EXISTS agents ( + agent_id INTEGER PRIMARY KEY AUTOINCREMENT + ,agent_kind VARCHAR(255) + ,agent_addr VARCHAR(2000) + ,agent_token VARCHAR(2000) + ,agent_active BOOL + ,agent_cert BLOB + ,agent_key BLOB + ,UNIQUE(agent_addr) + ,UNIQUE(agent_token) +); +` diff --git a/pkg/store/builtin/repo.go b/pkg/store/builtin/repo.go index dc4090f9..950ea9f9 100644 --- a/pkg/store/builtin/repo.go +++ b/pkg/store/builtin/repo.go @@ -40,22 +40,6 @@ func (db *Repostore) RepoList(user *common.User) ([]*common.Repo, error) { return repos, err } -// // RepoKeys retrieves a set of repository keys from -// // the datastore for the specified name. -// func (db *Repostore) RepoKeypair(repo *common.Repo) (*common.Keypair, error) { -// var keypair = new(common.Keypair) -// var err = meddler.QueryRow(db, keypair, rebind(repoKeysQuery), repo.ID) -// return keypair, err -// } - -// // RepoParams retrieves a set of repository params from -// // the datastore for the specified name. -// func (db *Repostore) RepoParams(repo *common.Repo) (*common.Params, error) { -// var params = new(common.Params) -// var err = meddler.QueryRow(db, params, rebind(repoParamsQuery), repo.ID) -// return params, err -// } - // AddRepo inserts a repo in the datastore. func (db *Repostore) AddRepo(repo *common.Repo) error { repo.Created = time.Now().UTC().Unix() @@ -69,16 +53,6 @@ func (db *Repostore) SetRepo(repo *common.Repo) error { return meddler.Update(db, repoTable, repo) } -// // SetRepoKeypair upserts a keypair in the datastore. -// func (db *Repostore) SetRepoKeypair(keys *common.Keypair) error { -// return meddler.Save(db, repoKeyTable, keys) -// } - -// // SetRepoKeypair upserts a param set in the datastore. -// func (db *Repostore) SetRepoParams(params *common.Params) error { -// return meddler.Save(db, repoParamTable, params) -// } - // DelRepo removes the repo from the datastore. func (db *Repostore) DelRepo(repo *common.Repo) error { var _, err = db.Exec(rebind(repoDeleteStmt), repo.ID) diff --git a/pkg/store/builtin/store.go b/pkg/store/builtin/store.go index 8ff575f6..ea080062 100644 --- a/pkg/store/builtin/store.go +++ b/pkg/store/builtin/store.go @@ -80,6 +80,7 @@ func New(db *sql.DB) store.Store { *Blobstore *Starstore *Tokenstore + *Agentstore }{ NewUserstore(db), NewRepostore(db), @@ -88,5 +89,6 @@ func New(db *sql.DB) store.Store { NewBlobstore(db), NewStarstore(db), NewTokenstore(db), + NewAgentstore(db), } } diff --git a/pkg/store/store.go b/pkg/store/store.go index 08973c2a..f3d9335c 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -34,6 +34,8 @@ type Store interface { // DelUser removes the user from the datastore. DelUser(*common.User) error + // + // Token returns a token by ID. Token(int64) (*common.Token, error) @@ -146,4 +148,24 @@ type Store interface { // Del removes an object from the blobstore. DelBlob(path string) error + + // + + // Agent returns an agent by ID. + Agent(int64) (*common.Agent, error) + + // AgentAddr returns an agent by address. + AgentAddr(string) (*common.Agent, error) + + // AgentToken returns an agent by token. + AgentToken(string) (*common.Agent, error) + + // AgentList returns a list of all build agents. + AgentList() ([]*common.Agent, error) + + // AddAgent inserts an agent in the datastore. + AddAgent(*common.Agent) error + + // SetAgent updates an agent in the datastore. + SetAgent(*common.Agent) error } diff --git a/pkg/types/agent.go b/pkg/types/agent.go index 42a41d0e..7fd87147 100644 --- a/pkg/types/agent.go +++ b/pkg/types/agent.go @@ -3,7 +3,12 @@ package types // Agent represents a worker that has connected // to the system in order to perform work type Agent struct { - Name string `json:"name"` - Addr string `json:"addr"` - IsHealthy bool `json:"is_healthy"` + ID int64 `meddler:"agent_id,pk" json:"id,omitempty"` + Kind string `meddler:"agent_kind" json:"kind,omitempty"` + Addr string `meddler:"agent_addr" json:"address"` + Token string `meddler:"agent_token" json:"token"` + Cert string `meddler:"agent_cert" json:"-"` + Key string `meddler:"agent_key" json:"-"` + Active bool `meddler:"agent_active" json:"is_active"` + IsHealthy bool `meddler:"-" json:"is_healthy,omitempty"` } diff --git a/pkg/types/commit.go b/pkg/types/commit.go index 09fe0ed8..3d3c27e9 100644 --- a/pkg/types/commit.go +++ b/pkg/types/commit.go @@ -12,6 +12,7 @@ const ( type Commit struct { ID int64 `meddler:"commit_id,pk" json:"-"` RepoID int64 `meddler:"repo_id" json:"-"` + AgentID int64 `meddler:"agent_id" json:"-"` Sequence int `meddler:"commit_seq" json:"sequence"` State string `meddler:"commit_state" json:"state"` Started int64 `meddler:"commit_started" json:"started_at"` diff --git a/pkg/types/util.go b/pkg/types/util.go new file mode 100644 index 00000000..894a17f1 --- /dev/null +++ b/pkg/types/util.go @@ -0,0 +1,48 @@ +package types + +import ( + "crypto/md5" + "crypto/rand" + "fmt" + "io" + "strings" +) + +// standard characters allowed in token string. +var chars = []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789") + +// default token length +var length = 40 + +// GenerateToken generates random strings good for use in URIs to +// identify unique objects. +func GenerateToken() string { + b := make([]byte, length) + r := make([]byte, length+(length/4)) // storage for random bytes. + clen := byte(len(chars)) + maxrb := byte(256 - (256 % len(chars))) + i := 0 + for { + io.ReadFull(rand.Reader, r) + for _, c := range r { + if c >= maxrb { + // Skip this number to avoid modulo bias. + continue + } + b[i] = chars[c%clen] + i++ + if i == length { + return string(b) + } + } + } +} + +// helper function to create a Gravatar Hash +// for the given Email address. +func CreateGravatar(email string) string { + email = strings.ToLower(strings.TrimSpace(email)) + hash := md5.New() + hash.Write([]byte(email)) + return fmt.Sprintf("%x", hash.Sum(nil)) +} diff --git a/pkg/types/util_test.go b/pkg/types/util_test.go new file mode 100644 index 00000000..2a2f3e35 --- /dev/null +++ b/pkg/types/util_test.go @@ -0,0 +1,19 @@ +package types + +import ( + "testing" +) + +func Test_CreateGravatar(t *testing.T) { + var got, want = CreateGravatar("dr_cooper@caltech.edu"), "2b77ba83e2216ddcd11fe8c24b70c2a3" + if got != want { + t.Errorf("Got gravatar hash %s, want %s", got, want) + } +} + +func Test_GenerateToken(t *testing.T) { + token := GenerateToken() + if len(token) != length { + t.Errorf("Want token length %d, got %d", length, len(token)) + } +}