2022-05-27 17:06:48 +00:00
|
|
|
package lib
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
// ParallellExecutor - Execute callback functions in parallell
|
|
|
|
type ParallellExecutor struct {
|
|
|
|
errChan chan error
|
|
|
|
wg *sync.WaitGroup
|
|
|
|
mux *sync.Mutex
|
2022-05-30 16:08:36 +00:00
|
|
|
guard chan struct{}
|
2022-05-27 17:06:48 +00:00
|
|
|
|
|
|
|
// Error returned by Wait(), cached for other Wait() invocations
|
|
|
|
err error
|
|
|
|
done bool
|
|
|
|
}
|
|
|
|
|
2022-05-30 16:08:36 +00:00
|
|
|
func NewParallellExecutor(maxWorkers int) *ParallellExecutor {
|
2022-05-27 17:06:48 +00:00
|
|
|
return &ParallellExecutor{
|
|
|
|
errChan: make(chan error),
|
|
|
|
mux: new(sync.Mutex),
|
|
|
|
wg: new(sync.WaitGroup),
|
2022-05-30 16:08:36 +00:00
|
|
|
guard: make(chan struct{}, maxWorkers),
|
2022-05-27 17:06:48 +00:00
|
|
|
|
|
|
|
err: nil,
|
|
|
|
done: false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *ParallellExecutor) Add(fn func() error) {
|
|
|
|
e.wg.Add(1)
|
|
|
|
|
2022-05-30 16:08:36 +00:00
|
|
|
e.guard <- struct{}{} // Block until a worker is available
|
|
|
|
|
2022-05-27 17:06:48 +00:00
|
|
|
go func() {
|
|
|
|
defer e.wg.Done()
|
2022-05-30 16:08:36 +00:00
|
|
|
defer func() {
|
|
|
|
<-e.guard
|
|
|
|
}()
|
2022-05-27 17:06:48 +00:00
|
|
|
|
|
|
|
err := fn()
|
|
|
|
if err != nil {
|
|
|
|
e.errChan <- err
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *ParallellExecutor) Wait() error {
|
|
|
|
e.mux.Lock()
|
|
|
|
defer e.mux.Unlock()
|
|
|
|
|
|
|
|
if e.done {
|
|
|
|
return e.err
|
|
|
|
}
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
// Ensure channel is closed
|
|
|
|
go func() {
|
|
|
|
e.wg.Wait()
|
|
|
|
close(e.errChan)
|
|
|
|
}()
|
|
|
|
|
|
|
|
for err = range e.errChan {
|
|
|
|
if err != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
e.done = true
|
|
|
|
e.err = err
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|