diff --git a/ringbuffer.go b/ringbuffer.go index 89713f3..801c3e1 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -3,11 +3,13 @@ package manager import ( "errors" "fmt" + "runtime" "sync" "sync/atomic" "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" + "golang.org/x/sys/unix" ) type RingBufferOptions struct { @@ -27,6 +29,14 @@ type RingBufferOptions struct { // TelemetryEnabled turns on telemetry about the usage of the ring buffer TelemetryEnabled bool + + // SchedPolicy - If set, the reader goroutine's OS thread will be pinned + // (LockOSThread) and given this scheduling policy via sched_setattr(2). + // Supported values: unix.SCHED_FIFO, unix.SCHED_RR. Zero means no change. + SchedPolicy int + + // SchedPriority - RT scheduling priority (1-99). Required when SchedPolicy is set. + SchedPriority int } type RingBuffer struct { @@ -74,6 +84,15 @@ func (rb *RingBuffer) init(manager *Manager) error { return fmt.Errorf("no DataHandler/RecordHandler set for %s", rb.Name) } + if rb.SchedPolicy != 0 { + if rb.SchedPolicy != unix.SCHED_FIFO && rb.SchedPolicy != unix.SCHED_RR { + return fmt.Errorf("unsupported SchedPolicy %d for %s: must be SCHED_FIFO or SCHED_RR", rb.SchedPolicy, rb.Name) + } + if rb.SchedPriority < 1 || rb.SchedPriority > 99 { + return fmt.Errorf("SchedPriority must be between 1 and 99 for %s, got %d", rb.Name, rb.SchedPriority) + } + } + if rb.TelemetryEnabled { rb.usageTelemetry = &atomic.Uint64{} } @@ -106,6 +125,24 @@ func (rb *RingBuffer) Start() error { rb.wgReader.Add(1) go func() { + if rb.SchedPolicy != 0 { + runtime.LockOSThread() + // No UnlockOSThread — the goroutine runs for the ring buffer's + // lifetime and the thread is destroyed when it exits. + attr, err := unix.SchedGetAttr(0, 0) + if err == nil { + attr.Policy = uint32(rb.SchedPolicy) + attr.Priority = uint32(rb.SchedPriority) + err = unix.SchedSetAttr(0, attr, 0) + } + if err != nil { + if rb.ErrChan != nil { + rb.ErrChan <- fmt.Errorf("failed to set RT scheduling policy on ring buffer reader: %w", err) + } + // Continue with normal scheduling (graceful degradation) + } + } + var record *ringbuf.Record var err error