gomod2nix/internal/lib/executor.go
= 5d387097aa fix: ensure executor doesn't deadlock when closure errors
When running 'gomod2nix' on in my project, the 'gomod2nix import' was
failing for every import. I have more imports than the default maxJobs.

This caused a deadlock and the program never finished.

This is because in the erroring case, we send to the errChan, which is a
blocking channel. If this blocks then the defers are never called, most
importantly the `defer` which pulls an entry off the semaphore
(e.guard).

This means once the erroring work functions exceeds the numWorkers, we
will block trying to acquire the semaphore when we call .Add with more
work.

We never get to the point where we call .Wait(), which would drain the
errChan becuase we are blocked on the semaphore whilst we are still
generating work.

This change moves the semaphore acquire to within the goroutines
themselves. This alters the behaviour in that we now will start as many
goroutines as we have work items, but the work they do will still be
gated by the semaphore.

This is reasonable behaviour: goroutines are cheap, in general this
package is useful if the work the functions are doing is expensive not
the goroutine creation itself. The work still is guarded by the
semaphore.

There is also a regression test added and in passing, the spelling of
Parallel is corrected.
2024-10-20 20:19:25 +02:00

74 lines
1.1 KiB
Go

package lib
import (
"sync"
)
// ParallelExecutor - Execute callback functions in parallel
type ParallelExecutor struct {
errChan chan error
wg *sync.WaitGroup
mux *sync.Mutex
guard chan struct{}
// Error returned by Wait(), cached for other Wait() invocations
err error
done bool
}
func NewParallelExecutor(maxWorkers int) *ParallelExecutor {
return &ParallelExecutor{
errChan: make(chan error),
mux: new(sync.Mutex),
wg: new(sync.WaitGroup),
guard: make(chan struct{}, maxWorkers),
err: nil,
done: false,
}
}
func (e *ParallelExecutor) Add(fn func() error) {
e.wg.Add(1)
go func() {
e.guard <- struct{}{} // Block until a worker is available
defer e.wg.Done()
defer func() {
<-e.guard
}()
err := fn()
if err != nil {
e.errChan <- err
}
}()
}
func (e *ParallelExecutor) Wait() error {
e.mux.Lock()
defer e.mux.Unlock()
if e.done {
return e.err
}
var err error
// Ensure channel is closed
go func() {
e.wg.Wait()
close(e.errChan)
}()
for err = range e.errChan {
if err != nil {
break
}
}
e.done = true
e.err = err
return err
}