got websocket events working

This commit is contained in:
Brad Rydzewski 2015-04-24 17:06:46 -07:00
parent da350989d3
commit a1f3b2da4d
10 changed files with 175 additions and 128 deletions

View file

@ -1,89 +0,0 @@
package cluster
import (
"sync"
"github.com/samalba/dockerclient"
)
// TODO (bradrydzewski) ability to cancel work.
// TODO (bradrydzewski) ability to remove a worker.
type Cluster struct {
sync.Mutex
clients map[dockerclient.Client]bool
clientc chan dockerclient.Client
}
func New() *Cluster {
return &Cluster{
clients: make(map[dockerclient.Client]bool),
clientc: make(chan dockerclient.Client, 999),
}
}
// Allocate allocates a client to the pool to
// be available to accept work.
func (c *Cluster) Allocate(cli dockerclient.Client) bool {
if c.IsAllocated(cli) {
return false
}
c.Lock()
c.clients[cli] = true
c.Unlock()
c.clientc <- cli
return true
}
// IsAllocated is a helper function that returns
// true if the client is currently allocated to
// the Pool.
func (c *Cluster) IsAllocated(cli dockerclient.Client) bool {
c.Lock()
defer c.Unlock()
_, ok := c.clients[cli]
return ok
}
// Deallocate removes the worker from the pool of
// available clients. If the client is currently
// reserved and performing work it will finish,
// but no longer be given new work.
func (c *Cluster) Deallocate(cli dockerclient.Client) {
c.Lock()
defer c.Unlock()
delete(c.clients, cli)
}
// List returns a list of all Workers currently
// allocated to the Pool.
func (c *Cluster) List() []dockerclient.Client {
c.Lock()
defer c.Unlock()
var clients []dockerclient.Client
for cli := range c.clients {
clients = append(clients, cli)
}
return clients
}
// Reserve reserves the next available worker to
// start doing work. Once work is complete, the
// worker should be released back to the pool.
func (p *Cluster) Reserve() <-chan dockerclient.Client {
return p.clientc
}
// Release releases the worker back to the pool
// of available workers.
func (c *Cluster) Release(cli dockerclient.Client) bool {
if !c.IsAllocated(cli) {
return false
}
c.clientc <- cli
return true
}

View file

