From 5cb8bd825d35bb19a1a21c3ffe8b34d1ec44447c Mon Sep 17 00:00:00 2001 From: adisbladis Date: Sat, 28 May 2022 01:06:48 +0800 Subject: [PATCH] fetch: Run in parallell --- fetch/fetch.go | 72 +++++++++++++++++++++++++++++------------------ lib/executor.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 27 deletions(-) create mode 100644 lib/executor.go diff --git a/fetch/fetch.go b/fetch/fetch.go index 3163506..d12eb8d 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -9,8 +9,10 @@ import ( "os/exec" "path" "strings" + "sync" log "github.com/sirupsen/logrus" + "github.com/tweag/gomod2nix/lib" "github.com/tweag/gomod2nix/types" "golang.org/x/mod/modfile" ) @@ -89,44 +91,60 @@ func FetchPackages(goModPath string, goSumPath string, goMod2NixPath string, num }).Info("Done downloading dependencies") } + executor := lib.NewParallellExecutor(numWorkers) + var mux sync.Mutex + packages := []*types.Package{} for _, dl := range modDownloads { + dl := dl - goPackagePath, hasReplace := replace[dl.Path] - if !hasReplace { - goPackagePath = dl.Path - } + executor.Add(func() error { + + goPackagePath, hasReplace := replace[dl.Path] + if !hasReplace { + goPackagePath = dl.Path + } + + var storePath string + { + stdout, err := exec.Command( + "nix", "eval", "--impure", "--expr", + fmt.Sprintf("builtins.path { name = \"%s_%s\"; path = \"%s\"; }", path.Base(goPackagePath), dl.Version, dl.Dir), + ).Output() + if err != nil { + return err + } + storePath = string(stdout)[1 : len(stdout)-2] + } - var storePath string - { stdout, err := exec.Command( - "nix", "eval", "--impure", "--expr", - fmt.Sprintf("builtins.path { name = \"%s_%s\"; path = \"%s\"; }", path.Base(goPackagePath), dl.Version, dl.Dir), + "nix-store", "--query", "--hash", storePath, ).Output() if err != nil { - return nil, err + return err } - storePath = string(stdout)[1 : len(stdout)-2] - } + hash := strings.TrimSpace(string(stdout)) - stdout, err := exec.Command( - "nix-store", "--query", "--hash", storePath, - ).Output() - if err != nil { - return nil, err - } - hash := strings.TrimSpace(string(stdout)) + pkg := &types.Package{ + GoPackagePath: goPackagePath, + Version: dl.Version, + Hash: hash, + } + if hasReplace { + pkg.ReplacedPath = dl.Path + } - pkg := &types.Package{ - GoPackagePath: goPackagePath, - Version: dl.Version, - Hash: hash, - } - if hasReplace { - pkg.ReplacedPath = dl.Path - } + mux.Lock() + packages = append(packages, pkg) + mux.Unlock() - packages = append(packages, pkg) + return nil + }) + } + + err = executor.Wait() + if err != nil { + return nil, err } return packages, nil diff --git a/lib/executor.go b/lib/executor.go new file mode 100644 index 0000000..dcee4d5 --- /dev/null +++ b/lib/executor.go @@ -0,0 +1,75 @@ +package lib + +import ( + "sync" +) + +// ParallellExecutor - Execute callback functions in parallell +type ParallellExecutor struct { + errChan chan error + wg *sync.WaitGroup + mux *sync.Mutex + guard chan struct{} + + // Error returned by Wait(), cached for other Wait() invocations + err error + done bool +} + +func NewParallellExecutor(maxWorkers int) *ParallellExecutor { + return &ParallellExecutor{ + errChan: make(chan error), + mux: new(sync.Mutex), + wg: new(sync.WaitGroup), + guard: make(chan struct{}, maxWorkers), + + err: nil, + done: false, + } +} + +func (e *ParallellExecutor) Add(fn func() error) { + e.wg.Add(1) + + e.guard <- struct{}{} // Block + + go func() { + defer e.wg.Done() + defer func() { + <-e.guard + }() + + err := fn() + if err != nil { + e.errChan <- err + } + }() +} + +func (e *ParallellExecutor) Wait() error { + e.mux.Lock() + defer e.mux.Unlock() + + if e.done { + return e.err + } + + var err error + + // Ensure channel is closed + go func() { + e.wg.Wait() + close(e.errChan) + }() + + for err = range e.errChan { + if err != nil { + break + } + } + + e.done = true + e.err = err + + return err +}