harness-drone/store/batch2/batch.go
2019-09-23 16:45:02 -07:00

420 lines
8.6 KiB
Go

// Copyright 2019 Drone IO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package batch2
import (
"context"
"fmt"
"time"
"github.com/drone/drone/core"
"github.com/drone/drone/store/repos"
"github.com/drone/drone/store/shared/db"
)
// New returns a new Batcher.
func New(db *db.DB) core.Batcher {
return &batchUpdater{db}
}
type batchUpdater struct {
db *db.DB
}
func (b *batchUpdater) Batch(ctx context.Context, user *core.User, batch *core.Batch) error {
return b.db.Update(func(execer db.Execer, binder db.Binder) error {
now := time.Now().Unix()
//
// the repository list API does not return permissions, which means we have
// no way of knowing if permissions are current or not. We therefore mark all
// permissions stale in the database, so that each one must be individually
// verified at runtime.
//
stmt := permResetStmt
switch b.db.Driver() {
case db.Postgres:
stmt = permResetStmtPostgres
}
_, err := execer.Exec(stmt, now, user.ID)
if err != nil {
return fmt.Errorf("batch: cannot reset permissions: %s", err)
}
// if the repository exists with the same name,
// but a different unique identifier, attempt to
// delete the previous entry.
var insert []*core.Repository
var update []*core.Repository
for _, repo := range append(batch.Insert, batch.Update...) {
params := repos.ToParams(repo)
stmt, args, err := binder.BindNamed(repoDeleteDeleted, params)
if err != nil {
return err
}
res, err := execer.Exec(stmt, args...)
if err != nil {
return fmt.Errorf("batch: cannot remove duplicate repository: %s: %s: %s", repo.Slug, repo.UID, err)
}
rows, _ := res.RowsAffected()
if rows > 0 {
insert = append(insert, repo)
} else if repo.ID > 0 {
update = append(update, repo)
} else {
insert = append(insert, repo)
}
}
for _, repo := range insert {
//
// insert repository
// TODO: group inserts in batches of N
//
stmt := repoInsertIgnoreStmt
switch b.db.Driver() {
case db.Mysql:
stmt = repoInsertIgnoreStmtMysql
case db.Postgres:
stmt = repoInsertIgnoreStmtPostgres
}
params := repos.ToParams(repo)
stmt, args, err := binder.BindNamed(stmt, params)
if err != nil {
return err
}
_, err = execer.Exec(stmt, args...)
if err != nil {
return fmt.Errorf("batch: cannot insert repository: %s: %s: %s", repo.Slug, repo.UID, err)
}
//
// insert permissions
// TODO: group inserts in batches of N
//
stmt = permInsertIgnoreStmt
switch b.db.Driver() {
case db.Mysql:
stmt = permInsertIgnoreStmtMysql
case db.Postgres:
stmt = permInsertIgnoreStmtPostgres
}
_, err = execer.Exec(stmt,
user.ID,
repo.UID,
now,
now,
)
if err != nil {
return fmt.Errorf("batch: cannot insert permissions: %s: %s: %s", repo.Slug, repo.UID, err)
}
}
//
// update existing repositories
// TODO: group updates in batches of N
//
for _, repo := range update {
params := repos.ToParams(repo)
// // if the repository exists with the same name,
// // but a different unique identifier, attempt to
// // delete the previous entry.
// stmt, args, err := binder.BindNamed(repoDeleteDeleted, params)
// if err != nil {
// return err
// }
// res, err := execer.Exec(stmt, args...)
// if err != nil {
// return fmt.Errorf("batch: cannot remove duplicate repository: %s: %s: %s", repo.Slug, repo.UID, err)
// }
// rows, _ := res.RowsAffected()
// if rows > 0 {
// stmt := repoInsertIgnoreStmt
// switch b.db.Driver() {
// case db.Mysql:
// stmt = repoInsertIgnoreStmtMysql
// case db.Postgres:
// stmt = repoInsertIgnoreStmtPostgres
// }
// params := repos.ToParams(repo)
// stmt, args, err := binder.BindNamed(stmt, params)
// if err != nil {
// return err
// }
// _, err = execer.Exec(stmt, args...)
// if err != nil {
// return fmt.Errorf("batch: cannot insert repository: %s: %s: %s", repo.Slug, repo.UID, err)
// }
// } else {
stmt, args, err := binder.BindNamed(repoUpdateRemoteStmt, params)
if err != nil {
return err
}
_, err = execer.Exec(stmt, args...)
if err != nil {
return fmt.Errorf("batch: cannot update repository: %s: %s: %s", repo.Slug, repo.UID, err)
}
// }
stmt = permInsertIgnoreStmt
switch b.db.Driver() {
case db.Mysql:
stmt = permInsertIgnoreStmtMysql
case db.Postgres:
stmt = permInsertIgnoreStmtPostgres
}
_, err = execer.Exec(stmt,
user.ID,
repo.UID,
now,
now,
)
if err != nil {
return fmt.Errorf("batch: cannot insert permissions: %s: %s: %s", repo.Slug, repo.UID, err)
}
}
//
// revoke permissions
// TODO: group deletes in batches of N
//
for _, repo := range batch.Revoke {
stmt := permRevokeStmt
switch b.db.Driver() {
case db.Postgres:
stmt = permRevokeStmtPostgres
}
_, err = execer.Exec(stmt, user.ID, repo.UID)
if err != nil {
return fmt.Errorf("batch: cannot revoking permissions: %s: %s: %s", repo.Slug, repo.UID, err)
}
}
return nil
})
}
const stmtInsertBase = `
(
repo_uid
,repo_user_id
,repo_namespace
,repo_name
,repo_slug
,repo_scm
,repo_clone_url
,repo_ssh_url
,repo_html_url
,repo_active
,repo_private
,repo_visibility
,repo_branch
,repo_counter
,repo_config
,repo_timeout
,repo_trusted
,repo_protected
,repo_no_forks
,repo_no_pulls
,repo_synced
,repo_created
,repo_updated
,repo_version
,repo_signer
,repo_secret
) VALUES (
:repo_uid
,:repo_user_id
,:repo_namespace
,:repo_name
,:repo_slug
,:repo_scm
,:repo_clone_url
,:repo_ssh_url
,:repo_html_url
,:repo_active
,:repo_private
,:repo_visibility
,:repo_branch
,:repo_counter
,:repo_config
,:repo_timeout
,:repo_trusted
,:repo_protected
,:repo_no_forks
,:repo_no_pulls
,:repo_synced
,:repo_created
,:repo_updated
,:repo_version
,:repo_signer
,:repo_secret
)
`
const repoInsertIgnoreStmt = `
INSERT OR IGNORE INTO repos ` + stmtInsertBase
const repoInsertIgnoreStmtMysql = `
INSERT IGNORE INTO repos ` + stmtInsertBase
const repoInsertIgnoreStmtPostgres = `
INSERT INTO repos ` + stmtInsertBase + ` ON CONFLICT DO NOTHING`
const repoUpdateRemoteStmt = `
UPDATE repos SET
repo_namespace=:repo_namespace
,repo_name=:repo_name
,repo_slug=:repo_slug
,repo_clone_url=:repo_clone_url
,repo_ssh_url=:repo_ssh_url
,repo_html_url=:repo_html_url
,repo_private=:repo_private
,repo_branch=:repo_branch
,repo_updated=:repo_updated
WHERE repo_id=:repo_id
`
const repoUpdateRemoteStmtPostgres = `
UPDATE repos SET
repo_namespace=$1
,repo_name=$2
,repo_slug=$3
,repo_clone_url=$4
,repo_ssh_url=$5
,repo_html_url=$6
,repo_private=$7
,repo_branch=$8
,repo_updated=$9
WHERE repo_id=$10
`
const permInsertIgnoreStmt = `
INSERT OR IGNORE INTO perms (
perm_user_id
,perm_repo_uid
,perm_read
,perm_write
,perm_admin
,perm_synced
,perm_created
,perm_updated
) values (
?
,?
,1
,0
,0
,0
,?
,?
)
`
const permInsertIgnoreStmtMysql = `
INSERT IGNORE INTO perms (
perm_user_id
,perm_repo_uid
,perm_read
,perm_write
,perm_admin
,perm_synced
,perm_created
,perm_updated
) values (
?
,?
,1
,0
,0
,0
,?
,?
)
`
const permInsertIgnoreStmtPostgres = `
INSERT INTO perms (
perm_user_id
,perm_repo_uid
,perm_read
,perm_write
,perm_admin
,perm_synced
,perm_created
,perm_updated
) values (
$1
,$2
,true
,false
,false
,0
,$3
,$4
) ON CONFLICT DO NOTHING
`
// this statement deletes a repository that was
// deleted in version control and then re-created
// with the same name (and thus has a different
// unique identifier)
const repoDeleteDeleted = `
DELETE FROM repos
WHERE repo_slug = :repo_slug
AND repo_uid != :repo_uid
`
// this resets the synced date indicating that
// the system should refresh the permissions next
// time the user attempts to access the resource
const permResetStmt = `
UPDATE perms SET
perm_updated = ?
,perm_synced = 0
WHERE perm_user_id = ?
`
const permResetStmtPostgres = `
UPDATE perms SET
perm_updated = $1
,perm_synced = 0
WHERE perm_user_id = $2
`
const permRevokeStmt = `
DELETE FROM perms
WHERE perm_user_id = ?
AND perm_repo_uid = ?
`
const permRevokeStmtPostgres = `
DELETE FROM perms
WHERE perm_user_id = $1
AND perm_repo_uid = $2
`