moved 0.5 out of feature flag, removed deprecated 0.4 code and features

This commit is contained in:
Brad Rydzewski 2016-04-28 14:10:32 -07:00
parent 73f9c44d7f
commit 4d4003a9a1
27 changed files with 120 additions and 1747 deletions

View file

@ -35,7 +35,7 @@ publish:
password: $$DOCKER_PASS
email: $$DOCKER_EMAIL
repo: drone/drone
tag: [ "latest", "0.4.2" ]
tag: [ "0.5.0" ]
when:
repo: drone/drone
branch: master

View file

@ -5,14 +5,11 @@ import (
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/drone/drone/bus"
"github.com/drone/drone/engine"
"github.com/drone/drone/queue"
"github.com/drone/drone/remote"
"github.com/drone/drone/shared/httputil"
@ -33,11 +30,8 @@ func init() {
if droneYml == "" {
droneYml = ".drone.yml"
}
droneSec = fmt.Sprintf("%s.sec", strings.TrimSuffix(droneYml, filepath.Ext(droneYml)))
if os.Getenv("CANARY") == "true" {
droneSec = fmt.Sprintf("%s.sig", droneYml)
}
}
func GetBuilds(c *gin.Context) {
repo := session.Repo(c)
@ -135,7 +129,6 @@ func GetBuildLogs(c *gin.Context) {
}
func DeleteBuild(c *gin.Context) {
engine_ := engine.FromContext(c)
repo := session.Repo(c)
// parse the build number and job sequence number from
@ -155,17 +148,8 @@ func DeleteBuild(c *gin.Context) {
return
}
if os.Getenv("CANARY") == "true" {
bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job))
return
}
node, err := store.GetNode(c, job.NodeID)
if err != nil {
c.AbortWithError(404, err)
return
}
engine_.Cancel(build.ID, job.ID, node)
c.String(204, "")
}
func PostBuild(c *gin.Context) {
@ -218,7 +202,6 @@ func PostBuild(c *gin.Context) {
log.Debugf("cannot find build secrets for %s. %s", repo.FullName, err)
}
key, _ := store.GetKey(c, repo)
netrc, err := remote_.Netrc(user, repo)
if err != nil {
log.Errorf("failure to generate netrc for %s. %s", repo.FullName, err)
@ -296,14 +279,6 @@ func PostBuild(c *gin.Context) {
log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
}
// IMPORTANT. PLEASE READ
//
// The below code uses a feature flag to switch between the current
// build engine and the exerimental 0.5 build engine. This can be
// enabled using with the environment variable CANARY=true
if os.Getenv("CANARY") == "true" {
var signed bool
var verified bool
@ -342,26 +317,4 @@ func PostBuild(c *gin.Context) {
System: &model.System{Link: httputil.GetURL(c.Request)},
})
}
return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW
}
engine_ := engine.FromContext(c)
go engine_.Schedule(c.Copy(), &engine.Task{
User: user,
Repo: repo,
Build: build,
BuildPrev: last,
Jobs: jobs,
Keys: key,
Netrc: netrc,
Config: string(raw),
Secret: string(sec),
System: &model.System{
Link: httputil.GetURL(c.Request),
Plugins: strings.Split(os.Getenv("PLUGIN_FILTER"), " "),
Globals: strings.Split(os.Getenv("PLUGIN_PARAMS"), " "),
Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "),
},
})
}

View file

@ -1,80 +0,0 @@
package api
import (
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"github.com/drone/drone/engine"
"github.com/drone/drone/model"
"github.com/drone/drone/store"
)
func GetNodes(c *gin.Context) {
nodes, err := store.GetNodeList(c)
if err != nil {
c.String(400, err.Error())
} else {
c.JSON(200, nodes)
}
}
func GetNode(c *gin.Context) {
}
func PostNode(c *gin.Context) {
engine := engine.FromContext(c)
in := struct {
Addr string `json:"address"`
Arch string `json:"architecture"`
Cert string `json:"cert"`
Key string `json:"key"`
CA string `json:"ca"`
}{}
err := c.Bind(&in)
if err != nil {
c.AbortWithStatus(http.StatusBadRequest)
return
}
node := &model.Node{}
node.Addr = in.Addr
node.Cert = in.Cert
node.Key = in.Key
node.CA = in.CA
node.Arch = "linux_amd64"
err = engine.Allocate(node)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
err = store.CreateNode(c, node)
if err != nil {
c.AbortWithStatus(http.StatusInternalServerError)
return
}
c.IndentedJSON(http.StatusOK, node)
}
func DeleteNode(c *gin.Context) {
engine := engine.FromContext(c)
id, _ := strconv.Atoi(c.Param("node"))
node, err := store.GetNode(c, int64(id))
if err != nil {
c.AbortWithStatus(http.StatusNotFound)
return
}
err = store.DeleteNode(c, node)
if err != nil {
c.AbortWithStatus(http.StatusInternalServerError)
return
}
engine.Deallocate(node)
}

View file

@ -12,7 +12,7 @@ import (
_ "github.com/joho/godotenv/autoload"
)
func main2() {
func main() {
envflag.Parse()
app := cli.NewApp()

View file

@ -1,62 +0,0 @@
package main
import (
"net/http"
"os"
"time"
"github.com/drone/drone/router"
"github.com/drone/drone/router/middleware"
"github.com/Sirupsen/logrus"
"github.com/gin-gonic/contrib/ginrus"
"github.com/ianschenck/envflag"
_ "github.com/joho/godotenv/autoload"
)
var (
addr = envflag.String("SERVER_ADDR", ":8000", "")
cert = envflag.String("SERVER_CERT", "", "")
key = envflag.String("SERVER_KEY", "", "")
debug = envflag.Bool("DEBUG", false, "")
)
func main() {
if os.Getenv("CANARY") == "true" {
main2()
return
}
envflag.Parse()
// debug level if requested by user
if *debug {
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logrus.WarnLevel)
}
// setup the server and start the listener
handler := router.Load(
ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true),
middleware.Version,
middleware.Queue(),
middleware.Stream(),
middleware.Bus(),
middleware.Cache(),
middleware.Store(),
middleware.Remote(),
middleware.Engine(),
)
if *cert != "" {
logrus.Fatal(
http.ListenAndServeTLS(*addr, *cert, *key, handler),
)
} else {
logrus.Fatal(
http.ListenAndServe(*addr, handler),
)
}
}

View file

