From 6030610c04cc75219d6047ce51e57b45ea3c1fb9 Mon Sep 17 00:00:00 2001
From: fuxiaohei <fuxiaohei@vip.qq.com>
Date: Sat, 3 Sep 2022 15:57:53 +0800
Subject: [PATCH] feat: task can report step and final result

---
 client/client.go   |  20 ++++++
 cmd/root.go        |   8 ++-
 go.sum             |   8 ---
 poller/poller.go   |   7 ++
 runtime/runtime.go |   7 +-
 runtime/task.go    | 173 +++++++++++++++++++++++++++++++++++++++------
 runtime/taskmap.go |  32 +++++++++
 7 files changed, 219 insertions(+), 36 deletions(-)
 create mode 100644 runtime/taskmap.go

diff --git a/client/client.go b/client/client.go
index 55538bc..6716c9e 100644
--- a/client/client.go
+++ b/client/client.go
@@ -31,3 +31,23 @@ type Client interface {
 	// UpdateStep updates the build step.
 	UpdateStep(ctx context.Context, args *runnerv1.UpdateStepRequest) error
 }
+
+type contextKey string
+
+const clientContextKey = contextKey("gitea.rpc.client")
+
+// FromContext returns the client from the context.
+func FromContext(ctx context.Context) Client {
+	val := ctx.Value(clientContextKey)
+	if val != nil {
+		if c, ok := val.(Client); ok {
+			return c
+		}
+	}
+	return nil
+}
+
+// WithClient returns a new context with the given client.
+func WithClient(ctx context.Context, c Client) context.Context {
+	return context.WithValue(ctx, clientContextKey, c)
+}
diff --git a/cmd/root.go b/cmd/root.go
index 7569b7f..d56c5d8 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -3,6 +3,7 @@ package cmd
 import (
 	"context"
 	"os"
+	"strconv"
 
 	"gitea.com/gitea/act_runner/engine"
 	"gitea.com/gitea/act_runner/runtime"
@@ -30,7 +31,7 @@ func initLogging(cfg Config) {
 }
 
 func Execute(ctx context.Context) {
-	task := runtime.NewTask()
+	task := runtime.NewTask(0)
 
 	// ./act_runner
 	rootCmd := &cobra.Command{
@@ -74,7 +75,8 @@ func runRoot(ctx context.Context, task *runtime.Task) func(cmd *cobra.Command, a
 			log.WithError(err).Fatalln("failed to connect docker daemon engine")
 		}
 
-		task.JobID = jobID
-		return task.Run(ctx)
+		task.BuildID, _ = strconv.ParseInt(jobID, 10, 64)
+		task.Run(ctx)
+		return nil
 	}
 }
diff --git a/go.sum b/go.sum
index ee1d418..7f7ec6f 100644
--- a/go.sum
+++ b/go.sum
@@ -25,14 +25,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY=
 gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY=
-gitea.com/gitea/proto-go v0.0.0-20220817054638-17fb0016dd41 h1:FIGF6szYd3lBIwvbeedfU5Lc7uG1Xpzi7bkktS6Vdvg=
-gitea.com/gitea/proto-go v0.0.0-20220817054638-17fb0016dd41/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
-gitea.com/gitea/proto-go v0.0.0-20220828011358-d0a015a5b095 h1:Ng3GDJLYpsG3lYdaqDzeZFkRm5ShA2V+LWJSHRD0IQ0=
-gitea.com/gitea/proto-go v0.0.0-20220828011358-d0a015a5b095/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
-gitea.com/gitea/proto-go v0.0.0-20220828031749-616e40329b57 h1:eVM6m3h5KpmJM2+LEqroENFaMs2kAo8QNIPyMgho9jg=
-gitea.com/gitea/proto-go v0.0.0-20220828031749-616e40329b57/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
-gitea.com/gitea/proto-go v0.0.0-20220901061207-b88901a1b9bc h1:kTVjwKxXma2yAdgXz8T1tiJihtWFK8jGLqArX2NownM=
-gitea.com/gitea/proto-go v0.0.0-20220901061207-b88901a1b9bc/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
 gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134 h1:5ofH0FGEkIj/P9a6oFDgkdmGSWow1yD1uubiftMA2Kw=
 gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y=
 github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
diff --git a/poller/poller.go b/poller/poller.go
index 17f2a7a..fede31e 100644
--- a/poller/poller.go
+++ b/poller/poller.go
@@ -93,5 +93,12 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
 		return nil
 	}
 
+	// FIXME: for testing task
+	// stage.Id = 111
+	// stage.BuildId = 1222
+
+	// set client to context, so that the stage can use it to
+	ctx = client.WithClient(ctx, p.Client)
+
 	return p.Dispatch(ctx, stage)
 }
