added code for remote queue plugin
This commit is contained in:
parent
b16759b0b6
commit
d2776bc990
7 changed files with 558 additions and 0 deletions
81
queue/builtin/queue.go
Normal file
81
queue/builtin/queue.go
Normal file
|
@ -0,0 +1,81 @@
|
|||
package builtin
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/drone/drone/queue"
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
sync.Mutex
|
||||
|
||||
acks map[*queue.Work]struct{}
|
||||
items map[*queue.Work]struct{}
|
||||
itemc chan *queue.Work
|
||||
}
|
||||
|
||||
func New() *Queue {
|
||||
return &Queue{
|
||||
acks: make(map[*queue.Work]struct{}),
|
||||
items: make(map[*queue.Work]struct{}),
|
||||
itemc: make(chan *queue.Work, 999),
|
||||
}
|
||||
}
|
||||
|
||||
// Publish inserts work at the tail of this queue, waiting for
|
||||
// space to become available if the queue is full.
|
||||
func (q *Queue) Publish(work *queue.Work) error {
|
||||
q.Lock()
|
||||
q.items[work] = struct{}{}
|
||||
q.Unlock()
|
||||
q.itemc <- work
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove removes the specified work item from this queue,
|
||||
// if it is present.
|
||||
func (q *Queue) Remove(work *queue.Work) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pull retrieves and removes the head of this queue, waiting
|
||||
// if necessary until work becomes available.
|
||||
func (q *Queue) Pull() *queue.Work {
|
||||
work := <-q.itemc
|
||||
q.Lock()
|
||||
delete(q.items, work)
|
||||
q.Unlock()
|
||||
return work
|
||||
}
|
||||
|
||||
// PullAct retrieves and removes the head of this queue, waiting
|
||||
// if necessary until work becomes available. Items pull from the
|
||||
// queue that aren't acknowledged will be pushed back to the queue
|
||||
// again when the default acknowledgement deadline is reached.
|
||||
func (q *Queue) PullAck() *queue.Work {
|
||||
work := q.Pull()
|
||||
q.Lock()
|
||||
q.acks[work] = struct{}{}
|
||||
q.Unlock()
|
||||
return work
|
||||
}
|
||||
|
||||
// Ack acknowledges an item in the queue was processed.
|
||||
func (q *Queue) Ack(work *queue.Work) error {
|
||||
q.Lock()
|
||||
delete(q.acks, work)
|
||||
q.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Items returns a slice containing all of the work in this
|
||||
// queue, in proper sequence.
|
||||
func (q *Queue) Items() []*queue.Work {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
items := []*queue.Work{}
|
||||
for work, _ := range q.items {
|
||||
items = append(items, work)
|
||||
}
|
||||
return items
|
||||
}
|
109
queue/plugin/client.go
Normal file
109
queue/plugin/client.go
Normal file
|
@ -0,0 +1,109 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/drone/drone/queue"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
url string
|
||||
token string
|
||||
}
|
||||
|
||||
func New(url, token string) *Client {
|
||||
return &Client{url, token}
|
||||
}
|
||||
|
||||
// Publish makes an http request to the remote queue
|
||||
// to insert work at the tail.
|
||||
func (c *Client) Publish(work *queue.Work) error {
|
||||
return c.send("POST", "/queue", work, nil)
|
||||
}
|
||||
|
||||
// Remove makes an http request to the remote queue to
|
||||
// remove the specified work item.
|
||||
func (c *Client) Remove(work *queue.Work) error {
|
||||
return c.send("DELETE", "/queue", work, nil)
|
||||
}
|
||||
|
||||
// Pull makes an http request to the remote queue to
|
||||
// retrieve work. This initiates a long poll and will
|
||||
// block until complete.
|
||||
func (c *Client) Pull() *queue.Work {
|
||||
out := &queue.Work{}
|
||||
err := c.send("POST", "/queue/pull", nil, out)
|
||||
if err != nil {
|
||||
// TODO handle error
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Pull makes an http request to the remote queue to
|
||||
// retrieve work, with an acknowldement required.
|
||||
// This initiates a long poll and will block until
|
||||
// complete.
|
||||
func (c *Client) PullAck() *queue.Work {
|
||||
out := &queue.Work{}
|
||||
err := c.send("POST", "/queue/pull?ack=true", nil, out)
|
||||
if err != nil {
|
||||
// TODO handle error
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Ack makes an http request to the remote queue
|
||||
// to acknowledge an item in the queue was processed.
|
||||
func (c *Client) Ack(work *queue.Work) error {
|
||||
return c.send("POST", "/queue/ack", nil, nil)
|
||||
}
|
||||
|
||||
// Items makes an http request to the remote queue
|
||||
// to fetch a list of all work.
|
||||
func (c *Client) Items() []*queue.Work {
|
||||
out := []*queue.Work{}
|
||||
err := c.send("GET", "/queue/items", nil, &out)
|
||||
if err != nil {
|
||||
// TODO handle error
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// send is a helper function that makes an authenticated
|
||||
// request to the remote http plugin.
|
||||
func (c *Client) send(method, path string, in interface{}, out interface{}) error {
|
||||
url_, err := url.Parse(c.url + path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var buf io.ReadWriter
|
||||
if in != nil {
|
||||
buf = new(bytes.Buffer)
|
||||
err := json.NewEncoder(buf).Encode(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(method, url_.String(), buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Authorization", "Bearer "+c.token)
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if out == nil {
|
||||
return nil
|
||||
}
|
||||
return json.NewDecoder(resp.Body).Decode(out)
|
||||
}
|
116
queue/plugin/server.go
Normal file
116
queue/plugin/server.go
Normal file
|
@ -0,0 +1,116 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Handle returns an http.Handler that enables a remote
|
||||
// client to interop with a Queue over http.
|
||||
func Handle(queue queue.Queue, token string) http.Handler {
|
||||
r := gin.New()
|
||||
|
||||
// middleware to validate the authorization token
|
||||
// and to inject the queue into the http context.
|
||||
bearer := "Bearer " + token
|
||||
r.Use(func(c *gin.Context) {
|
||||
if c.Request.Header.Get("Authorization") != bearer {
|
||||
c.AbortWithStatus(403)
|
||||
return
|
||||
}
|
||||
c.Set("queue", queue)
|
||||
c.Next()
|
||||
})
|
||||
|
||||
r.POST("/queue", publish)
|
||||
r.DELETE("/queue", remove)
|
||||
r.POST("/queue/pull", pull)
|
||||
r.POST("/queue/ack", ack)
|
||||
r.POST("/queue/items", items)
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// publish handles an http request to the queue
|
||||
// to insert work at the tail.
|
||||
func publish(c *gin.Context) {
|
||||
q := fromContext(c)
|
||||
work := &queue.Work{}
|
||||
if !c.Bind(work) {
|
||||
c.AbortWithStatus(400)
|
||||
return
|
||||
}
|
||||
err := q.Publish(work)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
return
|
||||
}
|
||||
c.Writer.WriteHeader(200)
|
||||
}
|
||||
|
||||
// remove handles an http request to the queue
|
||||
// to remove a work item.
|
||||
func remove(c *gin.Context) {
|
||||
q := fromContext(c)
|
||||
work := &queue.Work{}
|
||||
if !c.Bind(work) {
|
||||
c.AbortWithStatus(400)
|
||||
return
|
||||
}
|
||||
err := q.Remove(work)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
return
|
||||
}
|
||||
c.Writer.WriteHeader(200)
|
||||
}
|
||||
|
||||
// pull handles an http request to the queue
|
||||
// to retrieve work.
|
||||
func pull(c *gin.Context) {
|
||||
q := fromContext(c)
|
||||
var work *queue.Work
|
||||
if c.Request.FormValue("ack") != "" {
|
||||
work = q.PullAck()
|
||||
} else {
|
||||
work = q.Pull()
|
||||
}
|
||||
if work == nil {
|
||||
c.AbortWithStatus(500)
|
||||
return
|
||||
}
|
||||
c.JSON(200, work)
|
||||
}
|
||||
|
||||
// ack handles an http request to the queue
|
||||
// to confirm an item was successfully pulled.
|
||||
func ack(c *gin.Context) {
|
||||
q := fromContext(c)
|
||||
work := &queue.Work{}
|
||||
if !c.Bind(work) {
|
||||
c.AbortWithStatus(400)
|
||||
return
|
||||
}
|
||||
err := q.Ack(work)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
return
|
||||
}
|
||||
c.Writer.WriteHeader(200)
|
||||
}
|
||||
|
||||
// items handles an http request to the queue to
|
||||
// return a list of all work items.
|
||||
func items(c *gin.Context) {
|
||||
q := fromContext(c)
|
||||
items := q.Items()
|
||||
c.JSON(200, items)
|
||||
}
|
||||
|
||||
// helper function to retrieve the Queue from
|
||||
// the context and cast appropriately.
|
||||
func fromContext(c *gin.Context) queue.Queue {
|
||||
return c.MustGet("queue").(queue.Queue)
|
||||
}
|
55
queue/queue.go
Normal file
55
queue/queue.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package queue
|
||||
|
||||
type Queue interface {
|
||||
// Publish inserts work at the tail of this queue, waiting for
|
||||
// space to become available if the queue is full.
|
||||
Publish(*Work) error
|
||||
|
||||
// Remove removes the specified work item from this queue,
|
||||
// if it is present.
|
||||
Remove(*Work) error
|
||||
|
||||
// Pull retrieves and removes the head of this queue, waiting
|
||||
// if necessary until work becomes available.
|
||||
Pull() *Work
|
||||
|
||||
// PullAck retrieves and removes the head of this queue, waiting
|
||||
// if necessary until work becomes available. Items pull from the
|
||||
// queue that aren't acknowledged will be pushed back to the queue
|
||||
// again when the default acknowledgement deadline is reached.
|
||||
PullAck() *Work
|
||||
|
||||
// Ack acknowledges an item in the queue was processed.
|
||||
Ack(*Work) error
|
||||
|
||||
// Items returns a slice containing all of the work in this
|
||||
// queue, in proper sequence.
|
||||
Items() []*Work
|
||||
}
|
||||
|
||||
// type Manager interface {
|
||||
// // Register registers a worker that has signed
|
||||
// // up to accept work.
|
||||
// Register(*Worker)
|
||||
|
||||
// // Unregister unregisters a worker that should no
|
||||
// // longer be accepting work.
|
||||
// Unregister(*Worker)
|
||||
|
||||
// // Assign assigns work to a worker.
|
||||
// Assign(*Work, *Worker)
|
||||
|
||||
// // Unassign unassigns work from a worker.
|
||||
// Unassign(*Work, *Worker)
|
||||
|
||||
// // Work returns a list of all work that is
|
||||
// // currently in progress.
|
||||
// Work() []*Work
|
||||
|
||||
// // Worker retrieves a worker by name.
|
||||
// Worker(string) *Worker
|
||||
|
||||
// // Workers returns a slice containing all workers
|
||||
// // registered with the manager.
|
||||
// Workers() []*Worker
|
||||
// }
|
73
queue/worker.go
Normal file
73
queue/worker.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/drone/drone/common"
|
||||
)
|
||||
|
||||
// Work represents an item for work to be
|
||||
// processed by a worker.
|
||||
type Work struct {
|
||||
User *common.User `json:"user"`
|
||||
Repo *common.Repo `json:"repo"`
|
||||
Build *common.Build `json:"build"`
|
||||
Keys *common.Keypair `json:"keypair"`
|
||||
Netrc *common.Netrc `json:"netrc"`
|
||||
Yaml []byte `json:"yaml"`
|
||||
}
|
||||
|
||||
// represents a worker that has connected
|
||||
// to the system in order to perform work
|
||||
type Worker struct {
|
||||
Name string
|
||||
Addr string
|
||||
IsHealthy bool
|
||||
}
|
||||
|
||||
// Ping pings to worker to verify it is
|
||||
// available and in good health.
|
||||
func (w *Worker) Ping() (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Logs fetches the logs for a work item.
|
||||
func (w *Worker) Logs() (io.Reader, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Cancel cancels a work item.
|
||||
func (w *Worker) Cancel() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// type Monitor struct {
|
||||
// manager *Manager
|
||||
// }
|
||||
|
||||
// func NewMonitor(manager *Manager) *Monitor {
|
||||
// return &Monitor{manager}
|
||||
// }
|
||||
|
||||
// // start is a helper function that is used to monitor
|
||||
// // all registered workers and ensure they are in a
|
||||
// // healthy state.
|
||||
// func (m *Monitor) Start() {
|
||||
// ticker := time.NewTicker(1 * time.Hour)
|
||||
// go func() {
|
||||
// for {
|
||||
// select {
|
||||
// case <-ticker.C:
|
||||
// workers := m.manager.Workers()
|
||||
// for _, worker := range workers {
|
||||
// // ping the worker to make sure it is
|
||||
// // available and still accepting builds.
|
||||
// if _, err := worker.Ping(); err != nil {
|
||||
// m.manager.SetHealth(worker, false)
|
||||
// } else {
|
||||
// m.manager.SetHealth(worker, true)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
108
server/queue.go
Normal file
108
server/queue.go
Normal file
|
@ -0,0 +1,108 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strconv"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gin-gonic/gin/binding"
|
||||
|
||||
"github.com/drone/drone/common"
|
||||
"github.com/drone/drone/eventbus"
|
||||
)
|
||||
|
||||
// TODO (bradrydzewski) the callback URL should be signed.
|
||||
// TODO (bradrydzewski) we shouldn't need to fetch the Repo if specified in the URL path
|
||||
// TODO (bradrydzewski) use SetRepoLast to update the last repository
|
||||
|
||||
// GET /queue/pull
|
||||
func PollBuild(c *gin.Context) {
|
||||
queue := ToQueue(c)
|
||||
work := queue.PullAck()
|
||||
c.JSON(200, work)
|
||||
}
|
||||
|
||||
// GET /queue/push/:owner/:repo
|
||||
func PushBuild(c *gin.Context) {
|
||||
store := ToDatastore(c)
|
||||
repo := ToRepo(c)
|
||||
bus := ToBus(c)
|
||||
in := &common.Build{}
|
||||
if !c.BindWith(in, binding.JSON) {
|
||||
return
|
||||
}
|
||||
build, err := store.Build(repo.FullName, in.Number)
|
||||
if err != nil {
|
||||
c.Fail(404, err)
|
||||
return
|
||||
}
|
||||
err = store.SetBuildState(repo.FullName, build)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
return
|
||||
}
|
||||
|
||||
bus.Send(&eventbus.Event{
|
||||
Build: build,
|
||||
Repo: repo,
|
||||
})
|
||||
if repo.Last != nil && repo.Last.Number > build.Number {
|
||||
c.Writer.WriteHeader(200)
|
||||
return
|
||||
}
|
||||
repo.Last = build
|
||||
store.SetRepo(repo)
|
||||
c.Writer.WriteHeader(200)
|
||||
}
|
||||
|
||||
// POST /queue/push/:owner/:repo/:build
|
||||
func PushTask(c *gin.Context) {
|
||||
store := ToDatastore(c)
|
||||
repo := ToRepo(c)
|
||||
bus := ToBus(c)
|
||||
num, _ := strconv.Atoi(c.Params.ByName("build"))
|
||||
in := &common.Task{}
|
||||
if !c.BindWith(in, binding.JSON) {
|
||||
return
|
||||
}
|
||||
err := store.SetBuildTask(repo.FullName, num, in)
|
||||
if err != nil {
|
||||
c.Fail(404, err)
|
||||
return
|
||||
}
|
||||
build, err := store.Build(repo.FullName, num)
|
||||
if err != nil {
|
||||
c.Fail(404, err)
|
||||
return
|
||||
}
|
||||
bus.Send(&eventbus.Event{
|
||||
Build: build,
|
||||
Repo: repo,
|
||||
})
|
||||
c.Writer.WriteHeader(200)
|
||||
}
|
||||
|
||||
// POST /queue/push/:owner/:repo/:build/:task/logs
|
||||
func PushLogs(c *gin.Context) {
|
||||
store := ToDatastore(c)
|
||||
repo := ToRepo(c)
|
||||
bnum, _ := strconv.Atoi(c.Params.ByName("build"))
|
||||
tnum, _ := strconv.Atoi(c.Params.ByName("task"))
|
||||
|
||||
// TODO (bradrydzewski) change this interface to accept an io.Reader
|
||||
// instead of a byte array so that we can buffer the write and so that
|
||||
// we avoid unnecessary copies of the data in memory.
|
||||
logs, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, 5000000)) //5MB
|
||||
defer c.Request.Body.Close()
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
return
|
||||
}
|
||||
err = store.SetLogs(repo.FullName, bnum, tnum, logs)
|
||||
if err != nil {
|
||||
c.Fail(500, err)
|
||||
return
|
||||
}
|
||||
c.Writer.WriteHeader(200)
|
||||
}
|
|
@ -9,11 +9,27 @@ import (
|
|||
"github.com/drone/drone/common"
|
||||
"github.com/drone/drone/datastore"
|
||||
"github.com/drone/drone/eventbus"
|
||||
"github.com/drone/drone/queue"
|
||||
"github.com/drone/drone/remote"
|
||||
"github.com/drone/drone/server/session"
|
||||
"github.com/drone/drone/settings"
|
||||
)
|
||||
|
||||
func SetQueue(q queue.Queue) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
c.Set("queue", q)
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func ToQueue(c *gin.Context) queue.Queue {
|
||||
v, err := c.Get("queue")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return v.(queue.Queue)
|
||||
}
|
||||
|
||||
func SetBus(r eventbus.Bus) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
c.Set("eventbus", r)
|
||||
|
|
Loading…
Reference in a new issue