diff --git a/builder/pool/pool.go b/cluster/cluster.go similarity index 61% rename from builder/pool/pool.go rename to cluster/cluster.go index 31061029..ff2d740e 100644 --- a/builder/pool/pool.go +++ b/cluster/cluster.go @@ -1,4 +1,4 @@ -package pool +package cluster import ( "sync" @@ -9,14 +9,14 @@ import ( // TODO (bradrydzewski) ability to cancel work. // TODO (bradrydzewski) ability to remove a worker. -type Pool struct { +type Cluster struct { sync.Mutex clients map[dockerclient.Client]bool clientc chan dockerclient.Client } -func New() *Pool { - return &Pool{ +func New() *Cluster { + return &Cluster{ clients: make(map[dockerclient.Client]bool), clientc: make(chan dockerclient.Client, 999), } @@ -24,26 +24,26 @@ func New() *Pool { // Allocate allocates a client to the pool to // be available to accept work. -func (p *Pool) Allocate(c dockerclient.Client) bool { - if p.IsAllocated(c) { +func (c *Cluster) Allocate(cli dockerclient.Client) bool { + if c.IsAllocated(cli) { return false } - p.Lock() - p.clients[c] = true - p.Unlock() + c.Lock() + c.clients[cli] = true + c.Unlock() - p.clientc <- c + c.clientc <- cli return true } // IsAllocated is a helper function that returns // true if the client is currently allocated to // the Pool. -func (p *Pool) IsAllocated(c dockerclient.Client) bool { - p.Lock() - defer p.Unlock() - _, ok := p.clients[c] +func (c *Cluster) IsAllocated(cli dockerclient.Client) bool { + c.Lock() + defer c.Unlock() + _, ok := c.clients[cli] return ok } @@ -51,21 +51,21 @@ func (p *Pool) IsAllocated(c dockerclient.Client) bool { // available clients. If the client is currently // reserved and performing work it will finish, // but no longer be given new work. -func (p *Pool) Deallocate(c dockerclient.Client) { - p.Lock() - defer p.Unlock() - delete(p.clients, c) +func (c *Cluster) Deallocate(cli dockerclient.Client) { + c.Lock() + defer c.Unlock() + delete(c.clients, cli) } // List returns a list of all Workers currently // allocated to the Pool. -func (p *Pool) List() []dockerclient.Client { - p.Lock() - defer p.Unlock() +func (c *Cluster) List() []dockerclient.Client { + c.Lock() + defer c.Unlock() var clients []dockerclient.Client - for c := range p.clients { - clients = append(clients, c) + for cli := range c.clients { + clients = append(clients, cli) } return clients } @@ -73,17 +73,17 @@ func (p *Pool) List() []dockerclient.Client { // Reserve reserves the next available worker to // start doing work. Once work is complete, the // worker should be released back to the pool. -func (p *Pool) Reserve() <-chan dockerclient.Client { +func (p *Cluster) Reserve() <-chan dockerclient.Client { return p.clientc } // Release releases the worker back to the pool // of available workers. -func (p *Pool) Release(c dockerclient.Client) bool { - if !p.IsAllocated(c) { +func (c *Cluster) Release(cli dockerclient.Client) bool { + if !c.IsAllocated(cli) { return false } - p.clientc <- c + c.clientc <- cli return true } diff --git a/common/token.go b/common/token.go index e183806f..85b0967b 100644 --- a/common/token.go +++ b/common/token.go @@ -6,7 +6,7 @@ const ( ) type Token struct { - Kind string `json:"-"` + Kind string `json:"kind"` Login string `json:"-"` Label string `json:"label"` Repos []string `json:"repos,omitempty"` diff --git a/datastore/bolt/bolt.go b/datastore/bolt/bolt.go index 018ab30f..cd0186e3 100644 --- a/datastore/bolt/bolt.go +++ b/datastore/bolt/bolt.go @@ -24,7 +24,6 @@ var ( bucketRepoUsers = []byte("repo_users") bucketBuild = []byte("build") bucketBuildStatus = []byte("build_status") - bucketBuildTasks = []byte("build_tasks") bucketBuildLogs = []byte("build_logs") bucketBuildSeq = []byte("build_seq") ) @@ -51,7 +50,6 @@ func New(path string) (*DB, error) { tx.CreateBucketIfNotExists(bucketRepoUsers) tx.CreateBucketIfNotExists(bucketBuild) tx.CreateBucketIfNotExists(bucketBuildStatus) - tx.CreateBucketIfNotExists(bucketBuildTasks) tx.CreateBucketIfNotExists(bucketBuildLogs) tx.CreateBucketIfNotExists(bucketBuildSeq) return nil diff --git a/datastore/bolt/repo.go b/datastore/bolt/repo.go index e1f70217..9d2daa70 100644 --- a/datastore/bolt/repo.go +++ b/datastore/bolt/repo.go @@ -153,20 +153,25 @@ func (db *DB) DelRepo(repo *common.Repo) error { // deleteTracesOfRepo cleans up build leftovers when a repo is removed func (db *DB) deleteTracesOfRepo(t *bolt.Tx, repoKey []byte) error { - err := error(nil) - // bucketBuildSeq uses the repoKey directly - t.Bucket(bucketBuildSeq).Delete(repoKey) + err := t.Bucket(bucketBuildSeq).Delete(repoKey) + if err != nil { + // only error here is if our Tx is read-only + return err + } // the other buckets use repoKey with '/buildNumber', at least. // validating that an additiona '/' is there ensures that we don't // match 'github.com/drone/droney' when we're cleaning up after // 'github.com/drone/drone'. prefix := append(repoKey, '/') - deleteWithPrefix(t, bucketBuildLogs, prefix, true) - deleteWithPrefix(t, bucketBuildStatus, prefix, true) - deleteWithPrefix(t, bucketBuildTasks, prefix, true) - deleteWithPrefix(t, bucketBuild, prefix, true) + buckets := [][]byte{bucketBuildStatus, bucketBuildLogs, bucketBuild} + for _, b := range buckets { + err = deleteWithPrefix(t, b, prefix) + if err != nil { + break + } + } return err } diff --git a/datastore/bolt/repo_test.go b/datastore/bolt/repo_test.go index ed53a10f..a8797058 100644 --- a/datastore/bolt/repo_test.go +++ b/datastore/bolt/repo_test.go @@ -14,12 +14,17 @@ func TestRepo(t *testing.T) { testUser := "octocat" testRepo := "github.com/octopod/hq" testRepo2 := "github.com/octopod/avengers" + commUser := &common.User{Login: "freya"} var db *DB // Temp database - // create a new database before each unit - // test and destroy afterwards. + // create a new database before each unit test and destroy afterwards. g.BeforeEach(func() { - db = Must("/tmp/drone.test.db") + file, err := ioutil.TempFile(os.TempDir(), "drone-bolt") + if err != nil { + panic(err) + } + + db = Must(file.Name()) }) g.AfterEach(func() { os.Remove(db.Path()) @@ -42,7 +47,7 @@ func TestRepo(t *testing.T) { g.Assert(repo.FullName).Equal(testRepo) }) - g.It("Should del Repo", func() { + g.It("Should be deletable", func() { db.SetRepo(&common.Repo{FullName: testRepo}) db.Repo(testRepo) @@ -50,6 +55,33 @@ func TestRepo(t *testing.T) { g.Assert(err_).Equal(nil) }) + g.It("Should cleanup builds when deleted", func() { + repo := &common.Repo{FullName: testRepo} + err := db.SetRepoNotExists(commUser, repo) + g.Assert(err).Equal(nil) + + db.SetBuild(testRepo, &common.Build{State: "success"}) + db.SetBuild(testRepo, &common.Build{State: "success"}) + db.SetBuild(testRepo, &common.Build{State: "pending"}) + + db.SetBuildStatus(testRepo, 1, &common.Status{Context: "success"}) + db.SetBuildStatus(testRepo, 2, &common.Status{Context: "success"}) + db.SetBuildStatus(testRepo, 3, &common.Status{Context: "pending"}) + + // first a little sanity to validate our test conditions + _, err = db.BuildLast(testRepo) + g.Assert(err).Equal(nil) + + // now run our specific test suite + // 1. ensure that we can delete the repo + err = db.DelRepo(repo) + g.Assert(err).Equal(nil) + + // 2. ensure that deleting the repo cleans up other references + _, err = db.Build(testRepo, 1) + g.Assert(err).Equal(ErrKeyNotFound) + }) + g.It("Should get RepoList", func() { db.SetRepoNotExists(&common.User{Login: testUser}, &common.Repo{FullName: testRepo}) db.SetRepoNotExists(&common.User{Login: testUser}, &common.Repo{FullName: testRepo2}) diff --git a/datastore/bolt/util.go b/datastore/bolt/util.go index 1d992f3f..7affb98e 100644 --- a/datastore/bolt/util.go +++ b/datastore/bolt/util.go @@ -85,16 +85,17 @@ func splice(t *bolt.Tx, bucket, index, value []byte) error { return update(t, bucket, index, &keys) } -func deleteWithPrefix(t *bolt.Tx, bucket, prefix []byte, ignoreErr bool) error { +func deleteWithPrefix(t *bolt.Tx, bucket, prefix []byte) error { var err error c := t.Bucket(bucket).Cursor() for k, _ := c.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = c.Next() { err = c.Delete() - if !ignoreErr && err != nil { + if err != nil { break } } + // only error here is if our Tx is read-only return err } diff --git a/datastore/datastore.go b/datastore/datastore.go index ed4f121c..359884d5 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -101,11 +101,19 @@ type Datastore interface { // named repository. BuildLast(string) (*common.Build, error) + // BuildConf gets the build configuration file (yaml) + // for the named repository and build number. + // BuildConf(string, int) ([]byte, error) + // SetBuild inserts or updates a build for the named // repository. The build number is incremented and // assigned to the provided build. SetBuild(string, *common.Build) error + // SetBuildConf persists the build configuration file (yaml) + // for the named repository and build number. + // SetBuildConf(string, int) ([]byte, error) + // Status returns the status for the given repository // and build number. ////Status(string, int, string) (*common.Status, error) diff --git a/parser/parse.go b/parser/parse.go index f1066474..7543efc2 100644 --- a/parser/parse.go +++ b/parser/parse.go @@ -16,7 +16,7 @@ type Opts struct { Privileged bool } -var defaultOpts = &Opts{ +var DefaultOpts = &Opts{ Volumes: false, Network: false, Privileged: false, @@ -26,42 +26,13 @@ var defaultOpts = &Opts{ // a list of build configurations for each axis // using the default parsing options. func Parse(raw string) ([]*common.Config, error) { - return ParseOpts(raw, defaultOpts) + return ParseOpts(raw, DefaultOpts) } // ParseOpts parses a build matrix and returns // a list of build configurations for each axis // using the provided parsing options. func ParseOpts(raw string, opts *Opts) ([]*common.Config, error) { - confs, err := parse(raw) - if err != nil { - return nil, err - } - for _, conf := range confs { - err := Lint(conf) - if err != nil { - return nil, err - } - transformSetup(conf) - transformClone(conf) - transformBuild(conf) - transformImages(conf) - transformDockerPlugin(conf) - if !opts.Network { - rmNetwork(conf) - } - if !opts.Volumes { - rmVolumes(conf) - } - if !opts.Privileged { - rmPrivileged(conf) - } - } - return confs, nil -} - -// helper function to parse a matrix configuraiton file. -func parse(raw string) ([]*common.Config, error) { axis, err := matrix.Parse(raw) if err != nil { return nil, err @@ -71,7 +42,7 @@ func parse(raw string) ([]*common.Config, error) { // when no matrix values exist we should return // a single config value with an empty label. if len(axis) == 0 { - conf, err := parseYaml(raw) + conf, err := ParseSingle(raw, opts) if err != nil { return nil, err } @@ -81,19 +52,43 @@ func parse(raw string) ([]*common.Config, error) { for _, ax := range axis { // inject the matrix values into the raw script injected := inject.Inject(raw, ax) - conf, err := parseYaml(injected) + conf, err := ParseSingle(injected, opts) if err != nil { return nil, err } conf.Axis = common.Axis(ax) confs = append(confs, conf) } + return confs, nil } // helper funtion to parse a yaml configuration file. -func parseYaml(raw string) (*common.Config, error) { +func ParseSingle(raw string, opts *Opts) (*common.Config, error) { conf := &common.Config{} err := yaml.Unmarshal([]byte(raw), conf) + if err != nil { + return nil, err + } + // lint the yaml file + err = Lint(conf) + if err != nil { + return nil, err + } + // apply rules / transofms + transformSetup(conf) + transformClone(conf) + transformBuild(conf) + transformImages(conf) + transformDockerPlugin(conf) + if !opts.Network { + rmNetwork(conf) + } + if !opts.Volumes { + rmVolumes(conf) + } + if !opts.Privileged { + rmPrivileged(conf) + } return conf, err } diff --git a/queue/builtin/queue.go b/queue/builtin/queue.go new file mode 100644 index 00000000..bd444573 --- /dev/null +++ b/queue/builtin/queue.go @@ -0,0 +1,81 @@ +package builtin + +import ( + "sync" + + "github.com/drone/drone/queue" +) + +type Queue struct { + sync.Mutex + + acks map[*queue.Work]struct{} + items map[*queue.Work]struct{} + itemc chan *queue.Work +} + +func New() *Queue { + return &Queue{ + acks: make(map[*queue.Work]struct{}), + items: make(map[*queue.Work]struct{}), + itemc: make(chan *queue.Work, 999), + } +} + +// Publish inserts work at the tail of this queue, waiting for +// space to become available if the queue is full. +func (q *Queue) Publish(work *queue.Work) error { + q.Lock() + q.items[work] = struct{}{} + q.Unlock() + q.itemc <- work + return nil +} + +// Remove removes the specified work item from this queue, +// if it is present. +func (q *Queue) Remove(work *queue.Work) error { + return nil +} + +// Pull retrieves and removes the head of this queue, waiting +// if necessary until work becomes available. +func (q *Queue) Pull() *queue.Work { + work := <-q.itemc + q.Lock() + delete(q.items, work) + q.Unlock() + return work +} + +// PullAct retrieves and removes the head of this queue, waiting +// if necessary until work becomes available. Items pull from the +// queue that aren't acknowledged will be pushed back to the queue +// again when the default acknowledgement deadline is reached. +func (q *Queue) PullAck() *queue.Work { + work := q.Pull() + q.Lock() + q.acks[work] = struct{}{} + q.Unlock() + return work +} + +// Ack acknowledges an item in the queue was processed. +func (q *Queue) Ack(work *queue.Work) error { + q.Lock() + delete(q.acks, work) + q.Unlock() + return nil +} + +// Items returns a slice containing all of the work in this +// queue, in proper sequence. +func (q *Queue) Items() []*queue.Work { + q.Lock() + defer q.Unlock() + items := []*queue.Work{} + for work, _ := range q.items { + items = append(items, work) + } + return items +} diff --git a/queue/plugin/client.go b/queue/plugin/client.go new file mode 100644 index 00000000..d0f7706b --- /dev/null +++ b/queue/plugin/client.go @@ -0,0 +1,109 @@ +package plugin + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + + "github.com/drone/drone/queue" +) + +type Client struct { + url string + token string +} + +func New(url, token string) *Client { + return &Client{url, token} +} + +// Publish makes an http request to the remote queue +// to insert work at the tail. +func (c *Client) Publish(work *queue.Work) error { + return c.send("POST", "/queue", work, nil) +} + +// Remove makes an http request to the remote queue to +// remove the specified work item. +func (c *Client) Remove(work *queue.Work) error { + return c.send("DELETE", "/queue", work, nil) +} + +// Pull makes an http request to the remote queue to +// retrieve work. This initiates a long poll and will +// block until complete. +func (c *Client) Pull() *queue.Work { + out := &queue.Work{} + err := c.send("POST", "/queue/pull", nil, out) + if err != nil { + // TODO handle error + } + return out +} + +// Pull makes an http request to the remote queue to +// retrieve work, with an acknowldement required. +// This initiates a long poll and will block until +// complete. +func (c *Client) PullAck() *queue.Work { + out := &queue.Work{} + err := c.send("POST", "/queue/pull?ack=true", nil, out) + if err != nil { + // TODO handle error + } + return out +} + +// Ack makes an http request to the remote queue +// to acknowledge an item in the queue was processed. +func (c *Client) Ack(work *queue.Work) error { + return c.send("POST", "/queue/ack", nil, nil) +} + +// Items makes an http request to the remote queue +// to fetch a list of all work. +func (c *Client) Items() []*queue.Work { + out := []*queue.Work{} + err := c.send("GET", "/queue/items", nil, &out) + if err != nil { + // TODO handle error + } + return out +} + +// send is a helper function that makes an authenticated +// request to the remote http plugin. +func (c *Client) send(method, path string, in interface{}, out interface{}) error { + url_, err := url.Parse(c.url + path) + if err != nil { + return err + } + + var buf io.ReadWriter + if in != nil { + buf = new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(in) + if err != nil { + return err + } + } + + req, err := http.NewRequest(method, url_.String(), buf) + if err != nil { + return err + } + req.Header.Add("Authorization", "Bearer "+c.token) + req.Header.Add("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if out == nil { + return nil + } + return json.NewDecoder(resp.Body).Decode(out) +} diff --git a/queue/plugin/server.go b/queue/plugin/server.go new file mode 100644 index 00000000..baf35507 --- /dev/null +++ b/queue/plugin/server.go @@ -0,0 +1,116 @@ +package plugin + +import ( + "net/http" + + "github.com/drone/drone/queue" + "github.com/gin-gonic/gin" +) + +// Handle returns an http.Handler that enables a remote +// client to interop with a Queue over http. +func Handle(queue queue.Queue, token string) http.Handler { + r := gin.New() + + // middleware to validate the authorization token + // and to inject the queue into the http context. + bearer := "Bearer " + token + r.Use(func(c *gin.Context) { + if c.Request.Header.Get("Authorization") != bearer { + c.AbortWithStatus(403) + return + } + c.Set("queue", queue) + c.Next() + }) + + r.POST("/queue", publish) + r.DELETE("/queue", remove) + r.POST("/queue/pull", pull) + r.POST("/queue/ack", ack) + r.POST("/queue/items", items) + + return r +} + +// publish handles an http request to the queue +// to insert work at the tail. +func publish(c *gin.Context) { + q := fromContext(c) + work := &queue.Work{} + if !c.Bind(work) { + c.AbortWithStatus(400) + return + } + err := q.Publish(work) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} + +// remove handles an http request to the queue +// to remove a work item. +func remove(c *gin.Context) { + q := fromContext(c) + work := &queue.Work{} + if !c.Bind(work) { + c.AbortWithStatus(400) + return + } + err := q.Remove(work) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} + +// pull handles an http request to the queue +// to retrieve work. +func pull(c *gin.Context) { + q := fromContext(c) + var work *queue.Work + if c.Request.FormValue("ack") != "" { + work = q.PullAck() + } else { + work = q.Pull() + } + if work == nil { + c.AbortWithStatus(500) + return + } + c.JSON(200, work) +} + +// ack handles an http request to the queue +// to confirm an item was successfully pulled. +func ack(c *gin.Context) { + q := fromContext(c) + work := &queue.Work{} + if !c.Bind(work) { + c.AbortWithStatus(400) + return + } + err := q.Ack(work) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} + +// items handles an http request to the queue to +// return a list of all work items. +func items(c *gin.Context) { + q := fromContext(c) + items := q.Items() + c.JSON(200, items) +} + +// helper function to retrieve the Queue from +// the context and cast appropriately. +func fromContext(c *gin.Context) queue.Queue { + return c.MustGet("queue").(queue.Queue) +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 00000000..e0832d60 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,55 @@ +package queue + +type Queue interface { + // Publish inserts work at the tail of this queue, waiting for + // space to become available if the queue is full. + Publish(*Work) error + + // Remove removes the specified work item from this queue, + // if it is present. + Remove(*Work) error + + // Pull retrieves and removes the head of this queue, waiting + // if necessary until work becomes available. + Pull() *Work + + // PullAck retrieves and removes the head of this queue, waiting + // if necessary until work becomes available. Items pull from the + // queue that aren't acknowledged will be pushed back to the queue + // again when the default acknowledgement deadline is reached. + PullAck() *Work + + // Ack acknowledges an item in the queue was processed. + Ack(*Work) error + + // Items returns a slice containing all of the work in this + // queue, in proper sequence. + Items() []*Work +} + +// type Manager interface { +// // Register registers a worker that has signed +// // up to accept work. +// Register(*Worker) + +// // Unregister unregisters a worker that should no +// // longer be accepting work. +// Unregister(*Worker) + +// // Assign assigns work to a worker. +// Assign(*Work, *Worker) + +// // Unassign unassigns work from a worker. +// Unassign(*Work, *Worker) + +// // Work returns a list of all work that is +// // currently in progress. +// Work() []*Work + +// // Worker retrieves a worker by name. +// Worker(string) *Worker + +// // Workers returns a slice containing all workers +// // registered with the manager. +// Workers() []*Worker +// } diff --git a/queue/worker.go b/queue/worker.go new file mode 100644 index 00000000..912ef8b5 --- /dev/null +++ b/queue/worker.go @@ -0,0 +1,73 @@ +package queue + +import ( + "io" + + "github.com/drone/drone/common" +) + +// Work represents an item for work to be +// processed by a worker. +type Work struct { + User *common.User `json:"user"` + Repo *common.Repo `json:"repo"` + Build *common.Build `json:"build"` + Keys *common.Keypair `json:"keypair"` + Netrc *common.Netrc `json:"netrc"` + Yaml []byte `json:"yaml"` +} + +// represents a worker that has connected +// to the system in order to perform work +type Worker struct { + Name string + Addr string + IsHealthy bool +} + +// Ping pings to worker to verify it is +// available and in good health. +func (w *Worker) Ping() (bool, error) { + return false, nil +} + +// Logs fetches the logs for a work item. +func (w *Worker) Logs() (io.Reader, error) { + return nil, nil +} + +// Cancel cancels a work item. +func (w *Worker) Cancel() error { + return nil +} + +// type Monitor struct { +// manager *Manager +// } + +// func NewMonitor(manager *Manager) *Monitor { +// return &Monitor{manager} +// } + +// // start is a helper function that is used to monitor +// // all registered workers and ensure they are in a +// // healthy state. +// func (m *Monitor) Start() { +// ticker := time.NewTicker(1 * time.Hour) +// go func() { +// for { +// select { +// case <-ticker.C: +// workers := m.manager.Workers() +// for _, worker := range workers { +// // ping the worker to make sure it is +// // available and still accepting builds. +// if _, err := worker.Ping(); err != nil { +// m.manager.SetHealth(worker, false) +// } else { +// m.manager.SetHealth(worker, true) +// } +// } +// } +// } +// } diff --git a/remote/remote.go b/remote/remote.go index 64e5609f..4f110e67 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -12,16 +12,13 @@ type Remote interface { Login(token, secret string) (*common.User, error) // Orgs fetches the organizations for the given user. - // - // TODO(bradrydzewski) consider consolidating this to return - // the list of organizations along with - // the user Login info. Orgs(u *common.User) ([]string, error) // Repo fetches the named repository from the remote system. Repo(u *common.User, owner, repo string) (*common.Repo, error) - // Perm fetches the named repository from the remote system. + // Perm fetches the named repository permissions from + // the remote system for the specified user. Perm(u *common.User, owner, repo string) (*common.Perm, error) // Script fetches the build script (.drone.yml) from the remote diff --git a/server/builds.go b/server/builds.go index 513cf70c..8535737a 100644 --- a/server/builds.go +++ b/server/builds.go @@ -138,6 +138,11 @@ func RunBuild(c *gin.Context) { return } + // params, _ := store.RepoParams(repo.FullName) + // if params != nil && len(params) != 0 { + // raw = []byte(inject.InjectSafe(string(raw), params)) + // } + // TODO push build to queue c.JSON(202, build) diff --git a/server/hooks.go b/server/hooks.go index 70b3120b..df7a9ae2 100644 --- a/server/hooks.go +++ b/server/hooks.go @@ -5,6 +5,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/drone/drone/common" + "github.com/drone/drone/parser/inject" "github.com/drone/drone/parser/matrix" // "github.com/bradrydzewski/drone/worker" "github.com/gin-gonic/gin" @@ -72,6 +73,8 @@ func PostHook(c *gin.Context) { return } + params, _ := store.RepoParams(repo.FullName) + build := &common.Build{} build.State = common.StatePending build.Commit = hook.Commit @@ -84,7 +87,10 @@ func PostHook(c *gin.Context) { c.Fail(404, err) return } - + // inject any private parameters into the .drone.yml + if params != nil && len(params) != 0 { + raw = []byte(inject.InjectSafe(string(raw), params)) + } axes, err := matrix.Parse(string(raw)) if err != nil { log.Errorf("failure to calculate matrix for %s. %s", repo.FullName, err) diff --git a/server/queue.go b/server/queue.go new file mode 100644 index 00000000..b87c0641 --- /dev/null +++ b/server/queue.go @@ -0,0 +1,108 @@ +package server + +import ( + "io" + "io/ioutil" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + + "github.com/drone/drone/common" + "github.com/drone/drone/eventbus" +) + +// TODO (bradrydzewski) the callback URL should be signed. +// TODO (bradrydzewski) we shouldn't need to fetch the Repo if specified in the URL path +// TODO (bradrydzewski) use SetRepoLast to update the last repository + +// GET /queue/pull +func PollBuild(c *gin.Context) { + queue := ToQueue(c) + work := queue.PullAck() + c.JSON(200, work) +} + +// GET /queue/push/:owner/:repo +func PushBuild(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + bus := ToBus(c) + in := &common.Build{} + if !c.BindWith(in, binding.JSON) { + return + } + build, err := store.Build(repo.FullName, in.Number) + if err != nil { + c.Fail(404, err) + return + } + err = store.SetBuildState(repo.FullName, build) + if err != nil { + c.Fail(500, err) + return + } + + bus.Send(&eventbus.Event{ + Build: build, + Repo: repo, + }) + if repo.Last != nil && repo.Last.Number > build.Number { + c.Writer.WriteHeader(200) + return + } + repo.Last = build + store.SetRepo(repo) + c.Writer.WriteHeader(200) +} + +// POST /queue/push/:owner/:repo/:build +func PushTask(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + bus := ToBus(c) + num, _ := strconv.Atoi(c.Params.ByName("build")) + in := &common.Task{} + if !c.BindWith(in, binding.JSON) { + return + } + err := store.SetBuildTask(repo.FullName, num, in) + if err != nil { + c.Fail(404, err) + return + } + build, err := store.Build(repo.FullName, num) + if err != nil { + c.Fail(404, err) + return + } + bus.Send(&eventbus.Event{ + Build: build, + Repo: repo, + }) + c.Writer.WriteHeader(200) +} + +// POST /queue/push/:owner/:repo/:build/:task/logs +func PushLogs(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + bnum, _ := strconv.Atoi(c.Params.ByName("build")) + tnum, _ := strconv.Atoi(c.Params.ByName("task")) + + // TODO (bradrydzewski) change this interface to accept an io.Reader + // instead of a byte array so that we can buffer the write and so that + // we avoid unnecessary copies of the data in memory. + logs, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, 5000000)) //5MB + defer c.Request.Body.Close() + if err != nil { + c.Fail(500, err) + return + } + err = store.SetLogs(repo.FullName, bnum, tnum, logs) + if err != nil { + c.Fail(500, err) + return + } + c.Writer.WriteHeader(200) +} diff --git a/server/server.go b/server/server.go index 08f48d99..55ef51d2 100644 --- a/server/server.go +++ b/server/server.go @@ -9,11 +9,27 @@ import ( "github.com/drone/drone/common" "github.com/drone/drone/datastore" "github.com/drone/drone/eventbus" + "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/server/session" "github.com/drone/drone/settings" ) +func SetQueue(q queue.Queue) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("queue", q) + c.Next() + } +} + +func ToQueue(c *gin.Context) queue.Queue { + v, err := c.Get("queue") + if err != nil { + return nil + } + return v.(queue.Queue) +} + func SetBus(r eventbus.Bus) gin.HandlerFunc { return func(c *gin.Context) { c.Set("eventbus", r) diff --git a/server/static/scripts/controllers/builds.js b/server/static/scripts/controllers/builds.js index 6b271b1d..2c75c92a 100644 --- a/server/static/scripts/controllers/builds.js +++ b/server/static/scripts/controllers/builds.js @@ -98,6 +98,15 @@ $scope.error = err; }); }; + + // var convert = new Filter({stream:true,newline:false}); + // var term = document.getElementById("term") + // var stdout = document.getElementById("stdout").innerText.split("\n") + // stdout.forEach(function(line, i) { + // setTimeout(function () { + // term.innerHTML += convert.toHtml(line+"\n"); + // }, i*i); + // }); } angular