diff --git a/runtime/runtime.go b/runtime/runtime.go
index b8d51e8..1f463af 100644
--- a/runtime/runtime.go
+++ b/runtime/runtime.go
@@ -29,10 +29,7 @@ func (s *Runner) Run(ctx context.Context, stage *runnerv1.Stage) error {
 		WithField("runner.BuildID", stage.BuildId)
 
 	l.Info("start running pipeline")
-	// TODO: docker runner with stage data
-	// task.Run is blocking, so we need to use goroutine to run it in background
-	// return task metadata and status to the server
-	task := NewTask()
 
-	return task.Run(ctx)
+	// TODO: use ctx to transfer usage information
+	return startTask(stage.BuildId, ctx)
 }
diff --git a/runtime/task.go b/runtime/task.go
index c3f0292..5b4ef74 100644
--- a/runtime/task.go
+++ b/runtime/task.go
@@ -5,12 +5,18 @@ import (
 	"fmt"
 	"os"
 	"path/filepath"
+	"sync"
+	"time"
 
+	"gitea.com/gitea/act_runner/client"
 	"github.com/nektos/act/pkg/artifacts"
 	"github.com/nektos/act/pkg/common"
 	"github.com/nektos/act/pkg/model"
 	"github.com/nektos/act/pkg/runner"
+	"github.com/sirupsen/logrus"
 	log "github.com/sirupsen/logrus"
+
+	runnerv1 "gitea.com/gitea/proto-go/runner/v1"
 )
 
 type TaskInput struct {
@@ -54,6 +60,7 @@ type TaskInput struct {
 
 type taskLogHook struct {
 	entries []*log.Entry
+	lock    sync.Mutex
 }
 
 func (h *taskLogHook) Levels() []log.Level {
@@ -62,27 +69,69 @@ func (h *taskLogHook) Levels() []log.Level {
 
 func (h *taskLogHook) Fire(entry *log.Entry) error {
 	if flag, ok := entry.Data["raw_output"]; ok {
+		h.lock.Lock()
 		if flagVal, ok := flag.(bool); flagVal && ok {
 			log.Infof("task log: %s", entry.Message)
 			h.entries = append(h.entries, entry)
 		}
+		h.lock.Unlock()
 	}
 	return nil
 }
 
-type Task struct {
-	JobID   string
-	Input   *TaskInput
-	LogHook *taskLogHook
+func (h *taskLogHook) swapLogs() []*log.Entry {
+	if len(h.entries) == 0 {
+		return nil
+	}
+	h.lock.Lock()
+	entries := h.entries
+	h.entries = nil
+	h.lock.Unlock()
+	return entries
 }
 
-func NewTask() *Task {
+type TaskState int
+
+const (
+	// TaskStateUnknown is the default state
+	TaskStateUnknown TaskState = iota
+	// TaskStatePending is the pending state
+	// pending means task is received, parsing actions and preparing to run
+	TaskStatePending
+	// TaskStateRunning is the state when the task is running
+	// running means task is running
+	TaskStateRunning
+	// TaskStateSuccess is the state when the task is successful
+	// success means task is successful without any error
+	TaskStateSuccess
+	// TaskStateFailure is the state when the task is failed
+	// failure means task is failed with error
+	TaskStateFailure
+)
+
+type Task struct {
+	BuildID int64
+	Input   *TaskInput
+
+	logHook *taskLogHook
+	state   TaskState
+	client  client.Client
+	log     *logrus.Entry
+}
+
+// newTask creates a new task
+func NewTask(buildID int64) *Task {
 	task := &Task{
 		Input: &TaskInput{
 			reuseContainers: true,
 			ForgeInstance:   "gitea",
 		},
-		LogHook: &taskLogHook{},
+		BuildID: buildID,
+
+		state:   TaskStatePending,
+		client:  nil,
+		log:     logrus.WithField("buildID", buildID),
+		logHook: &taskLogHook{},
 	}
 	task.Input.repoDirectory, _ = os.Getwd()
 	return task
@@ -109,14 +158,86 @@ func demoPlatforms() map[string]string {
 	}
 }
 
-func (t *Task) Run(ctx context.Context) error {
+// reportFailure reports the failure of the task
+func (t *Task) reportFailure(ctx context.Context, err error) {
+	t.state = TaskStateFailure
+	finishTask(t.BuildID)
+
+	t.log.Errorf("task failed: %v", err)
+
+	if t.client == nil {
+		// TODO: fill the step request
+		stepRequest := &runnerv1.UpdateStepRequest{}
+		t.client.UpdateStep(ctx, stepRequest)
+		return
+	}
+
+}
+
+func (t *Task) startReporting(interval int64, ctx context.Context) {
+	for {
+		time.Sleep(time.Duration(interval) * time.Second)
+		if t.state == TaskStateSuccess || t.state == TaskStateFailure {
+			t.log.Debugf("task reporting stopped")
+			break
+		}
+		t.reportStep(ctx)
+	}
+}
+
+// reportStep reports the step of the task
+func (t *Task) reportStep(ctx context.Context) {
+	if t.client == nil {
+		return
+	}
+	logValues := t.logHook.swapLogs()
+	if len(logValues) == 0 {
+		t.log.Debugf("no log to report")
+		return
+	}
+	t.log.Infof("reporting %d logs", len(logValues))
+
+	// TODO: fill the step request
+	stepRequest := &runnerv1.UpdateStepRequest{}
+	t.client.UpdateStep(ctx, stepRequest)
+}
+
+// reportSuccess reports the success of the task
+func (t *Task) reportSuccess(ctx context.Context) {
+	t.state = TaskStateSuccess
+	finishTask(t.BuildID)
+
+	t.log.Infof("task success")
+
+	if t.client == nil {
+		return
+	}
+
+	// TODO: fill the step request
+	stepRequest := &runnerv1.UpdateStepRequest{}
+	t.client.UpdateStep(ctx, stepRequest)
+}
+
+func (t *Task) Run(ctx context.Context) {
+	// get client for context, use for reporting
+	t.client = client.FromContext(ctx)
+	if t.client == nil {
+		t.log.Warnf("no client found in context")
+	} else {
+		t.log.Infof("client found in context")
+	}
+
 	workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory)
 	if err != nil {
-		return err
+		t.reportFailure(ctx, err)
+		return
 	}
+	t.log.Debugf("workflows path: %s", workflowsPath)
+
 	planner, err := model.NewWorkflowPlanner(workflowsPath, false)
 	if err != nil {
-		return err
+		t.reportFailure(ctx, err)
+		return
 	}
 
 	var eventName string
@@ -124,7 +245,7 @@ func (t *Task) Run(ctx context.Context) error {
 	if len(events) > 0 {
 		// set default event type to first event
 		// this way user dont have to specify the event.
-		log.Debugf("Using detected workflow event: %s", events[0])
+		t.log.Debugf("Using detected workflow event: %s", events[0])
 		eventName = events[0]
 	} else {
 		if plan := planner.PlanEvent("push"); plan != nil {
@@ -134,18 +255,22 @@ func (t *Task) Run(ctx context.Context) error {
 
 	// build the plan for this run
 	var plan *model.Plan
-	jobID := t.JobID
+	jobID := ""
+	if t.BuildID > 0 {
+		jobID = fmt.Sprintf("%d", t.BuildID)
+	}
 	if jobID != "" {
-		log.Debugf("Planning job: %s", jobID)
+		t.log.Infof("Planning job: %s", jobID)
 		plan = planner.PlanJob(jobID)
 	} else {
-		log.Debugf("Planning event: %s", eventName)
+		t.log.Infof("Planning event: %s", eventName)
 		plan = planner.PlanEvent(eventName)
 	}
 
 	curDir, err := os.Getwd()
 	if err != nil {
-		return err
+		t.reportFailure(ctx, err)
+		return
 	}
 
 	// run the plan
@@ -182,21 +307,29 @@ func (t *Task) Run(ctx context.Context) error {
 	}
 	r, err := runner.New(config)
 	if err != nil {
-		return fmt.Errorf("new config failed: %v", err)
+		t.reportFailure(ctx, err)
+		return
 	}
 
 	cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)
+	t.log.Debugf("artifacts server started at %s:%s", input.artifactServerPath, input.artifactServerPort)
 
 	executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
 		cancel()
 		return nil
 	})
 
-	ctx = common.WithLoggerHook(ctx, t.LogHook)
+	t.log.Infof("workflow prepared")
+
+	// add logger recorders
+	ctx = common.WithLoggerHook(ctx, t.logHook)
+
+	go t.startReporting(1, ctx)
+
 	if err := executor(ctx); err != nil {
-		log.Warnf("workflow execution failed:%v, logs: %d", err, len(t.LogHook.entries))
-		return err
+		t.reportFailure(ctx, err)
+		return
 	}
-	log.Infof("workflow completed, logs: %d", len(t.LogHook.entries))
-	return nil
+
+	t.reportSuccess(ctx)
 }
diff --git a/runtime/taskmap.go b/runtime/taskmap.go
new file mode 100644
index 0000000..37369a1
--- /dev/null
+++ b/runtime/taskmap.go
@@ -0,0 +1,32 @@
+package runtime
+
+import (
+	"context"
+	"fmt"
+	"sync"
+)
+
+var globalTaskMap sync.Map
+
+// startTask adds the task to global map
+func startTask(buildID int64, ctx context.Context) error {
+	_, exist := globalTaskMap.Load(buildID)
+	if exist {
+		return fmt.Errorf("task %d already exists", buildID)
+	}
+
+	task := NewTask(buildID)
+
+	// set task ve to global map
+	// when task is done or canceled, it will be removed from the map
+	globalTaskMap.Store(buildID, task)
+
+	go task.Run(ctx)
+
+	return nil
+}
+
+// finishTask removes the task from global map
+func finishTask(buildID int64) {
+	globalTaskMap.Delete(buildID)
+}