Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drpcmanager/active_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func testMuxWriter(t *testing.T) *drpcwire.MuxWriter {
}

func testStream(t *testing.T, id uint64) *drpcstream.Stream {
return drpcstream.New(context.Background(), id, testMuxWriter(t))
return drpcstream.New(context.Background(), id, testMuxWriter(t), drpcstream.NewBufferPool())
}

func TestActiveStreams_AddAndGet(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Manager struct {
wg sync.WaitGroup // tracks active manageStream goroutines

// streams tracks active streams.
streams *activeStreams
streams *activeStreams
recvPool *drpcstream.BufferPool

pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
invokes chan invokeInfo // completed invoke info from manageReader to NewServerStream
Expand Down Expand Up @@ -130,6 +131,7 @@ func NewWithOptions(tr drpc.Transport, kind ManagerKind, opts Options) *Manager
m.pendingStreams = make(map[uint64]*pendingStream)

m.streams = newActiveStreams()
m.recvPool = drpcstream.NewBufferPool()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since manager uses it, how about we move it to the package where manager belongs? Also, keeping the name as buffer pool would enable us to use it for send buffer if needed in the future.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to move it to drpcwire in an upcoming PR as it's a lower in dependency hierarchy and contains similar fundamental pieces. I don't want to move it to drpcmanager or drpcstream because we have to create interfaces all over the place.


// set the internal stream options
drpcopts.SetStreamTransport(&m.opts.Stream.Internal, m.tr)
Expand Down Expand Up @@ -268,7 +270,7 @@ func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKin
drpcopts.SetStreamStats(&opts.Internal, cb(rpc))
}

stream := drpcstream.NewWithOptions(ctx, sid, m.wr, opts)
stream := drpcstream.NewWithOptions(ctx, sid, m.wr, m.recvPool, opts)

if err := m.streams.Add(sid, stream); err != nil {
return nil, err
Expand Down
42 changes: 42 additions & 0 deletions drpcstream/buffer_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (C) 2026 Cockroach Labs.
// See LICENSE for copying information.

package drpcstream

import "sync"

// BufferPool wraps sync.Pool to provide reusable byte slices for the
// stream receive path. Buffers obtained via Get should be returned via
// Put when no longer needed. Forgetting to Put is safe (GC reclaims)
// but reduces reuse.
type BufferPool struct {
Comment thread
cthumuluru-crdb marked this conversation as resolved.
pool sync.Pool
}

// NewBufferPool returns a new buffer pool.
func NewBufferPool() *BufferPool {
return &BufferPool{
pool: sync.Pool{
New: func() interface{} {
b := make([]byte, 0, 4096)
return &b
},
},
}
}

// Get returns a zero-length byte slice from the pool, retaining its
// backing array for reuse.
func (bp *BufferPool) Get() *[]byte {
p := bp.pool.Get().(*[]byte)
*p = (*p)[:0]
return p
}

// Put returns a buffer to the pool. Nil is safe to pass.
func (bp *BufferPool) Put(b *[]byte) {
if b == nil {
return
}
bp.pool.Put(b)
}
86 changes: 46 additions & 40 deletions drpcstream/ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ const defaultRingBufferCapacity = 256
// assembled packet data. It sits between manageReader (producer, calls
// Enqueue) and the application goroutine (consumer, calls Dequeue/Done).
//
// Slots are pre-allocated and reused: each slot's backing array grows via
// append to fit incoming data, then stays at its high-water mark, avoiding
// per-message allocation in steady state.
// Buffers are obtained from a shared BufferPool. Enqueue copies data into a
// pooled buffer; Dequeue returns that buffer's data and advances the tail
// immediately, and Done releases the buffer back to the pool. Keeping the
// pool behind Dequeue/Done means the consumer does not need to know whether
// the queue is backed by a pool or by fixed buffers.
//
// After Close, Dequeue drains any queued messages before returning the close
// error. This ensures graceful shutdown (KindClose/KindCloseSend) delivers
Expand All @@ -28,43 +30,53 @@ type ringBuffer struct {
mu sync.Mutex
cond sync.Cond

buf [][]byte // ring of byte slices
head int // next write position (producer)
tail int // next read position (consumer)
count int // number of occupied slots

held bool // true between Dequeue and Done
err error // terminal error, set by Close
// pool is shared across all streams on a connection and is owned by the
// Manager, not the ring buffer. Its lifetime outlives this buffer, so a
// consumer may safely return a buffer via Done even after Close.
pool *BufferPool
buf []*[]byte // ring of pooled buffer pointers
head int // next write position (producer)
tail int // next read position (consumer)
count int // number of occupied slots

held *[]byte // buffer from the last Dequeue, released by Done
err error // terminal error, set by Close
}

func (rb *ringBuffer) init() {
func (rb *ringBuffer) init(pool *BufferPool) {
rb.cond.L = &rb.mu
rb.buf = make([][]byte, defaultRingBufferCapacity)
rb.pool = pool
rb.buf = make([]*[]byte, defaultRingBufferCapacity)
}

// Enqueue copies data into the next write slot. If the buffer is full, it
// blocks until a slot is freed or the buffer is closed. If the buffer is
// closed, Enqueue returns silently without enqueuing.
// Enqueue copies data into a pooled buffer and places it in the next write
// slot. If the buffer is full, it blocks until a slot is freed or the buffer
// is closed. If the buffer is closed, Enqueue returns silently.
func (rb *ringBuffer) Enqueue(data []byte) {
b := rb.pool.Get()
*b = append(*b, data...)

rb.mu.Lock()
defer rb.mu.Unlock()

for rb.count == len(rb.buf) && rb.err == nil {
rb.cond.Wait()
}
if rb.err != nil {
rb.pool.Put(b)
return
}

rb.buf[rb.head] = append(rb.buf[rb.head][:0], data...)
rb.buf[rb.head] = b
rb.head = (rb.head + 1) % len(rb.buf)
rb.count++
rb.cond.Broadcast()
}

// Dequeue returns the data from the next read slot. If the buffer is empty,
// it blocks until data is available or the buffer is closed. The returned
// slice is valid until Done is called.
// Dequeue returns the data from the next buffered message and advances the
// tail. The returned slice is valid until Done is called, which releases the
// underlying buffer back to the pool. Done must be called exactly once after
// each successful Dequeue.
func (rb *ringBuffer) Dequeue() ([]byte, error) {
rb.mu.Lock()
defer rb.mu.Unlock()
Expand All @@ -76,37 +88,31 @@ func (rb *ringBuffer) Dequeue() ([]byte, error) {
return nil, rb.err
}

rb.held = true
return rb.buf[rb.tail], nil
}

// Done advances the read pointer, making the slot available for reuse.
// It must be called exactly once after each successful Dequeue.
//
// TODO(shubham): remove this method once a shared buffer pool is introduced.
// With a pool, Dequeue will advance the tail immediately and the caller will
// return the buffer to the pool directly.
func (rb *ringBuffer) Done() {
Comment thread
cthumuluru-crdb marked this conversation as resolved.
rb.mu.Lock()
defer rb.mu.Unlock()

b := rb.buf[rb.tail]
rb.buf[rb.tail] = nil
rb.tail = (rb.tail + 1) % len(rb.buf)
rb.count--
rb.held = false
rb.held = b
rb.cond.Broadcast()

return *b, nil
}

// Done releases the buffer from the most recent Dequeue back to the pool,
// invalidating the slice that Dequeue returned. It must be called exactly
// once after each successful Dequeue. Because the queue is single-consumer,
// Done is only ever called from the same goroutine as Dequeue.
func (rb *ringBuffer) Done() {
rb.pool.Put(rb.held)
rb.held = nil
}

// Close marks the buffer as closed with the given error. All blocked Enqueue
// and Dequeue calls are woken and will return. Close waits for any in-progress
// Dequeue/Done pair to complete before setting the error. Subsequent calls are
// no-ops.
// and Dequeue calls are woken and will return. Subsequent calls are no-ops.
func (rb *ringBuffer) Close(err error) {
rb.mu.Lock()
defer rb.mu.Unlock()

for rb.held {
rb.cond.Wait()
}
if rb.err != nil {
return
}
Expand Down
Loading
Loading