Merge pull request #970 from benschumacher/issue-drone/drone#969

Rework BoltDB util functions to use bolt.Tx instead of DB
This commit is contained in:
Brad Rydzewski 2015-04-10 14:12:03 -07:00
commit 840ea6d595
6 changed files with 136 additions and 96 deletions

View file

@ -15,7 +15,11 @@ import (
func (db *DB) GetBuild(repo string, build int) (*common.Build, error) { func (db *DB) GetBuild(repo string, build int) (*common.Build, error) {
build_ := &common.Build{} build_ := &common.Build{}
key := []byte(repo + "/" + strconv.Itoa(build)) key := []byte(repo + "/" + strconv.Itoa(build))
err := get(db, bucketBuild, key, build_)
err := db.View(func(t *bolt.Tx) error {
return get(t, bucketBuild, key, build_)
})
return build_, err return build_, err
} }
@ -41,7 +45,11 @@ func (db *DB) GetBuildLast(repo string) (*common.Build, error) {
func (db *DB) GetBuildStatus(repo string, build int, status string) (*common.Status, error) { func (db *DB) GetBuildStatus(repo string, build int, status string) (*common.Status, error) {
status_ := &common.Status{} status_ := &common.Status{}
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status) key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status)
err := update(db, bucketBuildStatus, key, status)
err := db.Update(func(t *bolt.Tx) error {
return update(t, bucketBuildStatus, key, status)
})
return status_, err return status_, err
} }
@ -68,49 +76,36 @@ func (db *DB) GetBuildStatusList(repo string, build int) ([]*common.Status, erro
// InsertBuild inserts a new build for the named repository // InsertBuild inserts a new build for the named repository
func (db *DB) InsertBuild(repo string, build *common.Build) error { func (db *DB) InsertBuild(repo string, build *common.Build) error {
var seqno int
t, err := db.Begin(true)
if err != nil {
return err
}
key := []byte(repo) key := []byte(repo)
raw := t.Bucket(bucketBuildSeq).Get(key)
if raw != nil {
// convert our raw to an integer value
seqno = int(binary.LittleEndian.Uint32(raw))
}
// increment the seqno, if no record was found, this starts us at 1 return db.Update(func (t *bolt.Tx) error {
seqno += 1 raw, err := raw(t, bucketBuildSeq, key)
// convert our new seqno back to raw value var next_seq uint32
raw = make([]byte, 4) // TODO: replace magic number 4 (uint32) switch err {
binary.LittleEndian.PutUint32(raw, uint32(seqno)) case ErrKeyNotFound:
err = t.Bucket(bucketBuildSeq).Put(key, raw) next_seq = 1
if err != nil { case nil:
t.Rollback() next_seq = 1 + binary.LittleEndian.Uint32(raw)
return err default:
} return err
}
// fill out build structure // covert our seqno to raw value
build.Number = seqno raw = make([]byte, 4) // TODO(benschumacher) replace magic number 4 (uint32)
build.Created = time.Now().UTC().Unix() binary.LittleEndian.PutUint32(raw, next_seq)
err = t.Bucket(bucketBuildSeq).Put(key, raw)
if err != nil {
return err
}
key = []byte(repo + "/" + strconv.Itoa(build.Number)) // fill out the build structure
raw, err = encode(build) build.Number = int(next_seq)
if err != nil { build.Created = time.Now().UTC().Unix()
t.Rollback()
return err
}
err = t.Bucket(bucketBuild).Put(key, raw) key = []byte(repo + "/" + strconv.Itoa(build.Number))
if err != nil { return insert(t, bucketBuild, key, build)
t.Rollback() })
return err
}
return t.Commit()
} }
// InsertBuildStatus inserts a new build status for the // InsertBuildStatus inserts a new build status for the
@ -118,7 +113,10 @@ func (db *DB) InsertBuild(repo string, build *common.Build) error {
// exists an error is returned. // exists an error is returned.
func (db *DB) InsertBuildStatus(repo string, build int, status *common.Status) error { func (db *DB) InsertBuildStatus(repo string, build int, status *common.Status) error {
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status.Context) key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status.Context)
return update(db, bucketBuildStatus, key, status)
return db.Update(func(t *bolt.Tx) error {
return update(t, bucketBuildStatus, key, status)
})
} }
// UpdateBuild updates an existing build for the named // UpdateBuild updates an existing build for the named
@ -127,5 +125,8 @@ func (db *DB) InsertBuildStatus(repo string, build int, status *common.Status) e
func (db *DB) UpdateBuild(repo string, build *common.Build) error { func (db *DB) UpdateBuild(repo string, build *common.Build) error {
key := []byte(repo + "/" + strconv.Itoa(build.Number)) key := []byte(repo + "/" + strconv.Itoa(build.Number))
build.Updated = time.Now().UTC().Unix() build.Updated = time.Now().UTC().Unix()
return update(db, bucketBuild, key, build)
return db.Update(func(t *bolt.Tx) error {
return update(t, bucketBuild, key, build)
})
} }

