fetch: Run in parallell

This commit is contained in:
adisbladis 2022-05-28 01:06:48 +08:00
parent f10ef7325c
commit 5cb8bd825d
2 changed files with 120 additions and 27 deletions

View file

@ -9,8 +9,10 @@ import (
"os/exec" "os/exec"
"path" "path"
"strings" "strings"
"sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tweag/gomod2nix/lib"
"github.com/tweag/gomod2nix/types" "github.com/tweag/gomod2nix/types"
"golang.org/x/mod/modfile" "golang.org/x/mod/modfile"
) )
@ -89,8 +91,14 @@ func FetchPackages(goModPath string, goSumPath string, goMod2NixPath string, num
}).Info("Done downloading dependencies") }).Info("Done downloading dependencies")
} }
executor := lib.NewParallellExecutor(numWorkers)
var mux sync.Mutex
packages := []*types.Package{} packages := []*types.Package{}
for _, dl := range modDownloads { for _, dl := range modDownloads {
dl := dl
executor.Add(func() error {
goPackagePath, hasReplace := replace[dl.Path] goPackagePath, hasReplace := replace[dl.Path]
if !hasReplace { if !hasReplace {
@ -104,7 +112,7 @@ func FetchPackages(goModPath string, goSumPath string, goMod2NixPath string, num
fmt.Sprintf("builtins.path { name = \"%s_%s\"; path = \"%s\"; }", path.Base(goPackagePath), dl.Version, dl.Dir), fmt.Sprintf("builtins.path { name = \"%s_%s\"; path = \"%s\"; }", path.Base(goPackagePath), dl.Version, dl.Dir),
).Output() ).Output()
if err != nil { if err != nil {
return nil, err return err
} }
storePath = string(stdout)[1 : len(stdout)-2] storePath = string(stdout)[1 : len(stdout)-2]
} }
@ -113,7 +121,7 @@ func FetchPackages(goModPath string, goSumPath string, goMod2NixPath string, num
"nix-store", "--query", "--hash", storePath, "nix-store", "--query", "--hash", storePath,
).Output() ).Output()
if err != nil { if err != nil {
return nil, err return err
} }
hash := strings.TrimSpace(string(stdout)) hash := strings.TrimSpace(string(stdout))
@ -126,7 +134,17 @@ func FetchPackages(goModPath string, goSumPath string, goMod2NixPath string, num
pkg.ReplacedPath = dl.Path pkg.ReplacedPath = dl.Path
} }
mux.Lock()
packages = append(packages, pkg) packages = append(packages, pkg)
mux.Unlock()
return nil
})
}
err = executor.Wait()
if err != nil {
return nil, err
} }
return packages, nil return packages, nil

75
lib/executor.go Normal file
View file

@ -0,0 +1,75 @@
package lib
import (
"sync"
)
// ParallellExecutor - Execute callback functions in parallell
type ParallellExecutor struct {
errChan chan error
wg *sync.WaitGroup
mux *sync.Mutex
guard chan struct{}
// Error returned by Wait(), cached for other Wait() invocations
err error
done bool
}
func NewParallellExecutor(maxWorkers int) *ParallellExecutor {
return &ParallellExecutor{
errChan: make(chan error),
mux: new(sync.Mutex),
wg: new(sync.WaitGroup),
guard: make(chan struct{}, maxWorkers),
err: nil,
done: false,
}
}
func (e *ParallellExecutor) Add(fn func() error) {
e.wg.Add(1)
e.guard <- struct{}{} // Block
go func() {
defer e.wg.Done()
defer func() {
<-e.guard
}()
err := fn()
if err != nil {
e.errChan <- err
}
}()
}
func (e *ParallellExecutor) Wait() error {
e.mux.Lock()
defer e.mux.Unlock()
if e.done {
return e.err
}
var err error
// Ensure channel is closed
go func() {
e.wg.Wait()
close(e.errChan)
}()
for err = range e.errChan {
if err != nil {
break
}
}
e.done = true
e.err = err
return err
}