feat: allow graceful shutdowns
Add a Shutdown(context.Context) method to the Poller. This will first shutdown all active polling, preventing any new jobs from spawning. It will then wait for either all jobs to finish, or for the context to finish. If the context finishes, it will then force all jobs to end, and then exit.
This commit is contained in:
parent
1735b26e66
commit
b350719527
2 changed files with 54 additions and 9 deletions
|
@ -122,9 +122,12 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
|||
|
||||
poller := poll.New(cfg, cli, runner)
|
||||
|
||||
poller.Poll(ctx)
|
||||
go poller.Poll()
|
||||
|
||||
return nil
|
||||
<-ctx.Done()
|
||||
log.Infof("runner: %s gracefully shutting down", resp.Msg.Runner.Name)
|
||||
|
||||
return poller.Shutdown(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,40 +25,82 @@ type Poller struct {
|
|||
runner *run.Runner
|
||||
cfg *config.Config
|
||||
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
||||
|
||||
pollingCtx context.Context
|
||||
shutdownPolling context.CancelFunc
|
||||
|
||||
jobsCtx context.Context
|
||||
shutdownJobs context.CancelFunc
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
|
||||
|
||||
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
return &Poller{
|
||||
client: client,
|
||||
runner: runner,
|
||||
cfg: cfg,
|
||||
|
||||
pollingCtx: pollingCtx,
|
||||
shutdownPolling: shutdownPolling,
|
||||
|
||||
jobsCtx: jobsCtx,
|
||||
shutdownJobs: shutdownJobs,
|
||||
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Poll(ctx context.Context) {
|
||||
func (p *Poller) Poll() {
|
||||
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
||||
wg.Add(1)
|
||||
go p.poll(ctx, wg, limiter)
|
||||
go p.poll(wg, limiter)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// signal that we shutdown
|
||||
close(p.done)
|
||||
}
|
||||
|
||||
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||
func (p *Poller) Shutdown(ctx context.Context) error {
|
||||
p.shutdownPolling()
|
||||
|
||||
select {
|
||||
// gracefully shutdown
|
||||
case <-p.done:
|
||||
return nil
|
||||
|
||||
// Our timeout for shutting down ran out
|
||||
case <-ctx.Done():
|
||||
// force a shutdown of all running jobs
|
||||
p.shutdownJobs()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
if err := limiter.Wait(p.pollingCtx); err != nil {
|
||||
if p.pollingCtx.Err() != nil {
|
||||
log.WithError(err).Debug("limiter wait failed")
|
||||
}
|
||||
return
|
||||
}
|
||||
task, ok := p.fetchTask(ctx)
|
||||
task, ok := p.fetchTask(p.pollingCtx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
p.runTaskWithRecover(ctx, task)
|
||||
|
||||
p.runTaskWithRecover(p.jobsCtx, task)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue