228 lines
5.5 KiB
Go
228 lines
5.5 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/cncd/logging"
|
|
"github.com/cncd/pipeline/pipeline/rpc"
|
|
"github.com/cncd/pubsub"
|
|
"github.com/cncd/queue"
|
|
"github.com/gin-gonic/gin"
|
|
|
|
"github.com/drone/drone/model"
|
|
"github.com/drone/drone/remote"
|
|
"github.com/drone/drone/store"
|
|
)
|
|
|
|
// This file is a complete disaster because I'm trying to wedge in some
|
|
// experimental code. Please pardon our appearance during renovations.
|
|
|
|
var config = struct {
|
|
pubsub pubsub.Publisher
|
|
queue queue.Queue
|
|
logger logging.Log
|
|
secret string
|
|
host string
|
|
}{
|
|
pubsub.New(),
|
|
queue.New(),
|
|
logging.New(),
|
|
os.Getenv("DRONE_SECRET"),
|
|
os.Getenv("DRONE_HOST"),
|
|
}
|
|
|
|
func init() {
|
|
config.pubsub.Create(context.Background(), "topic/events")
|
|
}
|
|
|
|
// func SetupRPC() gin.HandlerFunc {
|
|
// return func(c *gin.Context) {
|
|
// c.Next()
|
|
// }
|
|
// }
|
|
|
|
func RPCHandler(c *gin.Context) {
|
|
|
|
if secret := c.Request.Header.Get("Authorization"); secret != "Bearer "+config.secret {
|
|
log.Printf("Unable to connect agent. Invalid authorization token %q does not match %q", secret, config.secret)
|
|
c.String(401, "Unable to connect agent. Invalid authorization token")
|
|
return
|
|
}
|
|
peer := RPC{
|
|
remote: remote.FromContext(c),
|
|
store: store.FromContext(c),
|
|
queue: config.queue,
|
|
pubsub: config.pubsub,
|
|
logger: config.logger,
|
|
host: config.host,
|
|
}
|
|
rpc.NewServer(&peer).ServeHTTP(c.Writer, c.Request)
|
|
}
|
|
|
|
type RPC struct {
|
|
remote remote.Remote
|
|
queue queue.Queue
|
|
pubsub pubsub.Publisher
|
|
logger logging.Log
|
|
store store.Store
|
|
host string
|
|
}
|
|
|
|
// Next implements the rpc.Next function
|
|
func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) {
|
|
fn := func(task *queue.Task) bool {
|
|
for k, v := range filter.Labels {
|
|
if task.Labels[k] != v {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
task, err := s.queue.Poll(c, fn)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if task == nil {
|
|
return nil, nil
|
|
}
|
|
pipeline := new(rpc.Pipeline)
|
|
err = json.Unmarshal(task.Data, pipeline)
|
|
return pipeline, err
|
|
}
|
|
|
|
// Wait implements the rpc.Wait function
|
|
func (s *RPC) Wait(c context.Context, id string) error {
|
|
return s.queue.Wait(c, id)
|
|
}
|
|
|
|
// Extend implements the rpc.Extend function
|
|
func (s *RPC) Extend(c context.Context, id string) error {
|
|
return s.queue.Extend(c, id)
|
|
}
|
|
|
|
// Update implements the rpc.Update function
|
|
func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
|
|
jobID, err := strconv.ParseInt(id, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
job, err := s.store.GetJob(jobID)
|
|
if err != nil {
|
|
log.Printf("error: cannot find job with id %d: %s", jobID, err)
|
|
return err
|
|
}
|
|
|
|
build, err := s.store.GetBuild(job.BuildID)
|
|
if err != nil {
|
|
log.Printf("error: cannot find build with id %d: %s", job.BuildID, err)
|
|
return err
|
|
}
|
|
|
|
repo, err := s.store.GetRepo(build.RepoID)
|
|
if err != nil {
|
|
log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err)
|
|
return err
|
|
}
|
|
|
|
if build.Status != model.StatusRunning {
|
|
|
|
}
|
|
|
|
job.Started = state.Started
|
|
job.Finished = state.Finished
|
|
job.ExitCode = state.ExitCode
|
|
job.Status = model.StatusRunning
|
|
job.Error = state.Error
|
|
|
|
if build.Status == model.StatusPending {
|
|
build.Started = job.Started
|
|
build.Status = model.StatusRunning
|
|
s.store.UpdateBuild(build)
|
|
}
|
|
|
|
log.Printf("pipeline: update %s: exited=%v, exit_code=%d", id, state.Exited, state.ExitCode)
|
|
|
|
if state.Exited {
|
|
|
|
job.Status = model.StatusSuccess
|
|
if job.ExitCode != 0 || job.Error != "" {
|
|
job.Status = model.StatusFailure
|
|
}
|
|
|
|
// save the logs
|
|
var buf bytes.Buffer
|
|
if serr := s.logger.Snapshot(context.Background(), id, &buf); serr != nil {
|
|
log.Printf("error: snapshotting logs: %s", serr)
|
|
}
|
|
if werr := s.store.WriteLog(job, &buf); werr != nil {
|
|
log.Printf("error: persisting logs: %s", werr)
|
|
}
|
|
|
|
// close the logger
|
|
s.logger.Close(c, id)
|
|
s.queue.Done(c, id)
|
|
}
|
|
|
|
// hackity hack
|
|
cc := context.WithValue(c, "store", s.store)
|
|
ok, uerr := store.UpdateBuildJob(cc, build, job)
|
|
if uerr != nil {
|
|
log.Printf("error: updating job: %s", uerr)
|
|
}
|
|
if ok {
|
|
// get the user because we transfer the user form the server to agent
|
|
// and back we lose the token which does not get serialized to json.
|
|
user, uerr := s.store.GetUser(repo.UserID)
|
|
if uerr != nil {
|
|
logrus.Errorf("Unable to find user. %s", err)
|
|
} else {
|
|
s.remote.Status(user, repo, build,
|
|
fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number))
|
|
}
|
|
}
|
|
|
|
message := pubsub.Message{}
|
|
message.Data, _ = json.Marshal(model.Event{
|
|
Type: func() model.EventType {
|
|
// HACK we don't even really care about the event type.
|
|
// so we should just simplify how events are triggered.
|
|
// WTF was this being used for?????????????????????????
|
|
if job.Status == model.StatusRunning {
|
|
return model.Started
|
|
}
|
|
return model.Finished
|
|
}(),
|
|
Repo: *repo,
|
|
Build: *build,
|
|
Job: *job,
|
|
})
|
|
message.Labels = map[string]string{
|
|
"repo": repo.FullName,
|
|
"private": strconv.FormatBool(repo.IsPrivate),
|
|
}
|
|
s.pubsub.Publish(c, "topic/events", message)
|
|
log.Println("finish rpc.update")
|
|
return nil
|
|
}
|
|
|
|
// Upload implements the rpc.Upload function
|
|
func (s *RPC) Upload(c context.Context, id, mime string, file io.Reader) error { return nil }
|
|
|
|
// Done implements the rpc.Done function
|
|
func (s *RPC) Done(c context.Context, id string) error { return nil }
|
|
|
|
// Log implements the rpc.Log function
|
|
func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error {
|
|
entry := new(logging.Entry)
|
|
entry.Data, _ = json.Marshal(line)
|
|
s.logger.Write(c, id, entry)
|
|
return nil
|
|
}
|