feat(scheduler): introduce scheduler with retries and callbacks#19
feat(scheduler): introduce scheduler with retries and callbacks#19
Conversation
- Add new pkg/scheduler package: - Core types: Job, Schedule, Request, Callback, RetryPolicy, CallbackPayload - Options: WithHTTPClient, WithLogger, WithConcurrency - Errors: ErrInvalidJob, ErrUnsupportedMethod, ErrRetryableStatus, StatusError - Implement scheduling loop with concurrency control, HTTP execution, retry with backoff, and callback delivery with structured logging - Tests: - Add tests/scheduler_test.go for retry and callback behavior - Update tests/retrier_test.go defaults (max retries, interval, backoff factor) - Adjust tests/registry_test.go to use defaultRegistryLen - Update tests/timer_test.go with defaultPoolSize - Docs/Config: - Add Scheduler example to README - Update .golangci.yaml to exclude test paths - Extend cspell.json dictionary (e.g., sched) No breaking changes expected.
There was a problem hiding this comment.
Pull request overview
This pull request introduces a new scheduler package for executing HTTP requests on recurring intervals with retry logic and callback notifications. The implementation includes core scheduling functionality, concurrency control, and comprehensive type definitions.
Changes:
- Added scheduler package with job scheduling, retry policies, and callback delivery
- Refactored test constants across timer, retrier, and registry test files for consistency
- Updated linter configuration and spell-check dictionary
- Added scheduler usage example to README
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/scheduler/types.go | Defines core types for Schedule, Request, Callback, RetryPolicy, Job, and CallbackPayload |
| pkg/scheduler/scheduler.go | Main scheduler implementation with job execution loop, HTTP client, retry logic, and concurrency control |
| pkg/scheduler/options.go | Configuration options for HTTP client, logger, and concurrency limits |
| pkg/scheduler/errors.go | Error definitions for invalid jobs, unsupported methods, and status errors |
| tests/scheduler_test.go | Test covering scheduler retry and callback behavior |
| tests/timer_test.go | Refactored to use defaultPoolSize constant |
| tests/retrier_test.go | Refactored to use named constants for default values |
| tests/registry_test.go | Refactored to use defaultRegistryLen constant |
| .golangci.yaml | Added exclude rules for test files in revive linter |
| cspell.json | Added "sched" to dictionary |
| README.md | Added scheduler usage example with retry configuration |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func isSupportedMethod(method string) bool { | ||
| switch method { | ||
| case http.MethodGet, http.MethodPost, http.MethodPut: | ||
| return true | ||
| default: | ||
| return false | ||
| } | ||
| } |
There was a problem hiding this comment.
The isSupportedMethod function only allows GET, POST, and PUT methods. Common HTTP methods like PATCH and DELETE are not supported. Consider expanding this list or documenting why these methods are excluded, as DELETE and PATCH are widely used in RESTful APIs.
| func (s *Scheduler) enqueue(ctx context.Context, job Job, scheduledAt time.Time) { | ||
| if s.sem != nil { | ||
| s.sem <- struct{}{} | ||
| } | ||
|
|
||
| s.wg.Go(func() { | ||
| if s.sem != nil { | ||
| defer func() { <-s.sem }() | ||
| } | ||
|
|
||
| s.execute(ctx, job, scheduledAt) | ||
| }) | ||
| } |
There was a problem hiding this comment.
The semaphore is acquired before launching the goroutine. If there's a panic or issue between acquiring the semaphore (line 246) and starting the goroutine, the semaphore slot may not be properly released. Consider moving the semaphore acquisition inside the goroutine or using a select with context cancellation to prevent blocking indefinitely on semaphore acquisition.
| for { | ||
| if ctx.Err() != nil { | ||
| return | ||
| } | ||
|
|
||
| if job.Schedule.MaxRuns > 0 && runs >= job.Schedule.MaxRuns { | ||
| return | ||
| } | ||
|
|
||
| if !job.Schedule.EndAt.IsZero() && time.Now().After(job.Schedule.EndAt) { | ||
| return | ||
| } | ||
|
|
||
| runs++ | ||
| scheduledAt := time.Now() | ||
| s.enqueue(ctx, job, scheduledAt) | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| } | ||
| } |
There was a problem hiding this comment.
The job execution loop runs the job immediately upon entering the loop (line 217), then waits for the ticker interval. This means a job with Every set to 1 minute will execute at time 0, then again at 1 minute, rather than waiting 1 minute before the first execution. While this may be intentional, it differs from typical scheduler behavior where the interval represents the delay before the first execution. Consider documenting this behavior or adding a flag to control whether the first execution should be immediate or delayed.
| if job.Request.URL == "" { | ||
| return Job{}, fmt.Errorf("%w: request URL is required", ErrInvalidJob) | ||
| } |
There was a problem hiding this comment.
The Request URL validation only checks if it's empty, but doesn't validate if it's a well-formed URL. Consider adding URL parsing validation using url.Parse to catch malformed URLs early during job scheduling rather than at execution time.
| s.wg.Go(func() { | ||
| if s.sem != nil { | ||
| defer func() { <-s.sem }() | ||
| } | ||
|
|
||
| s.execute(ctx, job, scheduledAt) | ||
| }) |
There was a problem hiding this comment.
The standard library's sync.WaitGroup does not have a Go method. This code will fail to compile. The correct pattern is to call s.wg.Add(1) before launching the goroutine, then use a plain go statement with defer s.wg.Done() inside the goroutine function.
| s.wg.Go(func() { | |
| if s.sem != nil { | |
| defer func() { <-s.sem }() | |
| } | |
| s.execute(ctx, job, scheduledAt) | |
| }) | |
| s.wg.Add(1) | |
| go func() { | |
| defer s.wg.Done() | |
| if s.sem != nil { | |
| defer func() { <-s.sem }() | |
| } | |
| s.execute(ctx, job, scheduledAt) | |
| }() |
| package tests | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "sync/atomic" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/hyp3rd/go-again" | ||
| "github.com/hyp3rd/go-again/pkg/scheduler" | ||
| ) | ||
|
|
||
| const ( | ||
| schedulerTimeout = 200 * time.Millisecond | ||
| schedulerRetries = 5 | ||
| ) | ||
|
|
||
| //nolint:funlen | ||
| func TestSchedulerRetryAndCallback(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| var targetHits int32 | ||
|
|
||
| target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { | ||
| hit := atomic.AddInt32(&targetHits, 1) | ||
| if hit < 3 { | ||
| w.WriteHeader(http.StatusInternalServerError) | ||
|
|
||
| _, err := w.Write([]byte("retry")) | ||
| if err != nil { | ||
| t.Fatalf("failed to write response: %v", err) | ||
| } | ||
|
|
||
| return | ||
| } | ||
|
|
||
| w.WriteHeader(http.StatusOK) | ||
|
|
||
| _, err := w.Write([]byte("ok")) | ||
| if err != nil { | ||
| t.Fatalf("failed to write response: %v", err) | ||
| } | ||
| })) | ||
| defer target.Close() | ||
|
|
||
| callbackCh := make(chan scheduler.CallbackPayload, 1) | ||
|
|
||
| callback := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| defer func() { | ||
| err := r.Body.Close() | ||
| if err != nil { | ||
| t.Logf("failed to close request body: %v", err) | ||
| } | ||
| }() | ||
|
|
||
| var payload scheduler.CallbackPayload | ||
|
|
||
| err := json.NewDecoder(r.Body).Decode(&payload) | ||
| if err != nil { | ||
| t.Fatalf("failed to decode callback payload: %v", err) | ||
| } | ||
|
|
||
| callbackCh <- payload | ||
|
|
||
| w.WriteHeader(http.StatusOK) | ||
| })) | ||
| defer callback.Close() | ||
|
|
||
| retrier, err := again.NewRetrier( | ||
| context.Background(), | ||
| again.WithMaxRetries(schedulerRetries), | ||
| again.WithInterval(1*time.Millisecond), | ||
| again.WithJitter(1*time.Millisecond), | ||
| again.WithTimeout(schedulerTimeout), | ||
| ) | ||
| if err != nil { | ||
| t.Fatalf("failed to create retrier: %v", err) | ||
| } | ||
|
|
||
| sched := scheduler.NewScheduler() | ||
| defer sched.Stop() | ||
|
|
||
| job := scheduler.Job{ | ||
| Schedule: scheduler.Schedule{ | ||
| Every: 10 * time.Millisecond, | ||
| MaxRuns: 1, | ||
| }, | ||
| Request: scheduler.Request{ | ||
| Method: http.MethodGet, | ||
| URL: target.URL, | ||
| }, | ||
| Callback: scheduler.Callback{ | ||
| URL: callback.URL, | ||
| }, | ||
| RetryPolicy: scheduler.RetryPolicy{ | ||
| Retrier: retrier, | ||
| RetryStatusCodes: []int{http.StatusInternalServerError}, | ||
| }, | ||
| } | ||
|
|
||
| jobID, err := sched.Schedule(job) | ||
| if err != nil { | ||
| t.Fatalf("failed to schedule job: %v", err) | ||
| } | ||
|
|
||
| select { | ||
| case payload := <-callbackCh: | ||
| if payload.JobID != jobID { | ||
| t.Fatalf("expected job id %q, got %q", jobID, payload.JobID) | ||
| } | ||
|
|
||
| if !payload.Success { | ||
| t.Fatalf("expected success, got error %q", payload.Error) | ||
| } | ||
|
|
||
| if payload.Attempts != 3 { | ||
| t.Fatalf("expected 3 attempts, got %d", payload.Attempts) | ||
| } | ||
|
|
||
| if payload.StatusCode != http.StatusOK { | ||
| t.Fatalf("expected status %d, got %d", http.StatusOK, payload.StatusCode) | ||
| } | ||
| case <-time.After(2 * time.Second): | ||
| t.Fatal("timed out waiting for callback") | ||
| } | ||
| } |
There was a problem hiding this comment.
The scheduler test file only includes a single test covering the retry and callback happy path. Consider adding test coverage for edge cases such as: job validation failures, unsupported HTTP methods, concurrent job execution, job cancellation via Remove, scheduler Stop behavior, StartAt/EndAt schedule boundaries, MaxRuns enforcement, and callback failures.
| if !job.Schedule.EndAt.IsZero() && !job.Schedule.StartAt.IsZero() && job.Schedule.EndAt.Before(job.Schedule.StartAt) { | ||
| return Job{}, fmt.Errorf("%w: end time precedes start time", ErrInvalidJob) |
There was a problem hiding this comment.
The validation only checks if EndAt is before StartAt, but doesn't handle the case where EndAt equals StartAt. If EndAt equals StartAt, the job would never run since the condition on line 211 checks time.Now().After(job.Schedule.EndAt). Consider adding validation to reject jobs where EndAt equals StartAt, or document this edge case.
| if !job.Schedule.EndAt.IsZero() && !job.Schedule.StartAt.IsZero() && job.Schedule.EndAt.Before(job.Schedule.StartAt) { | |
| return Job{}, fmt.Errorf("%w: end time precedes start time", ErrInvalidJob) | |
| if !job.Schedule.EndAt.IsZero() && !job.Schedule.StartAt.IsZero() && | |
| (job.Schedule.EndAt.Before(job.Schedule.StartAt) || job.Schedule.EndAt.Equal(job.Schedule.StartAt)) { | |
| return Job{}, fmt.Errorf("%w: end time must be after start time", ErrInvalidJob) |
| func (*Scheduler) buildTemporaryErrors(retrier *again.Retrier, policy RetryPolicy) []error { | ||
| var temp []error | ||
|
|
||
| if len(policy.RetryStatusCodes) > 0 { | ||
| temp = append(temp, ErrRetryableStatus) | ||
| } | ||
|
|
||
| if len(policy.TemporaryErrors) > 0 { | ||
| temp = append(temp, policy.TemporaryErrors...) | ||
| } | ||
|
|
||
| if len(temp) > 0 && retrier.Registry != nil && retrier.Registry.Len() > 0 { | ||
| temp = append(temp, retrier.Registry.ListTemporaryErrors()...) | ||
| } | ||
|
|
||
| return temp |
There was a problem hiding this comment.
The buildTemporaryErrors function only adds registry errors if temp is non-empty (line 401). This means if a job has no RetryStatusCodes or TemporaryErrors configured, the registry defaults won't be used even though LoadDefaults was called during normalization. Consider removing the len(temp) > 0 check or documenting why registry errors should only be included when custom errors are also present.
| // WithLogger sets the scheduler logger. | ||
| func WithLogger(logger *slog.Logger) Option { | ||
| return func(s *Scheduler) { | ||
| s.logger = logger |
There was a problem hiding this comment.
The WithLogger option should check if the logger parameter is nil before assigning it, similar to how WithHTTPClient validates the client parameter. Without this check, users could inadvertently set a nil logger which could cause issues even though logError has a nil check.
| s.logger = logger | |
| if logger != nil { | |
| s.logger = logger | |
| } |
No breaking changes expected.