239 lines
5.9 KiB
Go
239 lines
5.9 KiB
Go
// Copyright 2019 Drone IO, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package syncer
|
|
|
|
import (
|
|
"context"
|
|
"runtime/debug"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/drone/drone/core"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// New returns a new Synchronizer.
|
|
func New(
|
|
repoz core.RepositoryService,
|
|
repos core.RepositoryStore,
|
|
users core.UserStore,
|
|
batch core.Batcher,
|
|
) *Synchronizer {
|
|
return &Synchronizer{
|
|
repoz: repoz,
|
|
repos: repos,
|
|
users: users,
|
|
batch: batch,
|
|
match: noopFilter,
|
|
}
|
|
}
|
|
|
|
// Synchronizer synchronizes user repositories and permissions
|
|
// between a remote source code management system and the local
|
|
// data store.
|
|
type Synchronizer struct {
|
|
repoz core.RepositoryService
|
|
repos core.RepositoryStore
|
|
users core.UserStore
|
|
batch core.Batcher
|
|
match FilterFunc
|
|
}
|
|
|
|
// SetFilter sets the filter function.
|
|
func (s *Synchronizer) SetFilter(fn FilterFunc) {
|
|
s.match = fn
|
|
}
|
|
|
|
// Sync synchronizes the user repository list in 6 easy steps.
|
|
func (s *Synchronizer) Sync(ctx context.Context, user *core.User) (*core.Batch, error) {
|
|
logger := logrus.WithField("login", user.Login)
|
|
logger.Debugln("syncer: begin repository sync")
|
|
|
|
defer func() {
|
|
// taking the paranoid approach to recover from
|
|
// a panic that should absolutely never happen.
|
|
if err := recover(); err != nil {
|
|
logger = logger.WithField("error", err)
|
|
logger.Errorf("syncer: unexpected panic\n%s\n", debug.Stack())
|
|
}
|
|
|
|
// when the synchronization process is complete
|
|
// be sure to update the user sync date.
|
|
user.Syncing = false
|
|
user.Synced = time.Now().Unix()
|
|
s.users.Update(context.Background(), user)
|
|
}()
|
|
|
|
if user.Syncing == false {
|
|
user.Syncing = true
|
|
err := s.users.Update(ctx, user)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("syncer: cannot update user")
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
batch := &core.Batch{}
|
|
remote := map[string]*core.Repository{}
|
|
local := map[string]*core.Repository{}
|
|
|
|
//
|
|
// STEP1: get the list of repositories from the remote
|
|
// source code management system (e.g. GitHub).
|
|
//
|
|
|
|
{
|
|
repos, err := s.repoz.List(ctx, user)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("syncer: cannot get remote repository list")
|
|
return nil, err
|
|
}
|
|
for _, repo := range repos {
|
|
if strings.Count(repo.Slug, "/") > 1 {
|
|
if logrus.GetLevel() == logrus.TraceLevel {
|
|
logger.WithField("namespace", repo.Namespace).
|
|
WithField("name", repo.Name).
|
|
WithField("uid", repo.UID).
|
|
Traceln("syncer: skipping subrepositories")
|
|
}
|
|
} else if repo.Archived {
|
|
if logrus.GetLevel() == logrus.TraceLevel {
|
|
logger.WithField("namespace", repo.Namespace).
|
|
WithField("name", repo.Name).
|
|
WithField("uid", repo.UID).
|
|
Traceln("syncer: skipping archived repositories")
|
|
}
|
|
} else if s.match(repo) {
|
|
remote[repo.UID] = repo
|
|
if logrus.GetLevel() == logrus.TraceLevel {
|
|
logger.WithField("namespace", repo.Namespace).
|
|
WithField("name", repo.Name).
|
|
WithField("uid", repo.UID).
|
|
Traceln("syncer: remote repository matches filter")
|
|
}
|
|
} else {
|
|
if logrus.GetLevel() == logrus.TraceLevel {
|
|
logger.WithField("namespace", repo.Namespace).
|
|
WithField("name", repo.Name).
|
|
WithField("uid", repo.UID).
|
|
Traceln("syncer: remote repository does not match filter")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//
|
|
// STEP2: get the list of repositories stored in the
|
|
// local database.
|
|
//
|
|
|
|
{
|
|
repos, err := s.repos.List(ctx, user.ID)
|
|
if err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("syncer: cannot get cached repository list")
|
|
return nil, err
|
|
}
|
|
|
|
for _, repo := range repos {
|
|
local[repo.UID] = repo
|
|
}
|
|
}
|
|
|
|
//
|
|
// STEP3 find repos that exist in the remote system,
|
|
// but do not exist locally. Insert.
|
|
//
|
|
|
|
for k, v := range remote {
|
|
_, ok := local[k]
|
|
if ok {
|
|
continue
|
|
}
|
|
v.Synced = time.Now().Unix()
|
|
v.Created = time.Now().Unix()
|
|
v.Updated = time.Now().Unix()
|
|
v.Version = 1
|
|
batch.Insert = append(batch.Insert, v)
|
|
|
|
if logrus.GetLevel() == logrus.TraceLevel {
|
|
logger.WithField("namespace", v.Namespace).
|
|
WithField("name", v.Name).
|
|
WithField("uid", v.UID).
|
|
Traceln("syncer: remote repository not in database")
|
|
}
|
|
}
|
|
|
|
//
|
|
// STEP4 find repos that exist in the remote system and
|
|
// in the local system, but with incorrect data. Update.
|
|
//
|
|
|
|
for k, v := range local {
|
|
vv, ok := remote[k]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if diff(v, vv) {
|
|
merge(v, vv)
|
|
v.Synced = time.Now().Unix()
|
|
v.Updated = time.Now().Unix()
|
|
batch.Update = append(batch.Update, v)
|
|
|
|
if logrus.GetLevel() == logrus.TraceLevel {
|
|
logger.WithField("namespace", v.Namespace).
|
|
WithField("name", v.Name).
|
|
WithField("uid", v.UID).
|
|
Traceln("syncer: repository requires update")
|
|
}
|
|
}
|
|
}
|
|
|
|
//
|
|
// STEP5 find repos that exist in the local system,
|
|
// but not in the remote system. Revoke permissions.
|
|
//
|
|
|
|
for k, v := range local {
|
|
_, ok := remote[k]
|
|
if ok {
|
|
continue
|
|
}
|
|
batch.Revoke = append(batch.Revoke, v)
|
|
|
|
if logrus.GetLevel() == logrus.TraceLevel {
|
|
logger.WithField("namespace", v.Namespace).
|
|
WithField("name", v.Name).
|
|
WithField("uid", v.UID).
|
|
Traceln("syncer: repository in database not in remote repository list")
|
|
}
|
|
}
|
|
|
|
//
|
|
// STEP6 update the database.
|
|
//
|
|
|
|
if err := s.batch.Batch(ctx, user, batch); err != nil {
|
|
logger = logger.WithError(err)
|
|
logger.Warnln("syncer: cannot batch update")
|
|
return nil, err
|
|
}
|
|
|
|
logger.Debugln("syncer: finished repository sync")
|
|
return batch, nil
|
|
}
|