diff --git a/pkg/common/executor.go b/pkg/common/executor.go index 5e9b28d..fb76d0b 100644 --- a/pkg/common/executor.go +++ b/pkg/common/executor.go @@ -91,27 +91,33 @@ func NewErrorExecutor(err error) Executor { } // NewParallelExecutor creates a new executor from a parallel of other executors -func NewParallelExecutor(executors ...Executor) Executor { +func NewParallelExecutor(parallel int, executors ...Executor) Executor { return func(ctx context.Context) error { - errChan := make(chan error) + work := make(chan Executor, len(executors)) + errs := make(chan error, len(executors)) - for _, executor := range executors { - e := executor - go func() { - err := e.ChannelError(errChan)(ctx) - if err != nil { - log.Fatal(err) + for i := 0; i < parallel; i++ { + go func(work <-chan Executor, errs chan<- error) { + for executor := range work { + errs <- executor(ctx) } - }() + }(work, errs) } + for i := 0; i < len(executors); i++ { + work <- executors[i] + } + close(work) + // Executor waits all executors to cleanup these resources. var firstErr error for i := 0; i < len(executors); i++ { - if err := <-errChan; err != nil && firstErr == nil { + err := <-errs + if firstErr == nil { firstErr = err } } + if err := ctx.Err(); err != nil { return err } @@ -119,14 +125,6 @@ func NewParallelExecutor(executors ...Executor) Executor { } } -// ChannelError sends error to errChan rather than returning error -func (e Executor) ChannelError(errChan chan error) Executor { - return func(ctx context.Context) error { - errChan <- e(ctx) - return nil - } -} - // Then runs another executor if this executor succeeds func (e Executor) Then(then Executor) Executor { return func(ctx context.Context) error { diff --git a/pkg/common/executor_test.go b/pkg/common/executor_test.go index 17df3b7..7f691e4 100644 --- a/pkg/common/executor_test.go +++ b/pkg/common/executor_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -79,14 +80,25 @@ func TestNewParallelExecutor(t *testing.T) { ctx := context.Background() count := 0 + activeCount := 0 + maxCount := 0 emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { count++ + + activeCount++ + if activeCount > maxCount { + maxCount = activeCount + } + time.Sleep(2 * time.Second) + activeCount-- + return nil }) - err := NewParallelExecutor(emptyWorkflow, emptyWorkflow)(ctx) - assert.Equal(2, count) + err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) + assert.Equal(3, count, "should run all 3 executors") + assert.Equal(2, maxCount, "should run at most 2 executors in parallel") assert.Nil(err) } @@ -101,7 +113,7 @@ func TestNewParallelExecutorFailed(t *testing.T) { count++ return fmt.Errorf("fake error") }) - err := NewParallelExecutor(errorWorkflow)(ctx) + err := NewParallelExecutor(1, errorWorkflow)(ctx) assert.Equal(1, count) assert.ErrorIs(context.Canceled, err) } @@ -123,7 +135,7 @@ func TestNewParallelExecutorCanceled(t *testing.T) { count++ return errExpected }) - err := NewParallelExecutor(errorWorkflow, successWorkflow, successWorkflow)(ctx) + err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx) assert.Equal(3, count) assert.Error(errExpected, err) } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 7b26922..d7020dd 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -121,8 +121,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { stage := plan.Stages[i] stagePipeline = append(stagePipeline, func(ctx context.Context) error { pipeline := make([]common.Executor, 0) - stageExecutor := make([]common.Executor, 0) for r, run := range stage.Runs { + stageExecutor := make([]common.Executor, 0) job := run.Job() if job.Strategy != nil { strategyRc := runner.newRunContext(run, nil) @@ -140,7 +140,6 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { maxParallel = len(matrixes) } - b := 0 for i, matrix := range matrixes { rc := runner.newRunContext(run, matrix) rc.JobName = rc.Name @@ -167,15 +166,10 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { return nil })(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets))) }) - b++ - if b == maxParallel { - pipeline = append(pipeline, common.NewParallelExecutor(stageExecutor...)) - stageExecutor = make([]common.Executor, 0) - b = 0 - } } + pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) } - return common.NewPipelineExecutor(pipeline...)(ctx) + return common.NewParallelExecutor(runtime.NumCPU(), pipeline...)(ctx) }) }