feat: send outputs

This commit is contained in:
Jason Song 2023-04-19 18:52:17 +08:00
parent 71ba3cd791
commit 6c16e9ad20
No known key found for this signature in database
GPG key ID: 8402EEEE4511A8B5
2 changed files with 46 additions and 17 deletions
internal
app/run
pkg/report

View file

@ -203,5 +203,7 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
// add logger recorders
ctx = common.WithLoggerHook(ctx, reporter)
return executor(ctx)
execErr := executor(ctx)
reporter.SetOutputs(job.Outputs)
return execErr
}

View file

@ -31,8 +31,11 @@ type Reporter struct {
logOffset int
logRows []*runnerv1.LogRow
logReplacer *strings.Replacer
state *runnerv1.TaskState
stateM sync.RWMutex
outputs map[string]string
sentOutputs map[string]bool
mutex sync.RWMutex
}
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
@ -52,12 +55,14 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
state: &runnerv1.TaskState{
Id: task.Id,
},
outputs: map[string]string{},
sentOutputs: map[string]bool{},
}
}
func (r *Reporter) ResetSteps(l int) {
r.stateM.Lock()
defer r.stateM.Unlock()
r.mutex.Lock()
defer r.mutex.Unlock()
for i := 0; i < l; i++ {
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
Id: int64(i),
@ -70,8 +75,8 @@ func (r *Reporter) Levels() []log.Level {
}
func (r *Reporter) Fire(entry *log.Entry) error {
r.stateM.Lock()
defer r.stateM.Unlock()
r.mutex.Lock()
defer r.mutex.Unlock()
log.WithFields(entry.Data).Trace(entry.Message)
@ -155,8 +160,8 @@ func (r *Reporter) RunDaemon() {
}
func (r *Reporter) Logf(format string, a ...interface{}) {
r.stateM.Lock()
defer r.stateM.Unlock()
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.duringSteps() {
r.logRows = append(r.logRows, &runnerv1.LogRow{
@ -166,10 +171,20 @@ func (r *Reporter) Logf(format string, a ...interface{}) {
}
}
func (r *Reporter) SetOutputs(outputs map[string]string) {
r.mutex.Lock()
defer r.mutex.Unlock()
for k, v := range outputs {
r.outputs[k] = v
r.sentOutputs[k] = false
}
}
func (r *Reporter) Close(lastWords string) error {
r.closed = true
r.stateM.Lock()
r.mutex.Lock()
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
if lastWords == "" {
lastWords = "Early termination"
@ -191,7 +206,7 @@ func (r *Reporter) Close(lastWords string) error {
Content: lastWords,
})
}
r.stateM.Unlock()
r.mutex.Unlock()
return retry.Do(func() error {
if err := r.ReportLog(true); err != nil {
@ -205,9 +220,9 @@ func (r *Reporter) ReportLog(noMore bool) error {
r.clientM.Lock()
defer r.clientM.Unlock()
r.stateM.RLock()
r.mutex.RLock()
rows := r.logRows
r.stateM.RUnlock()
r.mutex.RUnlock()
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
TaskId: r.state.Id,
@ -224,10 +239,10 @@ func (r *Reporter) ReportLog(noMore bool) error {
return fmt.Errorf("submitted logs are lost")
}
r.stateM.Lock()
r.mutex.Lock()
r.logRows = r.logRows[ack-r.logOffset:]
r.logOffset = ack
r.stateM.Unlock()
r.mutex.Unlock()
if noMore && ack < r.logOffset+len(rows) {
return fmt.Errorf("not all logs are submitted")
@ -240,17 +255,29 @@ func (r *Reporter) ReportState() error {
r.clientM.Lock()
defer r.clientM.Unlock()
r.stateM.RLock()
r.mutex.RLock()
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateM.RUnlock()
r.mutex.RUnlock()
outputs := make(map[string]string, len(r.outputs))
for k, v := range r.outputs {
if !r.sentOutputs[k] {
outputs[k] = v
}
}
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
State: state,
State: state,
Outputs: outputs,
}))
if err != nil {
return err
}
for _, k := range resp.Msg.SentOutputs {
r.sentOutputs[k] = true
}
if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
r.cancel()
}