@ -86,7 +86,6 @@ func start(c *cli.Context) error {
middleware.Cache(),
middleware.Store(),
middleware.Remote(),
middleware.Engine(),
)
if c.String("server-cert") != "" {
@ -109,7 +108,7 @@ var agreement = `
You are attempting to use the unstable channel. This build is experimental and
has known bugs and compatibility issues, and is not intended for general use.
has known bugs and compatibility issues. It is not intended for general use.
Please consider using the latest stable release instead:
@ -119,8 +118,8 @@ If you are attempting to build from source please use the latest stable tag:
v0.4.2
If you are interested in testing this experimental build and assisting with
development you will need to set the following environment variables to proceed:
If you are interested in testing this experimental build AND assisting with
development you may proceed by setting the following environment:
I_UNDERSTAND_I_AM_USING_AN_UNSTABLE_VERSION=true
I_AGREE_TO_FIX_BUGS_AND_NOT_FILE_BUGS=true

View file

@ -1,48 +0,0 @@
package engine
import (
"sync"
)
type eventbus struct {
sync.Mutex
subs map[chan *Event]bool
}
// New creates a new eventbus that manages a list of
// subscribers to which events are published.
func newEventbus() *eventbus {
return &eventbus{
subs: make(map[chan *Event]bool),
}
}
// Subscribe adds the channel to the list of
// subscribers. Each subscriber in the list will
// receive broadcast events.
func (b *eventbus) subscribe(c chan *Event) {
b.Lock()
b.subs[c] = true
b.Unlock()
}
// Unsubscribe removes the channel from the
// list of subscribers.
func (b *eventbus) unsubscribe(c chan *Event) {
b.Lock()
delete(b.subs, c)
b.Unlock()
}
// Send dispatches a message to all subscribers.
func (b *eventbus) send(event *Event) {
b.Lock()
defer b.Unlock()
for s := range b.subs {
go func(c chan *Event) {
defer recover()
c <- event
}(s)
}
}

View file

@ -1,50 +0,0 @@
package engine
import (
"testing"
. "github.com/franela/goblin"
)
func TestBus(t *testing.T) {
g := Goblin(t)
g.Describe("Event bus", func() {
g.It("Should unsubscribe", func() {
c1 := make(chan *Event)
c2 := make(chan *Event)
b := newEventbus()
b.subscribe(c1)
b.subscribe(c2)
g.Assert(len(b.subs)).Equal(2)
})
g.It("Should subscribe", func() {
c1 := make(chan *Event)
c2 := make(chan *Event)
b := newEventbus()
b.subscribe(c1)
b.subscribe(c2)
g.Assert(len(b.subs)).Equal(2)
b.unsubscribe(c1)
b.unsubscribe(c2)
g.Assert(len(b.subs)).Equal(0)
})
g.It("Should send", func() {
em := map[string]bool{"foo": true, "bar": true}
e1 := &Event{Name: "foo"}
e2 := &Event{Name: "bar"}
c := make(chan *Event)
b := newEventbus()
b.subscribe(c)
b.send(e1)
b.send(e2)
r1 := <-c
r2 := <-c
g.Assert(em[r1.Name]).Equal(true)
g.Assert(em[r2.Name]).Equal(true)
})
})
}

View file

@ -1,23 +0,0 @@
package engine
import (
"golang.org/x/net/context"
)
const key = "engine"
// Setter defines a context that enables setting values.
type Setter interface {
Set(string, interface{})
}
// FromContext returns the Engine associated with this context.
func FromContext(c context.Context) Engine {
return c.Value(key).(Engine)
}
// ToContext adds the Engine to this context if it supports
// the Setter interface.
func ToContext(c Setter, engine Engine) {
c.Set(key, engine)
}

View file

@ -1,444 +0,0 @@
package engine
import (
"bytes"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stdcopy"
"github.com/drone/drone/model"
"github.com/drone/drone/shared/docker"
"github.com/drone/drone/store"
"github.com/samalba/dockerclient"
"golang.org/x/net/context"
)
type Engine interface {
Schedule(context.Context, *Task)
Cancel(int64, int64, *model.Node) error
Stream(int64, int64, *model.Node) (io.ReadCloser, error)
Deallocate(*model.Node)
Allocate(*model.Node) error
Subscribe(chan *Event)
Unsubscribe(chan *Event)
}
var (
// options to fetch the stdout and stderr logs
logOpts = &dockerclient.LogOptions{
Stdout: true,
Stderr: true,
}
// options to fetch the stdout and stderr logs
// by tailing the output.
logOptsTail = &dockerclient.LogOptions{
Follow: true,
Stdout: true,
Stderr: true,
}
// error when the system cannot find logs
errLogging = errors.New("Logs not available")
)
type engine struct {
bus *eventbus
updater *updater
pool *pool
envs []string
}
// Load creates a new build engine, loaded with registered nodes from the
// database. The registered nodes are added to the pool of nodes to immediately
// start accepting workloads.
func Load(s store.Store) Engine {
engine := &engine{}
engine.bus = newEventbus()
engine.pool = newPool()
engine.updater = &updater{engine.bus}
// quick fix to propagate HTTP_PROXY variables
// throughout the build environment.
var proxyVars = []string{"HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy", "NO_PROXY", "no_proxy"}
for _, proxyVar := range proxyVars {
proxyVal := os.Getenv(proxyVar)
if len(proxyVal) != 0 {
engine.envs = append(engine.envs, proxyVar+"="+proxyVal)
}
}
nodes, err := s.GetNodeList()
if err != nil {
log.Fatalf("failed to get nodes from database. %s", err)
}
for _, node := range nodes {
engine.pool.allocate(node)
log.Infof("registered docker daemon %s", node.Addr)
}
return engine
}
// Cancel cancels the job running on the specified Node.
func (e *engine) Cancel(build, job int64, node *model.Node) error {
client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA)
if err != nil {
return err
}
id := fmt.Sprintf("drone_build_%d_job_%d", build, job)
return client.StopContainer(id, 30)
}
// Stream streams the job output from the specified Node.
func (e *engine) Stream(build, job int64, node *model.Node) (io.ReadCloser, error) {
client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA)
if err != nil {
log.Errorf("cannot create Docker client for node %s", node.Addr)
return nil, err
}
id := fmt.Sprintf("drone_build_%d_job_%d", build, job)
log.Debugf("streaming container logs %s", id)
return client.ContainerLogs(id, logOptsTail)
}
// Subscribe subscribes the channel to all build events.
func (e *engine) Subscribe(c chan *Event) {
e.bus.subscribe(c)
}
// Unsubscribe unsubscribes the channel from all build events.
func (e *engine) Unsubscribe(c chan *Event) {
e.bus.unsubscribe(c)
}
func (e *engine) Allocate(node *model.Node) error {
// run the full build!
client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA)
if err != nil {
log.Errorf("error creating docker client %s. %s.", node.Addr, err)
return err
}
version, err := client.Version()
if err != nil {
log.Errorf("error connecting to docker daemon %s. %s.", node.Addr, err)
return err
}
log.Infof("registered docker daemon %s running version %s", node.Addr, version.Version)
e.pool.allocate(node)
return nil
}
func (e *engine) Deallocate(n *model.Node) {
nodes := e.pool.list()
for _, node := range nodes {
if node.ID == n.ID {
log.Infof("un-registered docker daemon %s", node.Addr)
e.pool.deallocate(node)
break
}
}
}
func (e *engine) Schedule(c context.Context, req *Task) {
node := <-e.pool.reserve()
// since we are probably running in a go-routine
// make sure we recover from any panics so that
// a bug doesn't crash the whole system.
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Errorf("panic running build: %v\n%s", err, string(buf))
}
e.pool.release(node)
}()
// update the node that was allocated to each job
func(id int64) {
for _, job := range req.Jobs {
job.NodeID = id
store.UpdateJob(c, job)
}
}(node.ID)
// run the full build!
client, err := newDockerClient(node.Addr, node.Cert, node.Key, node.CA)
if err != nil {
log.Errorln("error creating docker client", err)
}
// update the build state if any of the sub-tasks
// had a non-success status
req.Build.Started = time.Now().UTC().Unix()
req.Build.Status = model.StatusRunning
e.updater.SetBuild(c, req)
// run all bulid jobs
for _, job := range req.Jobs {
req.Job = job
e.runJob(c, req, e.updater, client)
}
// update overall status based on each job
req.Build.Status = model.StatusSuccess
for _, job := range req.Jobs {
if job.Status != model.StatusSuccess {
req.Build.Status = job.Status
break
}
}
req.Build.Finished = time.Now().UTC().Unix()
err = e.updater.SetBuild(c, req)
if err != nil {
log.Errorf("error updating build completion status. %s", err)
}
// run notifications
err = e.runJobNotify(req, client)
if err != nil {
log.Errorf("error executing notification step. %s", err)
}
}
func newDockerClient(addr, cert, key, ca string) (dockerclient.Client, error) {
var tlc *tls.Config
// create the Docket client TLS config
if len(cert) != 0 {
pem, err := tls.X509KeyPair([]byte(cert), []byte(key))
if err != nil {
log.Errorf("error loading X509 key pair. %s.", err)
return dockerclient.NewDockerClient(addr, nil)
}
// create the TLS configuration for secure
// docker communications.
tlc = &tls.Config{}
tlc.Certificates = []tls.Certificate{pem}
// use the certificate authority if provided.
// else don't use a certificate authority and set
// skip verify to true
if len(ca) != 0 {
log.Infof("creating docker client %s with CA", addr)
pool := x509.NewCertPool()
pool.AppendCertsFromPEM([]byte(ca))
tlc.RootCAs = pool
} else {
log.Infof("creating docker client %s WITHOUT CA", addr)
tlc.InsecureSkipVerify = true
}
}
// create the Docker client. In this version of Drone (alpha)
// we do not spread builds across clients, but this can and
// (probably) will change in the future.
return dockerclient.NewDockerClient(addr, tlc)
}
func (e *engine) runJob(c context.Context, r *Task, updater *updater, client dockerclient.Client) error {
name := fmt.Sprintf("drone_build_%d_job_%d", r.Build.ID, r.Job.ID)
defer func() {
if r.Job.Status == model.StatusRunning {
r.Job.Status = model.StatusError
r.Job.Finished = time.Now().UTC().Unix()
r.Job.ExitCode = 255
}
if r.Job.Status == model.StatusPending {
r.Job.Status = model.StatusError
r.Job.Started = time.Now().UTC().Unix()
r.Job.Finished = time.Now().UTC().Unix()
r.Job.ExitCode = 255
}
updater.SetJob(c, r)
client.KillContainer(name, "9")
client.RemoveContainer(name, true, true)
}()
// marks the task as running
r.Job.Status = model.StatusRunning
r.Job.Started = time.Now().UTC().Unix()
// encode the build payload to write to stdin
// when launching the build container
in, err := encodeToLegacyFormat(r)
if err != nil {
log.Errorf("failure to marshal work. %s", err)
return err
}
// CREATE AND START BUILD
args := DefaultBuildArgs
if r.Build.Event == model.EventPull {
args = DefaultPullRequestArgs
}
args = append(args, "--")
args = append(args, string(in))
conf := &dockerclient.ContainerConfig{
Image: DefaultAgent,
Entrypoint: DefaultEntrypoint,
Cmd: args,
Env: e.envs,
HostConfig: dockerclient.HostConfig{
Binds: []string{"/var/run/docker.sock:/var/run/docker.sock"},
MemorySwappiness: -1,
},
Volumes: map[string]struct{}{
"/var/run/docker.sock": {},
},
}
log.Infof("preparing container %s", name)
client.PullImage(conf.Image, nil)
_, err = docker.RunDaemon(client, conf, name)
if err != nil {
log.Errorf("error starting build container. %s", err)
return err
}
// UPDATE STATUS
err = updater.SetJob(c, r)
if err != nil {
log.Errorf("error updating job status as running. %s", err)
return err
}
// WAIT FOR OUTPUT
info, builderr := docker.Wait(client, name)
switch {
case info.State.Running:
// A build unblocked before actually being completed.
log.Errorf("incomplete build: %s", name)
r.Job.ExitCode = 1
r.Job.Status = model.StatusError
case info.State.ExitCode == 128:
r.Job.ExitCode = info.State.ExitCode
r.Job.Status = model.StatusKilled
case info.State.ExitCode == 130:
r.Job.ExitCode = info.State.ExitCode
r.Job.Status = model.StatusKilled
case builderr != nil:
r.Job.Status = model.StatusError
case info.State.ExitCode != 0:
r.Job.ExitCode = info.State.ExitCode
r.Job.Status = model.StatusFailure
default:
r.Job.Status = model.StatusSuccess
}
// send the logs to the datastore
var buf bytes.Buffer
rc, err := client.ContainerLogs(name, docker.LogOpts)
if err != nil && builderr != nil {
buf.WriteString("Error launching build")
buf.WriteString(builderr.Error())
} else if err != nil {
buf.WriteString("Error launching build")
buf.WriteString(err.Error())
log.Errorf("error opening connection to logs. %s", err)
return err
} else {
defer rc.Close()
stdcopy.StdCopy(&buf, &buf, io.LimitReader(rc, 5000000))
}
// update the task in the datastore
r.Job.Finished = time.Now().UTC().Unix()
err = updater.SetJob(c, r)
if err != nil {
log.Errorf("error updating job after completion. %s", err)
return err
}
err = updater.SetLogs(c, r, ioutil.NopCloser(&buf))
if err != nil {
log.Errorf("error updating logs. %s", err)
return err
}
log.Debugf("completed job %d with status %s.", r.Job.ID, r.Job.Status)
return nil
}
func (e *engine) runJobNotify(r *Task, client dockerclient.Client) error {
name := fmt.Sprintf("drone_build_%d_notify", r.Build.ID)
defer func() {
client.KillContainer(name, "9")
client.RemoveContainer(name, true, true)
}()
// encode the build payload to write to stdin
// when launching the build container
in, err := encodeToLegacyFormat(r)
if err != nil {
log.Errorf("failure to marshal work. %s", err)
return err
}
args := DefaultNotifyArgs
args = append(args, "--")
args = append(args, string(in))
conf := &dockerclient.ContainerConfig{
Image: DefaultAgent,
Entrypoint: DefaultEntrypoint,
Cmd: args,
Env: e.envs,
HostConfig: dockerclient.HostConfig{
Binds: []string{"/var/run/docker.sock:/var/run/docker.sock"},
MemorySwappiness: -1,
},
Volumes: map[string]struct{}{
"/var/run/docker.sock": {},
},
}
log.Infof("preparing container %s", name)
info, err := docker.Run(client, conf, name)
if err != nil {
log.Errorf("Error starting notification container %s. %s", name, err)
}
// for debugging purposes we print a failed notification executions
// output to the logs. Otherwise we have no way to troubleshoot failed
// notifications. This is temporary code until I've come up with
// a better solution.
if info != nil && info.State.ExitCode != 0 && log.GetLevel() >= log.InfoLevel {
var buf bytes.Buffer
rc, err := client.ContainerLogs(name, docker.LogOpts)
if err == nil {
defer rc.Close()
stdcopy.StdCopy(&buf, &buf, io.LimitReader(rc, 50000))
}
log.Infof("Notification container %s exited with %d", name, info.State.ExitCode)
log.Infoln(buf.String())
}
return err
}

View file

@ -1,86 +0,0 @@
package engine
import (
"sync"
"github.com/drone/drone/model"
)
type pool struct {
sync.Mutex
nodes map[*model.Node]bool
nodec chan *model.Node
}
func newPool() *pool {
return &pool{
nodes: make(map[*model.Node]bool),
nodec: make(chan *model.Node, 999),
}
}
// Allocate allocates a node to the pool to
// be available to accept work.
func (p *pool) allocate(n *model.Node) bool {
if p.isAllocated(n) {
return false
}
p.Lock()
p.nodes[n] = true
p.Unlock()
p.nodec <- n
return true
}
// IsAllocated is a helper function that returns
// true if the node is currently allocated to
// the pool.
func (p *pool) isAllocated(n *model.Node) bool {
p.Lock()
defer p.Unlock()
_, ok := p.nodes[n]
return ok
}
// Deallocate removes the node from the pool of
// available nodes. If the node is currently
// reserved and performing work it will finish,
// but no longer be given new work.
func (p *pool) deallocate(n *model.Node) {
p.Lock()
defer p.Unlock()
delete(p.nodes, n)
}
// List returns a list of all model.Nodes currently
// allocated to the pool.
func (p *pool) list() []*model.Node {
p.Lock()
defer p.Unlock()
var nodes []*model.Node
for n := range p.nodes {
nodes = append(nodes, n)
}
return nodes
}
// Reserve reserves the next available node to
// start doing work. Once work is complete, the
// node should be released back to the pool.
func (p *pool) reserve() <-chan *model.Node {
return p.nodec
}
// Release releases the node back to the pool
// of available nodes.
func (p *pool) release(n *model.Node) bool {
if !p.isAllocated(n) {
return false
}
p.nodec <- n
return true
}

View file

@ -1,89 +0,0 @@
package engine
import (
"testing"
"github.com/drone/drone/model"
"github.com/franela/goblin"
)
func TestPool(t *testing.T) {
g := goblin.Goblin(t)
g.Describe("Pool", func() {
g.It("Should allocate nodes", func() {
n := &model.Node{Addr: "unix:///var/run/docker.sock"}
pool := newPool()
pool.allocate(n)
g.Assert(len(pool.nodes)).Equal(1)
g.Assert(len(pool.nodec)).Equal(1)
g.Assert(pool.nodes[n]).Equal(true)
})
g.It("Should not re-allocate an allocated node", func() {
n := &model.Node{Addr: "unix:///var/run/docker.sock"}
pool := newPool()
g.Assert(pool.allocate(n)).Equal(true)
g.Assert(pool.allocate(n)).Equal(false)
})
g.It("Should reserve a node", func() {
n := &model.Node{Addr: "unix:///var/run/docker.sock"}
pool := newPool()
pool.allocate(n)
g.Assert(<-pool.reserve()).Equal(n)
})
g.It("Should release a node", func() {
n := &model.Node{Addr: "unix:///var/run/docker.sock"}
pool := newPool()
pool.allocate(n)
g.Assert(len(pool.nodec)).Equal(1)
g.Assert(<-pool.reserve()).Equal(n)
g.Assert(len(pool.nodec)).Equal(0)
pool.release(n)
g.Assert(len(pool.nodec)).Equal(1)
g.Assert(<-pool.reserve()).Equal(n)
g.Assert(len(pool.nodec)).Equal(0)
})
g.It("Should not release an unallocated node", func() {
n := &model.Node{Addr: "unix:///var/run/docker.sock"}
pool := newPool()
g.Assert(len(pool.nodes)).Equal(0)
g.Assert(len(pool.nodec)).Equal(0)
pool.release(n)
g.Assert(len(pool.nodes)).Equal(0)
g.Assert(len(pool.nodec)).Equal(0)
pool.release(nil)
g.Assert(len(pool.nodes)).Equal(0)
g.Assert(len(pool.nodec)).Equal(0)
})
g.It("Should list all allocated nodes", func() {
n1 := &model.Node{Addr: "unix:///var/run/docker.sock"}
n2 := &model.Node{Addr: "unix:///var/run/docker.sock"}
pool := newPool()
pool.allocate(n1)
pool.allocate(n2)
g.Assert(len(pool.nodes)).Equal(2)
g.Assert(len(pool.nodec)).Equal(2)
g.Assert(len(pool.list())).Equal(2)
})
g.It("Should remove a node", func() {
n1 := &model.Node{Addr: "unix:///var/run/docker.sock"}
n2 := &model.Node{Addr: "unix:///var/run/docker.sock"}
pool := newPool()
pool.allocate(n1)
pool.allocate(n2)
g.Assert(len(pool.nodes)).Equal(2)
pool.deallocate(n1)
pool.deallocate(n2)
g.Assert(len(pool.nodes)).Equal(0)
g.Assert(len(pool.list())).Equal(0)
})
})
}

View file

@ -1,24 +0,0 @@
package engine
import (
"github.com/drone/drone/model"
)
type Event struct {
Name string
Msg []byte
}
type Task struct {
User *model.User `json:"-"`
Repo *model.Repo `json:"repo"`
Build *model.Build `json:"build"`
BuildPrev *model.Build `json:"build_last"`
Jobs []*model.Job `json:"-"`
Job *model.Job `json:"job"`
Keys *model.Key `json:"keys"`
Netrc *model.Netrc `json:"netrc"`
Config string `json:"config"`
Secret string `json:"secret"`
System *model.System `json:"system"`
}

View file

@ -1,66 +0,0 @@
package engine
import (
"encoding/json"
"fmt"
"io"
"github.com/drone/drone/model"
"github.com/drone/drone/remote"
"github.com/drone/drone/store"
"golang.org/x/net/context"
)
type updater struct {
bus *eventbus
}
func (u *updater) SetBuild(c context.Context, r *Task) error {
err := store.UpdateBuild(c, r.Build)
if err != nil {
return err
}
err = remote.FromContext(c).Status(r.User, r.Repo, r.Build, fmt.Sprintf("%s/%s/%d", r.System.Link, r.Repo.FullName, r.Build.Number))
if err != nil {
// log err
}
msg, err := json.Marshal(&payload{r.Build, r.Jobs})
if err != nil {
return err
}
u.bus.send(&Event{
Name: r.Repo.FullName,
Msg: msg,
})
return nil
}
func (u *updater) SetJob(c context.Context, r *Task) error {
err := store.UpdateJob(c, r.Job)
if err != nil {
return err
}
msg, err := json.Marshal(&payload{r.Build, r.Jobs})
if err != nil {
return err
}
u.bus.send(&Event{
Name: r.Repo.FullName,
Msg: msg,
})
return nil
}
func (u *updater) SetLogs(c context.Context, r *Task, rc io.ReadCloser) error {
return store.WriteLog(c, r.Job, rc)
}
type payload struct {
*model.Build
Jobs []*model.Job `json:"jobs"`
}

View file

@ -1,35 +0,0 @@
package engine
import (
"encoding/json"
)
func encodeToLegacyFormat(t *Task) ([]byte, error) {
// t.System.Plugins = append(t.System.Plugins, "plugins/*")
// s := map[string]interface{}{}
// s["repo"] = t.Repo
// s["config"] = t.Config
// s["secret"] = t.Secret
// s["job"] = t.Job
// s["system"] = t.System
// s["workspace"] = map[string]interface{}{
// "netrc": t.Netrc,
// "keys": t.Keys,
// }
// s["build"] = map[string]interface{}{
// "number": t.Build.Number,
// "status": t.Build.Status,
// "head_commit": map[string]interface{}{
// "sha": t.Build.Commit,
// "ref": t.Build.Ref,
// "branch": t.Build.Branch,
// "message": t.Build.Message,
// "author": map[string]interface{}{
// "login": t.Build.Author,
// "email": t.Build.Email,
// },
// },
// }
return json.Marshal(t)
}

View file

@ -1,115 +0,0 @@
package engine
import (
"fmt"
"io"
"github.com/drone/drone/shared/docker"
"github.com/samalba/dockerclient"
)
var (
// name of the build agent container.
DefaultAgent = "drone/drone-exec:latest"
// default name of the build agent executable
DefaultEntrypoint = []string{"/bin/drone-exec"}
// default argument to invoke build steps
DefaultBuildArgs = []string{"--pull", "--cache", "--clone", "--build", "--deploy"}
// default argument to invoke build steps
DefaultPullRequestArgs = []string{"--pull", "--cache", "--clone", "--build"}
// default arguments to invoke notify steps
DefaultNotifyArgs = []string{"--pull", "--notify"}
)
type worker struct {
client dockerclient.Client
build *dockerclient.ContainerInfo
notify *dockerclient.ContainerInfo
}
func newWorker(client dockerclient.Client) *worker {
return &worker{client: client}
}
// Build executes the clone, build and deploy steps.
func (w *worker) Build(name string, stdin []byte, pr bool) (_ int, err error) {
// the command line arguments passed into the
// build agent container.
args := DefaultBuildArgs
if pr {
args = DefaultPullRequestArgs
}
args = append(args, "--")
args = append(args, string(stdin))
conf := &dockerclient.ContainerConfig{
Image: DefaultAgent,
Entrypoint: DefaultEntrypoint,
Cmd: args,
HostConfig: dockerclient.HostConfig{
Binds: []string{"/var/run/docker.sock:/var/run/docker.sock"},
},
Volumes: map[string]struct{}{
"/var/run/docker.sock": {},
},
}
// TEMPORARY: always try to pull the new image for now
// since we'll be frequently updating the build image
// for the next few weeks
w.client.PullImage(conf.Image, nil)
w.build, err = docker.Run(w.client, conf, name)
if err != nil {
return 1, err
}
if w.build.State.OOMKilled {
return 1, fmt.Errorf("OOMKill received")
}
return w.build.State.ExitCode, err
}
// Notify executes the notification steps.
func (w *worker) Notify(stdin []byte) error {
args := DefaultNotifyArgs
args = append(args, "--")
args = append(args, string(stdin))
conf := &dockerclient.ContainerConfig{
Image: DefaultAgent,
Entrypoint: DefaultEntrypoint,
Cmd: args,
HostConfig: dockerclient.HostConfig{},
}
var err error
w.notify, err = docker.Run(w.client, conf, "")
return err
}
// Logs returns a multi-reader that fetches the logs
// from the build and deploy agents.
func (w *worker) Logs() (io.ReadCloser, error) {
if w.build == nil {
return nil, errLogging
}
return w.client.ContainerLogs(w.build.Id, logOpts)
}
// Remove stops and removes the build, deploy and
// notification agents created for the build task.
func (w *worker) Remove() {
if w.notify != nil {
w.client.KillContainer(w.notify.Id, "9")
w.client.RemoveContainer(w.notify.Id, true, true)
}
if w.build != nil {
w.client.KillContainer(w.build.Id, "9")
w.client.RemoveContainer(w.build.Id, true, true)
}
}

View file

@ -1,36 +0,0 @@
package model
const (
Freebsd_386 uint = iota
Freebsd_amd64
Freebsd_arm
Linux_386
Linux_amd64
Linux_arm
Linux_arm64
Solaris_amd64
Windows_386
Windows_amd64
)
var Archs = map[string]uint{
"freebsd_386": Freebsd_386,
"freebsd_amd64": Freebsd_amd64,
"freebsd_arm": Freebsd_arm,
"linux_386": Linux_386,
"linux_amd64": Linux_amd64,
"linux_arm": Linux_arm,
"linux_arm64": Linux_arm64,
"solaris_amd64": Solaris_amd64,
"windows_386": Windows_386,
"windows_amd64": Windows_amd64,
}
type Node struct {
ID int64 `meddler:"node_id,pk" json:"id"`
Addr string `meddler:"node_addr" json:"address"`
Arch string `meddler:"node_arch" json:"architecture"`
Cert string `meddler:"node_cert" json:"-"`
Key string `meddler:"node_key" json:"-"`
CA string `meddler:"node_ca" json:"-"`
}

View file

@ -1,28 +0,0 @@
package middleware
import (
"sync"
"github.com/drone/drone/engine"
"github.com/drone/drone/store"
"github.com/gin-gonic/gin"
)
// Engine is a middleware function that initializes the Engine and attaches to
// the context of every http.Request.
func Engine() gin.HandlerFunc {
var once sync.Once
var engine_ engine.Engine
return func(c *gin.Context) {
once.Do(func() {
store_ := store.FromContext(c)
engine_ = engine.Load(store_)
})
engine.ToContext(c, engine_)
c.Next()
}
}

View file

@ -2,7 +2,6 @@ package router
import (
"net/http"
"os"
"strings"
"github.com/gin-gonic/gin"
@ -41,8 +40,6 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler {
{
settings.Use(session.MustUser())
settings.GET("/profile", web.ShowUser)
settings.GET("/people", session.MustAdmin(), web.ShowUsers)
settings.GET("/nodes", session.MustAdmin(), web.ShowNodes)
}
repo := e.Group("/repos/:owner/:name")
{
@ -83,14 +80,6 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler {
users.DELETE("/:login", api.DeleteUser)
}
nodes := e.Group("/api/nodes")
{
nodes.Use(session.MustAdmin())
nodes.GET("", api.GetNodes)
nodes.POST("", api.PostNode)
nodes.DELETE("/:node", api.DeleteNode)
}
repos := e.Group("/api/repos/:owner/:name")
{
repos.POST("", api.PostRepo)
@ -139,14 +128,9 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler {
stream.Use(session.SetPerm())
stream.Use(session.MustPull)
if os.Getenv("CANARY") == "true" {
stream.GET("/:owner/:name", web.GetRepoEvents2)
stream.GET("/:owner/:name/:build/:number", web.GetStream2)
} else {
stream.GET("/:owner/:name", web.GetRepoEvents)
stream.GET("/:owner/:name/:build/:number", web.GetStream)
}
}
bots := e.Group("/bots")
{
@ -164,7 +148,6 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler {
queue := e.Group("/api/queue")
{
if os.Getenv("CANARY") == "true" {
queue.Use(middleware.AgentMust())
queue.POST("/pull", api.Pull)
queue.POST("/pull/:os/:arch", api.Pull)
@ -172,7 +155,6 @@ func Load(middlewares ...gin.HandlerFunc) http.Handler {
queue.POST("/stream/:id", api.Stream)
queue.POST("/status/:id", api.Update)
}
}
gitlab := e.Group("/gitlab/:owner/:name")
{

View file

@ -1,48 +0,0 @@
package datastore
import (
"github.com/drone/drone/model"
"github.com/russross/meddler"
)
func (db *datastore) GetNode(id int64) (*model.Node, error) {
var node = new(model.Node)
var err = meddler.Load(db, nodeTable, node, id)
return node, err
}
func (db *datastore) GetNodeList() ([]*model.Node, error) {
var nodes = []*model.Node{}
var err = meddler.QueryAll(db, &nodes, rebind(nodeListQuery))
return nodes, err
}
func (db *datastore) CreateNode(node *model.Node) error {
return meddler.Insert(db, nodeTable, node)
}
func (db *datastore) UpdateNode(node *model.Node) error {
return meddler.Update(db, nodeTable, node)
}
func (db *datastore) DeleteNode(node *model.Node) error {
var _, err = db.Exec(rebind(nodeDeleteStmt), node.ID)
return err
}
const nodeTable = "nodes"
const nodeListQuery = `
SELECT *
FROM nodes
ORDER BY node_addr
`
const nodeCountQuery = `
SELECT COUNT(*) FROM nodes
`
const nodeDeleteStmt = `
DELETE FROM nodes
WHERE node_id=?
`

View file

@ -1,101 +0,0 @@
package datastore
import (
"testing"
"github.com/drone/drone/model"
"github.com/franela/goblin"
)
func TestNodes(t *testing.T) {
db := openTest()
defer db.Close()
s := From(db)
g := goblin.Goblin(t)
g.Describe("Nodes", func() {
// before each test be sure to purge the package
// table data from the database.
g.BeforeEach(func() {
db.Exec("DELETE FROM nodes")
})
g.It("Should create a node", func() {
node := model.Node{
Addr: "unix:///var/run/docker/docker.sock",
Arch: "linux_amd64",
}
err := s.CreateNode(&node)
g.Assert(err == nil).IsTrue()
g.Assert(node.ID != 0).IsTrue()
})
g.It("Should update a node", func() {
node := model.Node{
Addr: "unix:///var/run/docker/docker.sock",
Arch: "linux_amd64",
}
err := s.CreateNode(&node)
g.Assert(err == nil).IsTrue()
g.Assert(node.ID != 0).IsTrue()
node.Addr = "unix:///var/run/docker.sock"
err1 := s.UpdateNode(&node)
getnode, err2 := s.GetNode(node.ID)
g.Assert(err1 == nil).IsTrue()
g.Assert(err2 == nil).IsTrue()
g.Assert(node.ID).Equal(getnode.ID)
g.Assert(node.Addr).Equal(getnode.Addr)
g.Assert(node.Arch).Equal(getnode.Arch)
})
g.It("Should get a node", func() {
node := model.Node{
Addr: "unix:///var/run/docker/docker.sock",
Arch: "linux_amd64",
}
err := s.CreateNode(&node)
g.Assert(err == nil).IsTrue()
g.Assert(node.ID != 0).IsTrue()
getnode, err := s.GetNode(node.ID)
g.Assert(err == nil).IsTrue()
g.Assert(node.ID).Equal(getnode.ID)
g.Assert(node.Addr).Equal(getnode.Addr)
g.Assert(node.Arch).Equal(getnode.Arch)
})
g.It("Should get a node list", func() {
node1 := model.Node{
Addr: "unix:///var/run/docker/docker.sock",
Arch: "linux_amd64",
}
node2 := model.Node{
Addr: "unix:///var/run/docker.sock",
Arch: "linux_386",
}
s.CreateNode(&node1)
s.CreateNode(&node2)
nodes, err := s.GetNodeList()
g.Assert(err == nil).IsTrue()
g.Assert(len(nodes)).Equal(2)
})
g.It("Should delete a node", func() {
node := model.Node{
Addr: "unix:///var/run/docker/docker.sock",
Arch: "linux_amd64",
}
err1 := s.CreateNode(&node)
err2 := s.DeleteNode(&node)
g.Assert(err1 == nil).IsTrue()
g.Assert(err2 == nil).IsTrue()
_, err := s.GetNode(node.ID)
g.Assert(err == nil).IsFalse()
})
})
}

View file

@ -125,21 +125,6 @@ type Store interface {
// WriteLog writes the job logs to the datastore.
WriteLog(*model.Job, io.Reader) error
// GetNode gets a build node from the datastore.
GetNode(id int64) (*model.Node, error)
// GetNodeList gets a build node list from the datastore.
GetNodeList() ([]*model.Node, error)
// CreateNode add a new build node to the datastore.
CreateNode(*model.Node) error
// UpdateNode updates a build node in the datastore.
UpdateNode(*model.Node) error
// DeleteNode removes a build node from the datastore.
DeleteNode(*model.Node) error
}
// GetUser gets a user by unique ID.
@ -343,23 +328,3 @@ func ReadLog(c context.Context, job *model.Job) (io.ReadCloser, error) {
func WriteLog(c context.Context, job *model.Job, r io.Reader) error {
return FromContext(c).WriteLog(job, r)
}
func GetNode(c context.Context, id int64) (*model.Node, error) {
return FromContext(c).GetNode(id)
}
func GetNodeList(c context.Context) ([]*model.Node, error) {
return FromContext(c).GetNodeList()
}
func CreateNode(c context.Context, node *model.Node) error {
return FromContext(c).CreateNode(node)
}
func UpdateNode(c context.Context, node *model.Node) error {
return FromContext(c).UpdateNode(node)
}
func DeleteNode(c context.Context, node *model.Node) error {
return FromContext(c).DeleteNode(node)
}

View file

@ -34,9 +34,6 @@ html
i.material-icons expand_more
div.dropdown-menu.dropdown-menu-right
a.dropdown-item[href="/settings/profile"] Profile
if User.Admin
a.dropdown-item[href="/settings/people"] People
a.dropdown-item[href="/settings/nodes"] Nodes
a.dropdown-item[href="/logout"] Logout

View file

@ -3,16 +3,13 @@ package web
import (
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"github.com/gin-gonic/gin"
"github.com/square/go-jose"
log "github.com/Sirupsen/logrus"
"github.com/drone/drone/bus"
"github.com/drone/drone/engine"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/drone/drone/remote"
@ -31,11 +28,8 @@ func init() {
if droneYml == "" {
droneYml = ".drone.yml"
}
droneSec = fmt.Sprintf("%s.sec", strings.TrimSuffix(droneYml, filepath.Ext(droneYml)))
if os.Getenv("CANARY") == "true" {
droneSec = fmt.Sprintf("%s.sig", droneYml)
}
}
var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`)
@ -168,8 +162,6 @@ func PostHook(c *gin.Context) {
return
}
key, _ := store.GetKey(c, repo)
// verify the branches can be built vs skipped
branches := yaml.ParseBranch(raw)
if !branches.Matches(build.Branch) && build.Event != model.EventTag && build.Event != model.EventDeploy {
@ -214,14 +206,6 @@ func PostHook(c *gin.Context) {
log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
}
// IMPORTANT. PLEASE READ
//
// The below code uses a feature flag to switch between the current
// build engine and the exerimental 0.5 build engine. This can be
// enabled using with the environment variable CANARY=true
if os.Getenv("CANARY") == "true" {
var signed bool
var verified bool
@ -260,25 +244,5 @@ func PostHook(c *gin.Context) {
System: &model.System{Link: httputil.GetURL(c.Request)},
})
}
return // EXIT NOT TO AVOID THE 0.4 ENGINE CODE BELOW
}
engine_ := engine.FromContext(c)
go engine_.Schedule(c.Copy(), &engine.Task{
User: user,
Repo: repo,
Build: build,
BuildPrev: last,
Jobs: jobs,
Keys: key,
Netrc: netrc,
Config: string(raw),
Secret: string(sec),
System: &model.System{
Link: httputil.GetURL(c.Request),
Plugins: strings.Split(os.Getenv("PLUGIN_FILTER"), " "),
Globals: strings.Split(os.Getenv("PLUGIN_PARAMS"), " "),
Escalates: strings.Split(os.Getenv("ESCALATE_FILTER"), " "),
},
})
}

