solve build.go conflict
This commit is contained in:
commit
abc83303ee
8 changed files with 181 additions and 41 deletions
|
@ -212,3 +212,4 @@ func (db *DB) SetBuildTask(repo string, build int, task *common.Task) error {
|
|||
return update(t, bucketBuild, key, build_)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
15
drone.go
15
drone.go
|
@ -6,10 +6,13 @@ import (
|
|||
"github.com/gin-gonic/gin"
|
||||
|
||||
"github.com/drone/drone/datastore/bolt"
|
||||
"github.com/drone/drone/eventbus"
|
||||
"github.com/drone/drone/remote/github"
|
||||
"github.com/drone/drone/server"
|
||||
"github.com/drone/drone/server/session"
|
||||
"github.com/drone/drone/settings"
|
||||
|
||||
queue "github.com/drone/drone/queue/builtin"
|
||||
)
|
||||
|
||||
var path = flag.String("config", "drone.toml", "")
|
||||
|
@ -31,8 +34,10 @@ func main() {
|
|||
|
||||
api := r.Group("/api")
|
||||
api.Use(server.SetHeaders())
|
||||
api.Use(server.SetBus(eventbus.New()))
|
||||
api.Use(server.SetDatastore(ds))
|
||||
api.Use(server.SetRemote(remote))
|
||||
api.Use(server.SetQueue(queue.New()))
|
||||
api.Use(server.SetSettings(settings))
|
||||
api.Use(server.SetUser(session))
|
||||
|
||||
|
@ -99,6 +104,16 @@ func main() {
|
|||
hooks.POST("", server.PostHook)
|
||||
}
|
||||
|
||||
queue := api.Group("/queue")
|
||||
{
|
||||
queue.Use(server.SetRepo())
|
||||
queue.GET("", server.GetQueue)
|
||||
queue.POST("/pull", server.PollBuild)
|
||||
queue.POST("/push/:owner/:name", server.PushBuild)
|
||||
queue.POST("/push/:owner/:name/:build", server.PushTask)
|
||||
queue.POST("/push/:owner/:name/:build/:task/logs", server.PushLogs)
|
||||
}
|
||||
|
||||
auth := r.Group("/authorize")
|
||||
{
|
||||
auth.Use(server.SetHeaders())
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package builtin
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/drone/drone/queue"
|
||||
)
|
||||
|
||||
var ErrNotFound = errors.New("work item not found")
|
||||
|
||||
type Queue struct {
|
||||
sync.Mutex
|
||||
|
||||
|
@ -35,6 +38,37 @@ func (q *Queue) Publish(work *queue.Work) error {
|
|||
// Remove removes the specified work item from this queue,
|
||||
// if it is present.
|
||||
func (q *Queue) Remove(work *queue.Work) error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
_, ok := q.items[work]
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
}
|
||||
var items []*queue.Work
|
||||
|
||||
// loop through and drain all items
|
||||
// from the queue.
|
||||
drain:
|
||||
for {
|
||||
select {
|
||||
case item := <-q.itemc:
|
||||
items = append(items, item)
|
||||
default:
|
||||
break drain
|
||||
}
|
||||
}
|
||||
|
||||
// re-add all items to the queue except
|
||||
// the item we're trying to remove
|
||||
for _, item := range items {
|
||||
if item == work {
|
||||
delete(q.items, work)
|
||||
delete(q.acks, work)
|
||||
continue
|
||||
}
|
||||
q.itemc <- item
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -48,7 +82,7 @@ func (q *Queue) Pull() *queue.Work {
|
|||
return work
|
||||
}
|
||||
|
||||
// PullAct retrieves and removes the head of this queue, waiting
|
||||
// PullAck retrieves and removes the head of this queue, waiting
|
||||
// if necessary until work becomes available. Items pull from the
|
||||
// queue that aren't acknowledged will be pushed back to the queue
|
||||
// again when the default acknowledgement deadline is reached.
|
||||
|
@ -74,7 +108,7 @@ func (q *Queue) Items() []*queue.Work {
|
|||
q.Lock()
|
||||
defer q.Unlock()
|
||||
items := []*queue.Work{}
|
||||
for work, _ := range q.items {
|
||||
for work := range q.items {
|
||||
items = append(items, work)
|
||||
}
|
||||
return items
|
||||
|
|
76
queue/builtin/queue_test.go
Normal file
76
queue/builtin/queue_test.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package builtin
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/drone/drone/queue"
|
||||
. "github.com/franela/goblin"
|
||||
)
|
||||
|
||||
func TestBuild(t *testing.T) {
|
||||
g := Goblin(t)
|
||||
g.Describe("Queue", func() {
|
||||
|
||||
g.It("Should publish item", func() {
|
||||
w1 := &queue.Work{}
|
||||
w2 := &queue.Work{}
|
||||
q := New()
|
||||
q.Publish(w1)
|
||||
q.Publish(w2)
|
||||
g.Assert(len(q.items)).Equal(2)
|
||||
g.Assert(len(q.itemc)).Equal(2)
|
||||
})
|
||||
|
||||
g.It("Should remove item", func() {
|
||||
w1 := &queue.Work{}
|
||||
w2 := &queue.Work{}
|
||||
w3 := &queue.Work{}
|
||||
q := New()
|
||||
q.Publish(w1)
|
||||
q.Publish(w2)
|
||||
q.Publish(w3)
|
||||
q.Remove(w2)
|
||||
g.Assert(len(q.items)).Equal(2)
|
||||
g.Assert(len(q.itemc)).Equal(2)
|
||||
g.Assert(q.Pull()).Equal(w1)
|
||||
g.Assert(q.Pull()).Equal(w3)
|
||||
g.Assert(q.Remove(w2)).Equal(ErrNotFound)
|
||||
})
|
||||
|
||||
g.It("Should pull item", func() {
|
||||
w1 := &queue.Work{}
|
||||
w2 := &queue.Work{}
|
||||
q := New()
|
||||
q.Publish(w1)
|
||||
q.Publish(w2)
|
||||
g.Assert(q.Pull()).Equal(w1)
|
||||
g.Assert(q.Pull()).Equal(w2)
|
||||
})
|
||||
|
||||
g.It("Should pull item with ack", func() {
|
||||
w := &queue.Work{}
|
||||
q := New()
|
||||
q.Publish(w)
|
||||
g.Assert(q.PullAck()).Equal(w)
|
||||
g.Assert(q.acks[w]).Equal(struct{}{})
|
||||
})
|
||||
|
||||
g.It("Should ack item", func() {
|
||||
w := &queue.Work{}
|
||||
q := New()
|
||||
q.Publish(w)
|
||||
g.Assert(q.PullAck()).Equal(w)
|
||||
g.Assert(len(q.acks)).Equal(1)
|
||||
g.Assert(q.Ack(w)).Equal(nil)
|
||||
g.Assert(len(q.acks)).Equal(0)
|
||||
})
|
||||
|
||||
g.It("Should get all items", func() {
|
||||
q := New()
|
||||
q.Publish(&queue.Work{})
|
||||
q.Publish(&queue.Work{})
|
||||
q.Publish(&queue.Work{})
|
||||
g.Assert(len(q.Items())).Equal(3)
|
||||
})
|
||||
})
|
||||
}
|
|
@ -26,30 +26,3 @@ type Queue interface {
|
|||
// queue, in proper sequence.
|
||||
Items() []*Work
|
||||
}
|
||||
|
||||
// type Manager interface {
|
||||
// // Register registers a worker that has signed
|
||||
// // up to accept work.
|
||||
// Register(*Worker)
|
||||
|
||||
// // Unregister unregisters a worker that should no
|
||||
// // longer be accepting work.
|
||||
// Unregister(*Worker)
|
||||
|
||||
// // Assign assigns work to a worker.
|
||||
// Assign(*Work, *Worker)
|
||||
|
||||
// // Unassign unassigns work from a worker.
|
||||
// Unassign(*Work, *Worker)
|
||||
|
||||
// // Work returns a list of all work that is
|
||||
// // currently in progress.
|
||||
// Work() []*Work
|
||||
|
||||
// // Worker retrieves a worker by name.
|
||||
// Worker(string) *Worker
|
||||
|
||||
// // Workers returns a slice containing all workers
|
||||
// // registered with the manager.
|
||||
// Workers() []*Worker
|
||||
// }
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"strconv"
|
||||
|
||||
"github.com/drone/drone/common"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gin-gonic/gin/binding"
|
||||
)
|
||||
|
@ -102,6 +103,7 @@ func PostBuildStatus(c *gin.Context) {
|
|||
//
|
||||
func RunBuild(c *gin.Context) {
|
||||
store := ToDatastore(c)
|
||||
queue_ := ToQueue(c)
|
||||
repo := ToRepo(c)
|
||||
num, err := strconv.Atoi(c.Params.ByName("number"))
|
||||
if err != nil {
|
||||
|
@ -114,6 +116,18 @@ func RunBuild(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
keys, err := store.RepoKeypair(repo.FullName)
|
||||
if err != nil {
|
||||
c.Fail(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
user, err := store.User(repo.User.Login)
|
||||
if err != nil {
|
||||
c.Fail(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
// must not restart a running build
|
||||
if build.State == common.StatePending || build.State == common.StateRunning {
|
||||
c.Fail(409, err)
|
||||
|
@ -143,7 +157,14 @@ func RunBuild(c *gin.Context) {
|
|||
// raw = []byte(inject.InjectSafe(string(raw), params))
|
||||
// }
|
||||
|
||||
// TODO push build to queue
|
||||
queue_.Publish(&queue.Work{
|
||||
User: user,
|
||||
Repo: repo,
|
||||
Build: build,
|
||||
Keys: keys,
|
||||
Netrc: &common.Netrc{}, //TODO create netrc
|
||||
Yaml: nil, // TODO fetch yaml
|
||||
})
|
||||
|
||||
c.JSON(202, build)
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"github.com/drone/drone/common"
|
||||
"github.com/drone/drone/parser/inject"
|
||||
"github.com/drone/drone/parser/matrix"
|
||||
// "github.com/bradrydzewski/drone/worker"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
|
@ -19,6 +19,7 @@ import (
|
|||
func PostHook(c *gin.Context) {
|
||||
remote := ToRemote(c)
|
||||
store := ToDatastore(c)
|
||||
queue_ := ToQueue(c)
|
||||
|
||||
hook, err := remote.Hook(c.Request)
|
||||
if err != nil {
|
||||
|
@ -107,19 +108,13 @@ func PostHook(c *gin.Context) {
|
|||
Environment: axis,
|
||||
})
|
||||
}
|
||||
|
||||
err = store.SetBuild(repo.FullName, build)
|
||||
keys, err := store.RepoKeypair(repo.FullName)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
log.Errorf("failure to fetch keypair for %s. %s", repo.FullName, err)
|
||||
c.Fail(404, err)
|
||||
return
|
||||
}
|
||||
|
||||
// w := worker.Work{
|
||||
// User: user,
|
||||
// Repo: repo,
|
||||
// Build: build,
|
||||
// }
|
||||
|
||||
// verify the branches can be built vs skipped
|
||||
// s, _ := script.ParseBuild(string(yml))
|
||||
// if len(hook.PullRequest) == 0 && !s.MatchBranch(hook.Branch) {
|
||||
|
@ -127,5 +122,20 @@ func PostHook(c *gin.Context) {
|
|||
// return
|
||||
// }
|
||||
|
||||
err = store.SetBuild(repo.FullName, build)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
return
|
||||
}
|
||||
|
||||
queue_.Publish(&queue.Work{
|
||||
User: user,
|
||||
Repo: repo,
|
||||
Build: build,
|
||||
Keys: keys,
|
||||
Netrc: &common.Netrc{}, // TODO
|
||||
Yaml: raw,
|
||||
})
|
||||
|
||||
c.JSON(200, build)
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
// GET /queue/pull
|
||||
func PollBuild(c *gin.Context) {
|
||||
queue := ToQueue(c)
|
||||
work := queue.PullAck()
|
||||
work := queue.Pull()
|
||||
c.JSON(200, work)
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,10 @@ func PushBuild(c *gin.Context) {
|
|||
c.Fail(404, err)
|
||||
return
|
||||
}
|
||||
build.Duration = in.Duration
|
||||
build.Started = in.Started
|
||||
build.Finished = in.Finished
|
||||
build.State = in.State
|
||||
err = store.SetBuildState(repo.FullName, build)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
|
@ -106,3 +110,9 @@ func PushLogs(c *gin.Context) {
|
|||
}
|
||||
c.Writer.WriteHeader(200)
|
||||
}
|
||||
|
||||
func GetQueue(c *gin.Context) {
|
||||
queue := ToQueue(c)
|
||||
items := queue.Items()
|
||||
c.JSON(200, items)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue