Skip to content

Commit a61bad0

Browse files
authored
Merge pull request #8 from simodima/implicit-queue
Removes the queue parameter from any Producer / Consumer function signature
2 parents d8e0628 + 9af1c95 commit a61bad0

File tree

13 files changed

+241
-159
lines changed

13 files changed

+241
-159
lines changed

consumer.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package squeue
22

33
import (
4-
"context"
54
"encoding/json"
65

76
"github.com/simodima/squeue/driver"
@@ -10,29 +9,34 @@ import (
109
//go:generate mockgen -source=driver/driver.go -package=squeue_test -destination=driver_test.go
1110

1211
// NewConsumer creates a new consumer for the given T type of messages
13-
func NewConsumer[T json.Unmarshaler](d driver.Driver) Consumer[T] {
14-
return Consumer[T]{d}
12+
func NewConsumer[T json.Unmarshaler](d driver.Driver, queue string) Consumer[T] {
13+
return Consumer[T]{
14+
driver: d,
15+
queue: queue,
16+
}
1517
}
1618

1719
type Consumer[T json.Unmarshaler] struct {
18-
driver driver.Driver
20+
driver driver.Driver
21+
queue string
22+
controller *driver.ConsumerController
1923
}
2024

2125
// Consume retrieves messages from the given queue.
2226
// Any provided options will be sent to the underlying driver.
2327
// The messages are indefinetely consumed from the queue and
2428
// sent to the chan Message[T].
25-
// To stop consuming messages is sufficient to cancel the context.Context
26-
func (p *Consumer[T]) Consume(ctx context.Context, queue string, opts ...func(message any)) (chan Message[T], error) {
27-
messages, err := p.driver.Consume(ctx, queue, opts...)
29+
func (p *Consumer[T]) Consume(opts ...func(message any)) (chan Message[T], error) {
30+
ctrl, err := p.driver.Consume(p.queue, opts...)
2831
if err != nil {
2932
return nil, wrapErr(err, ErrDriver, nil)
3033
}
34+
p.controller = ctrl
3135

3236
outMsg := make(chan Message[T])
3337

3438
go func() {
35-
for message := range messages {
39+
for message := range ctrl.Data() {
3640
if message.Error != nil {
3741
outMsg <- Message[T]{
3842
Error: wrapErr(message.Error, ErrDriver, nil),
@@ -61,8 +65,14 @@ func (p *Consumer[T]) Consume(ctx context.Context, queue string, opts ...func(me
6165
return outMsg, nil
6266
}
6367

68+
func (p *Consumer[T]) Stop() {
69+
if p.controller != nil {
70+
p.controller.Stop()
71+
}
72+
}
73+
6474
// Ack explicitly acknowldge the message handling.
6575
// It can be implemented as No Operation for some drivers.
66-
func (p *Consumer[T]) Ack(queue string, m Message[T]) error {
67-
return p.driver.Ack(queue, m.ID)
76+
func (p *Consumer[T]) Ack(m Message[T]) error {
77+
return p.driver.Ack(p.queue, m.ID)
6878
}

consumer_test.go

Lines changed: 91 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package squeue_test
22

33
import (
4-
"context"
54
"errors"
65
"testing"
76

@@ -31,41 +30,45 @@ func (suite *ConsumerTestSuite) TearDownTest() {
3130
}
3231

3332
func (suite *ConsumerTestSuite) TestNewConsumer() {
34-
squeue.NewConsumer[*TestMessage](suite.driver)
33+
squeue.NewConsumer[*TestMessage](suite.driver, "test-queue")
3534
}
3635

3736
func (suite *ConsumerTestSuite) TestConsumeMessages_DriverError() {
38-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
39-
ctx := context.Background()
4037
queue := "test-queue"
38+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
4139

4240
suite.driver.
4341
EXPECT().
44-
Consume(ctx, queue).
42+
Consume(queue).
4543
Return(nil, errors.New("consume error"))
4644

47-
messages, err := consumer.Consume(ctx, queue)
45+
messages, err := consumer.Consume()
4846
suite.Nil(messages)
4947
suite.Error(err)
5048
}
5149

5250
func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageWithError() {
53-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
54-
ctx := context.Background()
5551
queue := "test-queue"
56-
57-
dMessages := make(chan driver.Message)
58-
go func() {
59-
dMessages <- driver.Message{Error: errors.New("error in message")}
60-
close(dMessages)
61-
}()
52+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
6253

6354
suite.driver.
6455
EXPECT().
65-
Consume(ctx, queue).
66-
Return(dMessages, nil)
56+
Consume(queue).
57+
DoAndReturn(func(queue string, _ ...func()) (*driver.ConsumerController, error) {
58+
ctrl := driver.NewConsumerController()
59+
// Simulate catching the .Stop()
60+
go func() { <-ctrl.Done() }()
61+
62+
// Simulate sending one message and Stopping the driver
63+
go func() {
64+
ctrl.Send(driver.Message{Error: errors.New("error in message")})
65+
ctrl.Stop()
66+
}()
67+
68+
return ctrl, nil
69+
})
6770

68-
messages, err := consumer.Consume(ctx, queue)
71+
messages, err := consumer.Consume()
6972

7073
suite.NotNil(messages)
7174
suite.Nil(err)
@@ -80,26 +83,31 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageWithError() {
8083
}
8184

8285
func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageUnmarshallError() {
83-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
84-
ctx := context.Background()
8586
queue := "test-queue"
86-
87-
dMessages := make(chan driver.Message)
88-
go func() {
89-
dMessages <- driver.Message{
90-
Body: []byte("invalid json"),
91-
ID: "1111",
92-
Error: nil,
93-
}
94-
close(dMessages)
95-
}()
87+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
9688

9789
suite.driver.
9890
EXPECT().
99-
Consume(ctx, queue).
100-
Return(dMessages, nil)
101-
102-
messages, err := consumer.Consume(ctx, queue)
91+
Consume(queue).
92+
DoAndReturn(func(queue string, _ ...func()) (*driver.ConsumerController, error) {
93+
ctrl := driver.NewConsumerController()
94+
// Simulate catching the .Stop()
95+
go func() { <-ctrl.Done() }()
96+
97+
// Simulate sending one message and Stopping the driver
98+
go func() {
99+
ctrl.Send(driver.Message{
100+
Body: []byte("invalid json"),
101+
ID: "1111",
102+
Error: nil,
103+
})
104+
ctrl.Stop()
105+
}()
106+
107+
return ctrl, nil
108+
})
109+
110+
messages, err := consumer.Consume()
103111

104112
suite.NotNil(messages)
105113
suite.Nil(err)
@@ -117,37 +125,41 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_OneMessageUnmarshallError()
117125
}
118126

119127
func (suite *ConsumerTestSuite) TestConsumeMessages_RealWorldScenarioWithErrors() {
120-
consumer := squeue.NewConsumer[*TestMessage](suite.driver)
121-
ctx := context.Background()
122128
queue := "test-queue"
123-
124-
dMessages := make(chan driver.Message)
125-
go func() {
126-
dMessages <- driver.Message{
127-
Body: []byte(`{"name":"test message"}`),
128-
ID: "1111",
129-
Error: nil,
130-
}
131-
132-
dMessages <- driver.Message{
133-
Error: errors.New("wire error"),
134-
}
135-
136-
dMessages <- driver.Message{
137-
Body: []byte(`{"name":"test another message"}`),
138-
ID: "1111",
139-
Error: nil,
140-
}
141-
142-
close(dMessages)
143-
}()
129+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
144130

145131
suite.driver.
146132
EXPECT().
147-
Consume(ctx, queue).
148-
Return(dMessages, nil)
149-
150-
messages, err := consumer.Consume(ctx, queue)
133+
Consume(queue).
134+
DoAndReturn(func(queue string, _ ...func()) (*driver.ConsumerController, error) {
135+
ctrl := driver.NewConsumerController()
136+
// Simulate catching the .Stop()
137+
go func() { <-ctrl.Done() }()
138+
139+
// Simulate sending three messages and Stopping the driver
140+
go func() {
141+
ctrl.Send(driver.Message{
142+
Body: []byte(`{"name":"test message"}`),
143+
ID: "1111",
144+
Error: nil,
145+
})
146+
147+
ctrl.Send(driver.Message{
148+
Error: errors.New("wire error"),
149+
})
150+
151+
ctrl.Send(driver.Message{
152+
Body: []byte(`{"name":"test another message"}`),
153+
ID: "1111",
154+
Error: nil,
155+
})
156+
ctrl.Stop()
157+
}()
158+
159+
return ctrl, nil
160+
})
161+
162+
messages, err := consumer.Consume()
151163

152164
suite.NotNil(messages)
153165
suite.Nil(err)
@@ -173,6 +185,25 @@ func (suite *ConsumerTestSuite) TestConsumeMessages_RealWorldScenarioWithErrors(
173185
suite.Contains(m.Content.Name, "test another message")
174186
}
175187

188+
func (suite *ConsumerTestSuite) TestAckMessage_WithError() {
189+
queue := "test-queue"
190+
consumer := squeue.NewConsumer[*TestMessage](suite.driver, queue)
191+
192+
msg := squeue.Message[*TestMessage]{
193+
Content: &TestMessage{},
194+
ID: "123",
195+
}
196+
197+
suite.driver.
198+
EXPECT().
199+
Ack(queue, "123").
200+
Return(errors.New("ack error"))
201+
202+
err := consumer.Ack(msg)
203+
204+
suite.Error(err)
205+
}
206+
176207
func TestConsumerTestSuite(t *testing.T) {
177208
suite.Run(t, new(ConsumerTestSuite))
178209
}

driver/driver.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,36 @@
11
package driver
22

3-
import "context"
3+
func NewConsumerController() *ConsumerController {
4+
return &ConsumerController{
5+
data: make(chan Message),
6+
done: make(chan struct{}),
7+
}
8+
}
9+
10+
type ConsumerController struct {
11+
data chan Message
12+
done chan struct{}
13+
}
14+
15+
func (c *ConsumerController) Send(m Message) {
16+
c.data <- m
17+
}
18+
19+
func (c *ConsumerController) Data() <-chan Message {
20+
return c.data
21+
}
22+
23+
func (c *ConsumerController) Done() chan struct{} {
24+
return c.done
25+
}
26+
27+
func (c *ConsumerController) Stop() {
28+
c.done <- struct{}{}
29+
close(c.data)
30+
}
431

532
type Driver interface {
633
Enqueue(queue string, data []byte, opts ...func(message any)) error
7-
Consume(ctx context.Context, queue string, opts ...func(message any)) (chan Message, error)
34+
Consume(queue string, opts ...func(message any)) (*ConsumerController, error)
835
Ack(queue string, messageID string) error
936
}

driver/memdriver.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package driver
22

33
import (
4-
"context"
54
"fmt"
65
"math/rand"
76
"sync"
@@ -44,23 +43,23 @@ func (d *MemoryDriver) Enqueue(queue string, evt []byte, opts ...func(message an
4443
return nil
4544
}
4645

47-
func (d *MemoryDriver) Consume(ctx context.Context, queue string, opts ...func(message any)) (chan Message, error) {
48-
results := make(chan Message)
46+
func (d *MemoryDriver) Consume(queue string, opts ...func(message any)) (*ConsumerController, error) {
47+
ctrl := NewConsumerController()
48+
4949
go func() {
5050
for {
5151
select {
52-
case <-ctx.Done():
53-
close(results)
52+
case <-ctrl.Done():
5453
return
5554
case <-time.After(d.tick):
5655
if evt := d.pop(queue); evt != nil {
57-
results <- *evt
56+
ctrl.Send(*evt)
5857
}
5958
}
6059
}
6160
}()
6261

63-
return results, nil
62+
return ctrl, nil
6463
}
6564

6665
func (d *MemoryDriver) pop(queue string) *Message {

driver/memdriver_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package driver_test
22

33
import (
4-
"context"
54
"testing"
65
"time"
76

@@ -43,20 +42,18 @@ func (suite *MemoryTestSuite) TestEnqueueDequeueSuccess() {
4342
err = d.Enqueue("test", third)
4443
suite.Nil(err)
4544

46-
ctx, cancel := context.WithCancel(context.Background())
47-
defer cancel()
48-
49-
messages, err := d.Consume(ctx, "test")
45+
ctrl, err := d.Consume("test")
5046
suite.Nil(err)
5147

52-
m1 := <-messages
53-
m2 := <-messages
54-
m3 := <-messages
48+
m1 := <-ctrl.Data()
49+
m2 := <-ctrl.Data()
50+
m3 := <-ctrl.Data()
51+
52+
ctrl.Stop()
5553

5654
suite.Equal(first, m1.Body)
5755
suite.Equal(second, m2.Body)
5856
suite.Equal(third, m3.Body)
59-
6057
}
6158

6259
func TestSQSTestSuite(t *testing.T) {

0 commit comments

Comments
 (0)