yume-runner/runtime/task.go
HesterG 048b2e630f
Fix wrong last step duration when job failed ()
This PR is to fix the wrong last step duration when job failed like shown in the screenshot.
The reason is because when job failed, `Fire` function did not pass in Time, and `r.state.StoppedAt` is by default set to `0001-01-01 08:05:43 +0805 LMT`, which is later on reported to gitea by `UpdateTask`, which calls `UpdateTaskByState` to update the `task.Stopped`, and `task.Stopped` is used in `FullSteps`, resulting in wrong calcaulation of last step duration.

Co-authored-by: nickname <test@123.com>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Reviewed-on: https://gitea.com/gitea/act_runner/pulls/41
Reviewed-by: Jason Song <i@wolfogre.com>
Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com>
Co-authored-by: HesterG <hesterg@noreply.gitea.io>
Co-committed-by: HesterG <hesterg@noreply.gitea.io>
2023-03-08 15:07:09 +01:00

266 lines
7.5 KiB
Go

package runtime
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"github.com/nektos/act/pkg/artifacts"
"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/model"
"github.com/nektos/act/pkg/runner"
log "github.com/sirupsen/logrus"
"codeberg.org/forgejo/runner/client"
)
var globalTaskMap sync.Map
type TaskInput struct {
repoDirectory string
// actor string
// workdir string
// workflowsPath string
// autodetectEvent bool
// eventPath string
// reuseContainers bool
// bindWorkdir bool
// secrets []string
envs map[string]string
// platforms []string
// dryrun bool
forcePull bool
forceRebuild bool
// noOutput bool
// envfile string
// secretfile string
insecureSecrets bool
// defaultBranch string
privileged bool
usernsMode string
containerArchitecture string
containerDaemonSocket string
// noWorkflowRecurse bool
useGitIgnore bool
containerCapAdd []string
containerCapDrop []string
// autoRemove bool
artifactServerPath string
artifactServerPort string
jsonLogger bool
// noSkipCheckout bool
// remoteName string
EnvFile string
containerNetworkMode string
}
type Task struct {
BuildID int64
Input *TaskInput
client client.Client
log *log.Entry
platformPicker func([]string) string
}
// NewTask creates a new task
func NewTask(forgeInstance string, buildID int64, client client.Client, runnerEnvs map[string]string, picker func([]string) string) *Task {
task := &Task{
Input: &TaskInput{
envs: runnerEnvs,
containerNetworkMode: "bridge", // TODO should be configurable
},
BuildID: buildID,
client: client,
log: log.WithField("buildID", buildID),
platformPicker: picker,
}
task.Input.repoDirectory, _ = os.Getwd()
return task
}
// getWorkflowsPath return the workflows directory, it will try .gitea first and then fallback to .github
func getWorkflowsPath(dir string) (string, error) {
p := filepath.Join(dir, ".gitea/workflows")
_, err := os.Stat(p)
if err != nil {
if !os.IsNotExist(err) {
return "", err
}
return filepath.Join(dir, ".github/workflows"), nil
}
return p, nil
}
func getToken(task *runnerv1.Task) string {
token := task.Secrets["GITHUB_TOKEN"]
if task.Secrets["GITEA_TOKEN"] != "" {
token = task.Secrets["GITEA_TOKEN"]
}
if task.Context.Fields["token"].GetStringValue() != "" {
token = task.Context.Fields["token"].GetStringValue()
}
return token
}
func (t *Task) Run(ctx context.Context, task *runnerv1.Task, runnerName string) (lastErr error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
_, exist := globalTaskMap.Load(task.Id)
if exist {
return fmt.Errorf("task %d already exists", task.Id)
}
// set task ve to global map
// when task is done or canceled, it will be removed from the map
globalTaskMap.Store(task.Id, t)
defer globalTaskMap.Delete(task.Id)
lastWords := ""
reporter := NewReporter(ctx, cancel, t.client, task)
defer func() {
// set the job to failed on an error return value
if lastErr != nil {
reporter.Fire(&log.Entry{
Data: log.Fields{
"jobResult": "failure",
},
Time: time.Now(),
})
}
_ = reporter.Close(lastWords)
}()
reporter.RunDaemon()
reporter.Logf("%s received task %v of job %v", runnerName, task.Id, task.Context.Fields["job"].GetStringValue())
workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory)
if err != nil {
lastWords = err.Error()
return err
}
t.log.Debugf("workflows path: %s", workflowsPath)
workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
if err != nil {
lastWords = err.Error()
return err
}
var plan *model.Plan
jobIDs := workflow.GetJobIDs()
if len(jobIDs) != 1 {
err := fmt.Errorf("multiple jobs found: %v", jobIDs)
lastWords = err.Error()
return err
}
jobID := jobIDs[0]
plan = model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
job := workflow.GetJob(jobID)
reporter.ResetSteps(len(job.Steps))
log.Infof("plan: %+v", plan.Stages[0].Runs)
token := getToken(task)
dataContext := task.Context.Fields
log.Infof("task %v repo is %v %v %v", task.Id, dataContext["repository"].GetStringValue(),
dataContext["gitea_default_actions_url"].GetStringValue(),
t.client.Address())
preset := &model.GithubContext{
Event: dataContext["event"].GetStructValue().AsMap(),
RunID: dataContext["run_id"].GetStringValue(),
RunNumber: dataContext["run_number"].GetStringValue(),
Actor: dataContext["actor"].GetStringValue(),
Repository: dataContext["repository"].GetStringValue(),
EventName: dataContext["event_name"].GetStringValue(),
Sha: dataContext["sha"].GetStringValue(),
Ref: dataContext["ref"].GetStringValue(),
RefName: dataContext["ref_name"].GetStringValue(),
RefType: dataContext["ref_type"].GetStringValue(),
HeadRef: dataContext["head_ref"].GetStringValue(),
BaseRef: dataContext["base_ref"].GetStringValue(),
Token: token,
RepositoryOwner: dataContext["repository_owner"].GetStringValue(),
RetentionDays: dataContext["retention_days"].GetStringValue(),
}
eventJSON, err := json.Marshal(preset.Event)
if err != nil {
lastWords = err.Error()
return err
}
maxLifetime := 3 * time.Hour
if deadline, ok := ctx.Deadline(); ok {
maxLifetime = time.Until(deadline)
}
input := t.Input
config := &runner.Config{
Workdir: "." + string(filepath.Separator),
BindWorkdir: false,
ReuseContainers: false,
ForcePull: input.forcePull,
ForceRebuild: input.forceRebuild,
LogOutput: true,
JSONLogger: input.jsonLogger,
Env: input.envs,
Secrets: task.Secrets,
InsecureSecrets: input.insecureSecrets,
Privileged: input.privileged,
UsernsMode: input.usernsMode,
ContainerArchitecture: input.containerArchitecture,
ContainerDaemonSocket: input.containerDaemonSocket,
UseGitIgnore: input.useGitIgnore,
GitHubInstance: t.client.Address(),
ContainerCapAdd: input.containerCapAdd,
ContainerCapDrop: input.containerCapDrop,
AutoRemove: true,
ArtifactServerPath: input.artifactServerPath,
ArtifactServerPort: input.artifactServerPort,
NoSkipCheckout: true,
PresetGitHubContext: preset,
EventJSON: string(eventJSON),
ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id),
ContainerMaxLifetime: maxLifetime,
ContainerNetworkMode: input.containerNetworkMode,
DefaultActionInstance: dataContext["gitea_default_actions_url"].GetStringValue(),
PlatformPicker: t.platformPicker,
}
r, err := runner.New(config)
if err != nil {
lastWords = err.Error()
return err
}
artifactCancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)
t.log.Debugf("artifacts server started at %s:%s", input.artifactServerPath, input.artifactServerPort)
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
artifactCancel()
return nil
})
t.log.Infof("workflow prepared")
reporter.Logf("workflow prepared")
// add logger recorders
ctx = common.WithLoggerHook(ctx, reporter)
if err := executor(ctx); err != nil {
lastWords = err.Error()
return err
}
return nil
}