diff --git a/go.mod b/go.mod index 82a778d02..ebd89df0c 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/foxxorcat/weiyun-sdk-go v0.1.4 github.com/gin-contrib/cors v1.7.6 github.com/gin-gonic/gin v1.10.1 + github.com/go-co-op/gocron/v2 v2.19.0 github.com/go-resty/resty/v2 v2.16.5 github.com/go-webauthn/webauthn v0.13.4 github.com/golang-jwt/jwt/v4 v4.5.2 @@ -64,7 +65,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/afero v1.14.0 github.com/spf13/cobra v1.9.1 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 github.com/tchap/go-patricia/v2 v2.3.3 github.com/u2takey/ffmpeg-go v0.5.0 @@ -111,6 +112,7 @@ require ( github.com/jcmturner/goidentity/v6 v6.0.1 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/jonboulle/clockwork v0.5.0 // indirect github.com/lanrat/extsort v1.0.2 // indirect github.com/mikelolasagasti/xz v1.0.1 // indirect github.com/minio/minlz v1.0.0 // indirect @@ -118,6 +120,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/relvacode/iso8601 v1.6.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect go.uber.org/mock v0.5.0 // indirect golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 // indirect diff --git a/go.sum b/go.sum index 029e5d514..db02aab0c 100644 --- a/go.sum +++ b/go.sum @@ -302,6 +302,8 @@ github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ= github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/go-co-op/gocron/v2 v2.19.0 h1:OKf2y6LXPs/BgBI2fl8PxUpNAI1DA9Mg+hSeGOS38OU= +github.com/go-co-op/gocron/v2 v2.19.0/go.mod h1:5lEiCKk1oVJV39Zg7/YG10OnaVrDAV5GGR6O0663k6U= github.com/go-darwin/apfs v0.0.0-20211011131704-f84b94dbf348 h1:JnrjqG5iR07/8k7NqrLNilRsl3s1EPRQEGvbPyOce68= github.com/go-darwin/apfs v0.0.0-20211011131704-f84b94dbf348/go.mod h1:Czxo/d1g948LtrALAZdL04TL/HnkopquAjxYUuI02bo= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -398,8 +400,6 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/halalcloud/golang-sdk-lite v0.0.0-20251006164234-3c629727c499 h1:4ovnBdiGDFi8putQGxhipuuhXItAgh4/YnzufPYkZkQ= -github.com/halalcloud/golang-sdk-lite v0.0.0-20251006164234-3c629727c499/go.mod h1:8x1h4rm3s8xMcTyJrq848sQ6BJnKzl57mDY4CNshdPM= github.com/halalcloud/golang-sdk-lite v0.0.0-20251105081800-78cbb6786c38 h1:lsK2GVgI2Ox0NkRpQnN09GBOH7jtsjFK5tcIgxXlLr0= github.com/halalcloud/golang-sdk-lite v0.0.0-20251105081800-78cbb6786c38/go.mod h1:8x1h4rm3s8xMcTyJrq848sQ6BJnKzl57mDY4CNshdPM= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -468,6 +468,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I= +github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -636,6 +638,8 @@ github.com/rfjakob/eme v1.1.2/go.mod h1:cVvpasglm/G3ngEfcfT/Wt0GwhkuO32pf/poW6Ny github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= @@ -680,8 +684,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 h1:Sa+sR8aaAMFwxhXWENEnE6ZpqhZ9d7u1RT2722Rw6hc= github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5/go.mod h1:UdZiFUFu6e2WjjtjxivwXWcwc1N/8zgbkBR9QNucUOY= github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543 h1:6Y51mutOvRGRx6KqyMNo//xk8B8o6zW9/RVmy1VamOs= @@ -742,8 +746,8 @@ go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5J go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= -go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go4.org v0.0.0-20230225012048-214862532bf5 h1:nifaUDeh+rPaBCMPMQHZmvJf+QdpLFnuQPwx+LxVmtc= diff --git a/pkg/scheduler/errors.go b/pkg/scheduler/errors.go new file mode 100644 index 000000000..b7a1baf07 --- /dev/null +++ b/pkg/scheduler/errors.go @@ -0,0 +1,8 @@ +package scheduler + +import "errors" + +var ( + ErrJobCronNotDefined = errors.New("job cron not defined") + ErrJobTaskNotDefined = errors.New("job task not defined") +) diff --git a/pkg/scheduler/job.go b/pkg/scheduler/job.go new file mode 100644 index 000000000..d3920542b --- /dev/null +++ b/pkg/scheduler/job.go @@ -0,0 +1,361 @@ +package scheduler + +import ( + "context" + "time" + + "github.com/go-co-op/gocron/v2" + "github.com/google/uuid" +) + +// TaskExecutor is the interface that wraps the Execute method. +type TaskExecutor interface { + // Execute performs the task's action. + Execute(ctx context.Context) error +} + +// simpleTask is a simple implementation of TaskExecutor +type simpleTask struct { + f func(ctx context.Context) error +} + +// Execute performs the task's action. +func (st *simpleTask) Execute(ctx context.Context) error { + return st.f(ctx) +} + +// NewSimpleTask creates a new simpleTask with the provided function. +func NewSimpleTask(f func(ctx context.Context) error) TaskExecutor { + return &simpleTask{ + f: f, + } +} + +var _ TaskExecutor = (*simpleTask)(nil) + +// jobBuilder is used to build job definitions. +type jobBuilder struct { + id uuid.UUID + ctx context.Context + jobName string + cron gocron.JobDefinition + disabled bool + labels JobLabels + taskExecutor TaskExecutor + afterJobRuns []func(jobID uuid.UUID, jobName string) + afterJobRunsWithErrors []func(jobID uuid.UUID, jobName string, runErr error) + afterJobRunsWithPanics []func(jobID uuid.UUID, jobName string, panicData any) + beforeJobRuns []func(jobID uuid.UUID, jobName string) + beforeJobRunsSkipIfBeforeFuncErrors []func(jobID uuid.UUID, jobName string) error +} + +// jobDefine defines the parameters of a job. +type jobDefine struct { + id uuid.UUID + cron gocron.JobDefinition + disabled bool + taskExecutor TaskExecutor + opts []gocron.JobOption +} + +// NewJobBuilder create a jobBuilder +func NewJobBuilder() *jobBuilder { + return &jobBuilder{ + disabled: false, + labels: make(JobLabels), + } +} + +// ID sets the job ID if needed. +func (jb *jobBuilder) ID(id uuid.UUID) *jobBuilder { + jb.id = id + return jb +} + +// Ctx sets the job context. +func (jb *jobBuilder) Ctx(ctx context.Context) *jobBuilder { + jb.ctx = ctx + return jb +} + +// Name sets the job name. +func (jb *jobBuilder) Name(name string) *jobBuilder { + jb.jobName = name + return jb +} + +// _internalCron sets the job cron definition. +// This is an internal method; prefer using the By... methods. +func (jb *jobBuilder) _internalCron(cron gocron.JobDefinition) *jobBuilder { + jb.cron = cron + return jb +} + +// ByCrontab defines a new job using the crontab syntax: `* * * * *`. +// An optional 6th field can be used at the beginning if withSeconds +// is set to true: `* * * * * *`. +// The timezone can be set on the Scheduler using WithLocation, or in the +// crontab in the form `TZ=America/Chicago * * * * *` or +// `CRON_TZ=America/Chicago * * * * *` +func (jb *jobBuilder) ByCrontab(crontab string, withSeconds bool) *jobBuilder { + return jb._internalCron(gocron.CronJob(crontab, withSeconds)) +} + +// ByDuration defines a new job using time.Duration +// for the interval. +func (jb *jobBuilder) ByDuration(d time.Duration) *jobBuilder { + return jb._internalCron(gocron.DurationJob(d)) +} + +// ByDurationRandomJob defines a new job that runs on a random interval +// between the min and max duration values provided. +// +// To achieve a similar behavior as tools that use a splay/jitter technique +// consider the median value as the baseline and the difference between the +// max-median or median-min as the splay/jitter. +// +// For example, if you want a job to run every 5 minutes, but want to add +// up to 1 min of jitter to the interval, you could use +// ByDurationRandomJob(4*time.Minute, 6*time.Minute) +func (jb *jobBuilder) ByDurationRandomJob(min, max time.Duration) *jobBuilder { + return jb._internalCron(gocron.DurationRandomJob(min, max)) +} + +// ByDaily defines a new job that runs daily at the specified time. +func (jb *jobBuilder) ByDaily(interval uint, atTimes AtTimes) *jobBuilder { + return jb._internalCron(gocron.DailyJob(interval, newAtTimes(atTimes))) +} + +// ByWeekly defines a new job that runs weekly at the specified time. +func (jb *jobBuilder) ByWeekly(interval uint, weekdays []time.Weekday, atTimes AtTimes) *jobBuilder { + return jb._internalCron(gocron.WeeklyJob(interval, newWeekdays(weekdays), newAtTimes(atTimes))) +} + +// ByMonthly runs the job on the interval of months, on the specific days of the month +// specified, and at the set times. Days of the month can be 1 to 31 or negative (-1 to -31), which +// count backwards from the end of the month. E.g. -1 is the last day of the month. +// +// If a day of the month is selected that does not exist in all months (e.g. 31st) +// any month that does not have that day will be skipped. +// +// By default, the job will start the next available day, considering the last run to be now, +// and the time and month based on the interval, days and times you input. +// This means, if you select an interval greater than 1, your job by default will run +// X (interval) months from now if there are no daysOfTheMonth left in the current month. +// You can use WithStartAt to tell the scheduler to start the job sooner. +// +// Carefully consider your configuration! +// - For example: an interval of 2 months on the 31st of each month, starting 12/31 +// would skip Feb, April, June, and next run would be in August. +func (jb *jobBuilder) ByMonthly(interval uint, daysOfTheMonth []int, atTimes AtTimes) *jobBuilder { + return jb._internalCron(gocron.MonthlyJob( + interval, + newDaysOfTheMonth(daysOfTheMonth), + newAtTimes(atTimes))) +} + +// ByOneTimeJobStartImmediately tells the scheduler to run the one time job immediately. +func (jb *jobBuilder) ByOneTimeJobStartImmediately() *jobBuilder { + return jb._internalCron(gocron.OneTimeJob(gocron.OneTimeJobStartImmediately())) +} + +// ByOneTimeJobStartDateTime sets the date & time at which the job should run. +// This datetime must be in the future (according to the scheduler clock). +func (jb *jobBuilder) ByOneTimeJobStartDateTime(start time.Time) *jobBuilder { + return jb._internalCron(gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start))) +} + +// ByOneTimeJobStartDateTimes sets the date & times at which the job should run. +// At least one of the date/times must be in the future (according to the scheduler clock). +func (jb *jobBuilder) ByOneTimeJobStartDateTimes(times ...time.Time) *jobBuilder { + return jb._internalCron(gocron.OneTimeJob(gocron.OneTimeJobStartDateTimes(times...))) +} + +// Disabled sets the job disabled status. +func (jb *jobBuilder) Disabled(disabled bool) *jobBuilder { + jb.disabled = disabled + return jb +} + +// Label add or replaces a label key/value pair. +func (jb *jobBuilder) Label(key, value string) *jobBuilder { + jb.labels[key] = value + return jb +} + +// Labels batch adds or replaces multiple label key/value pairs. +func (jb *jobBuilder) Labels(labels JobLabels) *jobBuilder { + if len(labels) == 0 { + return jb + } + for k, v := range labels { + jb.labels[k] = v + } + return jb +} + +// TaskExecutor sets the job taskExecutor. +func (jb *jobBuilder) TaskExecutor(taskExecutor TaskExecutor) *jobBuilder { + jb.taskExecutor = taskExecutor + return jb +} + +// AfterJobRuns sets functions to be called after the job runs. +func (jb *jobBuilder) AfterJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) *jobBuilder { + jb.afterJobRuns = append(jb.afterJobRuns, eventListenerFunc) + return jb +} + +// AfterJobRunsWithError is used to listen for when a job has run and returned an error, and then run the provided function. +func (jb *jobBuilder) AfterJobRunsWithError(eventListenerFunc func(jobID uuid.UUID, jobName string, runErr error)) *jobBuilder { + jb.afterJobRunsWithErrors = append(jb.afterJobRunsWithErrors, eventListenerFunc) + return jb +} + +// AfterJobRunsWithPanic is used to listen for when a job has run and returned panicked recover data, and then run the provided function. +func (jb *jobBuilder) AfterJobRunsWithPanic(eventListenerFunc func(jobID uuid.UUID, jobName string, panicData any)) *jobBuilder { + jb.afterJobRunsWithPanics = append(jb.afterJobRunsWithPanics, eventListenerFunc) + return jb +} + +// BeforeJobRuns sets functions to be called before the job runs. +func (jb *jobBuilder) BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) *jobBuilder { + jb.beforeJobRuns = append(jb.beforeJobRuns, eventListenerFunc) + return jb +} + +// BeforeJobRunsSkipIfBeforeFuncErrors sets functions to be called before the job runs. +// If any of these functions return an error, the job run will be skipped. +func (jb *jobBuilder) BeforeJobRunsSkipIfBeforeFuncErrors(eventListenerFunc func(jobID uuid.UUID, jobName string) error) *jobBuilder { + jb.beforeJobRunsSkipIfBeforeFuncErrors = append(jb.beforeJobRunsSkipIfBeforeFuncErrors, eventListenerFunc) + return jb +} + +func (jb *jobBuilder) Build() (*jobDefine, error) { + if jb.cron == nil { + return nil, ErrJobCronNotDefined + } + if jb.taskExecutor == nil { + return nil, ErrJobTaskNotDefined + } + return &jobDefine{ + id: jb._internalGetOrCreateID(), + opts: jb._internalGetOptions(), + taskExecutor: jb.taskExecutor, + cron: jb.cron, + disabled: jb.disabled, + }, nil +} + +func (jb *jobBuilder) _internalGetOrCreateID() uuid.UUID { + if jb.id == uuid.Nil { + jb.id = uuid.New() + } + return jb.id +} + +func (jb *jobBuilder) _internalGetOptions() []gocron.JobOption { + tags := jobLabels2Tags(jb.labels) + opts := []gocron.JobOption{} + if jb.id != uuid.Nil { + opts = append(opts, gocron.WithIdentifier(jb.id)) + } + if jb.ctx != nil { + opts = append(opts, gocron.WithContext(jb.ctx)) + } + if jb.jobName != "" { + opts = append(opts, gocron.WithName(jb.jobName)) + } + if len(tags) > 0 { + opts = append(opts, gocron.WithTags(tags...)) + } + listens := []gocron.EventListener{} + if len(jb.afterJobRuns) > 0 { + listens = append(listens, gocron.AfterJobRuns( + func(jobID uuid.UUID, jobName string) { + for _, e := range jb.afterJobRuns { + e(jobID, jobName) + } + })) + } + if len(jb.afterJobRunsWithErrors) > 0 { + listens = append(listens, gocron.AfterJobRunsWithError( + func(jobID uuid.UUID, jobName string, runErr error) { + for _, e := range jb.afterJobRunsWithErrors { + e(jobID, jobName, runErr) + } + }), + ) + } + if len(jb.afterJobRunsWithPanics) > 0 { + listens = append(listens, gocron.AfterJobRunsWithPanic( + func(jobID uuid.UUID, jobName string, panicData any) { + for _, e := range jb.afterJobRunsWithPanics { + e(jobID, jobName, panicData) + } + }), + ) + } + if len(jb.beforeJobRuns) > 0 { + listens = append(listens, gocron.BeforeJobRuns( + func(jobID uuid.UUID, jobName string) { + for _, e := range jb.beforeJobRuns { + e(jobID, jobName) + } + })) + } + if len(jb.beforeJobRunsSkipIfBeforeFuncErrors) > 0 { + listens = append(listens, gocron.BeforeJobRunsSkipIfBeforeFuncErrors( + func(jobID uuid.UUID, jobName string) error { + for _, e := range jb.beforeJobRunsSkipIfBeforeFuncErrors { + if err := e(jobID, jobName); err != nil { + return err + } + } + return nil + }), + ) + } + if len(listens) > 0 { + opts = append(opts, gocron.WithEventListeners(listens...)) + } + return opts +} + +func newAtTimes(atTimes []AtTime) gocron.AtTimes { + if len(atTimes) == 0 { + return nil + } + if len(atTimes) == 1 { + at := gocron.NewAtTime(atTimes[0].hours, atTimes[0].minutes, atTimes[0].seconds) + return gocron.NewAtTimes(at) + } + var gocronAtTimes []gocron.AtTime + for _, at := range atTimes { + gocronAtTimes = append(gocronAtTimes, gocron.NewAtTime(at.hours, at.minutes, at.seconds)) + } + return gocron.NewAtTimes( + gocronAtTimes[0], + gocronAtTimes[1:]..., + ) +} + +func newWeekdays(weekdays []time.Weekday) gocron.Weekdays { + if len(weekdays) == 0 { + return nil + } + if len(weekdays) == 1 { + return gocron.NewWeekdays(weekdays[0]) + } + return gocron.NewWeekdays(weekdays[0], weekdays[1:]...) +} + +func newDaysOfTheMonth(days []int) gocron.DaysOfTheMonth { + if len(days) == 0 { + return nil + } + if len(days) == 1 { + return gocron.NewDaysOfTheMonth(days[0]) + } + return gocron.NewDaysOfTheMonth(days[0], days[1:]...) +} diff --git a/pkg/scheduler/meta.go b/pkg/scheduler/meta.go new file mode 100644 index 000000000..99d01d907 --- /dev/null +++ b/pkg/scheduler/meta.go @@ -0,0 +1,97 @@ +package scheduler + +import ( + "maps" + "time" + + "github.com/OpenListTeam/OpenList/v4/pkg/generic_sync" + "github.com/go-co-op/gocron/v2" + "github.com/google/uuid" +) + +// JobLabels is the type for job labels. +type JobLabels = map[string]string + +func newSafeMap[K comparable, V any]() *generic_sync.MapOf[K, V] { + return new(generic_sync.MapOf[K, V]) +} + +// OpJob represents an operational job with its metadata. +type OpJob struct { + id uuid.UUID + name string + labels JobLabels + disabled bool + _rawJob gocron.Job +} + +// ID returns the UUID of the job. +func (o *OpJob) ID() uuid.UUID { + return o.id +} + +// Name returns the name of the job. +func (o *OpJob) Name() string { + return o.name +} + +// Labels returns the labels of the job. +func (o *OpJob) Labels() JobLabels { + return o.labels +} + +// Label retrieves the value of a specific label by its key. +func (o *OpJob) Label(key string) (string, bool) { + value, exists := o.labels[key] + return value, exists +} + +// Disabled indicates whether the job is disabled. +func (o *OpJob) Disabled() bool { + return o.disabled +} + +// LastRun returns the last run time of the job. +func (o *OpJob) LastRun() (time.Time, error) { + return o._rawJob.LastRun() +} + +// NextRun returns the next run time of the job. +func (o *OpJob) NextRun() (time.Time, error) { + return o._rawJob.NextRun() +} + +// GetNextRuns returns the next n run times of the job. +func (o *OpJob) GetNextRuns(n int) ([]time.Time, error) { + return o._rawJob.NextRuns(n) +} + +// newOpJob creates a new OpJob instance from a gocron.Job and its disabled status. +func newOpJob(job gocron.Job, disabled bool) *OpJob { + labels := tags2JobLabels(job.Tags()) + labelsCopy := make(JobLabels) + maps.Copy(labelsCopy, labels) + return &OpJob{ + id: job.ID(), + name: job.Name(), + labels: labelsCopy, + disabled: disabled, + _rawJob: job, + } +} + +type AtTime struct { + hours, minutes, seconds uint +} + +// NewAtTime constructs a new AtTime instance. +func NewAtTime(hours, minutes, seconds uint) AtTime { + return AtTime{ + hours: hours, + minutes: minutes, + seconds: seconds, + } +} + +// AtTimes defines a list of AtTime +type AtTimes []AtTime diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 000000000..e45e72597 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,313 @@ +// Package scheduler provides a job scheduling system using gocron. +package scheduler + +import ( + "context" + "errors" + + "github.com/OpenListTeam/OpenList/v4/pkg/generic_sync" + "github.com/go-co-op/gocron/v2" + "github.com/google/uuid" +) + +// jobsMapType is a thread-safe map for storing jobs. +type jobsMapType = *generic_sync.MapOf[uuid.UUID, gocron.Job] + +// jobDisabledMapType is a thread-safe map for storing boolean values. +type jobDisabledMapType = *generic_sync.MapOf[uuid.UUID, any] + +// OpScheduler is the main scheduler struct that manages jobs. +type OpScheduler struct { + // Name is an optional human-readable identifier for this scheduler instance. + // Callers can use it for logging, metrics, or debugging when working with + // multiple OpScheduler instances. + Name string + scheduler gocron.Scheduler + jobsMap jobsMapType + jobDisabledMap jobDisabledMapType +} + +// NewOpScheduler creates a new OpScheduler instance. +func NewOpScheduler(name string, opts ...gocron.SchedulerOption) (*OpScheduler, error) { + scheduler, err := gocron.NewScheduler(opts...) + if err != nil { + return nil, err + } + return &OpScheduler{ + scheduler: scheduler, + Name: name, + jobDisabledMap: newSafeMap[uuid.UUID, any](), + jobsMap: newSafeMap[uuid.UUID, gocron.Job](), + }, nil +} + +// RunNow runs a job immediately by its UUID if the job is enabled. +func (o *OpScheduler) RunNow(jobUUID uuid.UUID) error { + job, exists := o._internalGetCronJob(jobUUID) + if !exists { + return errors.New("job not found: " + jobUUID.String()) + } + if o.jobIsDisabled(jobUUID) { + // job is disabled, do not run + return nil + } + return job.RunNow() +} + +// jobIsDisabled checks if a job is disabled. +func (o *OpScheduler) jobIsDisabled(jobUUID uuid.UUID) bool { + return o.jobDisabledMap.Has(jobUUID) +} + +// buildJobTask builds a gocron.Task with the provided parameters. +func (o *OpScheduler) buildJobTask( + jd *jobDefine, +) (gocron.Task, error) { + // check runner as function and NumIn is match params length + _task := gocron.NewTask( + func(ctx context.Context) error { + if o.jobIsDisabled(jd.id) { + return nil + } + return jd.taskExecutor.Execute(ctx) + }, + ) + return _task, nil +} + +// NewJobByBuilder creates and schedules a new job by builder +func (o *OpScheduler) NewJob(jBuilder *jobBuilder) (*OpJob, error) { + jd, err := jBuilder.Build() + if err != nil { + return nil, err + } + task, err := o.buildJobTask(jd) + if err != nil { + return nil, err + } + if jd.disabled { + o.jobDisabledMap.Store(jd.id, struct{}{}) + } + job, err := o.scheduler.NewJob( + jd.cron, + task, + jd.opts..., + ) + if err != nil { + // remove the disabled status if job creation failed + if jd.disabled { + o.jobDisabledMap.Delete(jd.id) + } + return nil, err + } + o.jobsMap.Store(jd.id, job) + return newOpJob(job, jd.disabled), nil +} + +// UpdateJob updates an existing job by its UUID using a job builder. +func (o *OpScheduler) UpdateJob( + jobUUID uuid.UUID, + jb *jobBuilder, +) error { + // Stop and remove the existing job + if exists := o.Exists(jobUUID); !exists { + return errors.New("job not found: " + jobUUID.String()) + } + // update the ID of jobBuilder to ensure consistency + jb.ID(jobUUID) + jd, err := jb.Build() + if err != nil { + return err + } + task, err := o.buildJobTask(jd) + if err != nil { + return err + } + // Update disabled status + rawDisabled := o.jobIsDisabled(jobUUID) + if rawDisabled != jd.disabled { + if jd.disabled { + o.jobDisabledMap.Store(jobUUID, struct{}{}) + } else { + o.jobDisabledMap.Delete(jobUUID) + } + } + job, err := o.scheduler.Update( + jobUUID, jd.cron, task, + jd.opts..., + ) + if err != nil { + // rollback disabled status if update failed + if jd.disabled != rawDisabled { + if rawDisabled { + o.jobDisabledMap.Store(jobUUID, struct{}{}) + } else { + o.jobDisabledMap.Delete(jobUUID) + } + } + return err + } + // Save job + o.jobsMap.Store(jobUUID, job) + return nil +} + +// Exists checks whether a job with the given UUID is registered in the scheduler. +func (o *OpScheduler) Exists(uuid uuid.UUID) bool { + _, exists := o._internalGetCronJob(uuid) + return exists +} + +// _internalGetCronJob retrieves a gocron.Job by its UUID. +func (o *OpScheduler) _internalGetCronJob(jobUUID uuid.UUID) (gocron.Job, bool) { + return o.jobsMap.Load(jobUUID) +} + +// GetJob retrieves a job by its UUID. +func (o *OpScheduler) GetJob(jobUUID uuid.UUID) (*OpJob, bool) { + job, exists := o._internalGetCronJob(jobUUID) + if !exists { + return nil, false + } + return newOpJob(job, o.jobIsDisabled(jobUUID)), true +} + +// GetJobsByLabels retrieves jobs that have all of the provided labels. +func (o *OpScheduler) GetJobsByLabels(labels JobLabels) []*OpJob { + var result []*OpJob + o.filterLabels(labels, func(j gocron.Job, jobLabels JobLabels) { + result = append(result, newOpJob(j, o.jobIsDisabled(j.ID()))) + }) + return result +} + +// EnableJob enables a job by its UUID. +func (o *OpScheduler) EnableJob(jobUUID uuid.UUID) error { + if !o.Exists(jobUUID) { + return errors.New("job not found: " + jobUUID.String()) + } + o.jobDisabledMap.Delete(jobUUID) + return nil +} + +// DisableJob disables a job by its UUID. +func (o *OpScheduler) DisableJob(jobUUID uuid.UUID) error { + if !o.Exists(jobUUID) { + return errors.New("job not found: " + jobUUID.String()) + } + o.jobDisabledMap.Store(jobUUID, struct{}{}) + return nil +} + +// RemoveJobs removes jobs by their UUIDs. +func (o *OpScheduler) RemoveJobs(waitForRemoveJobUUIDs ...uuid.UUID) error { + if len(waitForRemoveJobUUIDs) == 0 { + return nil + } + var errs []error + // try to remove jobs one by one + for _, jobID := range waitForRemoveJobUUIDs { + err := o.scheduler.RemoveJob(jobID) + if err != nil { + errs = append(errs, err) + continue + } + // Remove from jobsMap + o.jobsMap.Delete(jobID) + // Remove from disabled map + o.jobDisabledMap.Delete(jobID) + } + if len(errs) > 0 { + existsJobIDs := make(map[uuid.UUID]bool) + for _, item := range o.scheduler.Jobs() { + existsJobIDs[item.ID()] = true + } + // if job removal failed, check if job not exists in scheduler, but still in internal maps + for _, jobID := range waitForRemoveJobUUIDs { + if _, exists := existsJobIDs[jobID]; exists { + continue + } + // if job removal failed, but job not exists in scheduler, remove from internal maps + o.jobsMap.Delete(jobID) + o.jobDisabledMap.Delete(jobID) + } + return errors.Join(errs...) + } + return nil +} + +// filterLabels filters jobs in the jobsMap based on the provided labels and applies the action function to matching jobs. +func (o *OpScheduler) filterLabels( + labels JobLabels, + action func(gocron.Job, JobLabels), +) { + var loopFunc = func(_ uuid.UUID, job gocron.Job) bool { + jobLabels := tags2JobLabels(job.Tags()) + matches := true + for k, v := range labels { + if jobVal, exists := jobLabels[k]; !exists || jobVal != v { + matches = false + break + } + } + if matches { + action(job, jobLabels) + } + return true + } + o.jobsMap.Range(loopFunc) +} + +// RemoveJobByLabels removes all jobs that have all of the provided labels. +func (o *OpScheduler) RemoveJobByLabels(labels JobLabels) error { + if len(labels) == 0 { + return nil + } + needRemovedJobsUUID := make([]uuid.UUID, 0) + o.filterLabels( + labels, + func(j gocron.Job, jobLabels JobLabels) { + needRemovedJobsUUID = append(needRemovedJobsUUID, j.ID()) + }, + ) + if len(needRemovedJobsUUID) > 0 { + return o.RemoveJobs(needRemovedJobsUUID...) + } + return nil +} + +// Start starts the scheduler. +func (o *OpScheduler) Start() { + o.scheduler.Start() +} + +// Close is an alias for Shutdown. +func (o *OpScheduler) Close() error { + return o.Shutdown() +} + +// Shutdown stops the scheduler. +func (o *OpScheduler) Shutdown() error { + return o.scheduler.Shutdown() +} + +// StopAllJobs stops all jobs in the scheduler. +func (o *OpScheduler) StopAllJobs() error { + return o.scheduler.StopJobs() +} + +// RemoveAllJobs removes all jobs from the scheduler. +func (o *OpScheduler) RemoveAllJobs() error { + var errs []error + // First, stop all running jobs. + if err := o.scheduler.StopJobs(); err != nil { + errs = append(errs, err) + } + // Only clear the internal maps if the scheduler successfully removed all jobs. + if len(errs) == 0 { + o.jobDisabledMap.Clear() + o.jobsMap.Clear() + return nil + } + return errors.Join(errs...) +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go new file mode 100644 index 000000000..e823d799f --- /dev/null +++ b/pkg/scheduler/scheduler_test.go @@ -0,0 +1,881 @@ +package scheduler + +import ( + "context" + "errors" + "reflect" + "testing" + "time" + + "github.com/go-co-op/gocron/v2" + "github.com/google/uuid" +) + +const ( + fastInterval = 50 * time.Millisecond + shortWait = 300 * time.Millisecond + defaultTimeout = 10 * time.Second +) + +var doNothingRunner = func(ctx context.Context) error { return nil } + +type paramTask struct { + Runner func(ctx context.Context, params ...any) error + Params []any +} + +func (pt *paramTask) Execute(ctx context.Context) error { + return pt.Runner(ctx, pt.Params...) +} + +// jobRunner defines the expected function signature for job runners. +// +// Implementations must be functions that accept a context.Context as the first +// parameter, followed by zero or more additional parameters, and return an error. +// +// A canonical example is: +// +// func(ctx context.Context, args ...any) error +// +// While jobRunner is typed as any for flexibility, callers are expected to +// adhere to this function shape. +type jobRunner any + +type anyParamTask struct { + Runner jobRunner + Params []any +} + +func (apt *anyParamTask) Execute(ctx context.Context) error { + f := reflect.ValueOf(apt.Runner) + if f.IsZero() { + return errors.New("without runner define") + } + // check if f is a function + if f.Kind() != reflect.Func { + return errors.New("runner is not a function") + } + if len(apt.Params)+1 != f.Type().NumIn() { + return errors.New("params count not match runner func") + } + // check that the function returns exactly 1 value + if f.Type().NumOut() != 1 { + return errors.New("runner func return values more than 1") + } + // validate the return type + outType := f.Type().Out(0) + if !outType.Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return errors.New("runner func return value is not error type") + } + in := make([]reflect.Value, len(apt.Params)+1) + in[0] = reflect.ValueOf(ctx) + for k, param := range apt.Params { + in[k+1] = reflect.ValueOf(param) + } + returnValues := f.Call(in) + if returnValues[0].IsNil() { + return nil + } + if err, ok := returnValues[0].Interface().(error); ok { + return err + } + return errors.New("runner func return value is not error type") +} + +// TestGoCron sanity-checks direct gocron usage with immediate execution. +func TestGoCron(t *testing.T) { + s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Shutdown() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + arg0 := 0 + arg1 := "arg1" + executeCalled := make(chan bool, 1) + job, err := s.NewJob( + gocron.DurationJob(fastInterval), + gocron.NewTask( + func(ctx context.Context, arg0 int, arg1 string) error { + t.Logf("task is running with args: %d, %s", arg0, arg1) + executeCalled <- true + return nil + }, + arg0, arg1, + ), + gocron.WithContext(ctx), + ) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + t.Logf("job ID: %d", job.ID()) + err = job.RunNow() + if err != nil { + t.Fatalf("failed to run job now: %v", err) + } + select { + case <-executeCalled: + t.Log("job executed successfully") + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("job did not execute within the expected time") + } else if ctx.Err() != nil { + t.Fatalf("context error: %v", ctx.Err()) + } + } +} + +// TestSchedulerNormal verifies a normal job runs with provided params and labels. +func TestSchedulerNormal(t *testing.T) { + t.Log("start test") + t.Logf("Localtime: %v", time.Local) + s, err := NewOpScheduler("test-scheduler", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + labels := JobLabels{ + "env": "test", + "team": "devops", + } + arg0 := 0 + arg1 := "arg1" + // store task status + executed := make(chan bool, 1) + runner := func(ctx context.Context, _arg0 int, _arg1 string) error { + t.Log("task is running") + if _arg0 != arg0 { + t.Fatalf("expected _arg0 to be %d, got %v", arg0, _arg0) + } + if _arg1 != arg1 { + t.Fatalf("expected _arg1 to be %q, got %v", arg1, _arg1) + } + executed <- true + t.Log("task done") + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + t.Log("registry Job") + afterCreated, err := s.NewJob( + NewJobBuilder(). + Ctx(ctx). + ByDuration(fastInterval). + Name("test-job"). + Labels(labels). + TaskExecutor(&anyParamTask{ + Runner: runner, + Params: []any{arg0, arg1}, + }), + ) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + t.Log("check the job exists") + job, exists := s.GetJob(afterCreated.ID()) + if !exists { + t.Fatalf("job not found after creation") + } + t.Log("check the job name") + if job.Name() != "test-job" { + t.Fatalf("expected job name to be %q, got %q", "test-job", job.Name()) + } + t.Log("check the labels") + jobLabels := job.Labels() + if len(jobLabels) != len(labels) { + t.Fatalf("expected %d labels, got %d", len(labels), len(jobLabels)) + } + for k, v := range labels { + if jobLabels[k] != v { + t.Fatalf("expected label %q to be %q, got %q", k, v, jobLabels[k]) + } + } + t.Log("wait for job execution") + select { + case <-executed: + t.Log("job executed successfully") + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("job did not execute within the expected time") + } else if ctx.Err() != nil { + t.Fatalf("context error: %v", ctx.Err()) + } + } +} + +// TestDisabledJob ensures a job created disabled does not execute. +func TestDisabledJob(t *testing.T) { + t.Log("start test for disabled job") + s, err := NewOpScheduler("test-scheduler-disabled", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + labels := JobLabels{ + "env": "test", + "team": "devops", + } + chanCount := make(chan int, 1) + runner := func(ctx context.Context) error { + t.Fatalf("disabled job should not run") + chanCount <- 1 + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + t.Log("register disabled job") + afterCreated, err := s.NewJob( + NewJobBuilder().Ctx(ctx). + Name("test-job"). + ByDuration(fastInterval). + Disabled(true). + Labels(labels). + TaskExecutor(NewSimpleTask(runner)), + ) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + if job, ok := s.GetJob(afterCreated.ID()); !ok { + t.Fatalf("expected disabled job to exist after creation") + } else if !job.Disabled() { + t.Fatalf("expected job %s to be disabled", job.ID()) + } + // runNow + t.Log("attempt to run disabled job immediately") + err = s.RunNow(afterCreated.ID()) + if err != nil { + t.Fatalf("failed to run disabled job now: %v", err) + } + // check the channel to see if the job ran + select { + case count := <-chanCount: + t.Fatalf("disabled job ran unexpectedly, count: %d", count) + case <-time.After(shortWait): + t.Log("disabled job did not run as expected") + } + t.Log("test complete for disabled job") +} + +// TestEnableJob ensures enabling a disabled job allows execution. +func TestEnableJob(t *testing.T) { + t.Log("start test for enable job") + s, err := NewOpScheduler("test-scheduler-disabled", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + labels := JobLabels{ + "env": "test", + "team": "devops", + } + chanCount := make(chan int, 1) + runner := func(ctx context.Context) error { + t.Log("job has run") + chanCount <- 1 + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + t.Log("register disabled job") + afterCreated, err := s.NewJob( + NewJobBuilder().Ctx(ctx). + Name("test-job"). + ByDuration(fastInterval). + Disabled(true). + Labels(labels). + TaskExecutor(NewSimpleTask(runner)), + ) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + // enabled + err = s.EnableJob(afterCreated.ID()) + if err != nil { + t.Fatalf("enable job fail %v", err) + } + if job, ok := s.GetJob(afterCreated.ID()); !ok { + t.Fatalf("expected job to exist after enable") + } else if job.Disabled() { + t.Fatalf("job %s should be enabled after EnableJob", job.ID()) + } + // check the channel to see if the job ran + select { + case count := <-chanCount: + t.Logf("success run, count: %d", count) + case <-time.After(defaultTimeout): + t.Fatalf("enabled job did not run as expected") + } + t.Log("test complete for enable job") +} + +// TestRemoveJob ensures removing a job deletes it and prevents execution. +func TestRemoveJob(t *testing.T) { + t.Log("start test remove job") + s, err := NewOpScheduler("test-scheduler-disabled", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + labels := JobLabels{ + "env": "test", + "team": "devops", + } + chanCount := make(chan int, 1) + runner := func(ctx context.Context) error { + chanCount <- 1 + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + t.Log("register disabled job") + afterCreated, err := s.NewJob( + NewJobBuilder(). + Ctx(ctx). + Name("test-job"). + ByDuration(time.Hour). + Labels(labels). + TaskExecutor(NewSimpleTask(runner)), + ) + // avoid blocking if the channel already has a value + select { + case chanCount <- 0: + default: + } + err = s.RemoveJobs(afterCreated.ID()) + if err != nil { + t.Fatalf("remove job %s err: %v", afterCreated.ID(), err) + } + j, exists := s.GetJob(afterCreated.ID()) + if exists || j != nil { + t.Fatalf("job %s exists after removed", afterCreated.ID()) + } + // check the channel to see if the job ran + select { + case count := <-chanCount: + if count > 0 { + t.Fatalf("removed job ran unexpectedly, count: %d", count) + } + case <-time.After(defaultTimeout): + t.Log("removed job did not run as expected") + } + t.Log("test complete for removed job") + +} + +// TestDisableJobMethod ensures DisableJob marks an existing job disabled and prevents RunNow(false) from executing it. +func TestDisableJobMethod(t *testing.T) { + s, err := NewOpScheduler("test-disable-job", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + labels := JobLabels{"env": "test"} + executed := make(chan bool, 1) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + job, err := s.NewJob( + NewJobBuilder(). + Ctx(ctx). + Name("test-job"). + ByDuration(time.Hour). + Labels(labels). + TaskExecutor(NewSimpleTask(func(ctx context.Context) error { + executed <- true + return nil + })), + ) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + if err := s.DisableJob(job.ID()); err != nil { + t.Fatalf("disable job failed: %v", err) + } + updated, ok := s.GetJob(job.ID()) + if !ok || !updated.Disabled() { + t.Fatalf("expected job disabled") + } + if err := s.RunNow(job.ID()); err != nil { + t.Fatalf("run now failed for %s: %v", job.ID(), err) + } + select { + case <-executed: + t.Fatalf("disabled job should not run") + case <-time.After(shortWait): + } +} + +// TestUpdateJobLabelsAndEnable ensures UpdateJob toggles disabled->enabled and updates labels. +func TestUpdateJobLabelsAndEnable(t *testing.T) { + s, err := NewOpScheduler("test-update-toggle", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + initialLabels := JobLabels{"env": "old", "team": "ops"} + job, err := s.NewJob( + NewJobBuilder(). + Ctx(ctx). + Name("update-job"). + ByDuration(time.Hour). + Labels(initialLabels). + Disabled(true). + TaskExecutor( + NewSimpleTask(doNothingRunner), + ), + ) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + updatedLabels := JobLabels{"env": "new", "team": "dev"} + executed := make(chan bool, 1) + if err := s.UpdateJob( + job.ID(), + NewJobBuilder(). + Ctx(ctx). + Name("update-job-new"). + ByDuration(fastInterval). + Labels(updatedLabels). + TaskExecutor(NewSimpleTask( + func(ctx context.Context) error { + executed <- true + return nil + }), + ), + ); err != nil { + t.Fatalf("update failed: %v", err) + } + updated, ok := s.GetJob(job.ID()) + if !ok { + t.Fatalf("job not found after update") + } + if updated.Disabled() { + t.Fatalf("job should be enabled after update") + } + if updated.Name() != "update-job-new" { + t.Fatalf("unexpected name after update: %s", updated.Name()) + } + labels := updated.Labels() + if labels["env"] != "new" || labels["team"] != "dev" { + t.Fatalf("labels not updated: %+v", labels) + } + select { + case <-executed: + case <-time.After(defaultTimeout): + t.Fatalf("updated job did not run") + } +} + +// TestRemoveJobsLeavesOthers removes one job while keeping another running. +func TestRemoveJobsLeavesOthers(t *testing.T) { + s, err := NewOpScheduler("test-remove-jobs", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + // channels to track which jobs ran + keepRan := make(chan bool, 1) + removeRan := make(chan bool, 1) + jobRemove, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("remove-me").ByDuration(fastInterval).Label("env", "remove"). + TaskExecutor(NewSimpleTask( + func(ctx context.Context) error { + removeRan <- true + return nil + }, + )), + ) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + jobKeep, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("keep-me").ByDuration(fastInterval).Label("env", "keep"). + TaskExecutor(NewSimpleTask( + func(ctx context.Context) error { + keepRan <- true + return nil + }, + )), + ) + if err != nil { + t.Fatalf("failed to create keep job: %v", err) + } + if err := s.RemoveJobs(jobRemove.ID()); err != nil { + t.Fatalf("remove jobs failed for %s: %v", jobRemove.ID(), err) + } + // reset channels + removeRan = make(chan bool, 1) + keepRan = make(chan bool, 1) + // verify removed job does not exist and kept job does + if _, ok := s.GetJob(jobRemove.ID()); ok { + t.Fatalf("removed job still exists: %s", jobRemove.ID()) + } + if keepJob, ok := s.GetJob(jobKeep.ID()); !ok { + t.Fatalf("kept job missing: %s", jobKeep.ID()) + } else if keepJob.Labels()["env"] != "keep" { + t.Fatalf("kept job label mismatch: got %q want %q", keepJob.Labels()["env"], "keep") + } + select { + case <-removeRan: + t.Fatalf("removed job executed") + case <-time.After(shortWait): + } + select { + case <-keepRan: + case <-time.After(defaultTimeout): + t.Fatalf("kept job did not execute") + } + // ensure keep job still exists + if _, ok := s.GetJob(jobKeep.ID()); !ok { + t.Fatalf("kept job missing: %s", jobKeep.ID()) + } +} + +// TestRemoveJobByLabels removes all jobs matching specific labels while keeping others. +func TestRemoveJobByLabels(t *testing.T) { + s, err := NewOpScheduler("test-remove-by-label", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + labelsDev := JobLabels{"env": "dev"} + labelsProd := JobLabels{"env": "prod"} + + _, err = s.NewJob( + NewJobBuilder().Ctx(ctx).Name("dev-1"). + ByDuration(time.Hour).Labels(labelsDev). + TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create dev-1: %v", err) + } + devTwo, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("dev-2").ByDuration(time.Hour).Labels(labelsDev).TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create dev-2: %v", err) + } + prod, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("prod-1").ByDuration(time.Hour).Labels(labelsProd).TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create prod: %v", err) + } + if err := s.RemoveJobByLabels(labelsDev); err != nil { + t.Fatalf("remove by labels failed for %v: %v", labelsDev, err) + } + if _, ok := s.GetJob(devTwo.ID()); ok { + t.Fatalf("dev job still exists after removal: %s labels=%v", devTwo.ID(), labelsDev) + } + if _, ok := s.GetJob(prod.ID()); !ok { + t.Fatalf("prod job should remain: %s labels=%v", prod.ID(), labelsProd) + } +} + +// TestGetJobsByLabelsFilters verifies label-based filtering returns matching jobs only. +func TestGetJobsByLabelsFilters(t *testing.T) { + s, err := NewOpScheduler("test-get-by-labels", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + labelsA := JobLabels{"env": "dev", "team": "a"} + labelsB := JobLabels{"env": "dev", "team": "b"} + labelsC := JobLabels{"env": "prod", "team": "a"} + jobA, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("job-a").ByDuration(time.Hour).Labels(labelsA).TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create job-a: %v", err) + } + jobB, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("job-b").ByDuration(time.Hour).Labels(labelsB).TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create job-b: %v", err) + } + _, err = s.NewJob( + NewJobBuilder().Ctx(ctx).Name("job-c").ByDuration(time.Hour).Labels(labelsC).TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create job-c: %v", err) + } + devJobs := s.GetJobsByLabels(JobLabels{"env": "dev"}) + if len(devJobs) != 2 { + t.Fatalf("expected 2 dev jobs for env=dev, got %d", len(devJobs)) + } + var seenA, seenB bool + for _, j := range devJobs { + if j.ID() == jobA.ID() { + seenA = true + } + if j.ID() == jobB.ID() { + seenB = true + } + } + if !seenA || !seenB { + t.Fatalf("missing dev jobs: seenA=%v seenB=%v", seenA, seenB) + } +} + +// TestRemoveAllJobs clears all jobs and verifies none remain runnable. +func TestRemoveAllJobs(t *testing.T) { + s, err := NewOpScheduler("test-remove-all", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + labels := JobLabels{"env": "test"} + job1, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("job-1").ByDuration(time.Hour).Labels(labels).TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create job1: %v", err) + } + job2, err := s.NewJob( + NewJobBuilder().Ctx(ctx).Name("job-2").ByDuration(time.Hour).Labels(labels).TaskExecutor(NewSimpleTask(doNothingRunner)), + ) + if err != nil { + t.Fatalf("failed to create job2: %v", err) + } + if err := s.RemoveAllJobs(); err != nil { + t.Fatalf("remove all jobs failed: %v", err) + } + if _, ok := s.GetJob(job1.ID()); ok { + t.Fatalf("job1 still exists after remove all: %s", job1.ID()) + } + if _, ok := s.GetJob(job2.ID()); ok { + t.Fatalf("job2 still exists after remove all: %s", job2.ID()) + } + if got := s.GetJobsByLabels(JobLabels{"env": "test"}); len(got) != 0 { + t.Fatalf("expected no jobs after remove all, got %d", len(got)) + } + if err := s.RunNow(job1.ID()); err == nil { + t.Fatalf("expected error running removed job: %s", job1.ID()) + } +} + +// TestBeforeJobRuns is a placeholder for future tests of pre-run hooks. +func TestBeforeJobRuns(t *testing.T) { + s, err := NewOpScheduler("test-before-job-runs", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + beforeRunChan := make(chan bool, 1) + jb := NewJobBuilder(). + Ctx(context.Background()). + Name("before-job"). + ByDuration(fastInterval). + TaskExecutor(NewSimpleTask(doNothingRunner)). + BeforeJobRuns(func(jobID uuid.UUID, jobName string) { + beforeRunChan <- true + }) + _, err = s.NewJob(jb) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + select { + case <-beforeRunChan: + t.Logf("success") + case <-time.After(defaultTimeout): + t.Fatalf("before job run hook was not called within expected time") + } +} + +// TestBeforeJobRunsSkipIfBeforeFuncErrorsSkip +func TestBeforeJobRunsSkipIfBeforeFuncErrorsSkip(t *testing.T) { + s, err := NewOpScheduler("test-before-job-runs-error", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + beforeRunChan := make(chan bool, 1) + executeChan := make(chan bool, 1) + jb := NewJobBuilder(). + Ctx(context.Background()). + Name("before-job-error"). + ByDuration(fastInterval). + TaskExecutor(NewSimpleTask(func(ctx context.Context) error { + executeChan <- true + return nil + })). + BeforeJobRunsSkipIfBeforeFuncErrors(func(jobID uuid.UUID, jobName string) error { + beforeRunChan <- true + return errors.New("skip execution") + }) + _, err = s.NewJob(jb) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + select { + case <-beforeRunChan: + t.Logf("before run hook called") + case <-time.After(defaultTimeout): + t.Fatalf("before job run hook was not called within expected time") + } + select { + case <-executeChan: + t.Fatalf("job execution should have been skipped due to before hook error") + case <-time.After(shortWait): + t.Logf("job execution correctly skipped") + } +} + +// TestBeforeJobRunsSkipIfBeforeFuncErrorsNotSkip +func TestBeforeJobRunsSkipIfBeforeFuncErrorsNotSkip(t *testing.T) { + s, err := NewOpScheduler("test-before-job-runs-error", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + beforeRunChan := make(chan bool, 1) + executeChan := make(chan bool, 1) + jb := NewJobBuilder(). + Ctx(context.Background()). + Name("before-job-error"). + ByDuration(fastInterval). + TaskExecutor(NewSimpleTask(func(ctx context.Context) error { + executeChan <- true + return nil + })). + BeforeJobRunsSkipIfBeforeFuncErrors(func(jobID uuid.UUID, jobName string) error { + beforeRunChan <- true + return nil + }) + _, err = s.NewJob(jb) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + select { + case <-beforeRunChan: + t.Logf("before run hook called") + case <-time.After(defaultTimeout): + t.Fatalf("before job run hook was not called within expected time") + } + select { + case <-executeChan: + t.Logf("job executed successfully") + case <-time.After(defaultTimeout): + t.Fatalf("job execution did not occur as expected") + } +} + +// TestAfterJobRuns is a placeholder for future tests of post-run hooks. +func TestAfterJobRuns(t *testing.T) { + s, err := NewOpScheduler("test-after-job-runs", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + afterRunChan := make(chan bool, 1) + jb := NewJobBuilder(). + Ctx(context.Background()). + Name("after-job"). + ByDuration(fastInterval). + TaskExecutor(NewSimpleTask(doNothingRunner)). + AfterJobRuns(func(jobID uuid.UUID, jobName string) { + afterRunChan <- true + }) + _, err = s.NewJob(jb) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + select { + case <-afterRunChan: + t.Logf("success") + case <-time.After(defaultTimeout): + t.Fatalf("after job run hook was not called within expected time") + } +} + +// TestAfterJobRunsWithError is a placeholder for future tests of post-run hooks with error. +func TestAfterJobRunsWithError(t *testing.T) { + s, err := NewOpScheduler("test-after-job-runs-error", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + afterRunChan := make(chan bool, 1) + jb := NewJobBuilder(). + Ctx(context.Background()). + Name("after-job-error"). + ByDuration(fastInterval). + TaskExecutor(NewSimpleTask(func(ctx context.Context) error { + return errors.New("intentional error") + })). + AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, runErr error) { + if runErr != nil && runErr.Error() == "intentional error" { + afterRunChan <- true + } + }) + _, err = s.NewJob(jb) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + select { + case <-afterRunChan: + t.Logf("success") + case <-time.After(defaultTimeout): + t.Fatalf("after job run with error hook was not called within expected time") + } +} + +// AfterJobRunsWithPanic is a placeholder for future tests of post-run hooks with panic. +func TestAfterJobRunsWithPanic(t *testing.T) { + s, err := NewOpScheduler("test-after-job-runs-panic", gocron.WithLocation(time.Local)) + if err != nil { + t.Fatalf("failed to create scheduler: %v", err) + } + s.Start() + defer s.Close() + afterRunChan := make(chan bool, 1) + jb := NewJobBuilder(). + Ctx(context.Background()). + Name("after-job-panic"). + ByDuration(fastInterval). + TaskExecutor(NewSimpleTask(func(ctx context.Context) error { + panic("intentional panic") + })). + AfterJobRunsWithPanic(func(jobID uuid.UUID, jobName string, panicErr interface{}) { + if panicErr != nil && panicErr == "intentional panic" { + afterRunChan <- true + } + }) + _, err = s.NewJob(jb) + if err != nil { + t.Fatalf("failed to create job: %v", err) + } + select { + case <-afterRunChan: + t.Logf("success") + case <-time.After(defaultTimeout): + t.Fatalf("after job run with panic hook was not called within expected time") + } +} diff --git a/pkg/scheduler/util.go b/pkg/scheduler/util.go new file mode 100644 index 000000000..40f9dc72c --- /dev/null +++ b/pkg/scheduler/util.go @@ -0,0 +1,77 @@ +package scheduler + +import ( + "strings" +) + +// escapeTagStr escapes backslashes and colons in a string. +func escapeTagStr(s string) string { + s = strings.ReplaceAll(s, "\\", "\\\\") + s = strings.ReplaceAll(s, ":", "\\:") + return s +} + +// unescapeTagStr unescapes backslashes and colons in a string. +func unescapeTagStr(s string) string { + var b strings.Builder + b.Grow(len(s)) + for i := 0; i < len(s); i++ { + if s[i] == '\\' && i+1 < len(s) { + next := s[i+1] + if next == '\\' || next == ':' { + // Valid escaped sequence produced by escape(): unescape it. + b.WriteByte(next) + i++ + continue + } + } + b.WriteByte(s[i]) + } + return b.String() +} + +// jobLabels2Tags converts JobLabels to a slice of tags. +func jobLabels2Tags(labels JobLabels) []string { + tags := make([]string, 0, len(labels)) + if len(labels) == 0 { + return tags + } + for k, v := range labels { + tags = append(tags, escapeTagStr(k)+":"+escapeTagStr(v)) + } + return tags +} + +// tags2JobLabels converts a slice of tags to JobLabels. +func tags2JobLabels(tags []string) JobLabels { + labels := make(JobLabels) + if len(tags) == 0 { + return labels + } + for _, tag := range tags { + keyPart, valPart, ok := splitEscapedTag(tag) + if !ok { + continue + } + labels[unescapeTagStr(keyPart)] = unescapeTagStr(valPart) + } + return labels +} + +// splitEscapedTag splits the first unescaped colon to separate key and value. +// It expects the input to be produced by escapeTagStr. +func splitEscapedTag(tag string) (string, string, bool) { + for i := 0; i < len(tag); i++ { + if tag[i] == '\\' { + i++ // Skip the escaped character + if i >= len(tag) { + break + } + continue + } + if tag[i] == ':' { + return tag[:i], tag[i+1:], true + } + } + return "", "", false +} diff --git a/pkg/scheduler/util_test.go b/pkg/scheduler/util_test.go new file mode 100644 index 000000000..8455552dc --- /dev/null +++ b/pkg/scheduler/util_test.go @@ -0,0 +1,74 @@ +package scheduler + +import "testing" + +func TestEscapeUnescapeRoundTrip(t *testing.T) { + cases := []string{ + "", + "plain", + "has:colon", + "has\\backslash", + "both:colon\\and\\backslash", + "trailing\\", + "nested\\:colon", + "multiple::colons\\\\and\\mixed", + } + + for _, tc := range cases { + escaped := escapeTagStr(tc) + got := unescapeTagStr(escaped) + if got != tc { + t.Fatalf("round trip failed for %q: got %q", tc, got) + } + } +} + +func TestEscapeTagStrOutput(t *testing.T) { + input := "k:e\\y" + expected := "k\\:e\\\\y" + if got := escapeTagStr(input); got != expected { + t.Fatalf("escapeTagStr(%q)=%q, want %q", input, got, expected) + } +} + +func TestSplitEscapedTag(t *testing.T) { + build := func(k, v string) string { + return escapeTagStr(k) + ":" + escapeTagStr(v) + } + + cases := []struct { + name string + tag string + key string + val string + ok bool + }{ + {name: "simple", tag: build("k", "v"), key: "k", val: "v", ok: true}, + {name: "escaped colon in key", tag: build("k:e:y", "val"), key: "k:e:y", val: "val", ok: true}, + {name: "escaped colon in val", tag: build("key", "v:a:l"), key: "key", val: "v:a:l", ok: true}, + {name: "backslash in both", tag: build("k\\ey", "v\\al"), key: "k\\ey", val: "v\\al", ok: true}, + {name: "no colon", tag: "nocolon", ok: false}, + {name: "only escaped colon", tag: "key\\:part", ok: false}, + } + + for _, tc := range cases { + keyPart, valPart, ok := splitEscapedTag(tc.tag) + if ok != tc.ok { + t.Fatalf("%s: ok=%v, want %v (tag=%q)", tc.name, ok, tc.ok, tc.tag) + } + if !ok { + continue + } + if keyPart != escapeTagStr(tc.key) { + t.Fatalf("%s: key=%q, want %q", tc.name, keyPart, escapeTagStr(tc.key)) + } + if valPart != escapeTagStr(tc.val) { + t.Fatalf("%s: val=%q, want %q", tc.name, valPart, escapeTagStr(tc.val)) + } + + // Ensure unescape recovers originals. + if unescapeTagStr(keyPart) != tc.key || unescapeTagStr(valPart) != tc.val { + t.Fatalf("%s: unescape mismatch key=%q val=%q", tc.name, unescapeTagStr(keyPart), unescapeTagStr(valPart)) + } + } +}