Skip to content

Commit 6534509

Browse files
committed
feat: add asynq libary
1 parent d107004 commit 6534509

File tree

11 files changed

+1096
-0
lines changed

11 files changed

+1096
-0
lines changed

pkg/gocron/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ Scheduled task library encapsulated on [cron v3](github.com/robfig/cron).
66

77
### Example of use
88

9+
#### Local Scheduled Tasks
10+
11+
Local scheduled tasks are suitable for single-machine environments, typically used to perform periodic or delayed background jobs, such as data cleanup, log archiving, and local cache refreshing. Example usage:
12+
913
```go
1014
package main
1115

@@ -59,3 +63,11 @@ func main() {
5963
fmt.Println("running task list:", gocron.GetRunningTasks())
6064
}
6165
```
66+
67+
<br>
68+
69+
#### Distributed Scheduled Tasks
70+
71+
Distributed scheduled tasks are designed for cluster environments, ensuring coordinated task execution across multiple nodes to avoid duplicate scheduling while improving reliability and scalability. Example usage:
72+
73+
[https://github.com/go-dev-frame/sponge/tree/main/pkg/sasynq#periodic-tasks](https://github.com/go-dev-frame/sponge/tree/main/pkg/sasynq#periodic-tasks)

pkg/sasynq/README.md

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
## sasynq
2+
3+
`sasynq` is a wrapper around the excellent [asynq](https://github.com/hibiken/asynq) library. It provides a simpler and more user-friendly SDK while remaining fully compatible with native asynq usage patterns. Its main features include:
4+
5+
- Support for Redis Cluster and Sentinel for high availability and horizontal scalability.
6+
- Distributed task queues with support for priority queues, delayed queues, unique tasks (to prevent duplicate execution), and periodic task scheduling.
7+
- Built-in mechanisms for task retries (with customizable retry counts), timeouts, and deadlines.
8+
- Flexible scheduling for immediate, delayed, or specific-time execution.
9+
- Unified logging using zap.
10+
11+
`sasynq` streamlines asynchronous and distributed task processing in Go, helping you write clean and maintainable background job code quickly and safely.
12+
13+
<br>
14+
15+
## Example of use
16+
17+
### Queues
18+
19+
#### Defining Task Payloads and Handlers
20+
21+
Here’s how to define task payloads and handlers in `sasynq`:
22+
23+
```go
24+
// example/common/task.go
25+
package common
26+
27+
import (
28+
"context"
29+
"encoding/json"
30+
"fmt"
31+
32+
"github.com/hibiken/asynq"
33+
"github.com/go-dev-frame/sponge/pkg/sasynq"
34+
)
35+
36+
// ----------------------------- Definition Method 1 ----------------------------------
37+
38+
const TypeEmailSend = "email:send"
39+
40+
type EmailPayload struct {
41+
UserID int `json:"user_id"`
42+
Message string `json:"message"`
43+
}
44+
45+
func HandleEmailTask(ctx context.Context, p *EmailPayload) error {
46+
fmt.Printf("[Email] Task for UserID %d completed successfully\n", p.UserID)
47+
return nil
48+
}
49+
50+
// ----------------------------- Definition Method 2 ----------------------------------
51+
52+
const TypeSMSSend = "sms:send"
53+
54+
type SMSPayload struct {
55+
UserID int `json:"user_id"`
56+
Message string `json:"message"`
57+
}
58+
59+
func (p *SMSPayload) ProcessTask(ctx context.Context, t *asynq.Task) error {
60+
fmt.Printf("[SMS] Task for UserID %d completed successfully\n", p.UserID)
61+
return nil
62+
}
63+
64+
// ----------------------------- Definition Method 3 ----------------------------------
65+
66+
const TypeMsgNotification = "msg:notification"
67+
68+
type MsgNotificationPayload struct {
69+
UserID int `json:"user_id"`
70+
Message string `json:"message"`
71+
}
72+
73+
func HandleMsgNotificationTask(ctx context.Context, t *asynq.Task) error {
74+
var p MsgNotificationPayload
75+
if err := json.Unmarshal(t.Payload(), &p); err != nil {
76+
return fmt.Errorf("failed to unmarshal payload: %w", err)
77+
}
78+
fmt.Printf("[MSG] Task for UserID %d completed successfully\n", p.UserID)
79+
return nil
80+
}
81+
````
82+
83+
<br>
84+
85+
#### Producer Example
86+
87+
A producer enqueues tasks with various options like priority, delays, deadlines, and unique IDs.
88+
89+
```go
90+
// example/producer/main.go
91+
package main
92+
93+
import (
94+
"fmt"
95+
"time"
96+
97+
"github.com/go-dev-frame/sponge/pkg/sasynq"
98+
"example/common"
99+
)
100+
101+
func runProducer(client *sasynq.Client) error {
102+
// Immediate enqueue with critical priority
103+
userPayload1 := &common.EmailPayload{
104+
UserID: 101,
105+
Message: "This is a message that is immediately queued, with critical priority",
106+
}
107+
_, info, err := client.EnqueueNow(common.TypeEmailSend, userPayload1,
108+
sasynq.WithQueue("critical"),
109+
sasynq.WithRetry(5),
110+
)
111+
if err != nil {
112+
return err
113+
}
114+
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)
115+
116+
// Enqueue after a 5-second delay
117+
userPayload2 := &common.SMSPayload{
118+
UserID: 202,
119+
Message: "This is a message added to the queue after a 5-second delay, with default priority",
120+
}
121+
_, info, err = client.EnqueueIn(5*time.Second, common.TypeSMSSend, userPayload2,
122+
sasynq.WithQueue("default"),
123+
sasynq.WithRetry(3),
124+
)
125+
if err != nil {
126+
return err
127+
}
128+
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeSMSSend, info.ID, info.Queue)
129+
130+
// Enqueue to run at a specific time
131+
userPayload3 := &common.MsgNotificationPayload{
132+
UserID: 303,
133+
Message: "This is a message scheduled to run at a specific time, with low priority",
134+
}
135+
_, info, err = client.EnqueueAt(time.Now().Add(10*time.Second), common.TypeMsgNotification, userPayload3,
136+
sasynq.WithQueue("low"),
137+
sasynq.WithRetry(1),
138+
)
139+
if err != nil {
140+
return err
141+
}
142+
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeMsgNotification, info.ID, info.Queue)
143+
144+
// Example of using NewTask directly
145+
userPayload4 := &common.EmailPayload{
146+
UserID: 404,
147+
Message: "This is a test message, with low priority, a 15-second deadline, and a unique ID",
148+
}
149+
task, err := sasynq.NewTask(common.TypeEmailSend, userPayload4)
150+
if err != nil {
151+
return err
152+
}
153+
info, err = client.Enqueue(task,
154+
sasynq.WithQueue("low"),
155+
sasynq.WithRetry(1),
156+
sasynq.WithDeadline(time.Now().Add(15*time.Second)),
157+
sasynq.WithUniqueID("unique-id-xxxx-xxxx"),
158+
)
159+
if err != nil {
160+
return err
161+
}
162+
fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)
163+
164+
return nil
165+
}
166+
167+
func main() {
168+
cfg := sasynq.RedisConfig{
169+
Addr: "localhost:6379",
170+
}
171+
client := sasynq.NewClient(cfg)
172+
173+
err := runProducer(client)
174+
if err != nil {
175+
panic(err)
176+
}
177+
defer client.Close()
178+
179+
fmt.Println("All tasks enqueued.")
180+
}
181+
```
182+
183+
<br>
184+
185+
#### Consumer Example
186+
187+
A consumer server can register handlers in three different ways:
188+
189+
```go
190+
package main
191+
192+
import (
193+
"github.com/go-dev-frame/sponge/pkg/sasynq"
194+
"example/common"
195+
)
196+
197+
func runConsumer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
198+
serverCfg := sasynq.DefaultServerConfig() // Uses critical, default, and low queues by default
199+
srv := sasynq.NewServer(redisCfg, serverCfg)
200+
201+
// Attach logging middleware
202+
srv.Use(sasynq.LoggingMiddleware())
203+
204+
// Register task handlers (three methods available):
205+
sasynq.RegisterTaskHandler(srv.Mux(), common.TypeEmailSend, sasynq.HandleFunc(common.HandleEmailTask)) // Method 1 (recommended)
206+
srv.Register(common.TypeSMSSend, &common.SMSPayload{}) // Method 2: register struct as payload
207+
srv.RegisterFunc(common.TypeMsgNotification, common.HandleMsgNotificationTask) // Method 3: register function directly
208+
209+
srv.Run()
210+
211+
return srv, nil
212+
}
213+
214+
func main() {
215+
cfg := sasynq.RedisConfig{
216+
Addr: "localhost:6379",
217+
}
218+
srv, err := runConsumer(cfg)
219+
if err != nil {
220+
panic(err)
221+
}
222+
srv.WaitShutdown()
223+
}
224+
```
225+
226+
<br>
227+
228+
### Periodic Tasks
229+
230+
`sasynq` makes scheduling recurring tasks very simple.
231+
232+
```go
233+
package main
234+
235+
import (
236+
"context"
237+
"fmt"
238+
239+
"github.com/go-dev-frame/sponge/pkg/sasynq"
240+
)
241+
242+
const TypeScheduledGet = "scheduled:get"
243+
244+
type ScheduledGetPayload struct {
245+
URL string `json:"url"`
246+
}
247+
248+
func handleScheduledGetTask(ctx context.Context, p *ScheduledGetPayload) error {
249+
fmt.Printf("[Get] Task for URL %s completed successfully\n", p.URL)
250+
return nil
251+
}
252+
253+
// -----------------------------------------------------------------------
254+
255+
func registerSchedulerTasks(scheduler *sasynq.Scheduler) error {
256+
payload1 := &ScheduledGetPayload{URL: "https://google.com"}
257+
entryID1, err := scheduler.RegisterTask("@every 2s", TypeScheduledGet, payload1)
258+
if err != nil {
259+
return err
260+
}
261+
fmt.Printf("Registered periodic task with entry ID: %s\n", entryID1)
262+
263+
payload2 := &ScheduledGetPayload{URL: "https://bing.com"}
264+
entryID2, err := scheduler.RegisterTask("@every 3s", TypeScheduledGet, payload2)
265+
if err != nil {
266+
return err
267+
}
268+
fmt.Printf("Registered periodic task with entry ID: %s\n", entryID2)
269+
270+
scheduler.Run()
271+
272+
return nil
273+
}
274+
275+
func runServer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
276+
serverCfg := sasynq.DefaultServerConfig()
277+
srv := sasynq.NewServer(redisCfg, serverCfg)
278+
srv.Use(sasynq.LoggingMiddleware())
279+
280+
// Register handler for scheduled tasks
281+
sasynq.RegisterTaskHandler(srv.Mux(), TypeScheduledGet, sasynq.HandleFunc(handleScheduledGetTask))
282+
283+
srv.Run()
284+
285+
return srv, nil
286+
}
287+
288+
func main() {
289+
cfg := sasynq.RedisConfig{
290+
Addr: "localhost:6379",
291+
}
292+
293+
scheduler := sasynq.NewScheduler(cfg)
294+
err := registerSchedulerTasks(scheduler)
295+
if err != nil {
296+
panic(err)
297+
}
298+
299+
srv, err := runServer(cfg)
300+
if err != nil {
301+
panic(err)
302+
}
303+
srv.Shutdown()
304+
}
305+
```

pkg/sasynq/client.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package sasynq
2+
3+
import (
4+
"time"
5+
6+
"github.com/hibiken/asynq"
7+
)
8+
9+
// Client is a wrapper around asynq.Client providing more convenient APIs.
10+
type Client struct {
11+
*asynq.Client
12+
}
13+
14+
// NewClient creates a new producer client.
15+
func NewClient(cfg RedisConfig) *Client {
16+
return &Client{
17+
Client: asynq.NewClient(cfg.GetAsynqRedisConnOpt()),
18+
}
19+
}
20+
21+
// Enqueue enqueues the given task to a queue.
22+
func (c *Client) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) {
23+
return c.Client.Enqueue(task, opts...)
24+
}
25+
26+
// EnqueueNow enqueues a task for immediate processing, parameter payload should be supported json.Marshal
27+
func (c *Client) EnqueueNow(typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error) {
28+
task, err := NewTask(typeName, payload)
29+
if err != nil {
30+
return nil, nil, err
31+
}
32+
33+
info, err := c.Client.Enqueue(task, opts...)
34+
return task, info, err
35+
}
36+
37+
// EnqueueIn enqueues a task to be processed after a specified delay.
38+
func (c *Client) EnqueueIn(delay time.Duration, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error) {
39+
task, err := NewTask(typeName, payload)
40+
if err != nil {
41+
return nil, nil, err
42+
}
43+
44+
opts = append(opts, asynq.ProcessIn(delay))
45+
info, err := c.Client.Enqueue(task, opts...)
46+
return task, info, err
47+
}
48+
49+
// EnqueueAt enqueues a task to be processed at a specific time.
50+
func (c *Client) EnqueueAt(t time.Time, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error) {
51+
task, err := NewTask(typeName, payload)
52+
if err != nil {
53+
return nil, nil, err
54+
}
55+
56+
opts = append(opts, asynq.ProcessAt(t))
57+
info, err := c.Client.Enqueue(task, opts...)
58+
return task, info, err
59+
}

0 commit comments

Comments
 (0)