restore queue on restart

This commit is contained in:
Brad Rydzewski 2017-04-14 13:32:36 +02:00
parent 30b561636f
commit e172de592f
13 changed files with 236 additions and 9 deletions

71
model/queue.go Normal file
View file

@ -0,0 +1,71 @@
package model
import (
"context"
"github.com/cncd/queue"
)
// Task defines scheduled pipeline Task.
type Task struct {
ID string `meddler:"task_id"`
Data []byte `meddler:"task_data"`
Labels map[string]string `meddler:"task_labels,json"`
}
// TaskStore defines storage for scheduled Tasks.
type TaskStore interface {
TaskList() ([]*Task, error)
TaskInsert(*Task) error
TaskDelete(string) error
}
// WithTaskStore returns a queue that is backed by the TaskStore. This
// ensures the task Queue can be restored when the system starts.
func WithTaskStore(q queue.Queue, s TaskStore) queue.Queue {
tasks, _ := s.TaskList()
for _, task := range tasks {
q.Push(context.Background(), &queue.Task{
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
})
}
return &persistentQueue{q, s}
}
type persistentQueue struct {
queue.Queue
store TaskStore
}
// Push pushes an task to the tail of this queue.
func (q *persistentQueue) Push(c context.Context, task *queue.Task) error {
err := q.Queue.Push(c, task)
if err == nil {
q.store.TaskInsert(&Task{
ID: task.ID,
Data: task.Data,
Labels: task.Labels,
})
}
return err
}
// Poll retrieves and removes a task head of this queue.
func (q *persistentQueue) Poll(c context.Context, f queue.Filter) (*queue.Task, error) {
task, err := q.Queue.Poll(c, f)
if err == nil {
q.store.TaskDelete(task.ID)
}
return task, err
}
// Evict removes a pending task from the queue.
func (q *persistentQueue) Evict(c context.Context, id string) error {
err := q.Queue.Evict(c, id)
if err == nil {
q.store.TaskDelete(id)
}
return err
}

View file

@ -6,6 +6,7 @@ import (
"github.com/cncd/logging" "github.com/cncd/logging"
"github.com/cncd/pubsub" "github.com/cncd/pubsub"
"github.com/cncd/queue" "github.com/cncd/queue"
"github.com/drone/drone/model"
"github.com/drone/drone/plugins/registry" "github.com/drone/drone/plugins/registry"
"github.com/drone/drone/plugins/secrets" "github.com/drone/drone/plugins/secrets"
"github.com/drone/drone/plugins/sender" "github.com/drone/drone/plugins/sender"
@ -28,7 +29,7 @@ func Store(cli *cli.Context) gin.HandlerFunc {
server.Config.Storage.Files = v server.Config.Storage.Files = v
// services // services
server.Config.Services.Queue = queue.New() server.Config.Services.Queue = model.WithTaskStore(queue.New(), v)
server.Config.Services.Logs = logging.New() server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = pubsub.New() server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub.Create(context.Background(), "topic/events") server.Config.Services.Pubsub.Create(context.Background(), "topic/events")

View file

@ -0,0 +1,11 @@
-- +migrate Up
CREATE TABLE tasks (
task_id VARCHAR(255) PRIMARY KEY
,task_data MEDIUMBLOB
,task_labels MEDIUMBLOB
);
-- +migrate Down
DROP TABLE tasks;

View file

@ -0,0 +1,11 @@
-- +migrate Up
CREATE TABLE tasks (
task_id VARCHAR(255) PRIMARY KEY
,task_data BYTEA
,task_labels BYTEA
);
-- +migrate Down
DROP TABLE tasks;

View file

@ -0,0 +1,11 @@
-- +migrate Up
CREATE TABLE tasks (
task_id TEXT PRIMARY KEY
,task_data BLOB
,task_labels BLOB
);
-- +migrate Down
DROP TABLE tasks;

View file

@ -0,0 +1,11 @@
-- name: task-list
SELECT
task_id
,task_data
,task_labels
FROM tasks
-- name: task-delete
DELETE FROM tasks WHERE task_id = $1

View file

@ -26,6 +26,8 @@ var index = map[string]string{
"sender-find-repo-login": senderFindRepoLogin, "sender-find-repo-login": senderFindRepoLogin,
"sender-delete-repo": senderDeleteRepo, "sender-delete-repo": senderDeleteRepo,
"sender-delete": senderDelete, "sender-delete": senderDelete,
"task-list": taskList,
"task-delete": taskDelete,
} }
var filesFindBuild = ` var filesFindBuild = `
@ -259,3 +261,15 @@ DELETE FROM senders WHERE sender_repo_id = $1
var senderDelete = ` var senderDelete = `
DELETE FROM senders WHERE sender_id = $1 DELETE FROM senders WHERE sender_id = $1
` `
var taskList = `
SELECT
task_id
,task_data
,task_labels
FROM tasks
`
var taskDelete = `
DELETE FROM tasks WHERE task_id = $1
`

View file

@ -0,0 +1,11 @@
-- name: task-list
SELECT
task_id
,task_data
,task_labels
FROM tasks
-- name: task-delete
DELETE FROM tasks WHERE task_id = ?

View file

@ -26,6 +26,8 @@ var index = map[string]string{
"sender-find-repo-login": senderFindRepoLogin, "sender-find-repo-login": senderFindRepoLogin,
"sender-delete-repo": senderDeleteRepo, "sender-delete-repo": senderDeleteRepo,
"sender-delete": senderDelete, "sender-delete": senderDelete,
"task-list": taskList,
"task-delete": taskDelete,
} }
var filesFindBuild = ` var filesFindBuild = `
@ -259,3 +261,15 @@ DELETE FROM senders WHERE sender_repo_id = ?
var senderDelete = ` var senderDelete = `
DELETE FROM senders WHERE sender_id = ? DELETE FROM senders WHERE sender_id = ?
` `
var taskList = `
SELECT
task_id
,task_data
,task_labels
FROM tasks
`
var taskDelete = `
DELETE FROM tasks WHERE task_id = ?
`

