3ac2b726f2
* Log incoming jobs. Log the full contents of the job protobuf to make debugging jobs easier * Ensure that the parallel executor always uses at least one thread. The caller may mis-calculate the number of CPUs as zero, in which case ensure that at least one thread is spawned. * Use runtime.NumCPU for CPU counts. For hosts without docker, GetHostInfo() returns a blank struct which has zero CPUs and causes downstream trouble. --------- Co-authored-by: Paul Armstrong <psa@users.noreply.gitea.com> Co-authored-by: Jason Song <i@wolfogre.com>
196 lines
4.4 KiB
Go
196 lines
4.4 KiB
Go
package common
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Warning that implements `error` but safe to ignore
|
|
type Warning struct {
|
|
Message string
|
|
}
|
|
|
|
// Error the contract for error
|
|
func (w Warning) Error() string {
|
|
return w.Message
|
|
}
|
|
|
|
// Warningf create a warning
|
|
func Warningf(format string, args ...interface{}) Warning {
|
|
w := Warning{
|
|
Message: fmt.Sprintf(format, args...),
|
|
}
|
|
return w
|
|
}
|
|
|
|
// Executor define contract for the steps of a workflow
|
|
type Executor func(ctx context.Context) error
|
|
|
|
// Conditional define contract for the conditional predicate
|
|
type Conditional func(ctx context.Context) bool
|
|
|
|
// NewInfoExecutor is an executor that logs messages
|
|
func NewInfoExecutor(format string, args ...interface{}) Executor {
|
|
return func(ctx context.Context) error {
|
|
logger := Logger(ctx)
|
|
logger.Infof(format, args...)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NewDebugExecutor is an executor that logs messages
|
|
func NewDebugExecutor(format string, args ...interface{}) Executor {
|
|
return func(ctx context.Context) error {
|
|
logger := Logger(ctx)
|
|
logger.Debugf(format, args...)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NewPipelineExecutor creates a new executor from a series of other executors
|
|
func NewPipelineExecutor(executors ...Executor) Executor {
|
|
if len(executors) == 0 {
|
|
return func(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
}
|
|
var rtn Executor
|
|
for _, executor := range executors {
|
|
if rtn == nil {
|
|
rtn = executor
|
|
} else {
|
|
rtn = rtn.Then(executor)
|
|
}
|
|
}
|
|
return rtn
|
|
}
|
|
|
|
// NewConditionalExecutor creates a new executor based on conditions
|
|
func NewConditionalExecutor(conditional Conditional, trueExecutor Executor, falseExecutor Executor) Executor {
|
|
return func(ctx context.Context) error {
|
|
if conditional(ctx) {
|
|
if trueExecutor != nil {
|
|
return trueExecutor(ctx)
|
|
}
|
|
} else {
|
|
if falseExecutor != nil {
|
|
return falseExecutor(ctx)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NewErrorExecutor creates a new executor that always errors out
|
|
func NewErrorExecutor(err error) Executor {
|
|
return func(ctx context.Context) error {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// NewParallelExecutor creates a new executor from a parallel of other executors
|
|
func NewParallelExecutor(parallel int, executors ...Executor) Executor {
|
|
return func(ctx context.Context) error {
|
|
work := make(chan Executor, len(executors))
|
|
errs := make(chan error, len(executors))
|
|
|
|
if 1 > parallel {
|
|
log.Infof("Parallel tasks (%d) below minimum, setting to 1", parallel)
|
|
parallel = 1
|
|
}
|
|
|
|
for i := 0; i < parallel; i++ {
|
|
go func(work <-chan Executor, errs chan<- error) {
|
|
for executor := range work {
|
|
errs <- executor(ctx)
|
|
}
|
|
}(work, errs)
|
|
}
|
|
|
|
for i := 0; i < len(executors); i++ {
|
|
work <- executors[i]
|
|
}
|
|
close(work)
|
|
|
|
// Executor waits all executors to cleanup these resources.
|
|
var firstErr error
|
|
for i := 0; i < len(executors); i++ {
|
|
err := <-errs
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
return firstErr
|
|
}
|
|
}
|
|
|
|
// Then runs another executor if this executor succeeds
|
|
func (e Executor) Then(then Executor) Executor {
|
|
return func(ctx context.Context) error {
|
|
err := e(ctx)
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case Warning:
|
|
Logger(ctx).Warning(err.Error())
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
return then(ctx)
|
|
}
|
|
}
|
|
|
|
// If only runs this executor if conditional is true
|
|
func (e Executor) If(conditional Conditional) Executor {
|
|
return func(ctx context.Context) error {
|
|
if conditional(ctx) {
|
|
return e(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// IfNot only runs this executor if conditional is true
|
|
func (e Executor) IfNot(conditional Conditional) Executor {
|
|
return func(ctx context.Context) error {
|
|
if !conditional(ctx) {
|
|
return e(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// IfBool only runs this executor if conditional is true
|
|
func (e Executor) IfBool(conditional bool) Executor {
|
|
return e.If(func(ctx context.Context) bool {
|
|
return conditional
|
|
})
|
|
}
|
|
|
|
// Finally adds an executor to run after other executor
|
|
func (e Executor) Finally(finally Executor) Executor {
|
|
return func(ctx context.Context) error {
|
|
err := e(ctx)
|
|
err2 := finally(ctx)
|
|
if err2 != nil {
|
|
return fmt.Errorf("Error occurred running finally: %v (original error: %v)", err2, err)
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Not return an inverted conditional
|
|
func (c Conditional) Not() Conditional {
|
|
return func(ctx context.Context) bool {
|
|
return !c(ctx)
|
|
}
|
|
}
|