diff --git a/model/queue.go b/model/queue.go new file mode 100644 index 00000000..0b527d61 --- /dev/null +++ b/model/queue.go @@ -0,0 +1,71 @@ +package model + +import ( + "context" + + "github.com/cncd/queue" +) + +// Task defines scheduled pipeline Task. +type Task struct { + ID string `meddler:"task_id"` + Data []byte `meddler:"task_data"` + Labels map[string]string `meddler:"task_labels,json"` +} + +// TaskStore defines storage for scheduled Tasks. +type TaskStore interface { + TaskList() ([]*Task, error) + TaskInsert(*Task) error + TaskDelete(string) error +} + +// WithTaskStore returns a queue that is backed by the TaskStore. This +// ensures the task Queue can be restored when the system starts. +func WithTaskStore(q queue.Queue, s TaskStore) queue.Queue { + tasks, _ := s.TaskList() + for _, task := range tasks { + q.Push(context.Background(), &queue.Task{ + ID: task.ID, + Data: task.Data, + Labels: task.Labels, + }) + } + return &persistentQueue{q, s} +} + +type persistentQueue struct { + queue.Queue + store TaskStore +} + +// Push pushes an task to the tail of this queue. +func (q *persistentQueue) Push(c context.Context, task *queue.Task) error { + err := q.Queue.Push(c, task) + if err == nil { + q.store.TaskInsert(&Task{ + ID: task.ID, + Data: task.Data, + Labels: task.Labels, + }) + } + return err +} + +// Poll retrieves and removes a task head of this queue. +func (q *persistentQueue) Poll(c context.Context, f queue.Filter) (*queue.Task, error) { + task, err := q.Queue.Poll(c, f) + if err == nil { + q.store.TaskDelete(task.ID) + } + return task, err +} + +// Evict removes a pending task from the queue. +func (q *persistentQueue) Evict(c context.Context, id string) error { + err := q.Queue.Evict(c, id) + if err == nil { + q.store.TaskDelete(id) + } + return err +} diff --git a/router/middleware/store.go b/router/middleware/store.go index 6640c347..50ff877b 100644 --- a/router/middleware/store.go +++ b/router/middleware/store.go @@ -6,6 +6,7 @@ import ( "github.com/cncd/logging" "github.com/cncd/pubsub" "github.com/cncd/queue" + "github.com/drone/drone/model" "github.com/drone/drone/plugins/registry" "github.com/drone/drone/plugins/secrets" "github.com/drone/drone/plugins/sender" @@ -28,7 +29,7 @@ func Store(cli *cli.Context) gin.HandlerFunc { server.Config.Storage.Files = v // services - server.Config.Services.Queue = queue.New() + server.Config.Services.Queue = model.WithTaskStore(queue.New(), v) server.Config.Services.Logs = logging.New() server.Config.Services.Pubsub = pubsub.New() server.Config.Services.Pubsub.Create(context.Background(), "topic/events") diff --git a/store/datastore/ddl/mysql/15.sql b/store/datastore/ddl/mysql/15.sql new file mode 100644 index 00000000..fe01a9dd --- /dev/null +++ b/store/datastore/ddl/mysql/15.sql @@ -0,0 +1,11 @@ +-- +migrate Up + +CREATE TABLE tasks ( + task_id VARCHAR(255) PRIMARY KEY +,task_data MEDIUMBLOB +,task_labels MEDIUMBLOB +); + +-- +migrate Down + +DROP TABLE tasks; diff --git a/store/datastore/ddl/postgres/15.sql b/store/datastore/ddl/postgres/15.sql new file mode 100644 index 00000000..d450430e --- /dev/null +++ b/store/datastore/ddl/postgres/15.sql @@ -0,0 +1,11 @@ +-- +migrate Up + +CREATE TABLE tasks ( + task_id VARCHAR(255) PRIMARY KEY +,task_data BYTEA +,task_labels BYTEA +); + +-- +migrate Down + +DROP TABLE tasks; diff --git a/store/datastore/ddl/sqlite3/15.sql b/store/datastore/ddl/sqlite3/15.sql new file mode 100644 index 00000000..93f5d143 --- /dev/null +++ b/store/datastore/ddl/sqlite3/15.sql @@ -0,0 +1,11 @@ +-- +migrate Up + +CREATE TABLE tasks ( + task_id TEXT PRIMARY KEY +,task_data BLOB +,task_labels BLOB +); + +-- +migrate Down + +DROP TABLE tasks; diff --git a/store/datastore/sql/postgres/files/tasks.sql b/store/datastore/sql/postgres/files/tasks.sql new file mode 100644 index 00000000..896c66e5 --- /dev/null +++ b/store/datastore/sql/postgres/files/tasks.sql @@ -0,0 +1,11 @@ +-- name: task-list + +SELECT + task_id +,task_data +,task_labels +FROM tasks + +-- name: task-delete + +DELETE FROM tasks WHERE task_id = $1 diff --git a/store/datastore/sql/postgres/sql_gen.go b/store/datastore/sql/postgres/sql_gen.go index 654f1e75..7fc85bec 100644 --- a/store/datastore/sql/postgres/sql_gen.go +++ b/store/datastore/sql/postgres/sql_gen.go @@ -26,6 +26,8 @@ var index = map[string]string{ "sender-find-repo-login": senderFindRepoLogin, "sender-delete-repo": senderDeleteRepo, "sender-delete": senderDelete, + "task-list": taskList, + "task-delete": taskDelete, } var filesFindBuild = ` @@ -259,3 +261,15 @@ DELETE FROM senders WHERE sender_repo_id = $1 var senderDelete = ` DELETE FROM senders WHERE sender_id = $1 ` + +var taskList = ` +SELECT + task_id +,task_data +,task_labels +FROM tasks +` + +var taskDelete = ` +DELETE FROM tasks WHERE task_id = $1 +` diff --git a/store/datastore/sql/sqlite/files/task.sql b/store/datastore/sql/sqlite/files/task.sql new file mode 100644 index 00000000..5b61c7c5 --- /dev/null +++ b/store/datastore/sql/sqlite/files/task.sql @@ -0,0 +1,11 @@ +-- name: task-list + +SELECT + task_id +,task_data +,task_labels +FROM tasks + +-- name: task-delete + +DELETE FROM tasks WHERE task_id = ? diff --git a/store/datastore/sql/sqlite/sql_gen.go b/store/datastore/sql/sqlite/sql_gen.go index 61698f07..2ed2de0a 100644 --- a/store/datastore/sql/sqlite/sql_gen.go +++ b/store/datastore/sql/sqlite/sql_gen.go @@ -26,6 +26,8 @@ var index = map[string]string{ "sender-find-repo-login": senderFindRepoLogin, "sender-delete-repo": senderDeleteRepo, "sender-delete": senderDelete, + "task-list": taskList, + "task-delete": taskDelete, } var filesFindBuild = ` @@ -259,3 +261,15 @@ DELETE FROM senders WHERE sender_repo_id = ? var senderDelete = ` DELETE FROM senders WHERE sender_id = ? ` + +var taskList = ` +SELECT + task_id +,task_data +,task_labels +FROM tasks +` + +var taskDelete = ` +DELETE FROM tasks WHERE task_id = ? +` diff --git a/store/datastore/store.go b/store/datastore/store.go index 28798176..4e24ced3 100644 --- a/store/datastore/store.go +++ b/store/datastore/store.go @@ -61,7 +61,6 @@ func open(driver, config string) *sql.DB { logrus.Errorln(err) logrus.Fatalln("migration failed") } - cleanupDatabase(db) return db } @@ -126,13 +125,6 @@ func setupDatabase(driver string, db *sql.DB) error { return err } -// helper function to avoid stuck jobs when Drone unexpectedly -// restarts. This is a temp fix for https://github.com/drone/drone/issues/1195 -func cleanupDatabase(db *sql.DB) { - db.Exec("update builds set build_status = 'error' where build_status IN ('pending','running')") - db.Exec("update jobs set job_status = 'error' where job_status IN ('pending','running')") -} - // helper function to setup the meddler default driver // based on the selected driver name. func setupMeddler(driver string) { diff --git a/store/datastore/task.go b/store/datastore/task.go new file mode 100644 index 00000000..d82e2380 --- /dev/null +++ b/store/datastore/task.go @@ -0,0 +1,24 @@ +package datastore + +import ( + "github.com/drone/drone/model" + "github.com/drone/drone/store/datastore/sql" + "github.com/russross/meddler" +) + +func (db *datastore) TaskList() ([]*model.Task, error) { + stmt := sql.Lookup(db.driver, "task-list") + data := []*model.Task{} + err := meddler.QueryAll(db, &data, stmt) + return data, err +} + +func (db *datastore) TaskInsert(task *model.Task) error { + return meddler.Insert(db, "tasks", task) +} + +func (db *datastore) TaskDelete(id string) error { + stmt := sql.Lookup(db.driver, "task-delete") + _, err := db.Exec(stmt, id) + return err +} diff --git a/store/datastore/task_test.go b/store/datastore/task_test.go new file mode 100644 index 00000000..a1550765 --- /dev/null +++ b/store/datastore/task_test.go @@ -0,0 +1,52 @@ +package datastore + +import ( + "testing" + + "github.com/drone/drone/model" +) + +func TestTaskList(t *testing.T) { + s := newTest() + defer func() { + s.Exec("delete from tasks") + s.Close() + }() + + s.TaskInsert(&model.Task{ + ID: "some_random_id", + Data: []byte("foo"), + Labels: map[string]string{"foo": "bar"}, + }) + + list, err := s.TaskList() + if err != nil { + t.Error(err) + return + } + if got, want := len(list), 1; got != want { + t.Errorf("Want %d task, got %d", want, got) + return + } + if got, want := list[0].ID, "some_random_id"; got != want { + t.Errorf("Want task id %s, got %s", want, got) + } + if got, want := list[0].Data, "foo"; string(got) != want { + t.Errorf("Want task data %s, got %s", want, string(got)) + } + + err = s.TaskDelete("some_random_id") + if err != nil { + t.Error(err) + return + } + + list, err = s.TaskList() + if err != nil { + t.Error(err) + return + } + if got, want := len(list), 0; got != want { + t.Errorf("Want empty task list after delete") + } +} diff --git a/store/store.go b/store/store.go index a19deceb..4ba35832 100644 --- a/store/store.go +++ b/store/store.go @@ -125,6 +125,10 @@ type Store interface { FileFind(*model.Proc, string) (*model.File, error) FileRead(*model.Proc, string) (io.ReadCloser, error) FileCreate(*model.File, io.Reader) error + + TaskList() ([]*model.Task, error) + TaskInsert(*model.Task) error + TaskDelete(string) error } // GetUser gets a user by unique ID.