@ -114,6 +114,12 @@ func main() {
queue.POST("/push/:owner/:name/:build/:task/logs", server.PushLogs)
}
events := api.Group("/stream")
{
events.GET("/user", server.GetEvents)
//events.GET("/logs/:owner/:name/:build/:number")
}
auth := r.Group("/authorize")
{
auth.Use(server.SetHeaders())

View file

@ -3,9 +3,9 @@ package eventbus
import "github.com/drone/drone/common"
type Event struct {
Build *common.Build
Repo *common.Repo
Task *common.Task
Build *common.Build `json:"build,omitempty"`
Repo *common.Repo `json:"repo,omitempty"`
Task *common.Task `json:"task,omitempty"`
}
type Bus interface {

View file

@ -37,6 +37,7 @@ func PushBuild(c *gin.Context) {
c.Fail(404, err)
return
}
build.Duration = in.Duration
build.Started = in.Started
build.Finished = in.Finished
@ -47,16 +48,33 @@ func PushBuild(c *gin.Context) {
return
}
if repo.Last == nil || build.Number >= repo.Last.Number {
repo.Last = build
store.SetRepo(repo)
}
// <-- FIXME
// for some reason the Repo and Build fail to marshal to JSON.
// It has something to do with memory / pointers. So it goes away
// if I just refetch these items. Needs to be fixed in the future,
// but for now should be ok
repo, err = store.Repo(repo.FullName)
if err != nil {
c.Fail(500, err)
return
}
build, err = store.Build(repo.FullName, in.Number)
if err != nil {
c.Fail(404, err)
return
}
// END FIXME -->
bus.Send(&eventbus.Event{
Build: build,
Repo: repo,
})
if repo.Last != nil && repo.Last.Number > build.Number {
c.Writer.WriteHeader(200)
return
}
repo.Last = build
store.SetRepo(repo)
c.Writer.WriteHeader(200)
}
@ -83,6 +101,7 @@ func PushTask(c *gin.Context) {
bus.Send(&eventbus.Event{
Build: build,
Repo: repo,
Task: in,
})
c.Writer.WriteHeader(200)
}

View file

@ -31,6 +31,7 @@
<script src="/static/scripts/services/users.js"></script>
<script src="/static/scripts/services/logs.js"></script>
<script src="/static/scripts/services/tokens.js"></script>
<script src="/static/scripts/services/feed.js"></script>
<script src="/static/scripts/filters/filter.js"></script>
<script src="/static/scripts/filters/gravatar.js"></script>

View file

@ -4,7 +4,7 @@
* BuildsCtrl responsible for rendering the repo's
* recent build history.
*/
function BuildsCtrl($scope, $routeParams, builds, repos, users) {
function BuildsCtrl($scope, $routeParams, builds, repos, users, feed) {
var owner = $routeParams.owner;
var name = $routeParams.name;
@ -40,12 +40,31 @@
delete $scope.repo.subscription;
});
}
feed.subscribe(function(event) {
if (event.repo.full_name !== fullName) {
return; // ignore
}
// update repository
$scope.repo = event.repo;
$scope.apply();
if (event.build.number !== parseInt(number)) {
return; // ignore
}
// update the build status
$scope.build.state = event.build.state;
$scope.build.started = event.build.started;
$scope.build.finished = event.build.finished;
$scope.build.duration = event.build.duration;
$scope.$apply();
});
}
/**
* BuildCtrl responsible for rendering a build.
*/
function BuildCtrl($scope, $routeParams, logs, builds, repos, users) {
function BuildCtrl($scope, $routeParams, logs, builds, repos, users, feed) {
var step = parseInt($routeParams.step) || 1;
var number = $routeParams.number;
@ -99,6 +118,32 @@
});
};
feed.subscribe(function(event) {
if (event.repo.full_name !== fullName) {
return; // ignore
}
if (event.build.number !== parseInt(number)) {
return; // ignore
}
// update the build status
$scope.build.state = event.build.state;
$scope.build.started = event.build.started;
$scope.build.finished = event.build.finished;
$scope.build.duration = event.build.duration;
$scope.$apply();
if (!event.task || event.task.number !== step) {
return; // ignore
}
// update the task status
$scope.task.state = event.task.state;
$scope.task.started = event.task.started;
$scope.task.finished = event.task.finished;
$scope.task.duration = event.task.duration;
$scope.task.exit_code = event.task.exit_code;
$scope.$apply();
});
// var convert = new Filter({stream:true,newline:false});
// var term = document.getElementById("term")
// var stdout = document.getElementById("stdout").innerText.split("\n")

View file

