fix: send outputs
This commit is contained in:
parent
6c16e9ad20
commit
170b3d8c7a
1 changed files with 30 additions and 30 deletions
|
@ -32,10 +32,9 @@ type Reporter struct {
|
|||
logRows []*runnerv1.LogRow
|
||||
logReplacer *strings.Replacer
|
||||
|
||||
state *runnerv1.TaskState
|
||||
outputs map[string]string
|
||||
sentOutputs map[string]bool
|
||||
mutex sync.RWMutex
|
||||
state *runnerv1.TaskState
|
||||
stateMu sync.RWMutex
|
||||
outputs sync.Map
|
||||
}
|
||||
|
||||
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
|
||||
|
@ -55,14 +54,12 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
|
|||
state: &runnerv1.TaskState{
|
||||
Id: task.Id,
|
||||
},
|
||||
outputs: map[string]string{},
|
||||
sentOutputs: map[string]bool{},
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) ResetSteps(l int) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
for i := 0; i < l; i++ {
|
||||
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
|
||||
Id: int64(i),
|
||||
|
@ -75,8 +72,8 @@ func (r *Reporter) Levels() []log.Level {
|
|||
}
|
||||
|
||||
func (r *Reporter) Fire(entry *log.Entry) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
|
||||
log.WithFields(entry.Data).Trace(entry.Message)
|
||||
|
||||
|
@ -160,8 +157,8 @@ func (r *Reporter) RunDaemon() {
|
|||
}
|
||||
|
||||
func (r *Reporter) Logf(format string, a ...interface{}) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
|
||||
if !r.duringSteps() {
|
||||
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
||||
|
@ -172,19 +169,21 @@ func (r *Reporter) Logf(format string, a ...interface{}) {
|
|||
}
|
||||
|
||||
func (r *Reporter) SetOutputs(outputs map[string]string) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
|
||||
for k, v := range outputs {
|
||||
r.outputs[k] = v
|
||||
r.sentOutputs[k] = false
|
||||
if _, ok := r.outputs.Load(k); ok {
|
||||
continue
|
||||
}
|
||||
r.outputs.Store(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) Close(lastWords string) error {
|
||||
r.closed = true
|
||||
|
||||
r.mutex.Lock()
|
||||
r.stateMu.Lock()
|
||||
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
if lastWords == "" {
|
||||
lastWords = "Early termination"
|
||||
|
@ -206,7 +205,7 @@ func (r *Reporter) Close(lastWords string) error {
|
|||
Content: lastWords,
|
||||
})
|
||||
}
|
||||
r.mutex.Unlock()
|
||||
r.stateMu.Unlock()
|
||||
|
||||
return retry.Do(func() error {
|
||||
if err := r.ReportLog(true); err != nil {
|
||||
|
@ -220,9 +219,9 @@ func (r *Reporter) ReportLog(noMore bool) error {
|
|||
r.clientM.Lock()
|
||||
defer r.clientM.Unlock()
|
||||
|
||||
r.mutex.RLock()
|
||||
r.stateMu.RLock()
|
||||
rows := r.logRows
|
||||
r.mutex.RUnlock()
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
|
||||
TaskId: r.state.Id,
|
||||
|
@ -239,10 +238,10 @@ func (r *Reporter) ReportLog(noMore bool) error {
|
|||
return fmt.Errorf("submitted logs are lost")
|
||||
}
|
||||
|
||||
r.mutex.Lock()
|
||||
r.stateMu.Lock()
|
||||
r.logRows = r.logRows[ack-r.logOffset:]
|
||||
r.logOffset = ack
|
||||
r.mutex.Unlock()
|
||||
r.stateMu.Unlock()
|
||||
|
||||
if noMore && ack < r.logOffset+len(rows) {
|
||||
return fmt.Errorf("not all logs are submitted")
|
||||
|
@ -255,16 +254,17 @@ func (r *Reporter) ReportState() error {
|
|||
r.clientM.Lock()
|
||||
defer r.clientM.Unlock()
|
||||
|
||||
r.mutex.RLock()
|
||||
r.stateMu.RLock()
|
||||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||
r.mutex.RUnlock()
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
outputs := make(map[string]string, len(r.outputs))
|
||||
for k, v := range r.outputs {
|
||||
if !r.sentOutputs[k] {
|
||||
outputs[k] = v
|
||||
outputs := make(map[string]string)
|
||||
r.outputs.Range(func(k, v interface{}) bool {
|
||||
if val, ok := v.(string); ok {
|
||||
outputs[k.(string)] = val
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
||||
State: state,
|
||||
|
@ -275,7 +275,7 @@ func (r *Reporter) ReportState() error {
|
|||
}
|
||||
|
||||
for _, k := range resp.Msg.SentOutputs {
|
||||
r.sentOutputs[k] = true
|
||||
r.outputs.Store(k, struct{}{})
|
||||
}
|
||||
|
||||
if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue