Rework BoltDB util functions to use bolt.Tx instead of DB along with updates to existing API that leverage these to use Bolt's transaction wrapping functions.
This commit is contained in:
parent
779a6b46ab
commit
46114f1231
6 changed files with 136 additions and 96 deletions
|
@ -15,7 +15,11 @@ import (
|
|||
func (db *DB) GetBuild(repo string, build int) (*common.Build, error) {
|
||||
build_ := &common.Build{}
|
||||
key := []byte(repo + "/" + strconv.Itoa(build))
|
||||
err := get(db, bucketBuild, key, build_)
|
||||
|
||||
err := db.View(func(t *bolt.Tx) error {
|
||||
return get(t, bucketBuild, key, build_)
|
||||
})
|
||||
|
||||
return build_, err
|
||||
}
|
||||
|
||||
|
@ -41,7 +45,11 @@ func (db *DB) GetBuildLast(repo string) (*common.Build, error) {
|
|||
func (db *DB) GetBuildStatus(repo string, build int, status string) (*common.Status, error) {
|
||||
status_ := &common.Status{}
|
||||
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status)
|
||||
err := update(db, bucketBuildStatus, key, status)
|
||||
|
||||
err := db.Update(func(t *bolt.Tx) error {
|
||||
return update(t, bucketBuildStatus, key, status)
|
||||
})
|
||||
|
||||
return status_, err
|
||||
}
|
||||
|
||||
|
@ -68,49 +76,36 @@ func (db *DB) GetBuildStatusList(repo string, build int) ([]*common.Status, erro
|
|||
|
||||
// InsertBuild inserts a new build for the named repository
|
||||
func (db *DB) InsertBuild(repo string, build *common.Build) error {
|
||||
var seqno int
|
||||
|
||||
t, err := db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := []byte(repo)
|
||||
raw := t.Bucket(bucketBuildSeq).Get(key)
|
||||
if raw != nil {
|
||||
// convert our raw to an integer value
|
||||
seqno = int(binary.LittleEndian.Uint32(raw))
|
||||
}
|
||||
|
||||
// increment the seqno, if no record was found, this starts us at 1
|
||||
seqno += 1
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
raw, err := raw(t, bucketBuildSeq, key)
|
||||
|
||||
// convert our new seqno back to raw value
|
||||
raw = make([]byte, 4) // TODO: replace magic number 4 (uint32)
|
||||
binary.LittleEndian.PutUint32(raw, uint32(seqno))
|
||||
err = t.Bucket(bucketBuildSeq).Put(key, raw)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
return err
|
||||
}
|
||||
var next_seq uint32
|
||||
switch err {
|
||||
case ErrKeyNotFound:
|
||||
next_seq = 1
|
||||
case nil:
|
||||
next_seq = 1 + binary.LittleEndian.Uint32(raw)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
||||
// fill out build structure
|
||||
build.Number = seqno
|
||||
build.Created = time.Now().UTC().Unix()
|
||||
// covert our seqno to raw value
|
||||
raw = make([]byte, 4) // TODO(benschumacher) replace magic number 4 (uint32)
|
||||
binary.LittleEndian.PutUint32(raw, next_seq)
|
||||
err = t.Bucket(bucketBuildSeq).Put(key, raw)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key = []byte(repo + "/" + strconv.Itoa(build.Number))
|
||||
raw, err = encode(build)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
return err
|
||||
}
|
||||
// fill out the build structure
|
||||
build.Number = int(next_seq)
|
||||
build.Created = time.Now().UTC().Unix()
|
||||
|
||||
err = t.Bucket(bucketBuild).Put(key, raw)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
return t.Commit()
|
||||
key = []byte(repo + "/" + strconv.Itoa(build.Number))
|
||||
return insert(t, bucketBuild, key, build)
|
||||
})
|
||||
}
|
||||
|
||||
// InsertBuildStatus inserts a new build status for the
|
||||
|
@ -118,7 +113,10 @@ func (db *DB) InsertBuild(repo string, build *common.Build) error {
|
|||
// exists an error is returned.
|
||||
func (db *DB) InsertBuildStatus(repo string, build int, status *common.Status) error {
|
||||
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status.Context)
|
||||
return update(db, bucketBuildStatus, key, status)
|
||||
|
||||
return db.Update(func(t *bolt.Tx) error {
|
||||
return update(t, bucketBuildStatus, key, status)
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateBuild updates an existing build for the named
|
||||
|
@ -127,5 +125,8 @@ func (db *DB) InsertBuildStatus(repo string, build int, status *common.Status) e
|
|||
func (db *DB) UpdateBuild(repo string, build *common.Build) error {
|
||||
key := []byte(repo + "/" + strconv.Itoa(build.Number))
|
||||
build.Updated = time.Now().UTC().Unix()
|
||||
return update(db, bucketBuild, key, build)
|
||||
|
||||
return db.Update(func(t *bolt.Tx) error {
|
||||
return update(t, bucketBuild, key, build)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,13 +4,18 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/drone/drone/common"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
// GetRepo gets the repository by name.
|
||||
func (db *DB) GetRepo(repo string) (*common.Repo, error) {
|
||||
repo_ := &common.Repo{}
|
||||
key := []byte(repo)
|
||||
err := get(db, bucketRepo, key, repo_)
|
||||
|
||||
err := db.View(func (t *bolt.Tx) error {
|
||||
return get(t, bucketRepo, key, repo_)
|
||||
})
|
||||
|
||||
return repo_, err
|
||||
}
|
||||
|
||||
|
@ -19,7 +24,11 @@ func (db *DB) GetRepo(repo string) (*common.Repo, error) {
|
|||
func (db *DB) GetRepoParams(repo string) (map[string]string, error) {
|
||||
params := map[string]string{}
|
||||
key := []byte(repo)
|
||||
err := get(db, bucketRepoParams, key, ¶ms)
|
||||
|
||||
err := db.View(func (t *bolt.Tx) error {
|
||||
return get(t, bucketRepoParams, key, ¶ms)
|
||||
})
|
||||
|
||||
return params, err
|
||||
}
|
||||
|
||||
|
@ -28,7 +37,11 @@ func (db *DB) GetRepoParams(repo string) (map[string]string, error) {
|
|||
func (db *DB) GetRepoKeys(repo string) (*common.Keypair, error) {
|
||||
keypair := &common.Keypair{}
|
||||
key := []byte(repo)
|
||||
err := get(db, bucketRepoKeys, key, keypair)
|
||||
|
||||
err := db.View(func (t *bolt.Tx) error {
|
||||
return get(t, bucketRepoKeys, key, keypair)
|
||||
})
|
||||
|
||||
return keypair, err
|
||||
}
|
||||
|
||||
|
@ -37,7 +50,10 @@ func (db *DB) GetRepoKeys(repo string) (*common.Keypair, error) {
|
|||
func (db *DB) UpdateRepo(repo *common.Repo) error {
|
||||
key := []byte(repo.FullName)
|
||||
repo.Updated = time.Now().UTC().Unix()
|
||||
return update(db, bucketRepo, key, repo)
|
||||
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return update(t, bucketRepo, key, repo)
|
||||
})
|
||||
}
|
||||
|
||||
// InsertRepo inserts a repository in the datastore and
|
||||
|
@ -46,27 +62,38 @@ func (db *DB) InsertRepo(user *common.User, repo *common.Repo) error {
|
|||
key := []byte(repo.FullName)
|
||||
repo.Created = time.Now().UTC().Unix()
|
||||
repo.Updated = time.Now().UTC().Unix()
|
||||
// TODO(bradrydzewski) add repo to user index
|
||||
// TODO(bradrydzewski) add user to repo index
|
||||
return insert(db, bucketRepo, key, repo)
|
||||
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
// TODO(bradrydzewski) add repo to user index
|
||||
// TODO(bradrydzewski) add user to repo index
|
||||
return insert(t, bucketRepo, key, repo)
|
||||
})
|
||||
}
|
||||
|
||||
// UpsertRepoParams inserts or updates the private
|
||||
// environment parameters for the named repository.
|
||||
func (db *DB) UpsertRepoParams(repo string, params map[string]string) error {
|
||||
key := []byte(repo)
|
||||
return update(db, bucketRepoParams, key, params)
|
||||
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return update(t, bucketRepoParams, key, params)
|
||||
})
|
||||
}
|
||||
|
||||
// UpsertRepoKeys inserts or updates the private and
|
||||
// public keypair for the named repository.
|
||||
func (db *DB) UpsertRepoKeys(repo string, keypair *common.Keypair) error {
|
||||
key := []byte(repo)
|
||||
return update(db, bucketRepoKeys, key, keypair)
|
||||
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return update(t, bucketRepoKeys, key, keypair)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteRepo deletes the repository.
|
||||
func (db *DB) DeleteRepo(repo *common.Repo) error {
|
||||
//TODO(benschumacher) rework this to use BoltDB's txn wrapper
|
||||
|
||||
t, err := db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"strconv"
|
||||
|
||||
"github.com/drone/drone/common"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
// GetTask gets the task at index N for the named
|
||||
|
@ -11,7 +12,9 @@ import (
|
|||
func (db *DB) GetTask(repo string, build int, task int) (*common.Task, error) {
|
||||
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task))
|
||||
task_ := &common.Task{}
|
||||
err := get(db, bucketBuildTasks, key, task_)
|
||||
err := db.View(func (t *bolt.Tx) error {
|
||||
return get(t, bucketBuildTasks, key, task_)
|
||||
})
|
||||
return task_, err
|
||||
}
|
||||
|
||||
|
@ -19,7 +22,14 @@ func (db *DB) GetTask(repo string, build int, task int) (*common.Task, error) {
|
|||
// the named repository and build number.
|
||||
func (db *DB) GetTaskLogs(repo string, build int, task int) ([]byte, error) {
|
||||
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task))
|
||||
log, err := raw(db, bucketBuildLogs, key)
|
||||
|
||||
var log []byte
|
||||
err := db.View(func (t *bolt.Tx) error {
|
||||
var err error
|
||||
log, err = raw(t, bucketBuildLogs, key)
|
||||
return err
|
||||
})
|
||||
|
||||
return log, err
|
||||
}
|
||||
|
||||
|
@ -62,7 +72,9 @@ func (db *DB) GetTaskList(repo string, build int) ([]*common.Task, error) {
|
|||
// repository and build number.
|
||||
func (db *DB) UpsertTask(repo string, build int, task *common.Task) error {
|
||||
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task.Number))
|
||||
return update(db, bucketBuildTasks, key, task)
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return update(t, bucketBuildTasks, key, task)
|
||||
})
|
||||
}
|
||||
|
||||
// UpsertTaskLogs inserts or updates a task logs for the
|
||||
|
|
|
@ -2,13 +2,18 @@ package bolt
|
|||
|
||||
import (
|
||||
"github.com/drone/drone/common"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
// GetToken gets a token by sha value.
|
||||
func (db *DB) GetToken(sha string) (*common.Token, error) {
|
||||
token := &common.Token{}
|
||||
key := []byte(sha)
|
||||
err := get(db, bucketTokens, key, token)
|
||||
|
||||
err := db.View(func (t *bolt.Tx) error {
|
||||
return get(t, bucketTokens, key, token)
|
||||
})
|
||||
|
||||
return token, err
|
||||
}
|
||||
|
||||
|
@ -16,12 +21,16 @@ func (db *DB) GetToken(sha string) (*common.Token, error) {
|
|||
// If the token already exists and error is returned.
|
||||
func (db *DB) InsertToken(token *common.Token) error {
|
||||
key := []byte(token.Sha)
|
||||
return insert(db, bucketTokens, key, token)
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return insert(t, bucketTokens, key, token)
|
||||
})
|
||||
// TODO(bradrydzewski) add token to users_token index
|
||||
}
|
||||
|
||||
// DeleteUser deletes the token.
|
||||
func (db *DB) DeleteToken(token *common.Token) error {
|
||||
key := []byte(token.Sha)
|
||||
return delete(db, bucketUser, key)
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return delete(t, bucketUser, key)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,13 +4,18 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/drone/drone/common"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
// GetUser gets a user by user login.
|
||||
func (db *DB) GetUser(login string) (*common.User, error) {
|
||||
user := &common.User{}
|
||||
key := []byte(login)
|
||||
err := get(db, bucketUser, key, user)
|
||||
|
||||
err := db.View(func (t *bolt.Tx) error {
|
||||
return get(t, bucketUser, key, user)
|
||||
})
|
||||
|
||||
return user, err
|
||||
}
|
||||
|
||||
|
@ -120,7 +125,10 @@ func (db *DB) GetUserList() ([]*common.User, error) {
|
|||
func (db *DB) UpdateUser(user *common.User) error {
|
||||
key := []byte(user.Login)
|
||||
user.Updated = time.Now().UTC().Unix()
|
||||
return update(db, bucketUser, key, user)
|
||||
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return update(t, bucketUser, key, user)
|
||||
})
|
||||
}
|
||||
|
||||
// InsertUser inserts a new user into the datastore. If
|
||||
|
@ -129,7 +137,10 @@ func (db *DB) InsertUser(user *common.User) error {
|
|||
key := []byte(user.Login)
|
||||
user.Created = time.Now().UTC().Unix()
|
||||
user.Updated = time.Now().UTC().Unix()
|
||||
return insert(db, bucketUser, key, user)
|
||||
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return insert(t, bucketUser, key, user)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteUser deletes the user.
|
||||
|
@ -137,5 +148,8 @@ func (db *DB) DeleteUser(user *common.User) error {
|
|||
key := []byte(user.Login)
|
||||
// TODO(bradrydzewski) delete user subscriptions
|
||||
// TODO(bradrydzewski) delete user tokens
|
||||
return delete(db, bucketUser, key)
|
||||
|
||||
return db.Update(func (t *bolt.Tx) error {
|
||||
return delete(t, bucketUser, key)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package bolt
|
||||
|
||||
import "github.com/youtube/vitess/go/bson"
|
||||
import (
|
||||
"github.com/youtube/vitess/go/bson"
|
||||
"github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
func encode(v interface{}) ([]byte, error) {
|
||||
return bson.Marshal(v)
|
||||
|
@ -10,12 +13,7 @@ func decode(raw []byte, v interface{}) error {
|
|||
return bson.Unmarshal(raw, v)
|
||||
}
|
||||
|
||||
func get(db *DB, bucket, key []byte, v interface{}) error {
|
||||
t, err := db.Begin(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer t.Rollback()
|
||||
func get(t *bolt.Tx, bucket, key []byte, v interface{}) error {
|
||||
raw := t.Bucket(bucket).Get(key)
|
||||
if raw == nil {
|
||||
return ErrKeyNotFound
|
||||
|
@ -23,12 +21,7 @@ func get(db *DB, bucket, key []byte, v interface{}) error {
|
|||
return bson.Unmarshal(raw, v)
|
||||
}
|
||||
|
||||
func raw(db *DB, bucket, key []byte) ([]byte, error) {
|
||||
t, err := db.Begin(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer t.Rollback()
|
||||
func raw(t *bolt.Tx, bucket, key []byte) ([]byte, error) {
|
||||
raw := t.Bucket(bucket).Get(key)
|
||||
if raw == nil {
|
||||
return nil, ErrKeyNotFound
|
||||
|
@ -36,11 +29,7 @@ func raw(db *DB, bucket, key []byte) ([]byte, error) {
|
|||
return raw, nil
|
||||
}
|
||||
|
||||
func update(db *DB, bucket, key []byte, v interface{}) error {
|
||||
t, err := db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func update(t *bolt.Tx, bucket, key []byte, v interface{}) error {
|
||||
raw, err := encode(v)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
|
@ -48,17 +37,12 @@ func update(db *DB, bucket, key []byte, v interface{}) error {
|
|||
}
|
||||
err = t.Bucket(bucket).Put(key, raw)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
return err
|
||||
}
|
||||
return t.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func insert(db *DB, bucket, key []byte, v interface{}) error {
|
||||
t, err := db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func insert(t *bolt.Tx, bucket, key []byte, v interface{}) error {
|
||||
raw, err := encode(v)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
|
@ -67,26 +51,19 @@ func insert(db *DB, bucket, key []byte, v interface{}) error {
|
|||
// verify the key does not already exists
|
||||
// in the bucket. If exists, fail
|
||||
if t.Bucket(bucket).Get(key) != nil {
|
||||
t.Rollback()
|
||||
return ErrKeyExists
|
||||
}
|
||||
err = t.Bucket(bucket).Put(key, raw)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
return err
|
||||
}
|
||||
return t.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func delete(db *DB, bucket, key []byte) error {
|
||||
t, err := db.Begin(true)
|
||||
func delete(t *bolt.Tx, bucket, key []byte) error {
|
||||
err := t.Bucket(bucket).Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = t.Bucket(bucket).Delete(key)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
return err
|
||||
}
|
||||
return t.Commit()
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue