From 7aedd78015309da30b77351393b7040bdafd3e6b Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Mon, 18 May 2015 20:44:44 -0700 Subject: [PATCH] remote agent can now pull builds and push results --- cmd/drone-agent/main.go | 11 +++- cmd/drone-agent/updater.go | 16 ++--- cmd/drone-server/drone.go | 20 +++--- pkg/server/agent.go | 94 ---------------------------- pkg/server/queue.go | 27 +++++--- pkg/server/server.go | 29 +++------ pkg/server/ws.go | 30 ++++----- pkg/settings/settings.go | 5 ++ pkg/store/builtin/agent.go | 66 +++++++------------ pkg/store/builtin/migrate/migrate.go | 14 +---- pkg/store/store.go | 16 +---- pkg/types/build.go | 2 +- pkg/types/commit.go | 3 +- pkg/types/task.go | 14 ----- pkg/types/user.go | 2 +- 15 files changed, 105 insertions(+), 244 deletions(-) delete mode 100644 pkg/server/agent.go delete mode 100644 pkg/types/task.go diff --git a/cmd/drone-agent/main.go b/cmd/drone-agent/main.go index 6fc8f9c9..ae883d66 100644 --- a/cmd/drone-agent/main.go +++ b/cmd/drone-agent/main.go @@ -7,6 +7,7 @@ import ( "os" "time" + log "github.com/Sirupsen/logrus" "github.com/drone/drone/pkg/queue" runner "github.com/drone/drone/pkg/runner/builtin" @@ -47,11 +48,19 @@ func main() { for { w, err := pull() if err != nil { + log.Errorln(err) time.Sleep(30 * time.Second) continue } + + log.Infof("Pulled and running build %s / %d", + w.Repo.FullName, w.Commit.Sequence) + runner_ := runner.Runner{&updater{}} - runner_.Run(w) + err = runner_.Run(w) + if err != nil { + log.Errorln(err) + } } }() diff --git a/cmd/drone-agent/updater.go b/cmd/drone-agent/updater.go index 409b8ff8..33376fc1 100644 --- a/cmd/drone-agent/updater.go +++ b/cmd/drone-agent/updater.go @@ -11,20 +11,20 @@ import ( "strconv" "time" - //logs "github.com/Sirupsen/logrus" + logs "github.com/Sirupsen/logrus" common "github.com/drone/drone/pkg/types" ) type updater struct{} func (u *updater) SetCommit(user *common.User, r *common.Repo, c *common.Commit) error { - path := fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence) + path := fmt.Sprintf("/api/queue/push/%s", r.FullName) return sendBackoff("POST", path, c, nil) } func (u *updater) SetBuild(r *common.Repo, c *common.Commit, b *common.Build) error { - path := fmt.Sprintf("/api/queue/push/%s", r.FullName) - return sendBackoff("POST", path, c, nil) + path := fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence) + return sendBackoff("POST", path, b, nil) } func (u *updater) SetLogs(r *common.Repo, c *common.Commit, b *common.Build, rc io.ReadCloser) error { @@ -40,7 +40,7 @@ func sendBackoff(method, path string, in, out interface{}) error { if err == nil { break } - if attempts > 30 { + if attempts > 99 { break } attempts++ @@ -99,19 +99,21 @@ func send(method, path string, in, out interface{}) error { // make the request using the default http client resp, err := http.DefaultClient.Do(req) if err != nil { + logs.Errorf("Error posting request. %s", err) return err } defer resp.Body.Close() // Check for an http error status (ie not 200 StatusOK) if resp.StatusCode > 300 { + logs.Errorf("Error status code %d", resp.StatusCode) return fmt.Errorf(resp.Status) } // Decode the JSON response if out != nil { - return json.NewDecoder(resp.Body).Decode(out) + err = json.NewDecoder(resp.Body).Decode(out) } - return nil + return err } diff --git a/cmd/drone-server/drone.go b/cmd/drone-server/drone.go index bcb3c0a4..96169bd0 100644 --- a/cmd/drone-server/drone.go +++ b/cmd/drone-server/drone.go @@ -13,6 +13,7 @@ import ( "github.com/drone/drone/pkg/settings" "github.com/elazarl/go-bindata-assetfs" + log "github.com/Sirupsen/logrus" eventbus "github.com/drone/drone/pkg/bus/builtin" queue "github.com/drone/drone/pkg/queue/builtin" runner "github.com/drone/drone/pkg/runner/builtin" @@ -48,7 +49,15 @@ func main() { queue_ := queue.New() updater := runner.NewUpdater(eventbus_, store, remote) runner_ := runner.Runner{Updater: updater} - go run(&runner_, queue_) + + // launch the local queue runner if the system + // is not conifugred to run in agent mode + if settings.Agents != nil && settings.Agents.Secret != "" { + log.Infof("Run builds using remote build agents") + } else { + log.Infof("Run builds using the embedded build runner") + go run(&runner_, queue_) + } r := gin.Default() @@ -86,14 +95,6 @@ func main() { users.DELETE("/:name", server.DeleteUser) } - agents := api.Group("/agents") - { - agents.Use(server.MustAdmin()) - agents.GET("", server.GetAgents) - agents.POST("", server.PostAgent) - agents.DELETE("/:id", server.DeleteAgent) - } - repos := api.Group("/repos/:owner/:name") { repos.POST("", server.PostRepo) @@ -136,6 +137,7 @@ func main() { queue := api.Group("/queue") { queue.Use(server.MustAgent()) + queue.Use(server.SetSettings(settings)) queue.Use(server.SetUpdater(updater)) queue.POST("/pull", server.PollBuild) diff --git a/pkg/server/agent.go b/pkg/server/agent.go deleted file mode 100644 index 107a82b4..00000000 --- a/pkg/server/agent.go +++ /dev/null @@ -1,94 +0,0 @@ -package server - -import ( - "strconv" - - "github.com/drone/drone/pkg/types" - "github.com/gin-gonic/gin" - "github.com/gin-gonic/gin/binding" - - common "github.com/drone/drone/pkg/types" -) - -// GetAgents accepts a request to retrieve all build -// agents from the datastore and return encoded in JSON -// format. -// -// GET /api/agents -// -func GetAgents(c *gin.Context) { - store := ToDatastore(c) - agents, err := store.AgentList() - if err != nil { - c.Fail(400, err) - } else { - c.JSON(200, agents) - } -} - -// PostAgent accepts a request to register a new build -// agent with the system. The registered agent is returned -// from the datastore and return encoded in JSON format. -// -// POST /api/agents -// -func PostAgent(c *gin.Context) { - store := ToDatastore(c) - - in := &common.Agent{} - if !c.BindWith(in, binding.JSON) { - return - } - - // attept to fetch the agent from the - // datastore. If the agent already exists we - // should re-activate - agent, err := store.AgentAddr(in.Addr) - if err != nil { - agent = &common.Agent{} - agent.Addr = in.Addr - agent.Token = types.GenerateToken() - agent.Active = true - agent.IsHealthy = true - err = store.AddAgent(agent) - if err != nil { - c.Fail(400, err) - } else { - c.JSON(200, agent) - } - return - } - - agent.Active = true - err = store.SetAgent(agent) - if err != nil { - c.Fail(400, err) - } else { - c.JSON(200, agent) - } -} - -// DeleteAgent accepts a request to delete a build agent -// from the system. -// -// DELETE /api/agents/:id -// -func DeleteAgent(c *gin.Context) { - store := ToDatastore(c) - idstr := c.Params.ByName("id") - id, _ := strconv.Atoi(idstr) - - agent, err := store.Agent(int64(id)) - if err != nil { - c.Fail(404, err) - return - } - agent.Active = false - err = store.SetAgent(agent) - if err != nil { - c.Fail(400, err) - return - } - - c.Writer.WriteHeader(200) -} diff --git a/pkg/server/queue.go b/pkg/server/queue.go index 12f945bd..7741067f 100644 --- a/pkg/server/queue.go +++ b/pkg/server/queue.go @@ -1,6 +1,7 @@ package server import ( + "net" "strconv" "github.com/gin-gonic/gin" @@ -14,9 +15,17 @@ import ( func PollBuild(c *gin.Context) { queue := ToQueue(c) store := ToDatastore(c) - agent := ToAgent(c) - log.Infof("agent connected and polling builds at %s", agent.Addr) + // extract the IP address from the agent that is + // polling for builds. + host := c.Request.RemoteAddr + addr, _, err := net.SplitHostPort(host) + if err != nil { + addr = host + } + addr = net.JoinHostPort(addr, "1999") + + log.Infof("agent connected and polling builds at %s", addr) // pull an item from the queue work := queue.PullClose(c.Writer) @@ -25,14 +34,14 @@ func PollBuild(c *gin.Context) { return } - // store the agent details with the commit - work.Commit.AgentID = agent.ID - err := store.SetCommit(work.Commit) + // persist the relationship between agent and commit. + err = store.SetAgent(work.Commit, addr) if err != nil { - log.Errorf("unable to associate agent with commit. %s", err) - // IMPORTANT: this should never happen, and even if it does - // it is an error scenario that will only impact live streaming - // so we choose it log and ignore. + // note the we are ignoring and just logging the error here. + // we consider this an acceptible failure because it doesn't + // impact anything other than live-streaming output. + log.Errorf("unable to store the agent address %s for build %s %v", + addr, work.Repo.FullName, work.Commit.Sequence) } c.JSON(200, work) diff --git a/pkg/server/server.go b/pkg/server/server.go index 5daf095c..94c83c71 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -130,14 +130,6 @@ func ToRepo(c *gin.Context) *common.Repo { return v.(*common.Repo) } -func ToAgent(c *gin.Context) *common.Agent { - v, ok := c.Get("agent") - if !ok { - return nil - } - return v.(*common.Agent) -} - func ToDatastore(c *gin.Context) store.Store { return c.MustGet("datastore").(store.Store) } @@ -254,22 +246,19 @@ func MustAdmin() gin.HandlerFunc { func MustAgent() gin.HandlerFunc { return func(c *gin.Context) { - store := ToDatastore(c) + conf := ToSettings(c) + + // verify remote agents are enabled + if conf.Agents == nil || len(conf.Agents.Secret) == 0 { + c.AbortWithStatus(405) + return + } + // verify the agent token matches token := c.Request.FormValue("token") - if len(token) == 0 { + if token != conf.Agents.Secret { c.AbortWithStatus(401) return } - agent, err := store.AgentToken(token) - if err != nil { - c.Fail(401, err) - return - } - if agent.Active == false { - c.AbortWithStatus(403) - return - } - c.Set("agent", agent) c.Next() } } diff --git a/pkg/server/ws.go b/pkg/server/ws.go index 126bc59f..08b4ec86 100644 --- a/pkg/server/ws.go +++ b/pkg/server/ws.go @@ -71,21 +71,12 @@ func GetStream(c *gin.Context) { var rc io.ReadCloser - // if no agent is assigned to the build we - // should stream the local logs - if commit.AgentID == 0 { - rc, err = runner.Logs(build) - if err != nil { - c.Fail(404, err) - return - } - } else { - agent, err := store.Agent(commit.AgentID) - if err != nil { - c.Fail(404, err) - return - } - resp, err := http.Get("http://" + agent.Addr) + addr, err := store.Agent(commit) + // if the commit is being executed by an agent + // we'll proxy the build output directly to the + // remote Docker client, through the agent. + if err == nil { + resp, err := http.Get("http://" + addr) if err != nil { c.Fail(500, err) return @@ -95,6 +86,15 @@ func GetStream(c *gin.Context) { return } rc = resp.Body + + } else { + // else if the commit is not being executed + // by the build agent we can use the local runner + rc, err = runner.Logs(build) + if err != nil { + c.Fail(404, err) + return + } } defer func() { diff --git a/pkg/settings/settings.go b/pkg/settings/settings.go index 62d19f39..9b68b0a0 100644 --- a/pkg/settings/settings.go +++ b/pkg/settings/settings.go @@ -91,6 +91,10 @@ type Database struct { Datasource string `toml:"datasource"` } +type Agents struct { + Secret string `toml:"secret"` +} + // Settings defines global settings for the Drone system. type Settings struct { Database *Database `toml:"database"` @@ -98,6 +102,7 @@ type Settings struct { Service *Service `toml:"service"` Server *Server `toml:"server"` Session *Session `toml:"session"` + Agents *Agents `toml:"agents"` Plugins map[string]interface{} `toml:"plugins"` } diff --git a/pkg/store/builtin/agent.go b/pkg/store/builtin/agent.go index 73e30566..946a640f 100644 --- a/pkg/store/builtin/agent.go +++ b/pkg/store/builtin/agent.go @@ -16,62 +16,38 @@ func NewAgentstore(db *sql.DB) *Agentstore { } // Agent returns an agent by ID. -func (db *Agentstore) Agent(id int64) (*common.Agent, error) { +func (db *Agentstore) Agent(commit *common.Commit) (string, error) { var agent = new(common.Agent) - var err = meddler.Load(db, agentTable, agent, id) - return agent, err -} - -// AgentAddr returns an agent by address. -func (db *Agentstore) AgentAddr(addr string) (*common.Agent, error) { - var agent = new(common.Agent) - var err = meddler.QueryRow(db, agent, rebind(agentAddrQuery), addr) - return agent, err -} - -// AgentToken returns an agent by token. -func (db *Agentstore) AgentToken(token string) (*common.Agent, error) { - var agent = new(common.Agent) - var err = meddler.QueryRow(db, agent, rebind(agentTokenQuery), token) - return agent, err -} - -// AgentList returns a list of all build agents. -func (db *Agentstore) AgentList() ([]*common.Agent, error) { - var agents []*common.Agent - var err = meddler.QueryAll(db, &agents, rebind(agentListQuery), true) - return agents, err -} - -// AddAgent inserts an agent in the datastore. -func (db *Agentstore) AddAgent(agent *common.Agent) error { - return meddler.Insert(db, agentTable, agent) + var err = meddler.QueryRow(db, agent, rebind(agentQuery), commit.ID) + return agent.Addr, err } // SetAgent updates an agent in the datastore. -func (db *Agentstore) SetAgent(agent *common.Agent) error { - return meddler.Update(db, agentTable, agent) +func (db *Agentstore) SetAgent(commit *common.Commit, addr string) error { + agent := &agent{} + agent.Addr = addr + agent.CommitID = commit.ID + db.Exec(rebind(deleteAgentQuery), commit.ID) + return meddler.Insert(db, agentTable, agent) } -// Agent table name in database. +type agent struct { + ID int64 `meddler:"agent_id,pk"` + Addr string `meddler:"agent_addr"` + CommitID int64 `meddler:"commit_id"` +} + +// Build table name in database. const agentTable = "agents" -const agentTokenQuery = ` +const agentQuery = ` SELECT * FROM agents -WHERE agent_token = ? +WHERE commit_id = ? LIMIT 1; ` -const agentAddrQuery = ` -SELECT * -FROM agents -WHERE agent_addr = ? -LIMIT 1; -` - -const agentListQuery = ` -SELECT * -FROM agents -WHERE agent_active = ?; +const deleteAgentQuery = ` +DELETE FROM agents +WHERE commit_id = ?; ` diff --git a/pkg/store/builtin/migrate/migrate.go b/pkg/store/builtin/migrate/migrate.go index ba6dcc7a..a7f16ae9 100644 --- a/pkg/store/builtin/migrate/migrate.go +++ b/pkg/store/builtin/migrate/migrate.go @@ -125,7 +125,6 @@ var commitTable = ` CREATE TABLE IF NOT EXISTS commits ( commit_id INTEGER PRIMARY KEY AUTOINCREMENT ,repo_id INTEGER - ,agent_id INTEGER ,commit_seq INTEGER ,commit_state VARCHAR(255) ,commit_started INTEGER @@ -152,10 +151,6 @@ var commitRepoIndex = ` CREATE INDEX commits_repo_idx ON commits (repo_id); ` -var agentRepoIndex = ` -CREATE INDEX commits_agent_idx ON commits (agent_id); -` - var tokenTable = ` CREATE TABLE IF NOT EXISTS tokens ( token_id INTEGER PRIMARY KEY AUTOINCREMENT @@ -222,13 +217,8 @@ CREATE TABLE IF NOT EXISTS blobs ( var agentTable = ` CREATE TABLE IF NOT EXISTS agents ( agent_id INTEGER PRIMARY KEY AUTOINCREMENT - ,agent_kind VARCHAR(255) + ,commit_id INTEGER ,agent_addr VARCHAR(2000) - ,agent_token VARCHAR(2000) - ,agent_active BOOL - ,agent_cert BLOB - ,agent_key BLOB - ,UNIQUE(agent_addr) - ,UNIQUE(agent_token) + ,UNIQUE(commit_id) ); ` diff --git a/pkg/store/store.go b/pkg/store/store.go index f3d9335c..bbf44b8c 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -152,20 +152,8 @@ type Store interface { // // Agent returns an agent by ID. - Agent(int64) (*common.Agent, error) - - // AgentAddr returns an agent by address. - AgentAddr(string) (*common.Agent, error) - - // AgentToken returns an agent by token. - AgentToken(string) (*common.Agent, error) - - // AgentList returns a list of all build agents. - AgentList() ([]*common.Agent, error) - - // AddAgent inserts an agent in the datastore. - AddAgent(*common.Agent) error + Agent(*common.Commit) (string, error) // SetAgent updates an agent in the datastore. - SetAgent(*common.Agent) error + SetAgent(*common.Commit, string) error } diff --git a/pkg/types/build.go b/pkg/types/build.go index 9d8c948d..ed0b10f7 100644 --- a/pkg/types/build.go +++ b/pkg/types/build.go @@ -1,7 +1,7 @@ package types type Build struct { - ID int64 `meddler:"build_id,pk" json:"-"` + ID int64 `meddler:"build_id,pk" json:"id"` CommitID int64 `meddler:"commit_id" json:"-"` State string `meddler:"build_state" json:"state"` ExitCode int `meddler:"build_exit" json:"exit_code"` diff --git a/pkg/types/commit.go b/pkg/types/commit.go index 3d3c27e9..b46ed535 100644 --- a/pkg/types/commit.go +++ b/pkg/types/commit.go @@ -10,9 +10,8 @@ const ( ) type Commit struct { - ID int64 `meddler:"commit_id,pk" json:"-"` + ID int64 `meddler:"commit_id,pk" json:"id"` RepoID int64 `meddler:"repo_id" json:"-"` - AgentID int64 `meddler:"agent_id" json:"-"` Sequence int `meddler:"commit_seq" json:"sequence"` State string `meddler:"commit_state" json:"state"` Started int64 `meddler:"commit_started" json:"started_at"` diff --git a/pkg/types/task.go b/pkg/types/task.go deleted file mode 100644 index a16afa7e..00000000 --- a/pkg/types/task.go +++ /dev/null @@ -1,14 +0,0 @@ -package types - -type Task struct { - Number int `json:"number"` - State string `json:"state"` - ExitCode int `json:"exit_code"` - Duration int64 `json:"duration"` - Started int64 `json:"started_at"` - Finished int64 `json:"finished_at"` - - // Environment represents the build environment - // combination from the matrix. - Environment map[string]string `json:"environment,omitempty"` -} diff --git a/pkg/types/user.go b/pkg/types/user.go index 56f73340..a7f846a6 100644 --- a/pkg/types/user.go +++ b/pkg/types/user.go @@ -1,7 +1,7 @@ package types type User struct { - ID int64 `meddler:"user_id,pk" json:"-"` + ID int64 `meddler:"user_id,pk" json:"id"` Login string `meddler:"user_login" json:"login,omitempty"` Token string `meddler:"user_token" json:"-"` Secret string `meddler:"user_secret" json:"-"`