Clean the repository cache if it is in incomplete state (#595)
This commit is contained in:
parent
465fbba7d1
commit
010e459e95
3 changed files with 49 additions and 11 deletions
|
@ -105,17 +105,17 @@ func NewParallelExecutor(executors ...Executor) Executor {
|
|||
}()
|
||||
}
|
||||
|
||||
// Executor waits all executors to cleanup these resources.
|
||||
var firstErr error
|
||||
for i := 0; i < len(executors); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := <-errChan; err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -89,3 +89,41 @@ func TestNewParallelExecutor(t *testing.T) {
|
|||
|
||||
assert.Nil(err)
|
||||
}
|
||||
|
||||
func TestNewParallelExecutorFailed(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
count := 0
|
||||
errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
|
||||
count++
|
||||
return fmt.Errorf("fake error")
|
||||
})
|
||||
err := NewParallelExecutor(errorWorkflow)(ctx)
|
||||
assert.Equal(1, count)
|
||||
assert.ErrorIs(context.Canceled, err)
|
||||
}
|
||||
|
||||
func TestNewParallelExecutorCanceled(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
errExpected := fmt.Errorf("fake error")
|
||||
|
||||
count := 0
|
||||
successWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
|
||||
count++
|
||||
return errExpected
|
||||
})
|
||||
err := NewParallelExecutor(errorWorkflow, successWorkflow, successWorkflow)(ctx)
|
||||
assert.Equal(3, count)
|
||||
assert.Error(errExpected, err)
|
||||
}
|
||||
|
|
|
@ -190,7 +190,7 @@ type NewGitCloneExecutorInput struct {
|
|||
}
|
||||
|
||||
// CloneIfRequired ...
|
||||
func CloneIfRequired(refName plumbing.ReferenceName, input NewGitCloneExecutorInput, logger log.FieldLogger) (*git.Repository, error) {
|
||||
func CloneIfRequired(ctx context.Context, refName plumbing.ReferenceName, input NewGitCloneExecutorInput, logger log.FieldLogger) (*git.Repository, error) {
|
||||
r, err := git.PlainOpen(input.Dir)
|
||||
if err != nil {
|
||||
var progressWriter io.Writer
|
||||
|
@ -203,7 +203,7 @@ func CloneIfRequired(refName plumbing.ReferenceName, input NewGitCloneExecutorIn
|
|||
progressWriter = os.Stdout
|
||||
}
|
||||
|
||||
r, err = git.PlainClone(input.Dir, false, &git.CloneOptions{
|
||||
r, err = git.PlainCloneContext(ctx, input.Dir, false, &git.CloneOptions{
|
||||
URL: input.URL,
|
||||
Progress: progressWriter,
|
||||
})
|
||||
|
@ -228,7 +228,7 @@ func NewGitCloneExecutor(input NewGitCloneExecutorInput) Executor {
|
|||
defer cloneLock.Unlock()
|
||||
|
||||
refName := plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", input.Ref))
|
||||
r, err := CloneIfRequired(refName, input, logger)
|
||||
r, err := CloneIfRequired(ctx, refName, input, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue