271 lines
6.2 KiB
Go
271 lines
6.2 KiB
Go
package agent
|
|
|
|
import (
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/drone/drone/model"
|
|
"github.com/drone/mq/logger"
|
|
"github.com/drone/mq/stomp"
|
|
"github.com/tidwall/redlog"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/codegangsta/cli"
|
|
"github.com/samalba/dockerclient"
|
|
)
|
|
|
|
// AgentCmd is the exported command for starting the drone agent.
|
|
var AgentCmd = cli.Command{
|
|
Name: "agent",
|
|
Usage: "starts the drone agent",
|
|
Action: start,
|
|
Flags: []cli.Flag{
|
|
cli.StringFlag{
|
|
EnvVar: "DOCKER_HOST",
|
|
Name: "docker-host",
|
|
Usage: "docker daemon address",
|
|
Value: "unix:///var/run/docker.sock",
|
|
},
|
|
cli.BoolFlag{
|
|
EnvVar: "DOCKER_TLS_VERIFY",
|
|
Name: "docker-tls-verify",
|
|
Usage: "docker daemon supports tlsverify",
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DOCKER_CERT_PATH",
|
|
Name: "docker-cert-path",
|
|
Usage: "docker certificate directory",
|
|
Value: "",
|
|
},
|
|
cli.IntFlag{
|
|
EnvVar: "DOCKER_MAX_PROCS",
|
|
Name: "docker-max-procs",
|
|
Usage: "limit number of running docker processes",
|
|
Value: 2,
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DOCKER_OS",
|
|
Name: "docker-os",
|
|
Usage: "docker operating system",
|
|
Value: "linux",
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DOCKER_ARCH",
|
|
Name: "docker-arch",
|
|
Usage: "docker architecture system",
|
|
Value: "amd64",
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DRONE_SERVER",
|
|
Name: "drone-server",
|
|
Usage: "drone server address",
|
|
Value: "ws://localhost:8000/ws/broker",
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DRONE_TOKEN",
|
|
Name: "drone-token",
|
|
Usage: "drone authorization token",
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DRONE_SECRET,DRONE_AGENT_SECRET",
|
|
Name: "drone-secret",
|
|
Usage: "drone agent secret",
|
|
},
|
|
cli.DurationFlag{
|
|
EnvVar: "DRONE_BACKOFF",
|
|
Name: "backoff",
|
|
Usage: "drone server backoff interval",
|
|
Value: time.Second * 15,
|
|
},
|
|
cli.DurationFlag{
|
|
EnvVar: "DRONE_PING",
|
|
Name: "ping",
|
|
Usage: "drone server ping frequency",
|
|
Value: time.Minute * 5,
|
|
},
|
|
cli.BoolFlag{
|
|
EnvVar: "DRONE_DEBUG",
|
|
Name: "debug",
|
|
Usage: "start the agent in debug mode",
|
|
},
|
|
cli.DurationFlag{
|
|
EnvVar: "DRONE_TIMEOUT",
|
|
Name: "timeout",
|
|
Usage: "drone timeout due to log inactivity",
|
|
Value: time.Minute * 15,
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DRONE_FILTER",
|
|
Name: "filter",
|
|
Usage: "filter jobs processed by this agent",
|
|
},
|
|
cli.IntFlag{
|
|
EnvVar: "DRONE_MAX_LOGS",
|
|
Name: "max-log-size",
|
|
Usage: "drone maximum log size in megabytes",
|
|
Value: 5,
|
|
},
|
|
cli.StringSliceFlag{
|
|
EnvVar: "DRONE_PLUGIN_PRIVILEGED",
|
|
Name: "privileged",
|
|
Usage: "plugins that require privileged mode",
|
|
Value: &cli.StringSlice{
|
|
"plugins/docker",
|
|
"plugins/docker:*",
|
|
"plugins/gcr",
|
|
"plugins/gcr:*",
|
|
"plugins/ecr",
|
|
"plugins/ecr:*",
|
|
},
|
|
},
|
|
cli.StringFlag{
|
|
EnvVar: "DRONE_PLUGIN_NAMESPACE",
|
|
Name: "namespace",
|
|
Value: "plugins",
|
|
Usage: "default plugin image namespace",
|
|
},
|
|
cli.BoolTFlag{
|
|
EnvVar: "DRONE_PLUGIN_PULL",
|
|
Name: "pull",
|
|
Usage: "always pull latest plugin images",
|
|
},
|
|
cli.StringSliceFlag{
|
|
EnvVar: "DRONE_YAML_EXTENSION",
|
|
Name: "extension",
|
|
Usage: "custom plugin extension endpoint",
|
|
},
|
|
},
|
|
}
|
|
|
|
func start(c *cli.Context) {
|
|
|
|
log := redlog.New(os.Stderr)
|
|
log.SetLevel(0)
|
|
logger.SetLogger(log)
|
|
|
|
// debug level if requested by user
|
|
if c.Bool("debug") {
|
|
logrus.SetLevel(logrus.DebugLevel)
|
|
|
|
log.SetLevel(1)
|
|
} else {
|
|
logrus.SetLevel(logrus.WarnLevel)
|
|
}
|
|
|
|
var accessToken string
|
|
if c.String("drone-secret") != "" {
|
|
// secretToken := c.String("drone-secret")
|
|
accessToken = c.String("drone-secret")
|
|
// accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken)
|
|
} else {
|
|
accessToken = c.String("drone-token")
|
|
}
|
|
|
|
logger.Noticef("connecting to server %s", c.String("drone-server"))
|
|
|
|
server := strings.TrimRight(c.String("drone-server"), "/")
|
|
|
|
tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path"))
|
|
if err == nil {
|
|
tls.InsecureSkipVerify = c.Bool("docker-tls-verify")
|
|
}
|
|
docker, err := dockerclient.NewDockerClient(c.String("docker-host"), tls)
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
|
|
var client *stomp.Client
|
|
|
|
handler := func(m *stomp.Message) {
|
|
running.Add(1)
|
|
defer func() {
|
|
running.Done()
|
|
client.Ack(m.Ack)
|
|
}()
|
|
|
|
r := pipeline{
|
|
drone: client,
|
|
docker: docker,
|
|
config: config{
|
|
platform: c.String("docker-os") + "/" + c.String("docker-arch"),
|
|
timeout: c.Duration("timeout"),
|
|
namespace: c.String("namespace"),
|
|
privileged: c.StringSlice("privileged"),
|
|
pull: c.BoolT("pull"),
|
|
logs: int64(c.Int("max-log-size")) * 1000000,
|
|
extension: c.StringSlice("extension"),
|
|
},
|
|
}
|
|
|
|
work := new(model.Work)
|
|
m.Unmarshal(work)
|
|
r.run(work)
|
|
}
|
|
|
|
handleSignals()
|
|
|
|
backoff := c.Duration("backoff")
|
|
|
|
for {
|
|
// dial the drone server to establish a TCP connection.
|
|
client, err = stomp.Dial(server)
|
|
if err != nil {
|
|
logger.Warningf("connection failed, retry in %v. %s", backoff, err)
|
|
<-time.After(backoff)
|
|
continue
|
|
}
|
|
opts := []stomp.MessageOption{
|
|
stomp.WithCredentials("x-token", accessToken),
|
|
}
|
|
|
|
// initialize the stomp session and authenticate.
|
|
if err = client.Connect(opts...); err != nil {
|
|
logger.Warningf("session failed, retry in %v. %s", backoff, err)
|
|
<-time.After(backoff)
|
|
continue
|
|
}
|
|
|
|
opts = []stomp.MessageOption{
|
|
stomp.WithAck("client"),
|
|
stomp.WithPrefetch(
|
|
c.Int("docker-max-procs"),
|
|
),
|
|
}
|
|
if filter := c.String("filter"); filter != "" {
|
|
opts = append(opts, stomp.WithSelector(filter))
|
|
}
|
|
|
|
// subscribe to the pending build queue.
|
|
client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) {
|
|
go handler(m) // HACK until we a channel based Subscribe implementation
|
|
}), opts...)
|
|
|
|
logger.Noticef("connection established, ready to process builds.")
|
|
<-client.Done()
|
|
|
|
logger.Warningf("connection interrupted, attempting to reconnect.")
|
|
}
|
|
}
|
|
|
|
// tracks running builds
|
|
var running sync.WaitGroup
|
|
|
|
func handleSignals() {
|
|
// Graceful shut-down on SIGINT/SIGTERM
|
|
c := make(chan os.Signal, 1)
|
|
signal.Notify(c, os.Interrupt)
|
|
signal.Notify(c, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
<-c
|
|
logger.Warningf("SIGTERM received.")
|
|
logger.Warningf("wait for running builds to finish.")
|
|
running.Wait()
|
|
logger.Warningf("done.")
|
|
os.Exit(0)
|
|
}()
|
|
}
|