harness-drone/scheduler/kube/kubernetes.go

260 lines
7 KiB
Go
Raw Normal View History

2019-02-19 23:56:41 +00:00
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Drone Non-Commercial License
// that can be found in the LICENSE file.
// +build !oss
package kube
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/hashicorp/go-multierror"
"github.com/dchest/uniuri"
"github.com/drone/drone/core"
"github.com/drone/drone/scheduler/internal"
"github.com/sirupsen/logrus"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// Config is the configuration for the Kubernetes scheduler.
type Config struct {
Namespace string
ServiceAccount string
ConfigURL string
ConfigPath string
TTL int
Image string
ImagePullPolicy string
ImagePrivileged []string
DockerHost string
DockerHostWin string
LimitMemory int
LimitCompute int
RequestMemory int
RequestCompute int
CallbackHost string
CallbackProto string
CallbackSecret string
SecretToken string
SecretEndpoint string
SecretInsecure bool
RegistryToken string
RegistryEndpoint string
RegistryInsecure bool
LogDebug bool
LogTrace bool
LogPretty bool
LogText bool
}
type kubeScheduler struct {
client *kubernetes.Clientset
config Config
}
// FromConfig returns a new Kubernetes scheduler.
func FromConfig(conf Config) (core.Scheduler, error) {
config, err := clientcmd.BuildConfigFromFlags(conf.ConfigURL, conf.ConfigPath)
if err != nil {
return nil, err
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &kubeScheduler{client: client, config: conf}, nil
}
var _ core.Scheduler = (*kubeScheduler)(nil)
// Schedule schedules the stage for execution.
func (s *kubeScheduler) Schedule(ctx context.Context, stage *core.Stage) error {
env := toEnvironment(
map[string]string{
"DRONE_RUNNER_PRIVILEGED_IMAGES": strings.Join(s.config.ImagePrivileged, ","),
"DRONE_LIMIT_MEM": fmt.Sprint(s.config.LimitMemory),
"DRONE_LIMIT_CPU": fmt.Sprint(s.config.LimitCompute),
"DRONE_STAGE_ID": fmt.Sprint(stage.ID),
"DRONE_LOGS_DEBUG": fmt.Sprint(s.config.LogDebug),
"DRONE_LOGS_TRACE": fmt.Sprint(s.config.LogTrace),
"DRONE_LOGS_PRETTY": fmt.Sprint(s.config.LogPretty),
"DRONE_LOGS_TEXT": fmt.Sprint(s.config.LogText),
"DRONE_RPC_PROTO": s.config.CallbackProto,
"DRONE_RPC_HOST": s.config.CallbackHost,
"DRONE_RPC_SECRET": s.config.CallbackSecret,
"DRONE_RPC_DEBUG": fmt.Sprint(s.config.LogTrace),
"DRONE_REGISTRY_ENDPOINT": s.config.RegistryEndpoint,
"DRONE_REGISTRY_SECRET": s.config.RegistryToken,
"DRONE_REGISTRY_SKIP_VERIFY": fmt.Sprint(s.config.RegistryInsecure),
"DRONE_SECRET_ENDPOINT": s.config.SecretEndpoint,
"DRONE_SECRET_SECRET": s.config.SecretToken,
"DRONE_SECRET_SKIP_VERIFY": fmt.Sprint(s.config.SecretInsecure),
},
)
env = append(env,
v1.EnvVar{
Name: "KUBERNETES_NODE",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
v1.EnvVar{
Name: "DRONE_RUNNER_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
)
var pull v1.PullPolicy
switch s.config.ImagePullPolicy {
case "IfNotPresent":
pull = v1.PullIfNotPresent
case "Never":
pull = v1.PullNever
case "Always":
pull = v1.PullAlways
}
rand := strings.ToLower(uniuri.NewLen(12))
name := fmt.Sprintf("drone-job-%d-%s", stage.ID, rand)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: name,
Namespace: s.namespace(),
Annotations: map[string]string{
"io.drone": "true",
"io.core.stage.created": time.Unix(stage.Created, 0).String(),
"io.core.stage.scheduled": time.Now().String(),
"io.core.stage.id": fmt.Sprint(stage.ID),
"io.core.stage.number": fmt.Sprint(stage.Number),
"io.core.stage.os": fmt.Sprint(stage.OS),
"io.core.stage.arch": fmt.Sprint(stage.Arch),
"io.core.build.id": fmt.Sprint(stage.BuildID),
"io.core.repo.id": fmt.Sprint(stage.RepoID),
},
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: int32ptr(int32(s.config.TTL)),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
ServiceAccountName: s.config.ServiceAccount,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{{
Name: "drone-controller",
Image: internal.DefaultImage(s.config.Image),
ImagePullPolicy: pull,
Env: env,
}},
},
},
},
}
if len(stage.Labels) > 0 {
job.Spec.Template.Spec.NodeSelector = stage.Labels
}
if arch := stage.Arch; arch != "amd64" {
if job.Spec.Template.Spec.NodeSelector == nil {
job.Spec.Template.Spec.NodeSelector = map[string]string{}
}
job.Spec.Template.Spec.NodeSelector["beta.kubernetes.io/arch"] = arch
}
log := logrus.WithFields(logrus.Fields{
"stage-id": stage.ID,
"stage-number": stage.Number,
"stage-name": stage.Name,
"repo-id": stage.RepoID,
"build-id": stage.BuildID,
})
log.Debugf("kubernetes: creating job")
job, err := s.client.BatchV1().Jobs(s.namespace()).Create(job)
if err != nil {
logrus.WithError(err).Errorln("kubernetes: cannot create job")
} else {
log.Debugf("kubernetes: successfully created job")
}
return err
}
// Cancel cancels a scheduled or running stage.
func (s *kubeScheduler) Cancel(ctx context.Context, id int64) error {
prefix := fmt.Sprintf("drone-job-%d-", id)
jobs, err := s.client.BatchV1().Jobs(s.namespace()).List(metav1.ListOptions{})
if err != nil {
return err
}
var result error
for _, job := range jobs.Items {
if !strings.HasPrefix(job.Name, prefix) {
continue
}
err = s.client.BatchV1().Jobs(job.Namespace).Delete(job.Name, &metav1.DeleteOptions{
// GracePeriodSeconds
})
if err != nil {
result = multierror.Append(result, err)
}
}
return result
}
func (s *kubeScheduler) Cancelled(context.Context, int64) (bool, error) {
return false, errors.New("not implemented")
}
func (s *kubeScheduler) Request(context.Context, core.Filter) (*core.Stage, error) {
return nil, errors.New("not implemented")
}
func (s *kubeScheduler) Stats(_ context.Context) (interface{}, error) {
return nil, errors.New("not implemented")
}
func (s *kubeScheduler) namespace() string {
namespace := s.config.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return namespace
}
func int32ptr(x int32) *int32 {
return &x
}
func toEnvironment(from map[string]string) []v1.EnvVar {
var to []v1.EnvVar
for k, v := range from {
if v == "" {
continue
}
to = append(to, v1.EnvVar{
Name: k,
Value: v,
})
}
return to
}