2020-02-04 18:38:41 -06:00
package model
import (
2021-04-02 09:01:45 -05:00
"fmt"
2020-08-31 16:50:01 -05:00
"io"
2020-02-04 18:38:41 -06:00
"io/ioutil"
"math"
"os"
"path/filepath"
2021-04-02 09:01:45 -05:00
"regexp"
2020-02-04 18:38:41 -06:00
"sort"
2020-08-31 16:50:01 -05:00
"github.com/pkg/errors"
2020-02-04 18:38:41 -06:00
log "github.com/sirupsen/logrus"
)
// WorkflowPlanner contains methods for creating plans
type WorkflowPlanner interface {
PlanEvent ( eventName string ) * Plan
PlanJob ( jobName string ) * Plan
GetEvents ( ) [ ] string
}
// Plan contains a list of stages to run in series
type Plan struct {
Stages [ ] * Stage
}
// Stage contains a list of runs to execute in parallel
type Stage struct {
Runs [ ] * Run
}
// Run represents a job from a workflow that needs to be run
type Run struct {
Workflow * Workflow
JobID string
}
2020-02-07 00:17:58 -06:00
func ( r * Run ) String ( ) string {
jobName := r . Job ( ) . Name
if jobName == "" {
jobName = r . JobID
}
2020-02-27 01:29:43 -06:00
return jobName
2020-02-07 00:17:58 -06:00
}
// Job returns the job for this Run
func ( r * Run ) Job ( ) * Job {
return r . Workflow . GetJob ( r . JobID )
}
2020-05-26 22:29:50 -05:00
// NewWorkflowPlanner will load a specific workflow or all workflows from a directory
func NewWorkflowPlanner ( path string ) ( WorkflowPlanner , error ) {
fi , err := os . Stat ( path )
if err != nil {
return nil , err
}
var files [ ] os . FileInfo
var dirname string
if fi . IsDir ( ) {
log . Debugf ( "Loading workflows from '%s'" , path )
dirname = path
files , err = ioutil . ReadDir ( path )
} else {
log . Debugf ( "Loading workflow '%s'" , path )
dirname , err = filepath . Abs ( filepath . Dir ( path ) )
files = [ ] os . FileInfo { fi }
}
2020-02-04 18:38:41 -06:00
if err != nil {
return nil , err
}
wp := new ( workflowPlanner )
for _ , file := range files {
ext := filepath . Ext ( file . Name ( ) )
if ext == ".yml" || ext == ".yaml" {
f , err := os . Open ( filepath . Join ( dirname , file . Name ( ) ) )
if err != nil {
return nil , err
}
2020-02-17 00:04:13 -06:00
log . Debugf ( "Reading workflow '%s'" , f . Name ( ) )
2020-02-04 18:38:41 -06:00
workflow , err := ReadWorkflow ( f )
if err != nil {
f . Close ( )
2020-08-31 16:50:01 -05:00
if err == io . EOF {
return nil , errors . WithMessagef ( err , "unable to read workflow, %s file is empty" , file . Name ( ) )
}
2020-02-04 18:38:41 -06:00
return nil , err
}
2020-02-07 00:17:58 -06:00
if workflow . Name == "" {
workflow . Name = file . Name ( )
}
2021-04-02 09:01:45 -05:00
jobNameRegex := regexp . MustCompile ( ` ^([[:alpha:]_][[:alnum:]_\-]*)$ ` )
for k := range workflow . Jobs {
if ok := jobNameRegex . MatchString ( k ) ; ! ok {
return nil , fmt . Errorf ( "The workflow is not valid. %s: Job name %s is invalid. Names must start with a letter or '_' and contain only alphanumeric characters, '-', or '_'" , workflow . Name , k )
}
}
2020-02-04 18:38:41 -06:00
wp . workflows = append ( wp . workflows , workflow )
f . Close ( )
}
}
return wp , nil
}
type workflowPlanner struct {
workflows [ ] * Workflow
}
// PlanEvent builds a new list of runs to execute in parallel for an event name
func ( wp * workflowPlanner ) PlanEvent ( eventName string ) * Plan {
plan := new ( Plan )
2020-10-09 00:28:01 -05:00
if len ( wp . workflows ) == 0 {
log . Debugf ( "no events found for workflow: %s" , eventName )
}
2020-02-04 18:38:41 -06:00
for _ , w := range wp . workflows {
2020-02-10 18:35:00 -06:00
for _ , e := range w . On ( ) {
if e == eventName {
plan . mergeStages ( createStages ( w , w . GetJobIDs ( ) ... ) )
}
2020-02-04 18:38:41 -06:00
}
}
return plan
}
// PlanJob builds a new run to execute in parallel for a job name
func ( wp * workflowPlanner ) PlanJob ( jobName string ) * Plan {
plan := new ( Plan )
2020-10-09 00:28:01 -05:00
if len ( wp . workflows ) == 0 {
log . Debugf ( "no jobs found for workflow: %s" , jobName )
}
2020-02-04 18:38:41 -06:00
for _ , w := range wp . workflows {
plan . mergeStages ( createStages ( w , jobName ) )
}
return plan
}
// GetEvents gets all the events in the workflows file
func ( wp * workflowPlanner ) GetEvents ( ) [ ] string {
events := make ( [ ] string , 0 )
for _ , w := range wp . workflows {
found := false
for _ , e := range events {
2020-02-10 18:35:00 -06:00
for _ , we := range w . On ( ) {
if e == we {
found = true
break
}
}
if found {
2020-02-04 18:38:41 -06:00
break
}
}
if ! found {
2020-02-10 18:35:00 -06:00
events = append ( events , w . On ( ) ... )
2020-02-04 18:38:41 -06:00
}
}
// sort the list based on depth of dependencies
sort . Slice ( events , func ( i , j int ) bool {
return events [ i ] < events [ j ]
} )
return events
}
2020-02-17 12:30:52 -06:00
// MaxRunNameLen determines the max name length of all jobs
func ( p * Plan ) MaxRunNameLen ( ) int {
maxRunNameLen := 0
for _ , stage := range p . Stages {
for _ , run := range stage . Runs {
runNameLen := len ( run . String ( ) )
if runNameLen > maxRunNameLen {
maxRunNameLen = runNameLen
}
}
}
return maxRunNameLen
}
2020-02-04 18:38:41 -06:00
// GetJobIDs will get all the job names in the stage
func ( s * Stage ) GetJobIDs ( ) [ ] string {
names := make ( [ ] string , 0 )
for _ , r := range s . Runs {
names = append ( names , r . JobID )
}
return names
}
// Merge stages with existing stages in plan
func ( p * Plan ) mergeStages ( stages [ ] * Stage ) {
newStages := make ( [ ] * Stage , int ( math . Max ( float64 ( len ( p . Stages ) ) , float64 ( len ( stages ) ) ) ) )
for i := 0 ; i < len ( newStages ) ; i ++ {
newStages [ i ] = new ( Stage )
if i >= len ( p . Stages ) {
2020-02-10 18:53:14 -06:00
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , stages [ i ] . Runs ... )
2020-02-04 18:38:41 -06:00
} else if i >= len ( stages ) {
2020-02-10 18:53:14 -06:00
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , p . Stages [ i ] . Runs ... )
2020-02-04 18:38:41 -06:00
} else {
2020-02-10 18:53:14 -06:00
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , p . Stages [ i ] . Runs ... )
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , stages [ i ] . Runs ... )
2020-02-04 18:38:41 -06:00
}
}
p . Stages = newStages
}
func createStages ( w * Workflow , jobIDs ... string ) [ ] * Stage {
// first, build a list of all the necessary jobs to run, and their dependencies
jobDependencies := make ( map [ string ] [ ] string )
for len ( jobIDs ) > 0 {
newJobIDs := make ( [ ] string , 0 )
for _ , jID := range jobIDs {
// make sure we haven't visited this job yet
if _ , ok := jobDependencies [ jID ] ; ! ok {
if job := w . GetJob ( jID ) ; job != nil {
2020-02-10 18:35:00 -06:00
jobDependencies [ jID ] = job . Needs ( )
newJobIDs = append ( newJobIDs , job . Needs ( ) ... )
2020-02-04 18:38:41 -06:00
}
}
}
jobIDs = newJobIDs
}
// next, build an execution graph
stages := make ( [ ] * Stage , 0 )
for len ( jobDependencies ) > 0 {
stage := new ( Stage )
for jID , jDeps := range jobDependencies {
// make sure all deps are in the graph already
if listInStages ( jDeps , stages ... ) {
stage . Runs = append ( stage . Runs , & Run {
Workflow : w ,
JobID : jID ,
} )
delete ( jobDependencies , jID )
}
}
if len ( stage . Runs ) == 0 {
log . Fatalf ( "Unable to build dependency graph!" )
}
stages = append ( stages , stage )
}
return stages
}
// return true iff all strings in srcList exist in at least one of the stages
func listInStages ( srcList [ ] string , stages ... * Stage ) bool {
for _ , src := range srcList {
found := false
for _ , stage := range stages {
for _ , search := range stage . GetJobIDs ( ) {
if src == search {
found = true
}
}
}
if ! found {
return false
}
}
return true
}