View file

@ -4,13 +4,18 @@ import (
"time" "time"
"github.com/drone/drone/common" "github.com/drone/drone/common"
"github.com/boltdb/bolt"
) )
// GetRepo gets the repository by name. // GetRepo gets the repository by name.
func (db *DB) GetRepo(repo string) (*common.Repo, error) { func (db *DB) GetRepo(repo string) (*common.Repo, error) {
repo_ := &common.Repo{} repo_ := &common.Repo{}
key := []byte(repo) key := []byte(repo)
err := get(db, bucketRepo, key, repo_)
err := db.View(func (t *bolt.Tx) error {
return get(t, bucketRepo, key, repo_)
})
return repo_, err return repo_, err
} }
@ -19,7 +24,11 @@ func (db *DB) GetRepo(repo string) (*common.Repo, error) {
func (db *DB) GetRepoParams(repo string) (map[string]string, error) { func (db *DB) GetRepoParams(repo string) (map[string]string, error) {
params := map[string]string{} params := map[string]string{}
key := []byte(repo) key := []byte(repo)
err := get(db, bucketRepoParams, key, &params)
err := db.View(func (t *bolt.Tx) error {
return get(t, bucketRepoParams, key, &params)
})
return params, err return params, err
} }
@ -28,7 +37,11 @@ func (db *DB) GetRepoParams(repo string) (map[string]string, error) {
func (db *DB) GetRepoKeys(repo string) (*common.Keypair, error) { func (db *DB) GetRepoKeys(repo string) (*common.Keypair, error) {
keypair := &common.Keypair{} keypair := &common.Keypair{}
key := []byte(repo) key := []byte(repo)
err := get(db, bucketRepoKeys, key, keypair)
err := db.View(func (t *bolt.Tx) error {
return get(t, bucketRepoKeys, key, keypair)
})
return keypair, err return keypair, err
} }
@ -37,7 +50,10 @@ func (db *DB) GetRepoKeys(repo string) (*common.Keypair, error) {
func (db *DB) UpdateRepo(repo *common.Repo) error { func (db *DB) UpdateRepo(repo *common.Repo) error {
key := []byte(repo.FullName) key := []byte(repo.FullName)
repo.Updated = time.Now().UTC().Unix() repo.Updated = time.Now().UTC().Unix()
return update(db, bucketRepo, key, repo)
return db.Update(func (t *bolt.Tx) error {
return update(t, bucketRepo, key, repo)
})
} }
// InsertRepo inserts a repository in the datastore and // InsertRepo inserts a repository in the datastore and
@ -46,27 +62,38 @@ func (db *DB) InsertRepo(user *common.User, repo *common.Repo) error {
key := []byte(repo.FullName) key := []byte(repo.FullName)
repo.Created = time.Now().UTC().Unix() repo.Created = time.Now().UTC().Unix()
repo.Updated = time.Now().UTC().Unix() repo.Updated = time.Now().UTC().Unix()
// TODO(bradrydzewski) add repo to user index
// TODO(bradrydzewski) add user to repo index return db.Update(func (t *bolt.Tx) error {
return insert(db, bucketRepo, key, repo) // TODO(bradrydzewski) add repo to user index
// TODO(bradrydzewski) add user to repo index
return insert(t, bucketRepo, key, repo)
})
} }
// UpsertRepoParams inserts or updates the private // UpsertRepoParams inserts or updates the private
// environment parameters for the named repository. // environment parameters for the named repository.
func (db *DB) UpsertRepoParams(repo string, params map[string]string) error { func (db *DB) UpsertRepoParams(repo string, params map[string]string) error {
key := []byte(repo) key := []byte(repo)
return update(db, bucketRepoParams, key, params)
return db.Update(func (t *bolt.Tx) error {
return update(t, bucketRepoParams, key, params)
})
} }
// UpsertRepoKeys inserts or updates the private and // UpsertRepoKeys inserts or updates the private and
// public keypair for the named repository. // public keypair for the named repository.
func (db *DB) UpsertRepoKeys(repo string, keypair *common.Keypair) error { func (db *DB) UpsertRepoKeys(repo string, keypair *common.Keypair) error {
key := []byte(repo) key := []byte(repo)
return update(db, bucketRepoKeys, key, keypair)
return db.Update(func (t *bolt.Tx) error {
return update(t, bucketRepoKeys, key, keypair)
})
} }
// DeleteRepo deletes the repository. // DeleteRepo deletes the repository.
func (db *DB) DeleteRepo(repo *common.Repo) error { func (db *DB) DeleteRepo(repo *common.Repo) error {
//TODO(benschumacher) rework this to use BoltDB's txn wrapper
t, err := db.Begin(true) t, err := db.Begin(true)
if err != nil { if err != nil {
return err return err

View file

@ -4,6 +4,7 @@ import (
"strconv" "strconv"
"github.com/drone/drone/common" "github.com/drone/drone/common"
"github.com/boltdb/bolt"
) )
// GetTask gets the task at index N for the named // GetTask gets the task at index N for the named
@ -11,7 +12,9 @@ import (
func (db *DB) GetTask(repo string, build int, task int) (*common.Task, error) { func (db *DB) GetTask(repo string, build int, task int) (*common.Task, error) {
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task)) key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task))
task_ := &common.Task{} task_ := &common.Task{}
err := get(db, bucketBuildTasks, key, task_) err := db.View(func (t *bolt.Tx) error {
return get(t, bucketBuildTasks, key, task_)
})
return task_, err return task_, err
} }
@ -19,7 +22,14 @@ func (db *DB) GetTask(repo string, build int, task int) (*common.Task, error) {
// the named repository and build number. // the named repository and build number.
func (db *DB) GetTaskLogs(repo string, build int, task int) ([]byte, error) { func (db *DB) GetTaskLogs(repo string, build int, task int) ([]byte, error) {
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task)) key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task))
log, err := raw(db, bucketBuildLogs, key)
var log []byte
err := db.View(func (t *bolt.Tx) error {
var err error
log, err = raw(t, bucketBuildLogs, key)
return err
})
return log, err return log, err
} }
@ -62,7 +72,9 @@ func (db *DB) GetTaskList(repo string, build int) ([]*common.Task, error) {
// repository and build number. // repository and build number.
func (db *DB) UpsertTask(repo string, build int, task *common.Task) error { func (db *DB) UpsertTask(repo string, build int, task *common.Task) error {
key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task.Number)) key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task.Number))
return update(db, bucketBuildTasks, key, task) return db.Update(func (t *bolt.Tx) error {
return update(t, bucketBuildTasks, key, task)
})
} }
// UpsertTaskLogs inserts or updates a task logs for the // UpsertTaskLogs inserts or updates a task logs for the

