act/pkg/common/executor_test.go
Markus Wolf 18792f9620
feat: run jobs in parallel (#1003)
* feat: run jobs in parallel

This changes fixes and restructures the parallel execution of jobs.
The previous changes limiting the parallel execution did break this
and allowed only one job in parallel.

While we run #CPU jobs in parallel now, the jobs added per job-matrix
add to this. So we might over-commit to the capacity, but at least
it is limited.

* fix: correctly build job pipeline

The job pipeline should just append all required pipeline steps.
The parallelism will be handled by the ParallelExecutor and we
shouldn't handle it during building the pipelines.

Also this adds a test, that the ParallelExecutor does run
a limited amount of parallel goroutines.

* test: correct test implementation

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
2022-02-25 18:47:16 +00:00

141 lines
2.9 KiB
Go

package common
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestNewWorkflow(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
// empty
emptyWorkflow := NewPipelineExecutor()
assert.Nil(emptyWorkflow(ctx))
// error case
errorWorkflow := NewErrorExecutor(fmt.Errorf("test error"))
assert.NotNil(errorWorkflow(ctx))
// multiple success case
runcount := 0
successWorkflow := NewPipelineExecutor(
func(ctx context.Context) error {
runcount++
return nil
},
func(ctx context.Context) error {
runcount++
return nil
})
assert.Nil(successWorkflow(ctx))
assert.Equal(2, runcount)
}
func TestNewConditionalExecutor(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
trueCount := 0
falseCount := 0
err := NewConditionalExecutor(func(ctx context.Context) bool {
return false
}, func(ctx context.Context) error {
trueCount++
return nil
}, func(ctx context.Context) error {
falseCount++
return nil
})(ctx)
assert.Nil(err)
assert.Equal(0, trueCount)
assert.Equal(1, falseCount)
err = NewConditionalExecutor(func(ctx context.Context) bool {
return true
}, func(ctx context.Context) error {
trueCount++
return nil
}, func(ctx context.Context) error {
falseCount++
return nil
})(ctx)
assert.Nil(err)
assert.Equal(1, trueCount)
assert.Equal(1, falseCount)
}
func TestNewParallelExecutor(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
count := 0
activeCount := 0
maxCount := 0
emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++
activeCount++
if activeCount > maxCount {
maxCount = activeCount
}
time.Sleep(2 * time.Second)
activeCount--
return nil
})
err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
assert.Equal(3, count, "should run all 3 executors")
assert.Equal(2, maxCount, "should run at most 2 executors in parallel")
assert.Nil(err)
}
func TestNewParallelExecutorFailed(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()
count := 0
errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++
return fmt.Errorf("fake error")
})
err := NewParallelExecutor(1, errorWorkflow)(ctx)
assert.Equal(1, count)
assert.ErrorIs(context.Canceled, err)
}
func TestNewParallelExecutorCanceled(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()
errExpected := fmt.Errorf("fake error")
count := 0
successWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++
return nil
})
errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
count++
return errExpected
})
err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx)
assert.Equal(3, count)
assert.Error(errExpected, err)
}