View file

@ -61,7 +61,6 @@ func open(driver, config string) *sql.DB {
logrus.Errorln(err) logrus.Errorln(err)
logrus.Fatalln("migration failed") logrus.Fatalln("migration failed")
} }
cleanupDatabase(db)
return db return db
} }
@ -126,13 +125,6 @@ func setupDatabase(driver string, db *sql.DB) error {
return err return err
} }
// helper function to avoid stuck jobs when Drone unexpectedly
// restarts. This is a temp fix for https://github.com/drone/drone/issues/1195
func cleanupDatabase(db *sql.DB) {
db.Exec("update builds set build_status = 'error' where build_status IN ('pending','running')")
db.Exec("update jobs set job_status = 'error' where job_status IN ('pending','running')")
}
// helper function to setup the meddler default driver // helper function to setup the meddler default driver
// based on the selected driver name. // based on the selected driver name.
func setupMeddler(driver string) { func setupMeddler(driver string) {

24
store/datastore/task.go Normal file
View file

@ -0,0 +1,24 @@
package datastore
import (
"github.com/drone/drone/model"
"github.com/drone/drone/store/datastore/sql"
"github.com/russross/meddler"
)
func (db *datastore) TaskList() ([]*model.Task, error) {
stmt := sql.Lookup(db.driver, "task-list")
data := []*model.Task{}
err := meddler.QueryAll(db, &data, stmt)
return data, err
}
func (db *datastore) TaskInsert(task *model.Task) error {
return meddler.Insert(db, "tasks", task)
}
func (db *datastore) TaskDelete(id string) error {
stmt := sql.Lookup(db.driver, "task-delete")
_, err := db.Exec(stmt, id)
return err
}

View file

@ -0,0 +1,52 @@
package datastore
import (
"testing"
"github.com/drone/drone/model"
)
func TestTaskList(t *testing.T) {
s := newTest()
defer func() {
s.Exec("delete from tasks")
s.Close()
}()
s.TaskInsert(&model.Task{
ID: "some_random_id",
Data: []byte("foo"),
Labels: map[string]string{"foo": "bar"},
})
list, err := s.TaskList()
if err != nil {
t.Error(err)
return
}
if got, want := len(list), 1; got != want {
t.Errorf("Want %d task, got %d", want, got)
return
}
if got, want := list[0].ID, "some_random_id"; got != want {
t.Errorf("Want task id %s, got %s", want, got)
}
if got, want := list[0].Data, "foo"; string(got) != want {
t.Errorf("Want task data %s, got %s", want, string(got))
}
err = s.TaskDelete("some_random_id")
if err != nil {
t.Error(err)
return
}
list, err = s.TaskList()
if err != nil {
t.Error(err)
return
}
if got, want := len(list), 0; got != want {
t.Errorf("Want empty task list after delete")
}
}

View file

@ -125,6 +125,10 @@ type Store interface {
FileFind(*model.Proc, string) (*model.File, error) FileFind(*model.Proc, string) (*model.File, error)
FileRead(*model.Proc, string) (io.ReadCloser, error) FileRead(*model.Proc, string) (io.ReadCloser, error)
FileCreate(*model.File, io.Reader) error FileCreate(*model.File, io.Reader) error
TaskList() ([]*model.Task, error)
TaskInsert(*model.Task) error
TaskDelete(string) error
} }
// GetUser gets a user by unique ID. // GetUser gets a user by unique ID.