act/pkg/model/planner.go

222 lines
4.9 KiB
Go
Raw Normal View History

2020-02-04 18:38:41 -06:00
package model
import (
"fmt"
2020-02-04 18:38:41 -06:00
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
log "github.com/sirupsen/logrus"
)
// WorkflowPlanner contains methods for creating plans
type WorkflowPlanner interface {
PlanEvent(eventName string) *Plan
PlanJob(jobName string) *Plan
GetEvents() []string
}
// Plan contains a list of stages to run in series
type Plan struct {
Stages []*Stage
}
// Stage contains a list of runs to execute in parallel
type Stage struct {
Runs []*Run
}
// Run represents a job from a workflow that needs to be run
type Run struct {
Workflow *Workflow
JobID string
}
func (r *Run) String() string {
jobName := r.Job().Name
if jobName == "" {
jobName = r.JobID
}
return fmt.Sprintf("%s/%s", r.Workflow.Name, jobName)
}
// Job returns the job for this Run
func (r *Run) Job() *Job {
return r.Workflow.GetJob(r.JobID)
}
2020-02-04 18:38:41 -06:00
// NewWorkflowPlanner will load all workflows from a directory
func NewWorkflowPlanner(dirname string) (WorkflowPlanner, error) {
log.Debugf("Loading workflows from '%s'", dirname)
files, err := ioutil.ReadDir(dirname)
if err != nil {
return nil, err
}
wp := new(workflowPlanner)
for _, file := range files {
ext := filepath.Ext(file.Name())
if ext == ".yml" || ext == ".yaml" {
f, err := os.Open(filepath.Join(dirname, file.Name()))
if err != nil {
return nil, err
}
workflow, err := ReadWorkflow(f)
if err != nil {
f.Close()
return nil, err
}
if workflow.Name == "" {
workflow.Name = file.Name()
}
2020-02-04 18:38:41 -06:00
wp.workflows = append(wp.workflows, workflow)
f.Close()
}
}
return wp, nil
}
type workflowPlanner struct {
workflows []*Workflow
}
// PlanEvent builds a new list of runs to execute in parallel for an event name
func (wp *workflowPlanner) PlanEvent(eventName string) *Plan {
plan := new(Plan)
for _, w := range wp.workflows {
for _, e := range w.On() {
if e == eventName {
plan.mergeStages(createStages(w, w.GetJobIDs()...))
}
2020-02-04 18:38:41 -06:00
}
}
return plan
}
// PlanJob builds a new run to execute in parallel for a job name
func (wp *workflowPlanner) PlanJob(jobName string) *Plan {
plan := new(Plan)
for _, w := range wp.workflows {
plan.mergeStages(createStages(w, jobName))
}
return plan
}
// GetEvents gets all the events in the workflows file
func (wp *workflowPlanner) GetEvents() []string {
events := make([]string, 0)
for _, w := range wp.workflows {
found := false
for _, e := range events {
for _, we := range w.On() {
if e == we {
found = true
break
}
}
if found {
2020-02-04 18:38:41 -06:00
break
}
}
if !found {
events = append(events, w.On()...)
2020-02-04 18:38:41 -06:00
}
}
// sort the list based on depth of dependencies
sort.Slice(events, func(i, j int) bool {
return events[i] < events[j]
})
return events
}
// GetJobIDs will get all the job names in the stage
func (s *Stage) GetJobIDs() []string {
names := make([]string, 0)
for _, r := range s.Runs {
names = append(names, r.JobID)
}
return names
}
// Merge stages with existing stages in plan
func (p *Plan) mergeStages(stages []*Stage) {
newStages := make([]*Stage, int(math.Max(float64(len(p.Stages)), float64(len(stages)))))
for i := 0; i < len(newStages); i++ {
newStages[i] = new(Stage)
if i >= len(p.Stages) {
newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...)
2020-02-04 18:38:41 -06:00
} else if i >= len(stages) {
newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...)
2020-02-04 18:38:41 -06:00
} else {
newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...)
newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...)
2020-02-04 18:38:41 -06:00
}
}
p.Stages = newStages
}
func createStages(w *Workflow, jobIDs ...string) []*Stage {
// first, build a list of all the necessary jobs to run, and their dependencies
jobDependencies := make(map[string][]string)
for len(jobIDs) > 0 {
newJobIDs := make([]string, 0)
for _, jID := range jobIDs {
// make sure we haven't visited this job yet
if _, ok := jobDependencies[jID]; !ok {
if job := w.GetJob(jID); job != nil {
jobDependencies[jID] = job.Needs()
newJobIDs = append(newJobIDs, job.Needs()...)
2020-02-04 18:38:41 -06:00
}
}
}
jobIDs = newJobIDs
}
// next, build an execution graph
stages := make([]*Stage, 0)
for len(jobDependencies) > 0 {
stage := new(Stage)
for jID, jDeps := range jobDependencies {
// make sure all deps are in the graph already
if listInStages(jDeps, stages...) {
stage.Runs = append(stage.Runs, &Run{
Workflow: w,
JobID: jID,
})
delete(jobDependencies, jID)
}
}
if len(stage.Runs) == 0 {
log.Fatalf("Unable to build dependency graph!")
}
stages = append(stages, stage)
}
return stages
}
// return true iff all strings in srcList exist in at least one of the stages
func listInStages(srcList []string, stages ...*Stage) bool {
for _, src := range srcList {
found := false
for _, stage := range stages {
for _, search := range stage.GetJobIDs() {
if src == search {
found = true
}
}
}
if !found {
return false
}
}
return true
}