act/pkg/common/executor.go
Markus Wolf 18792f9620
feat: run jobs in parallel (#1003)
* feat: run jobs in parallel

This changes fixes and restructures the parallel execution of jobs.
The previous changes limiting the parallel execution did break this
and allowed only one job in parallel.

While we run #CPU jobs in parallel now, the jobs added per job-matrix
add to this. So we might over-commit to the capacity, but at least
it is limited.

* fix: correctly build job pipeline

The job pipeline should just append all required pipeline steps.
The parallelism will be handled by the ParallelExecutor and we
shouldn't handle it during building the pipelines.

Also this adds a test, that the ParallelExecutor does run
a limited amount of parallel goroutines.

* test: correct test implementation

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
2022-02-25 18:47:16 +00:00

191 lines
4.2 KiB
Go

package common
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
)
// Warning that implements `error` but safe to ignore
type Warning struct {
Message string
}
// Error the contract for error
func (w Warning) Error() string {
return w.Message
}
// Warningf create a warning
func Warningf(format string, args ...interface{}) Warning {
w := Warning{
Message: fmt.Sprintf(format, args...),
}
return w
}
// Executor define contract for the steps of a workflow
type Executor func(ctx context.Context) error
// Conditional define contract for the conditional predicate
type Conditional func(ctx context.Context) bool
// NewInfoExecutor is an executor that logs messages
func NewInfoExecutor(format string, args ...interface{}) Executor {
return func(ctx context.Context) error {
logger := Logger(ctx)
logger.Infof(format, args...)
return nil
}
}
// NewDebugExecutor is an executor that logs messages
func NewDebugExecutor(format string, args ...interface{}) Executor {
return func(ctx context.Context) error {
logger := Logger(ctx)
logger.Debugf(format, args...)
return nil
}
}
// NewPipelineExecutor creates a new executor from a series of other executors
func NewPipelineExecutor(executors ...Executor) Executor {
if len(executors) == 0 {
return func(ctx context.Context) error {
return nil
}
}
var rtn Executor
for _, executor := range executors {
if rtn == nil {
rtn = executor
} else {
rtn = rtn.Then(executor)
}
}
return rtn
}
// NewConditionalExecutor creates a new executor based on conditions
func NewConditionalExecutor(conditional Conditional, trueExecutor Executor, falseExecutor Executor) Executor {
return func(ctx context.Context) error {
if conditional(ctx) {
if trueExecutor != nil {
return trueExecutor(ctx)
}
} else {
if falseExecutor != nil {
return falseExecutor(ctx)
}
}
return nil
}
}
// NewErrorExecutor creates a new executor that always errors out
func NewErrorExecutor(err error) Executor {
return func(ctx context.Context) error {
return err
}
}
// NewParallelExecutor creates a new executor from a parallel of other executors
func NewParallelExecutor(parallel int, executors ...Executor) Executor {
return func(ctx context.Context) error {
work := make(chan Executor, len(executors))
errs := make(chan error, len(executors))
for i := 0; i < parallel; i++ {
go func(work <-chan Executor, errs chan<- error) {
for executor := range work {
errs <- executor(ctx)
}
}(work, errs)
}
for i := 0; i < len(executors); i++ {
work <- executors[i]
}
close(work)
// Executor waits all executors to cleanup these resources.
var firstErr error
for i := 0; i < len(executors); i++ {
err := <-errs
if firstErr == nil {
firstErr = err
}
}
if err := ctx.Err(); err != nil {
return err
}
return firstErr
}
}
// Then runs another executor if this executor succeeds
func (e Executor) Then(then Executor) Executor {
return func(ctx context.Context) error {
err := e(ctx)
if err != nil {
switch err.(type) {
case Warning:
log.Warning(err.Error())
default:
return err
}
}
if ctx.Err() != nil {
return ctx.Err()
}
return then(ctx)
}
}
// If only runs this executor if conditional is true
func (e Executor) If(conditional Conditional) Executor {
return func(ctx context.Context) error {
if conditional(ctx) {
return e(ctx)
}
return nil
}
}
// IfNot only runs this executor if conditional is true
func (e Executor) IfNot(conditional Conditional) Executor {
return func(ctx context.Context) error {
if !conditional(ctx) {
return e(ctx)
}
return nil
}
}
// IfBool only runs this executor if conditional is true
func (e Executor) IfBool(conditional bool) Executor {
return e.If(func(ctx context.Context) bool {
return conditional
})
}
// Finally adds an executor to run after other executor
func (e Executor) Finally(finally Executor) Executor {
return func(ctx context.Context) error {
err := e(ctx)
err2 := finally(ctx)
if err2 != nil {
return fmt.Errorf("Error occurred running finally: %v (original error: %v)", err2, err)
}
return err
}
}
// Not return an inverted conditional
func (c Conditional) Not() Conditional {
return func(ctx context.Context) bool {
return !c(ctx)
}
}