View file

@ -83,26 +83,6 @@ func ShowUser(c *gin.Context) {
})
}
func ShowUsers(c *gin.Context) {
user := session.User(c)
if !user.Admin {
c.AbortWithStatus(http.StatusForbidden)
return
}
users, _ := store.GetUserList(c)
token, _ := token.New(
token.CsrfToken,
user.Login,
).Sign(user.Hash)
c.HTML(200, "users.html", gin.H{
"User": user,
"Users": users,
"Csrf": token,
})
}
func ShowRepo(c *gin.Context) {
user := session.User(c)
repo := session.Repo(c)
@ -227,10 +207,3 @@ func ShowBuild(c *gin.Context) {
"Csrf": csrf,
})
}
func ShowNodes(c *gin.Context) {
user := session.User(c)
nodes, _ := store.GetNodeList(c)
token, _ := token.New(token.CsrfToken, user.Login).Sign(user.Hash)
c.HTML(http.StatusOK, "nodes.html", gin.H{"User": user, "Nodes": nodes, "Csrf": token})
}

View file

@ -1,15 +1,18 @@
package web
import (
"bufio"
"encoding/json"
"io"
"strconv"
"github.com/gin-gonic/gin"
"github.com/docker/docker/pkg/stdcopy"
"github.com/drone/drone/engine"
"github.com/drone/drone/bus"
"github.com/drone/drone/model"
"github.com/drone/drone/router/middleware/session"
"github.com/drone/drone/store"
"github.com/drone/drone/stream"
log "github.com/Sirupsen/logrus"
@ -19,14 +22,13 @@ import (
// GetRepoEvents will upgrade the connection to a Websocket and will stream
// event updates to the browser.
func GetRepoEvents(c *gin.Context) {
engine_ := engine.FromContext(c)
repo := session.Repo(c)
c.Writer.Header().Set("Content-Type", "text/event-stream")
eventc := make(chan *engine.Event, 1)
engine_.Subscribe(eventc)
eventc := make(chan *bus.Event, 1)
bus.Subscribe(c, eventc)
defer func() {
engine_.Unsubscribe(eventc)
bus.Unsubscribe(c, eventc)
close(eventc)
log.Infof("closed event stream")
}()
@ -38,11 +40,22 @@ func GetRepoEvents(c *gin.Context) {
log.Infof("nil event received")
return false
}
if event.Name == repo.FullName {
log.Debugf("received message %s", event.Name)
// TODO(bradrydzewski) This is a super hacky workaround until we improve
// the actual bus. Having a per-call database event is just plain stupid.
if event.Repo.FullName == repo.FullName {
var payload = struct {
model.Build
Jobs []*model.Job `json:"jobs"`
}{}
payload.Build = event.Build
payload.Jobs, _ = store.GetJobList(c, &event.Build)
data, _ := json.Marshal(&payload)
sse.Encode(w, sse.Event{
Event: "message",
Data: string(event.Msg),
Data: string(data),
})
}
case <-c.Writer.CloseNotify():
@ -54,7 +67,6 @@ func GetRepoEvents(c *gin.Context) {
func GetStream(c *gin.Context) {
engine_ := engine.FromContext(c)
repo := session.Repo(c)
buildn, _ := strconv.Atoi(c.Param("build"))
jobn, _ := strconv.Atoi(c.Param("number"))
@ -73,48 +85,32 @@ func GetStream(c *gin.Context) {
c.AbortWithError(404, err)
return
}
node, err := store.GetNode(c, job.NodeID)
if err != nil {
log.Debugln("stream cannot get node.", err)
c.AbortWithError(404, err)
return
}
rc, err := engine_.Stream(build.ID, job.ID, node)
rc, err := stream.Reader(c, stream.ToKey(job.ID))
if err != nil {
c.AbortWithError(404, err)
return
}
defer func() {
rc.Close()
}()
go func() {
defer func() {
recover()
}()
<-c.Writer.CloseNotify()
rc.Close()
}()
rw := &StreamWriter{c.Writer, 0}
stdcopy.StdCopy(rw, rw, rc)
}
type StreamWriter struct {
writer gin.ResponseWriter
count int
}
func (w *StreamWriter) Write(data []byte) (int, error) {
var err = sse.Encode(w.writer, sse.Event{
Id: strconv.Itoa(w.count),
var line int
var scanner = bufio.NewScanner(rc)
for scanner.Scan() {
line++
var err = sse.Encode(c.Writer, sse.Event{
Id: strconv.Itoa(line),
Event: "message",
Data: string(data),
Data: scanner.Text(),
})
w.writer.Flush()
w.count += len(data)
return len(data), err
if err != nil {
break
}
c.Writer.Flush()
}
log.Debugf("Closed stream %s#%d", repo.FullName, build.Number)
}

View file

@ -1,121 +0,0 @@
package web
import (
"bufio"
"encoding/json"
"io"
"strconv"
"github.com/gin-gonic/gin"
"github.com/drone/drone/bus"
"github.com/drone/drone/model"
"github.com/drone/drone/router/middleware/session"
"github.com/drone/drone/store"
"github.com/drone/drone/stream"
log "github.com/Sirupsen/logrus"
"github.com/manucorporat/sse"
)
// IMPORTANT. PLEASE READ
//
// This file containers experimental streaming features for the 0.5
// release. These can be enabled with the feature flag CANARY=true
// GetRepoEvents will upgrade the connection to a Websocket and will stream
// event updates to the browser.
func GetRepoEvents2(c *gin.Context) {
repo := session.Repo(c)
c.Writer.Header().Set("Content-Type", "text/event-stream")
eventc := make(chan *bus.Event, 1)
bus.Subscribe(c, eventc)
defer func() {
bus.Unsubscribe(c, eventc)
close(eventc)
log.Infof("closed event stream")
}()
c.Stream(func(w io.Writer) bool {
select {
case event := <-eventc:
if event == nil {
log.Infof("nil event received")
return false
}
// TODO(bradrydzewski) This is a super hacky workaround until we improve
// the actual bus. Having a per-call database event is just plain stupid.
if event.Repo.FullName == repo.FullName {
var payload = struct {
model.Build
Jobs []*model.Job `json:"jobs"`
}{}
payload.Build = event.Build
payload.Jobs, _ = store.GetJobList(c, &event.Build)
data, _ := json.Marshal(&payload)
sse.Encode(w, sse.Event{
Event: "message",
Data: string(data),
})
}
case <-c.Writer.CloseNotify():
return false
}
return true
})
}
func GetStream2(c *gin.Context) {
repo := session.Repo(c)
buildn, _ := strconv.Atoi(c.Param("build"))
jobn, _ := strconv.Atoi(c.Param("number"))
c.Writer.Header().Set("Content-Type", "text/event-stream")
build, err := store.GetBuildNumber(c, repo, buildn)
if err != nil {
log.Debugln("stream cannot get build number.", err)
c.AbortWithError(404, err)
return
}
job, err := store.GetJobNumber(c, build, jobn)
if err != nil {
log.Debugln("stream cannot get job number.", err)
c.AbortWithError(404, err)
return
}
rc, err := stream.Reader(c, stream.ToKey(job.ID))
if err != nil {
c.AbortWithError(404, err)
return
}
go func() {
<-c.Writer.CloseNotify()
rc.Close()
}()
var line int
var scanner = bufio.NewScanner(rc)
for scanner.Scan() {
line++
var err = sse.Encode(c.Writer, sse.Event{
Id: strconv.Itoa(line),
Event: "message",
Data: scanner.Text(),
})
if err != nil {
break
}
c.Writer.Flush()
}
log.Debugf("Closed stream %s#%d", repo.FullName, build.Number)
}