Merge pull request #1635 from bradrydzewski/master

ability for agent to ping central drone server
This commit is contained in:
Brad Rydzewski 2016-05-11 17:53:29 -07:00
commit a53eb9322e
13 changed files with 347 additions and 3 deletions

View file

@ -101,4 +101,7 @@ type Client interface {
// Wait waits for the job to the complete.
Wait(int64) *Wait
// Ping the server
Ping() error
}

View file

@ -23,6 +23,7 @@ const (
pathWait = "%s/api/queue/wait/%d"
pathStream = "%s/api/queue/stream/%d"
pathPush = "%s/api/queue/status/%d"
pathPing = "%s/api/queue/ping"
pathSelf = "%s/api/user"
pathFeed = "%s/api/user/feed"
@ -279,6 +280,13 @@ func (c *client) Push(p *queue.Work) error {
return err
}
// Ping pings the server.
func (c *client) Ping() error {
uri := fmt.Sprintf(pathPing, c.base)
err := c.post(uri, nil, nil)
return err
}
// Stream streams the build logs to the server.
func (c *client) Stream(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathStream, c.base, id)

View file

@ -75,6 +75,12 @@ var AgentCmd = cli.Command{
Usage: "drone server backoff interval",
Value: time.Second * 15,
},
cli.DurationFlag{
EnvVar: "DRONE_PING",
Name: "ping",
Usage: "drone server ping frequency",
Value: time.Minute * 5,
},
cli.BoolFlag{
EnvVar: "DRONE_DEBUG",
Name: "debug",
@ -134,6 +140,15 @@ func start(c *cli.Context) {
logrus.Fatal(err)
}
go func() {
for {
if err := client.Ping(); err != nil {
logrus.Warnf("unable to ping the server. %s", err.Error())
}
time.Sleep(c.Duration("ping"))
}
}()
var wg sync.WaitGroup
for i := 0; i < c.Int("docker-max-procs"); i++ {
wg.Add(1)

10
model/agent.go Normal file
View file

@ -0,0 +1,10 @@
package model
type Agent struct {
ID int64 `json:"id" meddler:"agent_id,pk"`
Address string `json:"address" meddler:"agent_addr"`
Platform string `json:"platform" meddler:"agent_platform"`
Capacity int `json:"capacity" meddler:"agent_capacity"`
Created int64 `json:"created_at" meddler:"agent_created"`
Updated int64 `json:"updated_at" meddler:"agent_updated"`
}

View file

@ -141,9 +141,11 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
builds.GET("", server.GetBuildQueue)
}
// agents := e.Group("/api/agents") {
// builds.Use(session.MustAdmin, server.GetAgents)
// }
agents := e.Group("/api/agents")
{
agents.Use(session.MustAdmin())
agents.GET("", server.GetAgents)
}
queue := e.Group("/api/queue")
{
@ -153,6 +155,7 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
queue.POST("/wait/:id", server.Wait)
queue.POST("/stream/:id", server.Stream)
queue.POST("/status/:id", server.Update)
queue.POST("/ping", server.Ping)
}
// DELETE THESE

15
server/agent.go Normal file
View file

@ -0,0 +1,15 @@
package server
import (
"github.com/drone/drone/store"
"github.com/gin-gonic/gin"
)
func GetAgents(c *gin.Context) {
agents, err := store.GetAgentList(c)
if err != nil {
c.String(500, "Error getting agent list. %s", err)
return
}
c.JSON(200, agents)
}

View file

@ -5,6 +5,7 @@ import (
"io"
"strconv"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/drone/drone/bus"
@ -178,3 +179,23 @@ func Stream(c *gin.Context) {
logrus.Debugf("Agent %s wrote stream to database", c.ClientIP())
}
func Ping(c *gin.Context) {
agent, err := store.GetAgentAddr(c, c.ClientIP())
if err == nil {
agent.Updated = time.Now().Unix()
err = store.UpdateAgent(c, agent)
} else {
err = store.CreateAgent(c, &model.Agent{
Address: c.ClientIP(),
Platform: "linux/amd64",
Capacity: 2,
Created: time.Now().Unix(),
Updated: time.Now().Unix(),
})
}
if err != nil {
logrus.Errorf("Unable to register agent. %s", err.Error())
}
c.String(200, "PONG")
}

56
store/datastore/agents.go Normal file
View file

@ -0,0 +1,56 @@
package datastore
import (
"github.com/drone/drone/model"
"github.com/russross/meddler"
)
func (db *datastore) GetAgent(id int64) (*model.Agent, error) {
var agent = new(model.Agent)
var err = meddler.Load(db, agentTable, agent, id)
return agent, err
}
func (db *datastore) GetAgentAddr(addr string) (*model.Agent, error) {
var agent = new(model.Agent)
var err = meddler.QueryRow(db, agent, rebind(agentAddrQuery), addr)
return agent, err
}
func (db *datastore) GetAgentList() ([]*model.Agent, error) {
var agents = []*model.Agent{}
var err = meddler.QueryAll(db, &agents, rebind(agentListQuery))
return agents, err
}
func (db *datastore) CreateAgent(agent *model.Agent) error {
return meddler.Insert(db, agentTable, agent)
}
func (db *datastore) UpdateAgent(agent *model.Agent) error {
return meddler.Update(db, agentTable, agent)
}
func (db *datastore) DeleteAgent(agent *model.Agent) error {
var _, err = db.Exec(rebind(agentDeleteStmt), agent.ID)
return err
}
const agentTable = "agents"
const agentAddrQuery = `
SELECT *
FROM agents
WHERE agent_addr=?
LIMIT 1
`
const agentListQuery = `
SELECT *
FROM agents
ORDER BY agent_addr ASC
`
const agentDeleteStmt = `
DELETE FROM agents WHERE agent_id = ?
`

View file

@ -0,0 +1,126 @@
package datastore
import (
"testing"
"github.com/drone/drone/model"
"github.com/franela/goblin"
)
func TestAgents(t *testing.T) {
db := openTest()
defer db.Close()
s := From(db)
g := goblin.Goblin(t)
g.Describe("Agents", func() {
// before each test be sure to purge the package
// table data from the database.
g.BeforeEach(func() {
db.Exec("DELETE FROM agents")
})
g.It("Should update", func() {
agent := model.Agent{
Address: "127.0.0.1",
Platform: "linux/amd64",
}
err1 := s.CreateAgent(&agent)
agent.Platform = "windows/amd64"
err2 := s.UpdateAgent(&agent)
getagent, err3 := s.GetAgent(agent.ID)
g.Assert(err1 == nil).IsTrue()
g.Assert(err2 == nil).IsTrue()
g.Assert(err3 == nil).IsTrue()
g.Assert(agent.ID).Equal(getagent.ID)
g.Assert(agent.Platform).Equal(getagent.Platform)
})
g.It("Should create", func() {
agent := model.Agent{
Address: "127.0.0.1",
Platform: "linux/amd64",
}
err := s.CreateAgent(&agent)
g.Assert(err == nil).IsTrue()
g.Assert(agent.ID != 0).IsTrue()
})
g.It("Should get by ID", func() {
agent := model.Agent{
Address: "127.0.0.1",
Platform: "linux/amd64",
}
s.CreateAgent(&agent)
getagent, err := s.GetAgent(agent.ID)
g.Assert(err == nil).IsTrue()
g.Assert(agent.ID).Equal(getagent.ID)
g.Assert(agent.Address).Equal(getagent.Address)
g.Assert(agent.Platform).Equal(getagent.Platform)
})
g.It("Should get by IP address", func() {
agent := model.Agent{
Address: "127.0.0.1",
Platform: "linux/amd64",
}
s.CreateAgent(&agent)
getagent, err := s.GetAgentAddr(agent.Address)
g.Assert(err == nil).IsTrue()
g.Assert(agent.ID).Equal(getagent.ID)
g.Assert(agent.Address).Equal(getagent.Address)
g.Assert(agent.Platform).Equal(getagent.Platform)
})
g.It("Should enforce unique IP address", func() {
agent1 := model.Agent{
Address: "127.0.0.1",
Platform: "linux/amd64",
}
agent2 := model.Agent{
Address: "127.0.0.1",
Platform: "linux/amd64",
}
err1 := s.CreateAgent(&agent1)
err2 := s.CreateAgent(&agent2)
g.Assert(err1 == nil).IsTrue()
g.Assert(err2 == nil).IsFalse()
})
g.It("Should list", func() {
agent1 := model.Agent{
Address: "127.0.0.1",
Platform: "linux/amd64",
}
agent2 := model.Agent{
Address: "localhost",
Platform: "linux/amd64",
}
s.CreateAgent(&agent1)
s.CreateAgent(&agent2)
agents, err := s.GetAgentList()
g.Assert(err == nil).IsTrue()
g.Assert(len(agents)).Equal(2)
g.Assert(agents[0].Address).Equal(agent1.Address)
g.Assert(agents[0].Platform).Equal(agent1.Platform)
})
// g.It("Should delete", func() {
// user := model.User{
// Login: "joe",
// Email: "foo@bar.com",
// Token: "e42080dddf012c718e476da161d21ad5",
// }
// s.CreateUser(&user)
// _, err1 := s.GetUser(user.ID)
// err2 := s.DeleteUser(&user)
// _, err3 := s.GetUser(user.ID)
// g.Assert(err1 == nil).IsTrue()
// g.Assert(err2 == nil).IsTrue()
// g.Assert(err3 == nil).IsFalse()
// })
})
}

View file

@ -0,0 +1,17 @@
-- +migrate Up
CREATE TABLE agents (
agent_id INTEGER PRIMARY KEY AUTO_INCREMENT
,agent_addr VARCHAR(500)
,agent_platform VARCHAR(500)
,agent_capacity INTEGER
,agent_created INTEGER
,agent_updated INTEGER
,UNIQUE(agent_addr)
);
-- +migrate Down
DROP TABLE agents;

View file

@ -0,0 +1,17 @@
-- +migrate Up
CREATE TABLE agents (
agent_id SERIAL PRIMARY KEY
,agent_addr VARCHAR(500)
,agent_platform VARCHAR(500)
,agent_capacity INTEGER
,agent_created INTEGER
,agent_updated INTEGER
,UNIQUE(agent_addr)
);
-- +migrate Down
DROP TABLE agents;

View file

@ -0,0 +1,17 @@
-- +migrate Up
CREATE TABLE agents (
agent_id INTEGER PRIMARY KEY AUTOINCREMENT
,agent_addr TEXT
,agent_platform TEXT
,agent_capacity INTEGER
,agent_created INTEGER
,agent_updated INTEGER
,UNIQUE(agent_addr)
);
-- +migrate Down
DROP TABLE agents;

View file

@ -116,6 +116,18 @@ type Store interface {
// WriteLog writes the job logs to the datastore.
WriteLog(*model.Job, io.Reader) error
GetAgent(int64) (*model.Agent, error)
GetAgentAddr(string) (*model.Agent, error)
GetAgentList() ([]*model.Agent, error)
CreateAgent(*model.Agent) error
UpdateAgent(*model.Agent) error
DeleteAgent(*model.Agent) error
}
// GetUser gets a user by unique ID.
@ -307,3 +319,27 @@ func ReadLog(c context.Context, job *model.Job) (io.ReadCloser, error) {
func WriteLog(c context.Context, job *model.Job, r io.Reader) error {
return FromContext(c).WriteLog(job, r)
}
func GetAgent(c context.Context, id int64) (*model.Agent, error) {
return FromContext(c).GetAgent(id)
}
func GetAgentAddr(c context.Context, addr string) (*model.Agent, error) {
return FromContext(c).GetAgentAddr(addr)
}
func GetAgentList(c context.Context) ([]*model.Agent, error) {
return FromContext(c).GetAgentList()
}
func CreateAgent(c context.Context, agent *model.Agent) error {
return FromContext(c).CreateAgent(agent)
}
func UpdateAgent(c context.Context, agent *model.Agent) error {
return FromContext(c).UpdateAgent(agent)
}
func DeleteAgent(c context.Context, agent *model.Agent) error {
return FromContext(c).DeleteAgent(agent)
}