Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package manager
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/ringbuf"
"golang.org/x/sys/unix"
)

type RingBufferOptions struct {
Expand All @@ -27,6 +29,14 @@ type RingBufferOptions struct {

// TelemetryEnabled turns on telemetry about the usage of the ring buffer
TelemetryEnabled bool

// SchedPolicy - If set, the reader goroutine's OS thread will be pinned
// (LockOSThread) and given this scheduling policy via sched_setattr(2).
// Supported values: unix.SCHED_FIFO, unix.SCHED_RR. Zero means no change.
SchedPolicy int

// SchedPriority - RT scheduling priority (1-99). Required when SchedPolicy is set.
SchedPriority int
}

type RingBuffer struct {
Expand Down Expand Up @@ -74,6 +84,15 @@ func (rb *RingBuffer) init(manager *Manager) error {
return fmt.Errorf("no DataHandler/RecordHandler set for %s", rb.Name)
}

if rb.SchedPolicy != 0 {
if rb.SchedPolicy != unix.SCHED_FIFO && rb.SchedPolicy != unix.SCHED_RR {
return fmt.Errorf("unsupported SchedPolicy %d for %s: must be SCHED_FIFO or SCHED_RR", rb.SchedPolicy, rb.Name)
}
if rb.SchedPriority < 1 || rb.SchedPriority > 99 {
return fmt.Errorf("SchedPriority must be between 1 and 99 for %s, got %d", rb.Name, rb.SchedPriority)
}
}

if rb.TelemetryEnabled {
rb.usageTelemetry = &atomic.Uint64{}
}
Expand Down Expand Up @@ -106,6 +125,24 @@ func (rb *RingBuffer) Start() error {
rb.wgReader.Add(1)

go func() {
if rb.SchedPolicy != 0 {
runtime.LockOSThread()
// No UnlockOSThread — the goroutine runs for the ring buffer's
// lifetime and the thread is destroyed when it exits.
attr, err := unix.SchedGetAttr(0, 0)
if err == nil {
attr.Policy = uint32(rb.SchedPolicy)
attr.Priority = uint32(rb.SchedPriority)
err = unix.SchedSetAttr(0, attr, 0)
}
if err != nil {
if rb.ErrChan != nil {
rb.ErrChan <- fmt.Errorf("failed to set RT scheduling policy on ring buffer reader: %w", err)
}
// Continue with normal scheduling (graceful degradation)
}
}

var record *ringbuf.Record
var err error

Expand Down
Loading