ea6566b059
* fixed: graceful shutdown * updated test for cron scheduler shutdown
196 lines
4.4 KiB
Go
196 lines
4.4 KiB
Go
// Copyright 2019 Drone.IO Inc. All rights reserved.
|
|
// Use of this source code is governed by the Drone Non-Commercial License
|
|
// that can be found in the LICENSE file.
|
|
|
|
// +build !oss
|
|
|
|
package cron
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/drone/drone/core"
|
|
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/robfig/cron"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// New returns a new Cron scheduler.
|
|
func New(
|
|
commits core.CommitService,
|
|
cron core.CronStore,
|
|
repos core.RepositoryStore,
|
|
users core.UserStore,
|
|
trigger core.Triggerer,
|
|
) *Scheduler {
|
|
return &Scheduler{
|
|
commits: commits,
|
|
cron: cron,
|
|
repos: repos,
|
|
users: users,
|
|
trigger: trigger,
|
|
}
|
|
}
|
|
|
|
// Scheduler defines a cron scheduler.
|
|
type Scheduler struct {
|
|
commits core.CommitService
|
|
cron core.CronStore
|
|
repos core.RepositoryStore
|
|
users core.UserStore
|
|
trigger core.Triggerer
|
|
}
|
|
|
|
// Start starts the cron scheduler.
|
|
func (s *Scheduler) Start(ctx context.Context, dur time.Duration) error {
|
|
ticker := time.NewTicker(dur)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
s.run(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Scheduler) run(ctx context.Context) error {
|
|
var result error
|
|
|
|
logrus.Debugln("cron: begin process pending jobs")
|
|
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
logger := logrus.WithField("error", err)
|
|
logger.Errorln("cron: unexpected panic")
|
|
}
|
|
}()
|
|
|
|
now := time.Now()
|
|
jobs, err := s.cron.Ready(ctx, now.Unix())
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger.Error("cron: cannot list pending jobs")
|
|
return err
|
|
}
|
|
|
|
logrus.Debugf("cron: found %d pending jobs", len(jobs))
|
|
|
|
for _, job := range jobs {
|
|
// jobs can be manually disabled in the user interface,
|
|
// and should be skipped.
|
|
if job.Disabled {
|
|
continue
|
|
}
|
|
|
|
sched, err := cron.Parse(job.Expr)
|
|
if err != nil {
|
|
result = multierror.Append(result, err)
|
|
// this should never happen since we parse and verify
|
|
// the cron expression when the cron entry is created.
|
|
continue
|
|
}
|
|
|
|
// calculate the next execution date.
|
|
job.Prev = job.Next
|
|
job.Next = sched.Next(now).Unix()
|
|
|
|
logger := logrus.WithFields(
|
|
logrus.Fields{
|
|
"repo": job.RepoID,
|
|
"cron": job.ID,
|
|
},
|
|
)
|
|
|
|
err = s.cron.Update(ctx, job)
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger.Warnln("cron: cannot re-schedule job")
|
|
result = multierror.Append(result, err)
|
|
continue
|
|
}
|
|
|
|
repo, err := s.repos.Find(ctx, job.RepoID)
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger.Warnln("cron: cannot find repository")
|
|
result = multierror.Append(result, err)
|
|
continue
|
|
}
|
|
|
|
user, err := s.users.Find(ctx, repo.UserID)
|
|
if err != nil {
|
|
logger := logrus.WithError(err)
|
|
logger.Warnln("cron: cannot find repository owner")
|
|
result = multierror.Append(result, err)
|
|
continue
|
|
}
|
|
|
|
if repo.Active == false {
|
|
logger.Traceln("cron: skip inactive repository")
|
|
continue
|
|
}
|
|
|
|
// TODO(bradrydzewski) we may actually need to query the branch
|
|
// first to get the sha, and then query the commit. This works fine
|
|
// with github and gitlab, but may not work with other providers.
|
|
|
|
commit, err := s.commits.FindRef(ctx, user, repo.Slug, job.Branch)
|
|
if err != nil {
|
|
logger.WithFields(
|
|
logrus.Fields{
|
|
"error": err,
|
|
"repo": repo.Slug,
|
|
"branch": repo.Branch,
|
|
}).Warnln("cron: cannot find commit")
|
|
result = multierror.Append(result, err)
|
|
continue
|
|
}
|
|
|
|
hook := &core.Hook{
|
|
Trigger: core.TriggerCron,
|
|
Event: core.EventCron,
|
|
Link: commit.Link,
|
|
Timestamp: commit.Author.Date,
|
|
Message: commit.Message,
|
|
After: commit.Sha,
|
|
Ref: fmt.Sprintf("refs/heads/%s", job.Branch),
|
|
Target: job.Branch,
|
|
Author: commit.Author.Login,
|
|
AuthorName: commit.Author.Name,
|
|
AuthorEmail: commit.Author.Email,
|
|
AuthorAvatar: commit.Author.Avatar,
|
|
Cron: job.Name,
|
|
Sender: commit.Author.Login,
|
|
}
|
|
|
|
logger.WithFields(
|
|
logrus.Fields{
|
|
"cron": job.Name,
|
|
"repo": repo.Slug,
|
|
"branch": repo.Branch,
|
|
"sha": commit.Sha,
|
|
}).Warnln("cron: trigger build")
|
|
|
|
_, err = s.trigger.Trigger(ctx, repo, hook)
|
|
if err != nil {
|
|
logger.WithFields(
|
|
logrus.Fields{
|
|
"error": err,
|
|
"repo": repo.Slug,
|
|
"branch": repo.Branch,
|
|
"sha": commit.Sha,
|
|
}).Warnln("cron: cannot trigger build")
|
|
result = multierror.Append(result, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
logrus.Debugf("cron: finished processing jobs")
|
|
return result
|
|
}
|