From 12989b187c0fd9adccae1e0b6027ccc41598ba5d Mon Sep 17 00:00:00 2001 From: Alex Suraci Date: Mon, 24 Feb 2014 16:53:28 -0800 Subject: [PATCH] introduce Queue object this is an intermediate step towards pushing configuration up. Signed-off-by: Abhijit Hiremagalur --- pkg/queue/init.go | 22 ---- pkg/queue/queue.go | 246 +++++--------------------------------------- pkg/queue/worker.go | 234 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 261 insertions(+), 241 deletions(-) delete mode 100644 pkg/queue/init.go create mode 100644 pkg/queue/worker.go diff --git a/pkg/queue/init.go b/pkg/queue/init.go deleted file mode 100644 index c0b4b697..00000000 --- a/pkg/queue/init.go +++ /dev/null @@ -1,22 +0,0 @@ -package queue - -import ( - "runtime" -) - -func init() { - // get the number of CPUs. Since builds - // tend to be CPU-intensive we should only - // execute 1 build per CPU. - ncpu := runtime.NumCPU() - - // must be at least 1 - if ncpu < 1 { - ncpu = 1 - } - - // spawn a worker for each CPU - for i := 0; i < ncpu; i++ { - go work() - } -} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index cadaee93..2b69f33c 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -1,46 +1,14 @@ package queue import ( - "bytes" - "fmt" - "github.com/drone/drone/pkg/build" - "github.com/drone/drone/pkg/build/git" - r "github.com/drone/drone/pkg/build/repo" "github.com/drone/drone/pkg/build/script" - "github.com/drone/drone/pkg/channel" - "github.com/drone/drone/pkg/database" . "github.com/drone/drone/pkg/model" - "github.com/drone/drone/pkg/plugin/notify" - "github.com/drone/go-github/github" - "log" - "path/filepath" - "time" + "runtime" ) -// queue that will store all build tasks until -// they are processed by a worker. -var queue = make(chan *BuildTask) - -// work is a function that will infinitely -// run in the background waiting for tasks that -// it can pull off the queue and execute. -func work() { - var task *BuildTask - for { - // get work item (pointer) from the queue - task = <-queue - if task == nil { - continue - } - - // execute the task - task.execute() - } -} - -// Add adds the task to the build queue. -func Add(task *BuildTask) { - queue <- task +// A Queue dispatches tasks to workers. +type Queue struct { + tasks chan<- *BuildTask } // BuildTasks represents a build that is pending @@ -55,193 +23,33 @@ type BuildTask struct { Script *script.Build } -// execute will execute the build task and persist -// the results to the datastore. -func (b *BuildTask) execute() error { - // we need to be sure that we can recover - // from any sort panic that could occur - // to avoid brining down the entire application - defer func() { - if e := recover(); e != nil { - b.Build.Finished = time.Now().UTC() - b.Commit.Finished = time.Now().UTC() - b.Build.Duration = b.Build.Finished.Unix() - b.Build.Started.Unix() - b.Commit.Duration = b.Build.Finished.Unix() - b.Build.Started.Unix() - b.Commit.Status = "Error" - b.Build.Status = "Error" - database.SaveBuild(b.Build) - database.SaveCommit(b.Commit) - } - }() +var defaultQueue = Start(runtime.NumCPU()) // TEMPORARY; INJECT PLEASE - // update commit and build status - b.Commit.Status = "Started" - b.Build.Status = "Started" - b.Build.Started = time.Now().UTC() - b.Commit.Started = time.Now().UTC() +var Add = defaultQueue.Add // TEMPORARY; INJECT PLEASE - // persist the commit to the database - if err := database.SaveCommit(b.Commit); err != nil { - return err +func Start(workers int) *Queue { + // get the number of CPUs. Since builds + // tend to be CPU-intensive we should only + // execute 1 build per CPU. + // must be at least 1 + // if ncpu < 1 { + // ncpu = 1 + // } + + tasks := make(chan *BuildTask) + + queue := &Queue{tasks: tasks} + + // spawn a worker for each CPU + for i := 0; i < workers; i++ { + worker := worker{} + go worker.work(tasks) } - // persist the build to the database - if err := database.SaveBuild(b.Build); err != nil { - return err - } - - // get settings - settings, _ := database.GetSettings() - - // notification context - context := ¬ify.Context{ - Repo: b.Repo, - Commit: b.Commit, - Host: settings.URL().String(), - } - - // send all "started" notifications - if b.Script.Notifications != nil { - b.Script.Notifications.Send(context) - } - - // Send "started" notification to Github - if err := updateGitHubStatus(b.Repo, b.Commit); err != nil { - log.Printf("error updating github status: %s\n", err.Error()) - } - - // make sure a channel exists for the repository, - // the commit, and the commit output (TODO) - reposlug := fmt.Sprintf("%s/%s/%s", b.Repo.Host, b.Repo.Owner, b.Repo.Name) - commitslug := fmt.Sprintf("%s/%s/%s/commit/%s", b.Repo.Host, b.Repo.Owner, b.Repo.Name, b.Commit.Hash) - consoleslug := fmt.Sprintf("%s/%s/%s/commit/%s/builds/%s", b.Repo.Host, b.Repo.Owner, b.Repo.Name, b.Commit.Hash, b.Build.Slug) - channel.Create(reposlug) - channel.Create(commitslug) - channel.CreateStream(consoleslug) - - // notify the channels that the commit and build started - channel.SendJSON(reposlug, b.Commit) - channel.SendJSON(commitslug, b.Build) - - var buf = &bufferWrapper{channel: consoleslug} - - // append private parameters to the environment - // variable section of the .drone.yml file - if b.Repo.Params != nil { - for k, v := range b.Repo.Params { - b.Script.Env = append(b.Script.Env, k+"="+v) - } - } - - // execute the build - builder := build.New() - builder.Build = b.Script - builder.Repo = &r.Repo{Path: b.Repo.URL, Branch: b.Commit.Branch, Commit: b.Commit.Hash, PR: b.Commit.PullRequest, Dir: filepath.Join("/var/cache/drone/src", b.Repo.Slug), Depth: git.GitDepth(b.Script.Git)} - builder.Key = []byte(b.Repo.PrivateKey) - builder.Stdout = buf - builder.Timeout = 300 * time.Minute - - defer func() { - // update the status of the commit using the - // GitHub status API. - if err := updateGitHubStatus(b.Repo, b.Commit); err != nil { - log.Printf("error updating github status: %s\n", err.Error()) - } - }() - - buildErr := builder.Run() - - b.Build.Finished = time.Now().UTC() - b.Commit.Finished = time.Now().UTC() - b.Build.Duration = b.Build.Finished.UnixNano() - b.Build.Started.UnixNano() - b.Commit.Duration = b.Build.Finished.UnixNano() - b.Build.Started.UnixNano() - b.Commit.Status = "Success" - b.Build.Status = "Success" - b.Build.Stdout = buf.buf.String() - - // if exit code != 0 set to failure - if builder.BuildState == nil || builder.BuildState.ExitCode != 0 { - b.Commit.Status = "Failure" - b.Build.Status = "Failure" - if buildErr != nil && b.Build.Stdout == "" { - // TODO: If you wanted to have very friendly error messages, you could do that here - b.Build.Stdout = buildErr.Error() + "\n" - } - } - - // persist the build to the database - if err := database.SaveBuild(b.Build); err != nil { - return err - } - - // persist the commit to the database - if err := database.SaveCommit(b.Commit); err != nil { - return err - } - - // notify the channels that the commit and build finished - channel.SendJSON(reposlug, b.Commit) - channel.SendJSON(commitslug, b.Build) - channel.Close(consoleslug) - - // send all "finished" notifications - if b.Script.Notifications != nil { - b.Script.Notifications.Send(context) - } - - return nil + return queue } -// updateGitHubStatus is a helper function that will send -// the build status to GitHub using the Status API. -// see https://github.com/blog/1227-commit-status-api -func updateGitHubStatus(repo *Repo, commit *Commit) error { - - // convert from drone status to github status - var message, status string - switch commit.Status { - case "Success": - status = "success" - message = "The build succeeded on drone.io" - case "Failure": - status = "failure" - message = "The build failed on drone.io" - case "Started": - status = "pending" - message = "The build is pending on drone.io" - default: - status = "error" - message = "The build errored on drone.io" - } - - // get the system settings - settings, _ := database.GetSettings() - - // get the user from the database - // since we need his / her GitHub token - user, err := database.GetUser(repo.UserID) - if err != nil { - return err - } - - client := github.New(user.GithubToken) - client.ApiUrl = settings.GitHubApiUrl - - var url string - url = settings.URL().String() + "/" + repo.Slug + "/commit/" + commit.Hash - - return client.Repos.CreateStatus(repo.Owner, repo.Name, status, url, message, commit.Hash) -} - -type bufferWrapper struct { - buf bytes.Buffer - - // name of the channel - channel string -} - -func (b *bufferWrapper) Write(p []byte) (n int, err error) { - n, err = b.buf.Write(p) - channel.SendBytes(b.channel, p) - return +// Add adds the task to the build queue. +func (q *Queue) Add(task *BuildTask) { + q.tasks <- task } diff --git a/pkg/queue/worker.go b/pkg/queue/worker.go new file mode 100644 index 00000000..e2cf8f6d --- /dev/null +++ b/pkg/queue/worker.go @@ -0,0 +1,234 @@ +package queue + +import ( + "bytes" + "fmt" + "github.com/drone/drone/pkg/build" + "github.com/drone/drone/pkg/build/git" + r "github.com/drone/drone/pkg/build/repo" + "github.com/drone/drone/pkg/channel" + "github.com/drone/drone/pkg/database" + . "github.com/drone/drone/pkg/model" + "github.com/drone/drone/pkg/plugin/notify" + "github.com/drone/go-github/github" + "io" + "log" + "path/filepath" + "time" +) + +type worker struct{} + +// work is a function that will infinitely +// run in the background waiting for tasks that +// it can pull off the queue and execute. +func (w *worker) work(queue <-chan *BuildTask) { + var task *BuildTask + for { + // get work item (pointer) from the queue + task = <-queue + if task == nil { + continue + } + + // execute the task + w.execute(task) + } +} + +// execute will execute the build task and persist +// the results to the datastore. +func (w *worker) execute(task *BuildTask) error { + // we need to be sure that we can recover + // from any sort panic that could occur + // to avoid brining down the entire application + defer func() { + if e := recover(); e != nil { + task.Build.Finished = time.Now().UTC() + task.Commit.Finished = time.Now().UTC() + task.Build.Duration = task.Build.Finished.Unix() - task.Build.Started.Unix() + task.Commit.Duration = task.Build.Finished.Unix() - task.Build.Started.Unix() + task.Commit.Status = "Error" + task.Build.Status = "Error" + database.SaveBuild(task.Build) + database.SaveCommit(task.Commit) + } + }() + + // update commit and build status + task.Commit.Status = "Started" + task.Build.Status = "Started" + task.Build.Started = time.Now().UTC() + task.Commit.Started = time.Now().UTC() + + // persist the commit to the database + if err := database.SaveCommit(task.Commit); err != nil { + return err + } + + // persist the build to the database + if err := database.SaveBuild(task.Build); err != nil { + return err + } + + // get settings + settings, _ := database.GetSettings() + + // notification context + context := ¬ify.Context{ + Repo: task.Repo, + Commit: task.Commit, + Host: settings.URL().String(), + } + + // send all "started" notifications + if task.Script.Notifications != nil { + task.Script.Notifications.Send(context) + } + + // Send "started" notification to Github + if err := updateGitHubStatus(task.Repo, task.Commit); err != nil { + log.Printf("error updating github status: %s\n", err.Error()) + } + + // make sure a channel exists for the repository, + // the commit, and the commit output (TODO) + reposlug := fmt.Sprintf("%s/%s/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name) + commitslug := fmt.Sprintf("%s/%s/%s/commit/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name, task.Commit.Hash) + consoleslug := fmt.Sprintf("%s/%s/%s/commit/%s/builds/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name, task.Commit.Hash, task.Build.Slug) + channel.Create(reposlug) + channel.Create(commitslug) + channel.CreateStream(consoleslug) + + // notify the channels that the commit and build started + channel.SendJSON(reposlug, task.Commit) + channel.SendJSON(commitslug, task.Build) + + var buf = &bufferWrapper{channel: consoleslug} + + // append private parameters to the environment + // variable section of the .drone.yml file + if task.Repo.Params != nil { + for k, v := range task.Repo.Params { + task.Script.Env = append(task.Script.Env, k+"="+v) + } + } + + defer func() { + // update the status of the commit using the + // GitHub status API. + if err := updateGitHubStatus(task.Repo, task.Commit); err != nil { + log.Printf("error updating github status: %s\n", err.Error()) + } + }() + + // execute the build + passed, buildErr := runBuild(task, buf) //w.builder.Build(script, repo, task.Repo.PrivateKey, buf) + + task.Build.Finished = time.Now().UTC() + task.Commit.Finished = time.Now().UTC() + task.Build.Duration = task.Build.Finished.UnixNano() - task.Build.Started.UnixNano() + task.Commit.Duration = task.Build.Finished.UnixNano() - task.Build.Started.UnixNano() + task.Commit.Status = "Success" + task.Build.Status = "Success" + task.Build.Stdout = buf.buf.String() + + // if exit code != 0 set to failure + if passed { + task.Commit.Status = "Failure" + task.Build.Status = "Failure" + if buildErr != nil && task.Build.Stdout == "" { + // TODO: If you wanted to have very friendly error messages, you could do that here + task.Build.Stdout = buildErr.Error() + "\n" + } + } + + // persist the build to the database + if err := database.SaveBuild(task.Build); err != nil { + return err + } + + // persist the commit to the database + if err := database.SaveCommit(task.Commit); err != nil { + return err + } + + // notify the channels that the commit and build finished + channel.SendJSON(reposlug, task.Commit) + channel.SendJSON(commitslug, task.Build) + channel.Close(consoleslug) + + // send all "finished" notifications + if task.Script.Notifications != nil { + task.Script.Notifications.Send(context) + } + + return nil +} + +func runBuild(b *BuildTask, buf io.Writer) (bool, error) { + builder := build.New() + builder.Build = b.Script + builder.Repo = &r.Repo{Path: b.Repo.URL, Branch: b.Commit.Branch, Commit: b.Commit.Hash, PR: b.Commit.PullRequest, Dir: filepath.Join("/var/cache/drone/src", b.Repo.Slug), Depth: git.GitDepth(b.Script.Git)} + builder.Key = []byte(b.Repo.PrivateKey) + builder.Stdout = buf + builder.Timeout = 300 * time.Minute + + buildErr := builder.Run() + + return builder.BuildState == nil || builder.BuildState.ExitCode != 0, buildErr +} + +// updateGitHubStatus is a helper function that will send +// the build status to GitHub using the Status API. +// see https://github.com/blog/1227-commit-status-api +func updateGitHubStatus(repo *Repo, commit *Commit) error { + + // convert from drone status to github status + var message, status string + switch commit.Status { + case "Success": + status = "success" + message = "The build succeeded on drone.io" + case "Failure": + status = "failure" + message = "The build failed on drone.io" + case "Started": + status = "pending" + message = "The build is pending on drone.io" + default: + status = "error" + message = "The build errored on drone.io" + } + + // get the system settings + settings, _ := database.GetSettings() + + // get the user from the database + // since we need his / her GitHub token + user, err := database.GetUser(repo.UserID) + if err != nil { + return err + } + + client := github.New(user.GithubToken) + client.ApiUrl = settings.GitHubApiUrl + + var url string + url = settings.URL().String() + "/" + repo.Slug + "/commit/" + commit.Hash + + return client.Repos.CreateStatus(repo.Owner, repo.Name, status, url, message, commit.Hash) +} + +type bufferWrapper struct { + buf bytes.Buffer + + // name of the channel + channel string +} + +func (b *bufferWrapper) Write(p []byte) (n int, err error) { + n, err = b.buf.Write(p) + channel.SendBytes(b.channel, p) + return +}