harness-drone/store/cron/cron.go

245 lines
5.1 KiB
Go
Raw Permalink Normal View History

2019-02-19 23:56:41 +00:00
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Drone Non-Commercial License
// that can be found in the LICENSE file.
2019-02-28 07:07:13 +00:00
// +build !oss
2019-02-19 23:56:41 +00:00
package cron
// NewCronStore returns a new CronStore.
import (
"context"
"github.com/drone/drone/core"
2019-02-28 07:07:13 +00:00
"github.com/drone/drone/store/shared/db"
2019-02-19 23:56:41 +00:00
)
// New returns a new Cron database store.
func New(db *db.DB) core.CronStore {
return &cronStore{db}
}
type cronStore struct {
db *db.DB
}
func (s *cronStore) List(ctx context.Context, id int64) ([]*core.Cron, error) {
var out []*core.Cron
err := s.db.View(func(queryer db.Queryer, binder db.Binder) error {
params := map[string]interface{}{"cron_repo_id": id}
stmt, args, err := binder.BindNamed(queryRepo, params)
if err != nil {
return err
}
rows, err := queryer.Query(stmt, args...)
if err != nil {
return err
}
out, err = scanRows(rows)
return err
})
return out, err
}
func (s *cronStore) Ready(ctx context.Context, before int64) ([]*core.Cron, error) {
var out []*core.Cron
err := s.db.View(func(queryer db.Queryer, binder db.Binder) error {
params := map[string]interface{}{"cron_next": before}
stmt, args, err := binder.BindNamed(queryReady, params)
if err != nil {
return err
}
rows, err := queryer.Query(stmt, args...)
if err != nil {
return err
}
out, err = scanRows(rows)
return err
})
return out, err
}
func (s *cronStore) Find(ctx context.Context, id int64) (*core.Cron, error) {
out := &core.Cron{ID: id}
err := s.db.View(func(queryer db.Queryer, binder db.Binder) error {
params := toParams(out)
query, args, err := binder.BindNamed(queryKey, params)
if err != nil {
return err
}
row := queryer.QueryRow(query, args...)
return scanRow(row, out)
})
return out, err
}
func (s *cronStore) FindName(ctx context.Context, id int64, name string) (*core.Cron, error) {
out := &core.Cron{Name: name, RepoID: id}
err := s.db.View(func(queryer db.Queryer, binder db.Binder) error {
params := toParams(out)
query, args, err := binder.BindNamed(queryName, params)
if err != nil {
return err
}
row := queryer.QueryRow(query, args...)
return scanRow(row, out)
})
return out, err
}
func (s *cronStore) Create(ctx context.Context, cron *core.Cron) error {
if s.db.Driver() == db.Postgres {
return s.createPostgres(ctx, cron)
}
return s.create(ctx, cron)
}
func (s *cronStore) create(ctx context.Context, cron *core.Cron) error {
return s.db.Lock(func(execer db.Execer, binder db.Binder) error {
params := toParams(cron)
stmt, args, err := binder.BindNamed(stmtInsert, params)
if err != nil {
return err
}
res, err := execer.Exec(stmt, args...)
if err != nil {
return err
}
cron.ID, err = res.LastInsertId()
return err
})
}
func (s *cronStore) createPostgres(ctx context.Context, cron *core.Cron) error {
return s.db.Lock(func(execer db.Execer, binder db.Binder) error {
params := toParams(cron)
stmt, args, err := binder.BindNamed(stmtInsertPg, params)
if err != nil {
return err
}
return execer.QueryRow(stmt, args...).Scan(&cron.ID)
})
}
func (s *cronStore) Update(ctx context.Context, cron *core.Cron) error {
return s.db.Lock(func(execer db.Execer, binder db.Binder) error {
params := toParams(cron)
stmt, args, err := binder.BindNamed(stmtUpdate, params)
if err != nil {
return err
}
_, err = execer.Exec(stmt, args...)
return err
})
}
func (s *cronStore) Delete(ctx context.Context, cron *core.Cron) error {
return s.db.Lock(func(execer db.Execer, binder db.Binder) error {
params := toParams(cron)
stmt, args, err := binder.BindNamed(stmtDelete, params)
if err != nil {
return err
}
_, err = execer.Exec(stmt, args...)
return err
})
}
const queryBase = `
SELECT
cron_id
,cron_repo_id
,cron_name
,cron_expr
,cron_next
,cron_prev
,cron_event
,cron_branch
,cron_target
,cron_disabled
,cron_created
,cron_updated
,cron_version
`
const queryKey = queryBase + `
FROM cron
WHERE cron_id = :cron_id
LIMIT 1
`
const queryName = queryBase + `
FROM cron
WHERE cron_name = :cron_name
AND cron_repo_id = :cron_repo_id
LIMIT 1
`
const queryRepo = queryBase + `
FROM cron
WHERE cron_repo_id = :cron_repo_id
ORDER BY cron_name
`
const queryReady = queryBase + `
FROM cron
WHERE cron_next < :cron_next
ORDER BY cron_name
`
const stmtUpdate = `
UPDATE cron SET
cron_repo_id = :cron_repo_id
,cron_name = :cron_name
,cron_expr = :cron_expr
,cron_next = :cron_next
,cron_prev = :cron_prev
,cron_event = :cron_event
,cron_branch = :cron_branch
,cron_target = :cron_target
,cron_disabled = :cron_disabled
,cron_created = :cron_created
,cron_updated = :cron_updated
,cron_version = :cron_version
WHERE cron_id = :cron_id
`
const stmtDelete = `
DELETE FROM cron
WHERE cron_id = :cron_id
`
const stmtInsert = `
INSERT INTO cron (
cron_repo_id
,cron_name
,cron_expr
,cron_next
,cron_prev
,cron_event
,cron_branch
,cron_target
,cron_disabled
,cron_created
,cron_updated
,cron_version
) VALUES (
:cron_repo_id
,:cron_name
,:cron_expr
,:cron_next
,:cron_prev
,:cron_event
,:cron_branch
,:cron_target
,:cron_disabled
,:cron_created
,:cron_updated
,:cron_version
)
`
const stmtInsertPg = stmtInsert + `
RETURNING cron_id
`