Skip to content

Commit 5887a95

Browse files
authored
Merge pull request #5 from boks1971/raja_submit_return_value
Return submit success from Submit.
2 parents fb79dfe + f0e98c4 commit 5887a95

File tree

2 files changed

+37
-15
lines changed

2 files changed

+37
-15
lines changed

pool.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,20 @@ const (
1313
)
1414

1515
type QueuePool interface {
16-
Submit(key string, job func())
16+
Submit(key string, job func()) bool
1717
Drain()
1818
Kill()
1919
}
2020

2121
type QueueWorker interface {
22-
Submit(job func())
22+
Submit(job func()) bool
2323
Drain()
2424
Kill()
2525
}
2626

2727
type QueueWorkerParams struct {
2828
QueueSize int
2929
DropWhenFull bool
30-
OnDropped func()
3130
}
3231

3332
type queuePool struct {
@@ -61,11 +60,11 @@ func (p *queuePool) hash(key string) int {
6160
return int(h.Sum32()) % p.capacity
6261
}
6362

64-
func (p *queuePool) Submit(key string, job func()) {
63+
func (p *queuePool) Submit(key string, job func()) bool {
6564
p.Lock()
6665
if p.kill.IsBroken() {
6766
p.Unlock()
68-
return
67+
return false
6968
}
7069

7170
idx := p.hash(key)
@@ -76,7 +75,7 @@ func (p *queuePool) Submit(key string, job func()) {
7675
}
7776
p.Unlock()
7877

79-
w.Submit(job)
78+
return w.Submit(job)
8079
}
8180

8281
func (p *queuePool) Drain() {
@@ -164,13 +163,12 @@ func (w *worker) run() {
164163
}
165164
}
166165

167-
func (w *worker) Submit(job func()) {
166+
func (w *worker) Submit(job func()) bool {
167+
submitted := true
168168
w.Lock()
169169
if w.active {
170170
if w.DropWhenFull && w.deque.Len() == w.QueueSize {
171-
if w.OnDropped != nil {
172-
w.OnDropped()
173-
}
171+
submitted = false
174172
} else {
175173
w.deque.PushBack(job)
176174
}
@@ -179,6 +177,7 @@ func (w *worker) Submit(job func()) {
179177
w.next <- job
180178
}
181179
w.Unlock()
180+
return submitted
182181
}
183182

184183
func (w *worker) Drain() {

pool_test.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,21 @@ func TestPool(t *testing.T) {
3838
return atomic.LoadUint32(&val2)
3939
}
4040

41-
p.Submit(key1, func() {
41+
submitted := p.Submit(key1, func() {
4242
time.Sleep(time.Millisecond * 500)
4343
add1(1)
4444
})
45+
require.True(t, submitted)
4546

46-
p.Submit(key2, func() {
47+
submitted = p.Submit(key2, func() {
4748
add2(2)
4849
})
50+
require.True(t, submitted)
4951

50-
p.Submit(key1, func() {
52+
submitted = p.Submit(key1, func() {
5153
add1(3)
5254
})
55+
require.True(t, submitted)
5356

5457
time.Sleep(time.Millisecond * 100)
5558
require.Equal(t, val(), get1())
@@ -58,17 +61,37 @@ func TestPool(t *testing.T) {
5861
time.Sleep(time.Millisecond * 500)
5962
require.Equal(t, val(1, 3), get1())
6063

61-
p.Submit(key1, func() {
64+
submitted = p.Submit(key1, func() {
6265
time.Sleep(time.Millisecond * 500)
6366
add1(4)
6467
})
68+
require.True(t, submitted)
6569

66-
p.Submit(key2, func() {
70+
submitted = p.Submit(key2, func() {
6771
time.Sleep(time.Millisecond * 500)
6872
add2(5)
6973
})
74+
require.True(t, submitted)
7075

7176
p.Drain()
7277
require.Equal(t, val(1, 3, 4), get1())
7378
require.Equal(t, val(2, 5), get2())
7479
}
80+
81+
func TestPoolOverflow(t *testing.T) {
82+
queueSize := 2
83+
p := NewQueuePool(2, QueueWorkerParams{QueueSize: queueSize, DropWhenFull: true})
84+
85+
for i := 0; i < queueSize+2; i++ {
86+
submitted := p.Submit("key", func() {
87+
time.Sleep(time.Millisecond * 500)
88+
})
89+
if i < queueSize+1 {
90+
require.True(t, submitted)
91+
} else {
92+
require.False(t, submitted)
93+
}
94+
}
95+
96+
p.Drain()
97+
}

0 commit comments

Comments
 (0)