@ -3,14 +3,14 @@
/**
* ReposCtrl responsible for rendering the user's
* repository home screen.
*/
function ReposCtrl($scope, $routeParams, repos, users) {
*/
function ReposCtrl($scope, $routeParams, repos, users, feed) {
// Gets the currently authenticated user
// Gets the currently authenticated user
users.getCached().then(function(payload){
$scope.user = payload.data;
});
// Gets a list of repos to display in the
// dropdown.
repos.list().then(function(payload){
@ -18,12 +18,25 @@
}).catch(function(err){
$scope.error = err;
});
feed.subscribe(function(event) {
if (!$scope.repos) {
return;
}
for (var i=0;i<$scope.repos.length;i++) {
if ($scope.repos[i].full_name === event.repo.full_name) {
$scope.repos[i]=event.repo;
$scope.$apply();
break;
}
};
});
}
/**
* RepoAddCtrl responsible for activaing a new
* repository.
*/
*/
function RepoAddCtrl($scope, $location, repos, users) {
$scope.add = function(slug) {
repos.post(slug).then(function(payload) {
@ -36,17 +49,17 @@
/**
* RepoEditCtrl responsible for editing a repository.
*/
*/
function RepoEditCtrl($scope, $location, $routeParams, repos, users) {
var owner = $routeParams.owner;
var name = $routeParams.name;
var fullName = owner+'/'+name;
// Gets the currently authenticated user
// Gets the currently authenticated user
users.getCached().then(function(payload){
$scope.user = payload.data;
});
// Gets a repository
repos.get(fullName).then(function(payload){
$scope.repo = payload.data;
@ -89,4 +102,4 @@
.controller('ReposCtrl', ReposCtrl)
.controller('RepoAddCtrl', RepoAddCtrl)
.controller('RepoEditCtrl', RepoEditCtrl);
})();
})();

View file

@ -12,7 +12,7 @@
/**
* Bootstraps the application and retrieves the
* token from the
* token from the
*/
function Authorize() {
// First, parse the query string
@ -117,24 +117,23 @@
});
}
// /**
// *
// */
// function RouteChange($rootScope, stdout, projs) {
// $rootScope.$on('$routeChangeStart', function (event, next) {
// projs.unsubscribe();
// stdout.unsubscribe();
// });
// //$rootScope.$on('$routeChangeSuccess', function (event, current, previous) {
// // document.title = current.$$route.title + ' · drone.io';
// //});
// }
function RouteChange($rootScope, feed) {
$rootScope.$on('$routeChangeStart', function (event, next) {
feed.unsubscribe();
});
$rootScope.$on('$routeChangeSuccess', function (event, current, previous) {
if (current.$$route.title) {
document.title = current.$$route.title + ' · drone';
}
});
}
angular
.module('drone')
.config(Authorize)
.config(Config);
// .run(RouteChange);
.config(Config)
.run(RouteChange);
})();
})();

View file

@ -0,0 +1,31 @@
'use strict';
(function () {
function FeedService($http, $window) {
var token = localStorage.getItem('access_token');
var proto = ($window.location.protocol == 'https:' ? 'wss' : 'ws');
var route = [proto, "://", $window.location.host, '/api/stream/user?access_token=', token].join('');
var wsCallback = undefined;
var ws = new WebSocket(route);
ws.onmessage = function(event) {
var data = angular.fromJson(event.data);
if (wsCallback != undefined) {
wsCallback(data);
}
};
this.subscribe = function(callback) {
wsCallback = callback;
};
this.unsubscribe = function() {
wsCallback = undefined;
};
}
angular
.module('drone')
.service('feed', FeedService);
})();

View file

@ -3,8 +3,10 @@ package server
import (
"time"
"github.com/drone/drone/common"
"github.com/drone/drone/eventbus"
log "github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
@ -32,6 +34,16 @@ func GetEvents(c *gin.Context) {
user := ToUser(c)
remote := ToRemote(c)
// TODO (bradrydzewski) revisit this approach at some point.
//
// instead of constantly checking for remote permissions, we will
// cache them for the lifecycle of this websocket. The pro here is
// that we are making way less external calls (good). The con is that
// if a ton of developers conntect to websockets for long periods of
// time with heavy build traffic (not super likely, but possible) this
// caching strategy could take up a lot of memory.
perms_ := map[string]*common.Perm{}
// upgrade the websocket
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
@ -56,15 +68,24 @@ func GetEvents(c *gin.Context) {
if event == nil {
return // why would this ever happen?
}
perms := perms(remote, user, event.Repo)
if perms != nil && perms.Pull {
ws.WriteJSON(event)
perm, ok := perms_[event.Repo.FullName]
if !ok {
perm = perms(remote, user, event.Repo)
perms_[event.Repo.FullName] = perm
}
if perm != nil && perm.Pull {
err := ws.WriteJSON(event)
if err != nil {
log.Errorln(err, event)
}
}
case <-ticker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait))
err := ws.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
ws.Close()
log.Debugf("closed websocket")
return
}
}
@ -72,6 +93,7 @@ func GetEvents(c *gin.Context) {
}()
readWebsocket(ws)
log.Debugf("closed websocket")
}
// readWebsocket will block while reading the websocket data