Skip to content

Comments

feat(scheduler): introduce scheduler with retries and callbacks#19

Merged
hyp3rd merged 1 commit intomainfrom
feat/improvements
Jan 21, 2026
Merged

feat(scheduler): introduce scheduler with retries and callbacks#19
hyp3rd merged 1 commit intomainfrom
feat/improvements

Conversation

@hyp3rd
Copy link
Owner

@hyp3rd hyp3rd commented Jan 21, 2026

  • Add new pkg/scheduler package:
    • Core types: Job, Schedule, Request, Callback, RetryPolicy, CallbackPayload
    • Options: WithHTTPClient, WithLogger, WithConcurrency
    • Errors: ErrInvalidJob, ErrUnsupportedMethod, ErrRetryableStatus, StatusError
    • Implement scheduling loop with concurrency control, HTTP execution, retry with backoff, and callback delivery with structured logging
  • Tests:
    • Add tests/scheduler_test.go for retry and callback behavior
    • Update tests/retrier_test.go defaults (max retries, interval, backoff factor)
    • Adjust tests/registry_test.go to use defaultRegistryLen
    • Update tests/timer_test.go with defaultPoolSize
  • Docs/Config:
    • Add Scheduler example to README
    • Update .golangci.yaml to exclude test paths
    • Extend cspell.json dictionary (e.g., sched)

No breaking changes expected.

- Add new pkg/scheduler package:
  - Core types: Job, Schedule, Request, Callback, RetryPolicy, CallbackPayload
  - Options: WithHTTPClient, WithLogger, WithConcurrency
  - Errors: ErrInvalidJob, ErrUnsupportedMethod, ErrRetryableStatus, StatusError
  - Implement scheduling loop with concurrency control, HTTP execution, retry with backoff, and callback delivery with structured logging
- Tests:
  - Add tests/scheduler_test.go for retry and callback behavior
  - Update tests/retrier_test.go defaults (max retries, interval, backoff factor)
  - Adjust tests/registry_test.go to use defaultRegistryLen
  - Update tests/timer_test.go with defaultPoolSize
- Docs/Config:
  - Add Scheduler example to README
  - Update .golangci.yaml to exclude test paths
  - Extend cspell.json dictionary (e.g., sched)

No breaking changes expected.
Copilot AI review requested due to automatic review settings January 21, 2026 13:28
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request introduces a new scheduler package for executing HTTP requests on recurring intervals with retry logic and callback notifications. The implementation includes core scheduling functionality, concurrency control, and comprehensive type definitions.

Changes:

  • Added scheduler package with job scheduling, retry policies, and callback delivery
  • Refactored test constants across timer, retrier, and registry test files for consistency
  • Updated linter configuration and spell-check dictionary
  • Added scheduler usage example to README

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
pkg/scheduler/types.go Defines core types for Schedule, Request, Callback, RetryPolicy, Job, and CallbackPayload
pkg/scheduler/scheduler.go Main scheduler implementation with job execution loop, HTTP client, retry logic, and concurrency control
pkg/scheduler/options.go Configuration options for HTTP client, logger, and concurrency limits
pkg/scheduler/errors.go Error definitions for invalid jobs, unsupported methods, and status errors
tests/scheduler_test.go Test covering scheduler retry and callback behavior
tests/timer_test.go Refactored to use defaultPoolSize constant
tests/retrier_test.go Refactored to use named constants for default values
tests/registry_test.go Refactored to use defaultRegistryLen constant
.golangci.yaml Added exclude rules for test files in revive linter
cspell.json Added "sched" to dictionary
README.md Added scheduler usage example with retry configuration

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +456 to +463
func isSupportedMethod(method string) bool {
switch method {
case http.MethodGet, http.MethodPost, http.MethodPut:
return true
default:
return false
}
}
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isSupportedMethod function only allows GET, POST, and PUT methods. Common HTTP methods like PATCH and DELETE are not supported. Consider expanding this list or documenting why these methods are excluded, as DELETE and PATCH are widely used in RESTful APIs.

Copilot uses AI. Check for mistakes.
Comment on lines +244 to +256
func (s *Scheduler) enqueue(ctx context.Context, job Job, scheduledAt time.Time) {
if s.sem != nil {
s.sem <- struct{}{}
}

s.wg.Go(func() {
if s.sem != nil {
defer func() { <-s.sem }()
}

s.execute(ctx, job, scheduledAt)
})
}
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphore is acquired before launching the goroutine. If there's a panic or issue between acquiring the semaphore (line 246) and starting the goroutine, the semaphore slot may not be properly released. Consider moving the semaphore acquisition inside the goroutine or using a select with context cancellation to prevent blocking indefinitely on semaphore acquisition.

Copilot uses AI. Check for mistakes.
Comment on lines +202 to +224
for {
if ctx.Err() != nil {
return
}

if job.Schedule.MaxRuns > 0 && runs >= job.Schedule.MaxRuns {
return
}

if !job.Schedule.EndAt.IsZero() && time.Now().After(job.Schedule.EndAt) {
return
}

runs++
scheduledAt := time.Now()
s.enqueue(ctx, job, scheduledAt)

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job execution loop runs the job immediately upon entering the loop (line 217), then waits for the ticker interval. This means a job with Every set to 1 minute will execute at time 0, then again at 1 minute, rather than waiting 1 minute before the first execution. While this may be intentional, it differs from typical scheduler behavior where the interval represents the delay before the first execution. Consider documenting this behavior or adding a flag to control whether the first execution should be immediate or delayed.

Copilot uses AI. Check for mistakes.
Comment on lines +147 to +149
if job.Request.URL == "" {
return Job{}, fmt.Errorf("%w: request URL is required", ErrInvalidJob)
}
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Request URL validation only checks if it's empty, but doesn't validate if it's a well-formed URL. Consider adding URL parsing validation using url.Parse to catch malformed URLs early during job scheduling rather than at execution time.

Copilot uses AI. Check for mistakes.
Comment on lines +249 to +255
s.wg.Go(func() {
if s.sem != nil {
defer func() { <-s.sem }()
}

s.execute(ctx, job, scheduledAt)
})
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The standard library's sync.WaitGroup does not have a Go method. This code will fail to compile. The correct pattern is to call s.wg.Add(1) before launching the goroutine, then use a plain go statement with defer s.wg.Done() inside the goroutine function.

Suggested change
s.wg.Go(func() {
if s.sem != nil {
defer func() { <-s.sem }()
}
s.execute(ctx, job, scheduledAt)
})
s.wg.Add(1)
go func() {
defer s.wg.Done()
if s.sem != nil {
defer func() { <-s.sem }()
}
s.execute(ctx, job, scheduledAt)
}()

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +129
package tests

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/hyp3rd/go-again"
"github.com/hyp3rd/go-again/pkg/scheduler"
)

const (
schedulerTimeout = 200 * time.Millisecond
schedulerRetries = 5
)

//nolint:funlen
func TestSchedulerRetryAndCallback(t *testing.T) {
t.Parallel()

var targetHits int32

target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
hit := atomic.AddInt32(&targetHits, 1)
if hit < 3 {
w.WriteHeader(http.StatusInternalServerError)

_, err := w.Write([]byte("retry"))
if err != nil {
t.Fatalf("failed to write response: %v", err)
}

return
}

w.WriteHeader(http.StatusOK)

_, err := w.Write([]byte("ok"))
if err != nil {
t.Fatalf("failed to write response: %v", err)
}
}))
defer target.Close()

callbackCh := make(chan scheduler.CallbackPayload, 1)

callback := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
err := r.Body.Close()
if err != nil {
t.Logf("failed to close request body: %v", err)
}
}()

var payload scheduler.CallbackPayload

err := json.NewDecoder(r.Body).Decode(&payload)
if err != nil {
t.Fatalf("failed to decode callback payload: %v", err)
}

callbackCh <- payload

w.WriteHeader(http.StatusOK)
}))
defer callback.Close()

retrier, err := again.NewRetrier(
context.Background(),
again.WithMaxRetries(schedulerRetries),
again.WithInterval(1*time.Millisecond),
again.WithJitter(1*time.Millisecond),
again.WithTimeout(schedulerTimeout),
)
if err != nil {
t.Fatalf("failed to create retrier: %v", err)
}

sched := scheduler.NewScheduler()
defer sched.Stop()

job := scheduler.Job{
Schedule: scheduler.Schedule{
Every: 10 * time.Millisecond,
MaxRuns: 1,
},
Request: scheduler.Request{
Method: http.MethodGet,
URL: target.URL,
},
Callback: scheduler.Callback{
URL: callback.URL,
},
RetryPolicy: scheduler.RetryPolicy{
Retrier: retrier,
RetryStatusCodes: []int{http.StatusInternalServerError},
},
}

jobID, err := sched.Schedule(job)
if err != nil {
t.Fatalf("failed to schedule job: %v", err)
}

select {
case payload := <-callbackCh:
if payload.JobID != jobID {
t.Fatalf("expected job id %q, got %q", jobID, payload.JobID)
}

if !payload.Success {
t.Fatalf("expected success, got error %q", payload.Error)
}

if payload.Attempts != 3 {
t.Fatalf("expected 3 attempts, got %d", payload.Attempts)
}

if payload.StatusCode != http.StatusOK {
t.Fatalf("expected status %d, got %d", http.StatusOK, payload.StatusCode)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for callback")
}
}
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler test file only includes a single test covering the retry and callback happy path. Consider adding test coverage for edge cases such as: job validation failures, unsupported HTTP methods, concurrent job execution, job cancellation via Remove, scheduler Stop behavior, StartAt/EndAt schedule boundaries, MaxRuns enforcement, and callback failures.

Copilot uses AI. Check for mistakes.
Comment on lines +143 to +144
if !job.Schedule.EndAt.IsZero() && !job.Schedule.StartAt.IsZero() && job.Schedule.EndAt.Before(job.Schedule.StartAt) {
return Job{}, fmt.Errorf("%w: end time precedes start time", ErrInvalidJob)
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation only checks if EndAt is before StartAt, but doesn't handle the case where EndAt equals StartAt. If EndAt equals StartAt, the job would never run since the condition on line 211 checks time.Now().After(job.Schedule.EndAt). Consider adding validation to reject jobs where EndAt equals StartAt, or document this edge case.

Suggested change
if !job.Schedule.EndAt.IsZero() && !job.Schedule.StartAt.IsZero() && job.Schedule.EndAt.Before(job.Schedule.StartAt) {
return Job{}, fmt.Errorf("%w: end time precedes start time", ErrInvalidJob)
if !job.Schedule.EndAt.IsZero() && !job.Schedule.StartAt.IsZero() &&
(job.Schedule.EndAt.Before(job.Schedule.StartAt) || job.Schedule.EndAt.Equal(job.Schedule.StartAt)) {
return Job{}, fmt.Errorf("%w: end time must be after start time", ErrInvalidJob)

Copilot uses AI. Check for mistakes.
Comment on lines +390 to +405
func (*Scheduler) buildTemporaryErrors(retrier *again.Retrier, policy RetryPolicy) []error {
var temp []error

if len(policy.RetryStatusCodes) > 0 {
temp = append(temp, ErrRetryableStatus)
}

if len(policy.TemporaryErrors) > 0 {
temp = append(temp, policy.TemporaryErrors...)
}

if len(temp) > 0 && retrier.Registry != nil && retrier.Registry.Len() > 0 {
temp = append(temp, retrier.Registry.ListTemporaryErrors()...)
}

return temp
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buildTemporaryErrors function only adds registry errors if temp is non-empty (line 401). This means if a job has no RetryStatusCodes or TemporaryErrors configured, the registry defaults won't be used even though LoadDefaults was called during normalization. Consider removing the len(temp) > 0 check or documenting why registry errors should only be included when custom errors are also present.

Copilot uses AI. Check for mistakes.
// WithLogger sets the scheduler logger.
func WithLogger(logger *slog.Logger) Option {
return func(s *Scheduler) {
s.logger = logger
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WithLogger option should check if the logger parameter is nil before assigning it, similar to how WithHTTPClient validates the client parameter. Without this check, users could inadvertently set a nil logger which could cause issues even though logError has a nil check.

Suggested change
s.logger = logger
if logger != nil {
s.logger = logger
}

Copilot uses AI. Check for mistakes.
@hyp3rd hyp3rd merged commit 18c1e7f into main Jan 21, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant