backport 0.5 queue packages

This commit is contained in:
Brad Rydzewski 2016-04-12 17:27:24 -07:00
parent 7129cbf184
commit 9b93e6bae7
47 changed files with 2768 additions and 0 deletions

View file

@ -21,4 +21,9 @@ ENV DATABASE_CONFIG=/var/lib/drone/drone.sqlite
ADD drone_static /drone_static
# Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf,
# but Go and CGO rely on /etc/nsswitch.conf to check the order of DNS resolving.
# To fix this we just create /etc/nsswitch.conf and add the following line:
#RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
ENTRYPOINT ["/drone_static"]

40
bus/bus.go Normal file
View file

@ -0,0 +1,40 @@
package bus
//go:generate mockery -name Bus -output mock -case=underscore
import "golang.org/x/net/context"
// Bus represents an event bus implementation that
// allows a publisher to broadcast Event notifications
// to a list of subscribers.
type Bus interface {
// Publish broadcasts an event to all subscribers.
Publish(*Event)
// Subscribe adds the channel to the list of
// subscribers. Each subscriber in the list will
// receive broadcast events.
Subscribe(chan *Event)
// Unsubscribe removes the channel from the list
// of subscribers.
Unsubscribe(chan *Event)
}
// Publish broadcasts an event to all subscribers.
func Publish(c context.Context, event *Event) {
FromContext(c).Publish(event)
}
// Subscribe adds the channel to the list of
// subscribers. Each subscriber in the list will
// receive broadcast events.
func Subscribe(c context.Context, eventc chan *Event) {
FromContext(c).Subscribe(eventc)
}
// Unsubscribe removes the channel from the
// list of subscribers.
func Unsubscribe(c context.Context, eventc chan *Event) {
FromContext(c).Unsubscribe(eventc)
}

46
bus/bus_impl.go Normal file
View file

@ -0,0 +1,46 @@
package bus
import (
"sync"
)
type eventbus struct {
sync.Mutex
subs map[chan *Event]bool
}
// New creates a simple event bus that manages a list of
// subscribers to which events are published.
func New() Bus {
return newEventbus()
}
func newEventbus() *eventbus {
return &eventbus{
subs: make(map[chan *Event]bool),
}
}
func (b *eventbus) Subscribe(c chan *Event) {
b.Lock()
b.subs[c] = true
b.Unlock()
}
func (b *eventbus) Unsubscribe(c chan *Event) {
b.Lock()
delete(b.subs, c)
b.Unlock()
}
func (b *eventbus) Publish(event *Event) {
b.Lock()
defer b.Unlock()
for s := range b.subs {
go func(c chan *Event) {
defer recover()
c <- event
}(s)
}
}

73
bus/bus_impl_test.go Normal file
View file

@ -0,0 +1,73 @@
package bus
import (
"sync"
"testing"
"github.com/drone/drone/model"
. "github.com/franela/goblin"
"github.com/gin-gonic/gin"
)
func TestBus(t *testing.T) {
g := Goblin(t)
g.Describe("Event bus", func() {
g.It("Should unsubscribe", func() {
c := new(gin.Context)
b := newEventbus()
ToContext(c, b)
c1 := make(chan *Event)
c2 := make(chan *Event)
Subscribe(c, c1)
Subscribe(c, c2)
g.Assert(len(b.subs)).Equal(2)
})
g.It("Should subscribe", func() {
c := new(gin.Context)
b := newEventbus()
ToContext(c, b)
c1 := make(chan *Event)
c2 := make(chan *Event)
Subscribe(c, c1)
Subscribe(c, c2)
g.Assert(len(b.subs)).Equal(2)
Unsubscribe(c, c1)
Unsubscribe(c, c2)
g.Assert(len(b.subs)).Equal(0)
})
g.It("Should publish", func() {
c := new(gin.Context)
b := New()
ToContext(c, b)
e1 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{})
e2 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{})
c1 := make(chan *Event)
Subscribe(c, c1)
var wg sync.WaitGroup
wg.Add(1)
var r1, r2 *Event
go func() {
r1 = <-c1
r2 = <-c1
wg.Done()
}()
Publish(c, e1)
Publish(c, e2)
wg.Wait()
})
})
}

21
bus/context.go Normal file
View file

@ -0,0 +1,21 @@
package bus
import "golang.org/x/net/context"
const key = "bus"
// Setter defines a context that enables setting values.
type Setter interface {
Set(string, interface{})
}
// FromContext returns the Bus associated with this context.
func FromContext(c context.Context) Bus {
return c.Value(key).(Bus)
}
// ToContext adds the Bus to this context if it supports
// the Setter interface.
func ToContext(c Setter, b Bus) {
c.Set(key, b)
}

32
bus/types.go Normal file
View file

@ -0,0 +1,32 @@
package bus
import "github.com/drone/drone/model"
// EventType defines the possible types of build events.
type EventType string
const (
Enqueued EventType = "enqueued"
Started EventType = "started"
Finished EventType = "finished"
Cancelled EventType = "cancelled"
)
// Event represents a build event.
type Event struct {
Type EventType `json:"type"`
Repo model.Repo `json:"repo"`
Build model.Build `json:"build"`
Job model.Job `json:"job"`
}
// NewEvent creates a new Event for the build, using copies of
// the build data to avoid possible mutation or race conditions.
func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event {
return &Event{
Type: t,
Repo: *r,
Build: *b,
Job: *j,
}
}

23
queue/context.go Normal file
View file

@ -0,0 +1,23 @@
package queue
import (
"golang.org/x/net/context"
)
const key = "queue"
// Setter defines a context that enables setting values.
type Setter interface {
Set(string, interface{})
}
// FromContext returns the Queue associated with this context.
func FromContext(c context.Context) Queue {
return c.Value(key).(Queue)
}
// ToContext adds the Queue to this context if it supports
// the Setter interface.
func ToContext(c Setter, q Queue) {
c.Set(key, q)
}

67
queue/queue.go Normal file
View file

@ -0,0 +1,67 @@
package queue
//go:generate mockery -name Queue -output mock -case=underscore
import (
"errors"
"golang.org/x/net/context"
)
// ErrNotFound indicates the requested work item does not
// exist in the queue.
var ErrNotFound = errors.New("queue item not found")
type Queue interface {
// Publish inserts work at the tail of this queue, waiting for
// space to become available if the queue is full.
Publish(*Work) error
// Remove removes the specified work item from this queue,
// if it is present.
Remove(*Work) error
// PullClose retrieves and removes the head of this queue,
// waiting if necessary until work becomes available.
Pull() *Work
// PullClose retrieves and removes the head of this queue,
// waiting if necessary until work becomes available. The
// CloseNotifier should be provided to clone the channel
// if the subscribing client terminates its connection.
PullClose(CloseNotifier) *Work
}
// Publish inserts work at the tail of this queue, waiting for
// space to become available if the queue is full.
func Publish(c context.Context, w *Work) error {
return FromContext(c).Publish(w)
}
// Remove removes the specified work item from this queue,
// if it is present.
func Remove(c context.Context, w *Work) error {
return FromContext(c).Remove(w)
}
// PullClose retrieves and removes the head of this queue,
// waiting if necessary until work becomes available.
func Pull(c context.Context) *Work {
return FromContext(c).Pull()
}
// PullClose retrieves and removes the head of this queue,
// waiting if necessary until work becomes available. The
// CloseNotifier should be provided to clone the channel
// if the subscribing client terminates its connection.
func PullClose(c context.Context, cn CloseNotifier) *Work {
return FromContext(c).PullClose(cn)
}
// CloseNotifier defines a datastructure that is capable of notifying
// a subscriber when its connection is closed.
type CloseNotifier interface {
// CloseNotify returns a channel that receives a single value
// when the client connection has gone away.
CloseNotify() <-chan bool
}

85
queue/queue_impl.go Normal file
View file

@ -0,0 +1,85 @@
package queue
import "sync"
type queue struct {
sync.Mutex
items map[*Work]struct{}
itemc chan *Work
}
func New() Queue {
return newQueue()
}
func newQueue() *queue {
return &queue{
items: make(map[*Work]struct{}),
itemc: make(chan *Work, 999),
}
}
func (q *queue) Publish(work *Work) error {
q.Lock()
q.items[work] = struct{}{}
q.Unlock()
q.itemc <- work
return nil
}
func (q *queue) Remove(work *Work) error {
q.Lock()
defer q.Unlock()
_, ok := q.items[work]
if !ok {
return ErrNotFound
}
var items []*Work
// loop through and drain all items
// from the
drain:
for {
select {
case item := <-q.itemc:
items = append(items, item)
default:
break drain
}
}
// re-add all items to the queue except
// the item we're trying to remove
for _, item := range items {
if item == work {
delete(q.items, work)
continue
}
q.itemc <- item
}
return nil
}
func (q *queue) Pull() *Work {
work := <-q.itemc
q.Lock()
delete(q.items, work)
q.Unlock()
return work
}
func (q *queue) PullClose(cn CloseNotifier) *Work {
for {
select {
case <-cn.CloseNotify():
return nil
case work := <-q.itemc:
q.Lock()
delete(q.items, work)
q.Unlock()
return work
}
}
}

93
queue/queue_impl_test.go Normal file
View file

@ -0,0 +1,93 @@
package queue
import (
"sync"
"testing"
. "github.com/franela/goblin"
"github.com/gin-gonic/gin"
)
func TestBuild(t *testing.T) {
g := Goblin(t)
g.Describe("Queue", func() {
g.It("Should publish item", func() {
c := new(gin.Context)
q := newQueue()
ToContext(c, q)
w1 := &Work{}
w2 := &Work{}
Publish(c, w1)
Publish(c, w2)
g.Assert(len(q.items)).Equal(2)
g.Assert(len(q.itemc)).Equal(2)
})
g.It("Should remove item", func() {
c := new(gin.Context)
q := newQueue()
ToContext(c, q)
w1 := &Work{}
w2 := &Work{}
w3 := &Work{}
Publish(c, w1)
Publish(c, w2)
Publish(c, w3)
Remove(c, w2)
g.Assert(len(q.items)).Equal(2)
g.Assert(len(q.itemc)).Equal(2)
g.Assert(Pull(c)).Equal(w1)
g.Assert(Pull(c)).Equal(w3)
g.Assert(Remove(c, w2)).Equal(ErrNotFound)
})
g.It("Should pull item", func() {
c := new(gin.Context)
q := New()
ToContext(c, q)
cn := new(closeNotifier)
cn.closec = make(chan bool, 1)
w1 := &Work{}
w2 := &Work{}
Publish(c, w1)
g.Assert(Pull(c)).Equal(w1)
Publish(c, w2)
g.Assert(PullClose(c, cn)).Equal(w2)
})
g.It("Should cancel pulling item", func() {
c := new(gin.Context)
q := New()
ToContext(c, q)
cn := new(closeNotifier)
cn.closec = make(chan bool, 1)
var wg sync.WaitGroup
go func() {
wg.Add(1)
g.Assert(PullClose(c, cn) == nil).IsTrue()
wg.Done()
}()
go func() {
cn.closec <- true
}()
wg.Wait()
})
})
}
type closeNotifier struct {
closec chan bool
}
func (c *closeNotifier) CloseNotify() <-chan bool {
return c.closec
}

18
queue/types.go Normal file
View file

@ -0,0 +1,18 @@
package queue
import "github.com/drone/drone/model"
// Work represents an item for work to be
// processed by a worker.
type Work struct {
Yaml string `json:"config"`
YamlEnc string `json:"secret"`
Repo *model.Repo `json:"repo"`
Build *model.Build `json:"build"`
BuildLast *model.Build `json:"build_last"`
Job *model.Job `json:"job"`
Netrc *model.Netrc `json:"netrc"`
Keys *model.Key `json:"keys"`
System *model.System `json:"system"`
User *model.User `json:"user"`
}

21
stream/context.go Normal file
View file

@ -0,0 +1,21 @@
package stream
import "golang.org/x/net/context"
const key = "stream"
// Setter defines a context that enables setting values.
type Setter interface {
Set(string, interface{})
}
// FromContext returns the Mux associated with this context.
func FromContext(c context.Context) Mux {
return c.Value(key).(Mux)
}
// ToContext adds the Mux to this context if it supports
// the Setter interface.
func ToContext(c Setter, m Mux) {
c.Set(key, m)
}

73
stream/stream.go Normal file
View file

@ -0,0 +1,73 @@
package stream
//go:generate mockery -name Mux -output mock -case=underscore
import (
"bufio"
"io"
"strconv"
"golang.org/x/net/context"
)
// Mux defines a stream multiplexer
type Mux interface {
// Create creates and returns a new stream identified by
// the specified key.
Create(key string) (io.ReadCloser, io.WriteCloser, error)
// Open returns the existing stream by key. If the stream
// does not exist an error is returned.
Open(key string) (io.ReadCloser, io.WriteCloser, error)
// Remove deletes the stream by key.
Remove(key string) error
// Exists return true if the stream exists.
Exists(key string) bool
}
// Create creates and returns a new stream identified
// by the specified key.
func Create(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) {
return FromContext(c).Create(key)
}
// Open returns the existing stream by key. If the stream does
// not exist an error is returned.
func Open(c context.Context, key string) (io.ReadCloser, io.WriteCloser, error) {
return FromContext(c).Open(key)
}
// Exists return true if the stream exists.
func Exists(c context.Context, key string) bool {
return FromContext(c).Exists(key)
}
// Remove deletes the stream by key.
func Remove(c context.Context, key string) error {
return FromContext(c).Remove(key)
}
// ToKey is a helper function that converts a unique identifier
// of type int64 into a string.
func ToKey(i int64) string {
return strconv.FormatInt(i, 10)
}
// Copy copies the stream from the source to the destination in
// valid JSON format. This converts the logs, which are per-line
// JSON objects, to a JSON array.
func Copy(dest io.Writer, src io.Reader) error {
io.WriteString(dest, "[")
scanner := bufio.NewScanner(src)
for scanner.Scan() {
io.WriteString(dest, scanner.Text())
io.WriteString(dest, ",\n")
}
io.WriteString(dest, "{}]")
return nil
}

95
stream/stream_impl.go Normal file
View file

@ -0,0 +1,95 @@
package stream
import (
"io"
"sync"
"github.com/djherbis/fscache"
)
var noexp fscache.Reaper
// New creates a new Mux using an in-memory filesystem.
func New() Mux {
fs := fscache.NewMemFs()
c, err := fscache.NewCache(fs, noexp)
if err != nil {
panic(err)
}
return &mux{c}
}
// New creates a new Mux using a persistent filesystem.
func NewFileSystem(path string) Mux {
fs, err := fscache.NewFs(path, 0777)
if err != nil {
panic(err)
}
c, err := fscache.NewCache(fs, noexp)
if err != nil {
panic(err)
}
return &mux{c}
}
// mux wraps the default fscache.Cache to match the
// defined interface and to wrap the ReadCloser and
// WriteCloser to avoid panics when we over-aggressively
// close streams.
type mux struct {
cache fscache.Cache
}
func (m *mux) Create(key string) (io.ReadCloser, io.WriteCloser, error) {
rc, wc, err := m.cache.Get(key)
if rc != nil {
rc = &closeOnceReader{ReadCloser: rc}
}
if wc != nil {
wc = &closeOnceWriter{WriteCloser: wc}
}
return rc, wc, err
}
func (m *mux) Open(key string) (io.ReadCloser, io.WriteCloser, error) {
return m.Create(key)
}
func (m *mux) Exists(key string) bool {
return m.cache.Exists(key)
}
func (m *mux) Remove(key string) error {
return m.cache.Remove(key)
}
// closeOnceReader is a helper function that ensures
// the reader is only closed once. This is because
// attempting to close the fscache reader more than
// once results in a panic.
type closeOnceReader struct {
io.ReadCloser
once sync.Once
}
func (c *closeOnceReader) Close() error {
c.once.Do(func() {
c.ReadCloser.Close()
})
return nil
}
// closeOnceWriter is a helper function that ensures
// the writer is only closed once. This is because
// attempting to close the fscache writer more than
// once results in a panic.
type closeOnceWriter struct {
io.WriteCloser
once sync.Once
}
func (c *closeOnceWriter) Close() error {
c.once.Do(func() {
c.WriteCloser.Close()
})
return nil
}

View file

@ -0,0 +1 @@
package stream

22
vendor/github.com/djherbis/fscache/LICENSE generated vendored Normal file
View file

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2015 Dustin H
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

93
vendor/github.com/djherbis/fscache/README.md generated vendored Normal file
View file

@ -0,0 +1,93 @@
fscache
==========
[![GoDoc](https://godoc.org/github.com/djherbis/fscache?status.svg)](https://godoc.org/github.com/djherbis/fscache)
[![Release](https://img.shields.io/github/release/djherbis/fscache.svg)](https://github.com/djherbis/fscache/releases/latest)
[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE.txt)
[![Build Status](https://travis-ci.org/djherbis/fscache.svg?branch=master)](https://travis-ci.org/djherbis/fscache)
[![Coverage Status](https://coveralls.io/repos/djherbis/fscache/badge.svg?branch=master)](https://coveralls.io/r/djherbis/fscache?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/djherbis/fscache)](https://goreportcard.com/report/github.com/djherbis/fscache)
Usage
------------
Streaming File Cache for #golang
fscache allows multiple readers to read from a cache while its being written to. [blog post](https://djherbis.github.io/post/fscache/)
Using the Cache directly:
```go
package main
import (
"io"
"log"
"os"
"time"
"gopkg.in/djherbis/fscache.v0"
)
func main() {
// create the cache, keys expire after 1 hour.
c, err := fscache.New("./cache", 0755, time.Hour)
if err != nil {
log.Fatal(err.Error())
}
// wipe the cache when done
defer c.Clean()
// Get() and it's streams can be called concurrently but just for example:
for i := 0; i < 3; i++ {
r, w, err := c.Get("stream")
if err != nil {
log.Fatal(err.Error())
}
if w != nil { // a new stream, write to it.
go func(){
w.Write([]byte("hello world\n"))
w.Close()
}()
}
// the stream has started, read from it
io.Copy(os.Stdout, r)
r.Close()
}
}
```
A Caching Middle-ware:
```go
package main
import(
"net/http"
"time"
"gopkg.in/djherbis/fscache.v0"
)
func main(){
c, err := fscache.New("./cache", 0700, 0)
if err != nil {
log.Fatal(err.Error())
}
handler := func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%v: %s", time.Now(), "hello world")
}
http.ListenAndServe(":8080", fscache.Handler(c, http.HandlerFunc(handler)))
}
```
Installation
------------
```sh
go get gopkg.in/djherbis/fscache.v0
```

85
vendor/github.com/djherbis/fscache/distrib.go generated vendored Normal file
View file

@ -0,0 +1,85 @@
package fscache
import (
"bytes"
"crypto/sha1"
"encoding/binary"
"io"
)
// Distributor provides a way to partition keys into Caches.
type Distributor interface {
// GetCache will always return the same Cache for the same key.
GetCache(key string) Cache
// Clean should wipe all the caches this Distributor manages
Clean() error
}
// stdDistribution distributes the keyspace evenly.
func stdDistribution(key string, n uint64) uint64 {
h := sha1.New()
io.WriteString(h, key)
buf := bytes.NewBuffer(h.Sum(nil)[:8])
i, _ := binary.ReadUvarint(buf)
return i % n
}
// NewDistributor returns a Distributor which evenly distributes the keyspace
// into the passed caches.
func NewDistributor(caches ...Cache) Distributor {
if len(caches) == 0 {
return nil
}
return &distrib{
distribution: stdDistribution,
caches: caches,
size: uint64(len(caches)),
}
}
type distrib struct {
distribution func(key string, n uint64) uint64
caches []Cache
size uint64
}
func (d *distrib) GetCache(key string) Cache {
return d.caches[d.distribution(key, d.size)]
}
// BUG(djherbis): Return an error if cleaning fails
func (d *distrib) Clean() error {
for _, c := range d.caches {
c.Clean()
}
return nil
}
// NewPartition returns a Cache which uses the Caches defined by the passed Distributor.
func NewPartition(d Distributor) Cache {
return &partition{
distributor: d,
}
}
type partition struct {
distributor Distributor
}
func (p *partition) Get(key string) (ReadAtCloser, io.WriteCloser, error) {
return p.distributor.GetCache(key).Get(key)
}
func (p *partition) Remove(key string) error {
return p.distributor.GetCache(key).Remove(key)
}
func (p *partition) Exists(key string) bool {
return p.distributor.GetCache(key).Exists(key)
}
func (p *partition) Clean() error {
return p.distributor.Clean()
}

199
vendor/github.com/djherbis/fscache/fs.go generated vendored Normal file
View file

@ -0,0 +1,199 @@
package fscache
import (
"bytes"
"crypto/md5"
"crypto/rand"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
"gopkg.in/djherbis/atime.v1"
"gopkg.in/djherbis/stream.v1"
)
// FileSystem is used as the source for a Cache.
type FileSystem interface {
// Stream FileSystem
stream.FileSystem
// Reload should look through the FileSystem and call the suplied fn
// with the key/filename pairs that are found.
Reload(func(key, name string)) error
// RemoveAll should empty the FileSystem of all files.
RemoveAll() error
// AccessTimes takes a File.Name() and returns the last time the file was read,
// and the last time it was written to.
// It will be used to check expiry of a file, and must be concurrent safe
// with modifications to the FileSystem (writes, reads etc.)
AccessTimes(name string) (rt, wt time.Time, err error)
}
type stdFs struct {
root string
}
// NewFs returns a FileSystem rooted at directory dir.
// Dir is created with perms if it doesn't exist.
func NewFs(dir string, mode os.FileMode) (FileSystem, error) {
return &stdFs{root: dir}, os.MkdirAll(dir, mode)
}
func (fs *stdFs) Reload(add func(key, name string)) error {
files, err := ioutil.ReadDir(fs.root)
if err != nil {
return err
}
addfiles := make(map[string]struct {
os.FileInfo
key string
})
for _, f := range files {
if strings.HasSuffix(f.Name(), ".key") {
continue
}
key, err := fs.getKey(f.Name())
if err != nil {
return err
}
fi, ok := addfiles[key]
if !ok || fi.ModTime().Before(f.ModTime()) {
if ok {
fs.Remove(fi.Name())
}
addfiles[key] = struct {
os.FileInfo
key string
}{
FileInfo: f,
key: key,
}
} else {
fs.Remove(f.Name())
}
}
for _, f := range addfiles {
path, err := filepath.Abs(filepath.Join(fs.root, f.Name()))
if err != nil {
return err
}
add(f.key, path)
}
return nil
}
func (fs *stdFs) Create(name string) (stream.File, error) {
name, err := fs.makeName(name)
if err != nil {
return nil, err
}
return fs.create(name)
}
func (fs *stdFs) create(name string) (stream.File, error) {
return os.OpenFile(filepath.Join(fs.root, name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
}
func (fs *stdFs) Open(name string) (stream.File, error) {
return os.Open(name)
}
func (fs *stdFs) Remove(name string) error {
os.Remove(fmt.Sprintf("%s.key", name))
return os.Remove(name)
}
func (fs *stdFs) RemoveAll() error {
return os.RemoveAll(fs.root)
}
func (fs *stdFs) AccessTimes(name string) (rt, wt time.Time, err error) {
fi, err := os.Stat(name)
if err != nil {
return rt, wt, err
}
return atime.Get(fi), fi.ModTime(), nil
}
const (
saltSize = 8
maxShort = 20
shortPrefix = "s"
longPrefix = "l"
)
func salt() string {
buf := bytes.NewBufferString("")
enc := base64.NewEncoder(base64.URLEncoding, buf)
io.CopyN(enc, rand.Reader, saltSize)
return buf.String()
}
func tob64(s string) string {
buf := bytes.NewBufferString("")
enc := base64.NewEncoder(base64.URLEncoding, buf)
enc.Write([]byte(s))
enc.Close()
return buf.String()
}
func fromb64(s string) string {
buf := bytes.NewBufferString(s)
dec := base64.NewDecoder(base64.URLEncoding, buf)
out := bytes.NewBufferString("")
io.Copy(out, dec)
return out.String()
}
func (fs *stdFs) makeName(key string) (string, error) {
b64key := tob64(key)
// short name
if len(b64key) < maxShort {
return fmt.Sprintf("%s%s%s", shortPrefix, salt(), b64key), nil
}
// long name
hash := md5.Sum([]byte(key))
name := fmt.Sprintf("%s%s%x", longPrefix, salt(), hash[:])
f, err := fs.create(fmt.Sprintf("%s.key", name))
if err != nil {
return "", err
}
_, err = f.Write([]byte(key))
f.Close()
return name, err
}
func (fs *stdFs) getKey(name string) (string, error) {
// short name
if strings.HasPrefix(name, shortPrefix) {
return fromb64(strings.TrimPrefix(name, shortPrefix)[saltSize:]), nil
}
// long name
f, err := fs.Open(filepath.Join(fs.root, fmt.Sprintf("%s.key", name)))
if err != nil {
return "", err
}
defer f.Close()
key, err := ioutil.ReadAll(f)
if err != nil {
return "", err
}
return string(key), nil
}

303
vendor/github.com/djherbis/fscache/fscache.go generated vendored Normal file
View file

@ -0,0 +1,303 @@
package fscache
import (
"io"
"os"
"sync"
"sync/atomic"
"time"
"gopkg.in/djherbis/stream.v1"
)
// Cache works like a concurrent-safe map for streams.
type Cache interface {
// Get manages access to the streams in the cache.
// If the key does not exist, w != nil and you can start writing to the stream.
// If the key does exist, w == nil.
// r will always be non-nil as long as err == nil and you must close r when you're done reading.
// Get can be called concurrently, and writing and reading is concurrent safe.
Get(key string) (ReadAtCloser, io.WriteCloser, error)
// Remove deletes the stream from the cache, blocking until the underlying
// file can be deleted (all active streams finish with it).
// It is safe to call Remove concurrently with Get.
Remove(key string) error
// Exists checks if a key is in the cache.
// It is safe to call Exists concurrently with Get.
Exists(key string) bool
// Clean will empty the cache and delete the cache folder.
// Clean is not safe to call while streams are being read/written.
Clean() error
}
type cache struct {
mu sync.RWMutex
files map[string]fileStream
grim Reaper
fs FileSystem
}
// ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range
// Requests are possible.
type ReadAtCloser interface {
io.ReadCloser
io.ReaderAt
}
type fileStream interface {
next() (ReadAtCloser, error)
inUse() bool
io.WriteCloser
Remove() error
Name() string
}
// New creates a new Cache using NewFs(dir, perms).
// expiry is the duration after which an un-accessed key will be removed from
// the cache, a zero value expiro means never expire.
func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) {
fs, err := NewFs(dir, perms)
if err != nil {
return nil, err
}
var grim Reaper
if expiry > 0 {
grim = &reaper{
expiry: expiry,
period: expiry,
}
}
return NewCache(fs, grim)
}
// NewCache creates a new Cache based on FileSystem fs.
// fs.Files() are loaded using the name they were created with as a key.
// Reaper is used to determine when files expire, nil means never expire.
func NewCache(fs FileSystem, grim Reaper) (Cache, error) {
c := &cache{
files: make(map[string]fileStream),
grim: grim,
fs: fs,
}
err := c.load()
if err != nil {
return nil, err
}
if grim != nil {
c.haunter()
}
return c, nil
}
func (c *cache) haunter() {
c.haunt()
time.AfterFunc(c.grim.Next(), c.haunter)
}
func (c *cache) haunt() {
c.mu.Lock()
defer c.mu.Unlock()
for key, f := range c.files {
if f.inUse() {
continue
}
lastRead, lastWrite, err := c.fs.AccessTimes(f.Name())
if err != nil {
continue
}
if c.grim.Reap(key, lastRead, lastWrite) {
delete(c.files, key)
c.fs.Remove(f.Name())
}
}
return
}
func (c *cache) load() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.fs.Reload(func(key, name string) {
c.files[key] = c.oldFile(name)
})
}
func (c *cache) Exists(key string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
_, ok := c.files[key]
return ok
}
func (c *cache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) {
c.mu.RLock()
f, ok := c.files[key]
if ok {
r, err = f.next()
c.mu.RUnlock()
return r, nil, err
}
c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
f, ok = c.files[key]
if ok {
r, err = f.next()
return r, nil, err
}
f, err = c.newFile(key)
if err != nil {
return nil, nil, err
}
r, err = f.next()
if err != nil {
f.Close()
c.fs.Remove(f.Name())
return nil, nil, err
}
c.files[key] = f
return r, f, err
}
func (c *cache) Remove(key string) error {
c.mu.Lock()
f, ok := c.files[key]
delete(c.files, key)
c.mu.Unlock()
if ok {
return f.Remove()
}
return nil
}
func (c *cache) Clean() error {
c.mu.Lock()
defer c.mu.Unlock()
c.files = make(map[string]fileStream)
return c.fs.RemoveAll()
}
type cachedFile struct {
stream *stream.Stream
handleCounter
}
func (c *cache) newFile(name string) (fileStream, error) {
s, err := stream.NewStream(name, c.fs)
if err != nil {
return nil, err
}
cf := &cachedFile{
stream: s,
}
cf.inc()
return cf, nil
}
func (c *cache) oldFile(name string) fileStream {
return &reloadedFile{
fs: c.fs,
name: name,
}
}
type reloadedFile struct {
fs FileSystem
name string
handleCounter
io.WriteCloser // nop Write & Close methods. will never be called.
}
func (f *reloadedFile) Name() string { return f.name }
func (f *reloadedFile) Remove() error {
f.waitUntilFree()
return f.fs.Remove(f.name)
}
func (f *reloadedFile) next() (r ReadAtCloser, err error) {
r, err = f.fs.Open(f.name)
if err == nil {
f.inc()
}
return &cacheReader{r: r, cnt: &f.handleCounter}, err
}
func (f *cachedFile) Name() string { return f.stream.Name() }
func (f *cachedFile) Remove() error { return f.stream.Remove() }
func (f *cachedFile) next() (r ReadAtCloser, err error) {
reader, err := f.stream.NextReader()
if err != nil {
return nil, err
}
f.inc()
return &cacheReader{
r: reader,
cnt: &f.handleCounter,
}, nil
}
func (f *cachedFile) Write(p []byte) (int, error) {
return f.stream.Write(p)
}
func (f *cachedFile) Close() error {
defer f.dec()
return f.stream.Close()
}
type cacheReader struct {
r ReadAtCloser
cnt *handleCounter
}
func (r *cacheReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.r.ReadAt(p, off)
}
func (r *cacheReader) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}
func (r *cacheReader) Close() error {
defer r.cnt.dec()
return r.r.Close()
}
type handleCounter struct {
cnt int64
grp sync.WaitGroup
}
func (h *handleCounter) inc() {
h.grp.Add(1)
atomic.AddInt64(&h.cnt, 1)
}
func (h *handleCounter) dec() {
atomic.AddInt64(&h.cnt, -1)
h.grp.Done()
}
func (h *handleCounter) inUse() bool {
return atomic.LoadInt64(&h.cnt) > 0
}
func (h *handleCounter) waitUntilFree() {
h.grp.Wait()
}

41
vendor/github.com/djherbis/fscache/handler.go generated vendored Normal file
View file

@ -0,0 +1,41 @@
package fscache
import (
"io"
"net/http"
)
// Handler is a caching middle-ware for http Handlers.
// It responds to http requests via the passed http.Handler, and caches the response
// using the passed cache. The cache key for the request is the req.URL.String().
// Note: It does not cache http headers. It is more efficient to set them yourself.
func Handler(c Cache, h http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
url := req.URL.String()
r, w, err := c.Get(url)
if err != nil {
h.ServeHTTP(rw, req)
return
}
defer r.Close()
if w != nil {
go func() {
defer w.Close()
h.ServeHTTP(&respWrapper{
ResponseWriter: rw,
Writer: w,
}, req)
}()
}
io.Copy(rw, r)
})
}
type respWrapper struct {
http.ResponseWriter
io.Writer
}
func (r *respWrapper) Write(p []byte) (int, error) {
return r.Writer.Write(p)
}

129
vendor/github.com/djherbis/fscache/layers.go generated vendored Normal file
View file

@ -0,0 +1,129 @@
package fscache
import (
"errors"
"io"
"sync"
)
type layeredCache struct {
layers []Cache
}
// NewLayered returns a Cache which stores its data in all the passed
// caches, when a key is requested it is loaded into all the caches above the first hit.
func NewLayered(caches ...Cache) Cache {
return &layeredCache{layers: caches}
}
func (l *layeredCache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) {
var last ReadAtCloser
var writers []io.WriteCloser
for i, layer := range l.layers {
r, w, err = layer.Get(key)
if err != nil {
if len(writers) > 0 {
last.Close()
multiWC(writers...).Close()
}
return nil, nil, err
}
// hit
if w == nil {
if len(writers) > 0 {
go func(r io.ReadCloser) {
wc := multiWC(writers...)
defer r.Close()
defer wc.Close()
io.Copy(wc, r)
}(r)
return last, nil, nil
}
return r, nil, nil
}
// miss
writers = append(writers, w)
if i == len(l.layers)-1 {
if last != nil {
last.Close()
}
return r, multiWC(writers...), nil
}
if last != nil {
last.Close()
}
last = r
}
return nil, nil, errors.New("no caches")
}
func (l *layeredCache) Remove(key string) error {
var grp sync.WaitGroup
// walk upwards so that lower layers don't
// restore upper layers on Get()
for i := len(l.layers) - 1; i >= 0; i-- {
grp.Add(1)
go func(layer Cache) {
defer grp.Done()
layer.Remove(key)
}(l.layers[i])
}
grp.Wait()
return nil
}
func (l *layeredCache) Exists(key string) bool {
for _, layer := range l.layers {
if layer.Exists(key) {
return true
}
}
return false
}
func (l *layeredCache) Clean() (err error) {
for _, layer := range l.layers {
er := layer.Clean()
if er != nil {
err = er
}
}
return nil
}
func multiWC(wc ...io.WriteCloser) io.WriteCloser {
if len(wc) == 0 {
return nil
}
return &multiWriteCloser{
writers: wc,
}
}
type multiWriteCloser struct {
writers []io.WriteCloser
}
func (t *multiWriteCloser) Write(p []byte) (n int, err error) {
for _, w := range t.writers {
n, err = w.Write(p)
if err != nil {
return
}
}
return len(p), nil
}
func (t *multiWriteCloser) Close() error {
for _, w := range t.writers {
w.Close()
}
return nil
}

133
vendor/github.com/djherbis/fscache/memfs.go generated vendored Normal file
View file

@ -0,0 +1,133 @@
package fscache
import (
"bytes"
"errors"
"io"
"sync"
"time"
"gopkg.in/djherbis/stream.v1"
)
type memFS struct {
mu sync.RWMutex
files map[string]*memFile
}
// NewMemFs creates an in-memory FileSystem.
// It does not support persistence (Reload is a nop).
func NewMemFs() FileSystem {
return &memFS{
files: make(map[string]*memFile),
}
}
func (fs *memFS) Reload(add func(key, name string)) error {
return nil
}
func (fs *memFS) AccessTimes(name string) (rt, wt time.Time, err error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
f, ok := fs.files[name]
if ok {
return f.rt, f.wt, nil
}
return rt, wt, errors.New("file has not been read")
}
func (fs *memFS) Create(key string) (stream.File, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if _, ok := fs.files[key]; ok {
return nil, errors.New("file exists")
}
file := &memFile{
name: key,
r: bytes.NewBuffer(nil),
wt: time.Now(),
}
file.memReader.memFile = file
fs.files[key] = file
return file, nil
}
func (fs *memFS) Open(name string) (stream.File, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if f, ok := fs.files[name]; ok {
f.rt = time.Now()
return &memReader{memFile: f}, nil
}
return nil, errors.New("file does not exist")
}
func (fs *memFS) Remove(key string) error {
fs.mu.Lock()
defer fs.mu.Unlock()
delete(fs.files, key)
return nil
}
func (fs *memFS) RemoveAll() error {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.files = make(map[string]*memFile)
return nil
}
type memFile struct {
mu sync.RWMutex
name string
r *bytes.Buffer
memReader
rt, wt time.Time
}
func (f *memFile) Name() string {
return f.name
}
func (f *memFile) Write(p []byte) (int, error) {
if len(p) > 0 {
f.mu.Lock()
defer f.mu.Unlock()
return f.r.Write(p)
}
return len(p), nil
}
func (f *memFile) Bytes() []byte {
f.mu.RLock()
defer f.mu.RUnlock()
return f.r.Bytes()
}
func (f *memFile) Close() error {
return nil
}
type memReader struct {
*memFile
n int
}
func (r *memReader) ReadAt(p []byte, off int64) (n int, err error) {
data := r.Bytes()
if int64(len(data)) < off {
return 0, io.EOF
}
n, err = bytes.NewReader(data[off:]).ReadAt(p, 0)
return n, err
}
func (r *memReader) Read(p []byte) (n int, err error) {
n, err = bytes.NewReader(r.Bytes()[r.n:]).Read(p)
r.n += n
return n, err
}
func (r *memReader) Close() error {
return nil
}

37
vendor/github.com/djherbis/fscache/reaper.go generated vendored Normal file
View file

@ -0,0 +1,37 @@
package fscache
import "time"
// Reaper is used to control when streams expire from the cache.
// It is called once right after loading, and then it is run
// again after every Next() period of time.
type Reaper interface {
// Returns the amount of time to wait before the next scheduled Reaping.
Next() time.Duration
// Given a key and the last r/w times of a file, return true
// to remove the file from the cache, false to keep it.
Reap(key string, lastRead, lastWrite time.Time) bool
}
// NewReaper returns a simple reaper which runs every "period"
// and reaps files which are older than "expiry".
func NewReaper(expiry, period time.Duration) Reaper {
return &reaper{
expiry: expiry,
period: period,
}
}
type reaper struct {
period time.Duration
expiry time.Duration
}
func (g *reaper) Next() time.Duration {
return g.period
}
func (g *reaper) Reap(key string, lastRead, lastWrite time.Time) bool {
return lastRead.Before(time.Now().Add(-g.expiry))
}

206
vendor/github.com/djherbis/fscache/server.go generated vendored Normal file
View file

@ -0,0 +1,206 @@
package fscache
import (
"bytes"
"errors"
"fmt"
"io"
"net"
)
// ListenAndServe hosts a Cache for access via NewRemote
func ListenAndServe(c Cache, addr string) error {
return (&server{c: c}).ListenAndServe(addr)
}
// NewRemote returns a Cache run via ListenAndServe
func NewRemote(raddr string) Cache {
return &remote{raddr: raddr}
}
type server struct {
c Cache
}
func (s *server) ListenAndServe(addr string) error {
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}
for {
c, err := l.Accept()
if err != nil {
return err
}
go s.Serve(c)
}
}
const (
actionGet = iota
actionRemove = iota
actionExists = iota
actionClean = iota
)
func getKey(r io.Reader) string {
dec := newDecoder(r)
buf := bytes.NewBufferString("")
io.Copy(buf, dec)
return buf.String()
}
func sendKey(w io.Writer, key string) {
enc := newEncoder(w)
enc.Write([]byte(key))
enc.Close()
}
func (s *server) Serve(c net.Conn) {
var action int
fmt.Fscanf(c, "%d\n", &action)
switch action {
case actionGet:
s.get(c, getKey(c))
case actionRemove:
s.c.Remove(getKey(c))
case actionExists:
s.exists(c, getKey(c))
case actionClean:
s.c.Clean()
}
}
func (s *server) exists(c net.Conn, key string) {
if s.c.Exists(key) {
fmt.Fprintf(c, "%d\n", 1)
} else {
fmt.Fprintf(c, "%d\n", 0)
}
}
func (s *server) get(c net.Conn, key string) {
r, w, err := s.c.Get(key)
if err != nil {
return // handle this better
}
defer r.Close()
if w != nil {
go func() {
fmt.Fprintf(c, "%d\n", 1)
io.Copy(w, newDecoder(c))
w.Close()
}()
} else {
fmt.Fprintf(c, "%d\n", 0)
}
enc := newEncoder(c)
io.Copy(enc, r)
enc.Close()
}
type remote struct {
raddr string
}
func (rmt *remote) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) {
c, err := net.Dial("tcp", rmt.raddr)
if err != nil {
return nil, nil, err
}
fmt.Fprintf(c, "%d\n", actionGet)
sendKey(c, key)
var i int
fmt.Fscanf(c, "%d\n", &i)
var ch chan struct{}
switch i {
case 0:
ch = make(chan struct{}) // close net.Conn on reader close
case 1:
ch = make(chan struct{}, 1) // two closes before net.Conn close
w = &safeCloser{
c: c,
ch: ch,
w: newEncoder(c),
}
default:
return nil, nil, errors.New("bad bad bad")
}
r = &safeCloser{
c: c,
ch: ch,
r: newDecoder(c),
}
return r, w, nil
}
type safeCloser struct {
c net.Conn
ch chan<- struct{}
r ReadAtCloser
w io.WriteCloser
}
func (s *safeCloser) ReadAt(p []byte, off int64) (int, error) {
return s.r.ReadAt(p, off)
}
func (s *safeCloser) Read(p []byte) (int, error) { return s.r.Read(p) }
func (s *safeCloser) Write(p []byte) (int, error) { return s.w.Write(p) }
// Close only closes the underlying connection when ch is full.
func (s *safeCloser) Close() (err error) {
if s.r != nil {
err = s.r.Close()
} else if s.w != nil {
err = s.w.Close()
}
select {
case s.ch <- struct{}{}:
return err
default:
return s.c.Close()
}
}
func (rmt *remote) Exists(key string) bool {
c, err := net.Dial("tcp", rmt.raddr)
if err != nil {
return false
}
fmt.Fprintf(c, "%d\n", actionExists)
sendKey(c, key)
var i int
fmt.Fscanf(c, "%d\n", &i)
return i == 1
}
func (rmt *remote) Remove(key string) error {
c, err := net.Dial("tcp", rmt.raddr)
if err != nil {
return err
}
fmt.Fprintf(c, "%d\n", actionRemove)
sendKey(c, key)
return nil
}
func (rmt *remote) Clean() error {
c, err := net.Dial("tcp", rmt.raddr)
if err != nil {
return err
}
fmt.Fprintf(c, "%d\n", actionClean)
return nil
}

72
vendor/github.com/djherbis/fscache/stream.go generated vendored Normal file
View file

@ -0,0 +1,72 @@
package fscache
import (
"encoding/json"
"errors"
"io"
)
type decoder interface {
Decode(interface{}) error
}
type encoder interface {
Encode(interface{}) error
}
type pktReader struct {
dec decoder
}
type pktWriter struct {
enc encoder
}
type packet struct {
Err int
Data []byte
}
const eof = 1
func (t *pktReader) ReadAt(p []byte, off int64) (n int, err error) {
// TODO not implemented
return 0, errors.New("not implemented")
}
func (t *pktReader) Read(p []byte) (int, error) {
var pkt packet
err := t.dec.Decode(&pkt)
if err != nil {
return 0, err
}
if pkt.Err == eof {
return 0, io.EOF
}
return copy(p, pkt.Data), nil
}
func (t *pktReader) Close() error {
return nil
}
func (t *pktWriter) Write(p []byte) (int, error) {
pkt := packet{Data: p}
err := t.enc.Encode(pkt)
if err != nil {
return 0, err
}
return len(p), nil
}
func (t *pktWriter) Close() error {
return t.enc.Encode(packet{Err: eof})
}
func newEncoder(w io.Writer) io.WriteCloser {
return &pktWriter{enc: json.NewEncoder(w)}
}
func newDecoder(r io.Reader) ReadAtCloser {
return &pktReader{dec: json.NewDecoder(r)}
}

22
vendor/gopkg.in/djherbis/atime.v1/LICENSE generated vendored Normal file
View file

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2015 Dustin H
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

39
vendor/gopkg.in/djherbis/atime.v1/README.md generated vendored Normal file
View file

@ -0,0 +1,39 @@
atime
==========
[![GoDoc](https://godoc.org/github.com/djherbis/atime?status.svg)](https://godoc.org/github.com/djherbis/atime)
[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE.txt)
[![Build Status](https://travis-ci.org/djherbis/atime.svg?branch=master)](https://travis-ci.org/djherbis/atime)
[![Coverage Status](https://coveralls.io/repos/djherbis/atime/badge.svg?branch=master)](https://coveralls.io/r/djherbis/atime?branch=master)
Usage
------------
File Access Times for #golang
Looking for ctime or btime? Checkout https://github.com/djherbis/times
Go has a hidden atime function for most platforms, this repo makes it accessible.
```go
package main
import (
"log"
"github.com/djherbis/atime"
)
func main() {
at, err := atime.Stat("myfile")
if err != nil {
log.Fatal(err.Error())
}
log.Println(at)
}
```
Installation
------------
```sh
go get github.com/djherbis/atime
```

21
vendor/gopkg.in/djherbis/atime.v1/atime_darwin.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_darwin.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(ts syscall.Timespec) time.Time {
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
}
func atime(fi os.FileInfo) time.Time {
return timespecToTime(fi.Sys().(*syscall.Stat_t).Atimespec)
}

21
vendor/gopkg.in/djherbis/atime.v1/atime_dragonfly.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_dragonfly.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(ts syscall.Timespec) time.Time {
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
}
func atime(fi os.FileInfo) time.Time {
return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim)
}

21
vendor/gopkg.in/djherbis/atime.v1/atime_freebsd.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_freebsd.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(ts syscall.Timespec) time.Time {
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
}
func atime(fi os.FileInfo) time.Time {
return timespecToTime(fi.Sys().(*syscall.Stat_t).Atimespec)
}

21
vendor/gopkg.in/djherbis/atime.v1/atime_linux.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_linux.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(ts syscall.Timespec) time.Time {
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
}
func atime(fi os.FileInfo) time.Time {
return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim)
}

22
vendor/gopkg.in/djherbis/atime.v1/atime_nacl.go generated vendored Normal file
View file

@ -0,0 +1,22 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_nacl.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(sec, nsec int64) time.Time {
return time.Unix(sec, nsec)
}
func atime(fi os.FileInfo) time.Time {
st := fi.Sys().(*syscall.Stat_t)
return timespecToTime(st.Atime, st.AtimeNsec)
}

21
vendor/gopkg.in/djherbis/atime.v1/atime_netbsd.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_netbsd.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(ts syscall.Timespec) time.Time {
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
}
func atime(fi os.FileInfo) time.Time {
return timespecToTime(fi.Sys().(*syscall.Stat_t).Atimespec)
}

21
vendor/gopkg.in/djherbis/atime.v1/atime_openbsd.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_openbsd.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(ts syscall.Timespec) time.Time {
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
}
func atime(fi os.FileInfo) time.Time {
return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim)
}

16
vendor/gopkg.in/djherbis/atime.v1/atime_plan9.go generated vendored Normal file
View file

@ -0,0 +1,16 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_plan9.go
package atime
import (
"os"
"time"
)
func atime(fi os.FileInfo) time.Time {
return time.Unix(int64(fi.Sys().(*syscall.Dir).Atime), 0)
}

21
vendor/gopkg.in/djherbis/atime.v1/atime_solaris.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_solaris.go
package atime
import (
"os"
"syscall"
"time"
)
func timespecToTime(ts syscall.Timespec) time.Time {
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
}
func atime(fi os.FileInfo) time.Time {
return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim)
}

17
vendor/gopkg.in/djherbis/atime.v1/atime_windows.go generated vendored Normal file
View file

@ -0,0 +1,17 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// http://golang.org/src/os/stat_windows.go
package atime
import (
"os"
"syscall"
"time"
)
func atime(fi os.FileInfo) time.Time {
return time.Unix(0, fi.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds())
}

21
vendor/gopkg.in/djherbis/atime.v1/stat.go generated vendored Normal file
View file

@ -0,0 +1,21 @@
// Package atime provides a platform-independent way to get atimes for files.
package atime
import (
"os"
"time"
)
// Get returns the Last Access Time for the given FileInfo
func Get(fi os.FileInfo) time.Time {
return atime(fi)
}
// Stat returns the Last Access Time for the given filename
func Stat(name string) (time.Time, error) {
fi, err := os.Stat(name)
if err != nil {
return time.Time{}, err
}
return atime(fi), nil
}

22
vendor/gopkg.in/djherbis/stream.v1/LICENSE generated vendored Normal file
View file

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2015 Dustin H
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

80
vendor/gopkg.in/djherbis/stream.v1/README.md generated vendored Normal file
View file

@ -0,0 +1,80 @@
stream
==========
[![GoDoc](https://godoc.org/github.com/djherbis/stream?status.svg)](https://godoc.org/github.com/djherbis/stream)
[![Release](https://img.shields.io/github/release/djherbis/stream.svg)](https://github.com/djherbis/stream/releases/latest)
[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg)](LICENSE.txt)
[![Build Status](https://travis-ci.org/djherbis/stream.svg?branch=master)](https://travis-ci.org/djherbis/stream)
[![Coverage Status](https://coveralls.io/repos/djherbis/stream/badge.svg?branch=master)](https://coveralls.io/r/djherbis/stream?branch=master)
Usage
------------
Write and Read concurrently, and independently.
To explain further, if you need to write to multiple places you can use io.MultiWriter,
if you need multiple Readers on something you can use io.TeeReader. If you want concurrency you can use io.Pipe().
However all of these methods "tie" each Read/Write together, your readers can't read from different places in the stream, each write must be distributed to all readers in sequence.
This package provides a way for multiple Readers to read off the same Writer, without waiting for the others. This is done by writing to a "File" interface which buffers the input so it can be read at any time from many independent readers. Readers can even be created while writing or after the stream is closed. They will all see a consistent view of the stream and will block until the section of the stream they request is written, all while being unaffected by the actions of the other readers.
The use case for this stems from my other project djherbis/fscache. I needed a byte caching mechanism which allowed many independent clients to have access to the data while it was being written, rather than re-generating the byte stream for each of them or waiting for a complete copy of the stream which could be stored and then re-used.
```go
import(
"io"
"log"
"os"
"time"
"github.com/djherbis/stream"
)
func main(){
w, err := stream.New("mystream")
if err != nil {
log.Fatal(err)
}
go func(){
io.WriteString(w, "Hello World!")
<-time.After(time.Second)
io.WriteString(w, "Streaming updates...")
w.Close()
}()
waitForReader := make(chan struct{})
go func(){
// Read from the stream
r, err := w.NextReader()
if err != nil {
log.Fatal(err)
}
io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates...
r.Close()
close(waitForReader)
}()
// Full copy of the stream!
r, err := w.NextReader()
if err != nil {
log.Fatal(err)
}
io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates...
// r supports io.ReaderAt too.
p := make([]byte, 4)
r.ReadAt(p, 1) // Read "ello" into p
r.Close()
<-waitForReader // don't leave main before go-routine finishes
}
```
Installation
------------
```sh
go get github.com/djherbis/stream
```

39
vendor/gopkg.in/djherbis/stream.v1/fs.go generated vendored Normal file
View file

@ -0,0 +1,39 @@
package stream
import (
"io"
"os"
)
// File is a backing data-source for a Stream.
type File interface {
Name() string // The name used to Create/Open the File
io.Reader // Reader must continue reading after EOF on subsequent calls after more Writes.
io.ReaderAt // Similarly to Reader
io.Writer // Concurrent reading/writing must be supported.
io.Closer // Close should do any cleanup when done with the File.
}
// FileSystem is used to manage Files
type FileSystem interface {
Create(name string) (File, error) // Create must return a new File for Writing
Open(name string) (File, error) // Open must return an existing File for Reading
Remove(name string) error // Remove deletes an existing File
}
// StdFileSystem is backed by the os package.
var StdFileSystem FileSystem = stdFS{}
type stdFS struct{}
func (fs stdFS) Create(name string) (File, error) {
return os.Create(name)
}
func (fs stdFS) Open(name string) (File, error) {
return os.Open(name)
}
func (fs stdFS) Remove(name string) error {
return os.Remove(name)
}

107
vendor/gopkg.in/djherbis/stream.v1/memfs.go generated vendored Normal file
View file

@ -0,0 +1,107 @@
package stream
import (
"bytes"
"errors"
"io"
"sync"
)
// ErrNotFoundInMem is returned when an in-memory FileSystem cannot find a file.
var ErrNotFoundInMem = errors.New("not found")
type memfs struct {
mu sync.RWMutex
files map[string]*memFile
}
// NewMemFS returns a New in-memory FileSystem
func NewMemFS() FileSystem {
return &memfs{
files: make(map[string]*memFile),
}
}
func (fs *memfs) Create(key string) (File, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
file := &memFile{
name: key,
r: bytes.NewBuffer(nil),
}
file.memReader.memFile = file
fs.files[key] = file
return file, nil
}
func (fs *memfs) Open(key string) (File, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
if f, ok := fs.files[key]; ok {
return &memReader{memFile: f}, nil
}
return nil, ErrNotFoundInMem
}
func (fs *memfs) Remove(key string) error {
fs.mu.Lock()
defer fs.mu.Unlock()
delete(fs.files, key)
return nil
}
type memFile struct {
mu sync.RWMutex
name string
r *bytes.Buffer
memReader
}
func (f *memFile) Name() string {
return f.name
}
func (f *memFile) Write(p []byte) (int, error) {
if len(p) > 0 {
f.mu.Lock()
defer f.mu.Unlock()
return f.r.Write(p)
}
return len(p), nil
}
func (f *memFile) Bytes() []byte {
f.mu.RLock()
defer f.mu.RUnlock()
return f.r.Bytes()
}
func (f *memFile) Close() error {
return nil
}
type memReader struct {
*memFile
n int
}
func (r *memReader) ReadAt(p []byte, off int64) (n int, err error) {
data := r.Bytes()
if int64(len(data)) < off {
return 0, io.EOF
}
n, err = bytes.NewReader(data[off:]).ReadAt(p, 0)
return n, err
}
func (r *memReader) Read(p []byte) (n int, err error) {
n, err = bytes.NewReader(r.Bytes()[r.n:]).Read(p)
r.n += n
return n, err
}
func (r *memReader) Close() error {
return nil
}

82
vendor/gopkg.in/djherbis/stream.v1/reader.go generated vendored Normal file
View file

@ -0,0 +1,82 @@
package stream
import "io"
// Reader is a concurrent-safe Stream Reader.
type Reader struct {
s *Stream
file File
}
// Name returns the name of the underlying File in the FileSystem.
func (r *Reader) Name() string { return r.file.Name() }
// ReadAt lets you Read from specific offsets in the Stream.
// ReadAt blocks while waiting for the requested section of the Stream to be written,
// unless the Stream is closed in which case it will always return immediately.
func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
r.s.b.RLock()
defer r.s.b.RUnlock()
var m int
for {
m, err = r.file.ReadAt(p[n:], off+int64(n))
n += m
if r.s.b.IsOpen() {
switch {
case n != 0 && err == nil:
return n, err
case err == io.EOF:
r.s.b.Wait()
case err != nil:
return n, err
}
} else {
return n, err
}
}
}
// Read reads from the Stream. If the end of an open Stream is reached, Read
// blocks until more data is written or the Stream is Closed.
func (r *Reader) Read(p []byte) (n int, err error) {
r.s.b.RLock()
defer r.s.b.RUnlock()
var m int
for {
m, err = r.file.Read(p[n:])
n += m
if r.s.b.IsOpen() {
switch {
case n != 0 && err == nil:
return n, err
case err == io.EOF:
r.s.b.Wait()
case err != nil:
return n, err
}
} else {
return n, err
}
}
}
// Close closes this Reader on the Stream. This must be called when done with the
// Reader or else the Stream cannot be Removed.
func (r *Reader) Close() error {
defer r.s.dec()
return r.file.Close()
}

92
vendor/gopkg.in/djherbis/stream.v1/stream.go generated vendored Normal file
View file

@ -0,0 +1,92 @@
// Package stream provides a way to read and write to a synchronous buffered pipe, with multiple reader support.
package stream
import (
"errors"
"sync"
)
// ErrRemoving is returned when requesting a Reader on a Stream which is being Removed.
var ErrRemoving = errors.New("cannot open a new reader while removing file")
// Stream is used to concurrently Write and Read from a File.
type Stream struct {
grp sync.WaitGroup
b *broadcaster
file File
fs FileSystem
removing chan struct{}
}
// New creates a new Stream from the StdFileSystem with Name "name".
func New(name string) (*Stream, error) {
return NewStream(name, StdFileSystem)
}
// NewStream creates a new Stream with Name "name" in FileSystem fs.
func NewStream(name string, fs FileSystem) (*Stream, error) {
f, err := fs.Create(name)
sf := &Stream{
file: f,
fs: fs,
b: newBroadcaster(),
removing: make(chan struct{}),
}
sf.inc()
return sf, err
}
// Name returns the name of the underlying File in the FileSystem.
func (s *Stream) Name() string { return s.file.Name() }
// Write writes p to the Stream. It's concurrent safe to be called with Stream's other methods.
func (s *Stream) Write(p []byte) (int, error) {
defer s.b.Broadcast()
s.b.Lock()
defer s.b.Unlock()
return s.file.Write(p)
}
// Close will close the active stream. This will cause Readers to return EOF once they have
// read the entire stream.
func (s *Stream) Close() error {
defer s.dec()
defer s.b.Close()
s.b.Lock()
defer s.b.Unlock()
return s.file.Close()
}
// Remove will block until the Stream and all its Readers have been Closed,
// at which point it will delete the underlying file. NextReader() will return
// ErrRemoving if called after Remove.
func (s *Stream) Remove() error {
close(s.removing)
s.grp.Wait()
return s.fs.Remove(s.file.Name())
}
// NextReader will return a concurrent-safe Reader for this stream. Each Reader will
// see a complete and independent view of the stream, and can Read will the stream
// is written to.
func (s *Stream) NextReader() (*Reader, error) {
s.inc()
select {
case <-s.removing:
s.dec()
return nil, ErrRemoving
default:
}
file, err := s.fs.Open(s.file.Name())
if err != nil {
s.dec()
return nil, err
}
return &Reader{file: file, s: s}, nil
}
func (s *Stream) inc() { s.grp.Add(1) }
func (s *Stream) dec() { s.grp.Done() }

34
vendor/gopkg.in/djherbis/stream.v1/sync.go generated vendored Normal file
View file

@ -0,0 +1,34 @@
package stream
import (
"sync"
"sync/atomic"
)
type broadcaster struct {
sync.RWMutex
closed uint32
*sync.Cond
}
func newBroadcaster() *broadcaster {
var b broadcaster
b.Cond = sync.NewCond(b.RWMutex.RLocker())
return &b
}
func (b *broadcaster) Wait() {
if b.IsOpen() {
b.Cond.Wait()
}
}
func (b *broadcaster) IsOpen() bool {
return atomic.LoadUint32(&b.closed) == 0
}
func (b *broadcaster) Close() error {
atomic.StoreUint32(&b.closed, 1)
b.Cond.Broadcast()
return nil
}

15
vendor/vendor.json vendored
View file

@ -23,6 +23,11 @@
"revision": "c1da56349675b292d3200463e2c88b9aa5e02391",
"revisionTime": "2015-09-04T14:24:56-07:00"
},
{
"path": "github.com/djherbis/fscache",
"revision": "ffc728270b01f3906c396bbe796232b87750f24e",
"revisionTime": "2016-03-05T10:30:05-08:00"
},
{
"path": "github.com/docker/docker/pkg/stdcopy",
"revision": "9356c76d9f6e285e71f04df33ef7870455a42775",
@ -216,6 +221,16 @@
"revision": "8a57ed94ffd43444c0879fe75701732a38afc985",
"revisionTime": "2015-12-29T21:02:54-07:00"
},
{
"path": "gopkg.in/djherbis/atime.v1",
"revision": "8e47e0e01d08df8b9f840d74299c8ab70a024a30",
"revisionTime": "2015-08-29T00:19:25-07:00"
},
{
"path": "gopkg.in/djherbis/stream.v1",
"revision": "26a761059928627ca84837000dfb33447c66a146",
"revisionTime": "2016-02-03T22:24:40-08:00"
},
{
"path": "gopkg.in/go-playground/validator.v8",
"revision": "014792cf3e266caff1e916876be12282b33059e0",