View file

@ -2,13 +2,18 @@ package bolt
import ( import (
"github.com/drone/drone/common" "github.com/drone/drone/common"
"github.com/boltdb/bolt"
) )
// GetToken gets a token by sha value. // GetToken gets a token by sha value.
func (db *DB) GetToken(sha string) (*common.Token, error) { func (db *DB) GetToken(sha string) (*common.Token, error) {
token := &common.Token{} token := &common.Token{}
key := []byte(sha) key := []byte(sha)
err := get(db, bucketTokens, key, token)
err := db.View(func (t *bolt.Tx) error {
return get(t, bucketTokens, key, token)
})
return token, err return token, err
} }
@ -16,12 +21,16 @@ func (db *DB) GetToken(sha string) (*common.Token, error) {
// If the token already exists and error is returned. // If the token already exists and error is returned.
func (db *DB) InsertToken(token *common.Token) error { func (db *DB) InsertToken(token *common.Token) error {
key := []byte(token.Sha) key := []byte(token.Sha)
return insert(db, bucketTokens, key, token) return db.Update(func (t *bolt.Tx) error {
return insert(t, bucketTokens, key, token)
})
// TODO(bradrydzewski) add token to users_token index // TODO(bradrydzewski) add token to users_token index
} }
// DeleteUser deletes the token. // DeleteUser deletes the token.
func (db *DB) DeleteToken(token *common.Token) error { func (db *DB) DeleteToken(token *common.Token) error {
key := []byte(token.Sha) key := []byte(token.Sha)
return delete(db, bucketUser, key) return db.Update(func (t *bolt.Tx) error {
return delete(t, bucketUser, key)
})
} }

View file

@ -4,13 +4,18 @@ import (
"time" "time"
"github.com/drone/drone/common" "github.com/drone/drone/common"
"github.com/boltdb/bolt"
) )
// GetUser gets a user by user login. // GetUser gets a user by user login.
func (db *DB) GetUser(login string) (*common.User, error) { func (db *DB) GetUser(login string) (*common.User, error) {
user := &common.User{} user := &common.User{}
key := []byte(login) key := []byte(login)
err := get(db, bucketUser, key, user)
err := db.View(func (t *bolt.Tx) error {
return get(t, bucketUser, key, user)
})
return user, err return user, err
} }
@ -120,7 +125,10 @@ func (db *DB) GetUserList() ([]*common.User, error) {
func (db *DB) UpdateUser(user *common.User) error { func (db *DB) UpdateUser(user *common.User) error {
key := []byte(user.Login) key := []byte(user.Login)
user.Updated = time.Now().UTC().Unix() user.Updated = time.Now().UTC().Unix()
return update(db, bucketUser, key, user)
return db.Update(func (t *bolt.Tx) error {
return update(t, bucketUser, key, user)
})
} }
// InsertUser inserts a new user into the datastore. If // InsertUser inserts a new user into the datastore. If
@ -129,7 +137,10 @@ func (db *DB) InsertUser(user *common.User) error {
key := []byte(user.Login) key := []byte(user.Login)
user.Created = time.Now().UTC().Unix() user.Created = time.Now().UTC().Unix()
user.Updated = time.Now().UTC().Unix() user.Updated = time.Now().UTC().Unix()
return insert(db, bucketUser, key, user)
return db.Update(func (t *bolt.Tx) error {
return insert(t, bucketUser, key, user)
})
} }
// DeleteUser deletes the user. // DeleteUser deletes the user.
@ -137,5 +148,8 @@ func (db *DB) DeleteUser(user *common.User) error {
key := []byte(user.Login) key := []byte(user.Login)
// TODO(bradrydzewski) delete user subscriptions // TODO(bradrydzewski) delete user subscriptions
// TODO(bradrydzewski) delete user tokens // TODO(bradrydzewski) delete user tokens
return delete(db, bucketUser, key)
return db.Update(func (t *bolt.Tx) error {
return delete(t, bucketUser, key)
})
} }

