queue polling not correctly handles client disconnect
This commit is contained in:
parent
96b49cf0fc
commit
65105d06c2
3 changed files with 36 additions and 2 deletions
|
@ -82,6 +82,24 @@ func (q *Queue) Pull() *queue.Work {
|
|||
return work
|
||||
}
|
||||
|
||||
// PullClose retrieves and removes the head of this queue,
|
||||
// waiting if necessary until work becomes available. The
|
||||
// CloseNotifier should be provided to clone the channel
|
||||
// if the subscribing client terminates its connection.
|
||||
func (q *Queue) PullClose(cn queue.CloseNotifier) *queue.Work {
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
return nil
|
||||
case work := <-q.itemc:
|
||||
q.Lock()
|
||||
delete(q.items, work)
|
||||
q.Unlock()
|
||||
return work
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// PullAck retrieves and removes the head of this queue, waiting
|
||||
// if necessary until work becomes available. Items pull from the
|
||||
// queue that aren't acknowledged will be pushed back to the queue
|
||||
|
|
|
@ -13,6 +13,12 @@ type Queue interface {
|
|||
// if necessary until work becomes available.
|
||||
Pull() *Work
|
||||
|
||||
// PullClose retrieves and removes the head of this queue,
|
||||
// waiting if necessary until work becomes available. The
|
||||
// CloseNotifier should be provided to clone the channel
|
||||
// if the subscribing client terminates its connection.
|
||||
PullClose(CloseNotifier) *Work
|
||||
|
||||
// PullAck retrieves and removes the head of this queue, waiting
|
||||
// if necessary until work becomes available. Items pull from the
|
||||
// queue that aren't acknowledged will be pushed back to the queue
|
||||
|
@ -26,3 +32,9 @@ type Queue interface {
|
|||
// queue, in proper sequence.
|
||||
Items() []*Work
|
||||
}
|
||||
|
||||
type CloseNotifier interface {
|
||||
// CloseNotify returns a channel that receives a single value
|
||||
// when the client connection has gone away.
|
||||
CloseNotify() <-chan bool
|
||||
}
|
||||
|
|
|
@ -19,8 +19,12 @@ import (
|
|||
// GET /queue/pull
|
||||
func PollBuild(c *gin.Context) {
|
||||
queue := ToQueue(c)
|
||||
work := queue.Pull()
|
||||
c.JSON(200, work)
|
||||
work := queue.PullClose(c.Writer)
|
||||
if work == nil {
|
||||
c.AbortWithStatus(500)
|
||||
} else {
|
||||
c.JSON(200, work)
|
||||
}
|
||||
}
|
||||
|
||||
// GET /queue/push/:owner/:repo
|
||||
|
|
Loading…
Reference in a new issue