View file

@ -1,6 +1,9 @@
package bolt package bolt
import "github.com/youtube/vitess/go/bson" import (
"github.com/youtube/vitess/go/bson"
"github.com/boltdb/bolt"
)
func encode(v interface{}) ([]byte, error) { func encode(v interface{}) ([]byte, error) {
return bson.Marshal(v) return bson.Marshal(v)
@ -10,12 +13,7 @@ func decode(raw []byte, v interface{}) error {
return bson.Unmarshal(raw, v) return bson.Unmarshal(raw, v)
} }
func get(db *DB, bucket, key []byte, v interface{}) error { func get(t *bolt.Tx, bucket, key []byte, v interface{}) error {
t, err := db.Begin(false)
if err != nil {
return err
}
defer t.Rollback()
raw := t.Bucket(bucket).Get(key) raw := t.Bucket(bucket).Get(key)
if raw == nil { if raw == nil {
return ErrKeyNotFound return ErrKeyNotFound
@ -23,12 +21,7 @@ func get(db *DB, bucket, key []byte, v interface{}) error {
return bson.Unmarshal(raw, v) return bson.Unmarshal(raw, v)
} }
func raw(db *DB, bucket, key []byte) ([]byte, error) { func raw(t *bolt.Tx, bucket, key []byte) ([]byte, error) {
t, err := db.Begin(false)
if err != nil {
return nil, err
}
defer t.Rollback()
raw := t.Bucket(bucket).Get(key) raw := t.Bucket(bucket).Get(key)
if raw == nil { if raw == nil {
return nil, ErrKeyNotFound return nil, ErrKeyNotFound
@ -36,11 +29,7 @@ func raw(db *DB, bucket, key []byte) ([]byte, error) {
return raw, nil return raw, nil
} }
func update(db *DB, bucket, key []byte, v interface{}) error { func update(t *bolt.Tx, bucket, key []byte, v interface{}) error {
t, err := db.Begin(true)
if err != nil {
return err
}
raw, err := encode(v) raw, err := encode(v)
if err != nil { if err != nil {
t.Rollback() t.Rollback()
@ -48,17 +37,12 @@ func update(db *DB, bucket, key []byte, v interface{}) error {
} }
err = t.Bucket(bucket).Put(key, raw) err = t.Bucket(bucket).Put(key, raw)
if err != nil { if err != nil {
t.Rollback()
return err return err
} }
return t.Commit() return nil
} }
func insert(db *DB, bucket, key []byte, v interface{}) error { func insert(t *bolt.Tx, bucket, key []byte, v interface{}) error {
t, err := db.Begin(true)
if err != nil {
return err
}
raw, err := encode(v) raw, err := encode(v)
if err != nil { if err != nil {
t.Rollback() t.Rollback()
@ -67,26 +51,19 @@ func insert(db *DB, bucket, key []byte, v interface{}) error {
// verify the key does not already exists // verify the key does not already exists
// in the bucket. If exists, fail // in the bucket. If exists, fail
if t.Bucket(bucket).Get(key) != nil { if t.Bucket(bucket).Get(key) != nil {
t.Rollback()
return ErrKeyExists return ErrKeyExists
} }
err = t.Bucket(bucket).Put(key, raw) err = t.Bucket(bucket).Put(key, raw)
if err != nil { if err != nil {
t.Rollback()
return err return err
} }
return t.Commit() return nil
} }
func delete(db *DB, bucket, key []byte) error { func delete(t *bolt.Tx, bucket, key []byte) error {
t, err := db.Begin(true) err := t.Bucket(bucket).Delete(key)
if err != nil { if err != nil {
return err return err
} }
err = t.Bucket(bucket).Delete(key) return nil
if err != nil {
t.Rollback()
return err
}
return t.Commit()
} }