Add poll-free single-slot encode loop#158
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #158 +/- ##
==========================================
+ Coverage 45.52% 46.03% +0.50%
==========================================
Files 18 18
Lines 1777 1803 +26
==========================================
+ Hits 809 830 +21
- Misses 901 905 +4
- Partials 67 68 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
93ca29f to
d3fc877
Compare
Add a single-slot, newest-wins FrameBuffer and drive each track's encode goroutine off the buffer's enqueue signal instead of a 5ms poll, so the freshest frame is always encoded and stale frames are dropped rather than queued. - FrameBuffer keeps only the latest frame; enqueue overwrites and signals FrameReady, with a Closed channel so a blocked reader wakes on close. - runEncodeLoop blocks on FrameReady (no poll); error handling extracted into handleEncodeErr to keep complexity within the cyclop limit. - Remove the unreachable non-FrameBuffer poll fallback (videoSource is always a *FrameBuffer) and silence the gosec G118 false positive on the per-track cancel func (stored on the track, called at teardown). - Restore vnet/vnet to .gitignore. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
d3fc877 to
035c1cc
Compare
| // handleEncodeErr processes an error from encodeAndSendTrack. It returns true | ||
| // when the encode loop should exit. For ErrNoFrameAvailable the source had no | ||
| // frame this iteration; encodeAndSendTrack has already released the vpx encoder | ||
| // mutex (held only across Read), so we wait OUTSIDE it — recreateEncoder and | ||
| // DynamicQPControl bitrate updates can acquire encoderMu.Lock while we idle. We | ||
| // block on the enqueue signal instead of polling so a freshly pushed frame is | ||
| // picked up immediately, eliminating poll-induced buffer-wait latency. | ||
| func (s *RTCSender) handleEncodeErr( | ||
| ctx context.Context, err error, trackID string, | ||
| frameReady, bufClosed <-chan struct{}, | ||
| ) bool { | ||
| if ctx.Err() != nil || errors.Is(err, ErrBufferClosed) { | ||
| return true | ||
| } | ||
| if errors.Is(err, ErrNoFrameAvailable) { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return true | ||
| case <-bufClosed: | ||
| return true | ||
| case <-frameReady: | ||
| return false | ||
| } | ||
| } | ||
| s.log.Errorf("Error processing track %s: %v", trackID, err) | ||
|
|
||
| return false | ||
| } |
There was a problem hiding this comment.
Can this consume the only FrameReady signal even when the queued frame was not read (ErrNoFrameAvailable), causing the sender to stall until another frame arrives? I don't see a test to confirm if this doesn't happen, and i think this can happen.
//cc @lkang-nuro @hanguyen-nuro
There was a problem hiding this comment.
Hi @JoTurk , I will look into it, and add tests and change accordingly. But according to CC, it does not seem to be an issue (I will investigate it a bit more), for now just post it here to see if it makes sense to you.
● Let me walk through it concretely. First, the two facts about the code that JoTurk's concern hinges on:
Fact 1 — the consumer reads the frame and the signal on separate paths.
- The frame lives in frameChan (cap 1). It's popped only by FrameBuffer.Read() (frame_buffer.go:183).
- The wakeup lives in notifyChan (cap 1), exposed as FrameReady(). It's consumed only in handleEncodeErr
(rtc_sender.go:821).
These two channels are not drained together. When a frame is read successfully on the fast path (main loop,
rtc_sender.go:787), the consumer takes the frame out of frameChan but leaves the matching signal sitting in
notifyChan.
Fact 2 — the producer pushes the frame then signals (frame_buffer.go:249-250):
case f.frameChan <- fm:
f.signal() // notifyChan <- {} , coalesced if one already pending
What JoTurk is saying
His comment is on rtc_sender.go:828 (the handleEncodeErr body). He's pointing at line 821:
case <-frameReady: // consumes one signal from notifyChan
return false
His worry: this case fires — consuming the one signal in notifyChan — on an iteration where Read() returned
ErrNoFrameAvailable (i.e. no frame was actually taken). If that's the only signal, and a frame is/was sitting
in the buffer, the loop has now "spent" its wakeup without consuming the frame, so the next Read() finds
nothing and the loop blocks forever → stall until another frame arrives.
Example — walk the interleaving he fears
Two goroutines: P = producer (SendFrame), C = consumer (encode loop). Start state: frameChan [], notifyChan
[].
t1 P: SendFrame(A)
frame_buffer.go:249 frameChan <- A → frameChan [A]
frame_buffer.go:250 signal() → notifyChan [sig] (state: A queued, 1 sig)
t2 C: iteration N, encodeAndSendTrack → Read() (rtc_sender.go:787 → 847)
frame_buffer.go:183 fm := <-frameChan → reads A, frameChan []
── note: notifyChan is NOT touched ── (state: frameChan [], notifyChan [sig])
encodes A, hasFrame=true, loops
t3 C: iteration N+1 → Read()
frame_buffer.go:188 default → ErrNoFrameAvailable
rtc_sender.go:815 handleEncodeErr, err == ErrNoFrameAvailable
rtc_sender.go:821 case <-frameReady: ← FIRES, consumes the [sig] ★
returns false (state: frameChan [], notifyChan [])
★ This is exactly the thing JoTurk describes: at t3 the loop consumed the only FrameReady signal on an
iteration where no frame was read. His observation is literally correct — it does happen.
Now the question is whether it stalls. Continue:
t4 C: iteration N+2 → Read()
frame_buffer.go:188 default → ErrNoFrameAvailable again
rtc_sender.go:821 case <-frameReady: ← BLOCKS (notifyChan empty) ✓ correctly idle
t5 P: SendFrame(B)
frame_buffer.go:249 frameChan <- B → frameChan [B]
frame_buffer.go:250 signal() → notifyChan [sig]
t6 C: wakes at rtc_sender.go:821, returns false, loops → Read() reads B ✓
No stall. The signal consumed at t3 was a stale leftover from frame A — a frame the consumer had already read
at t2. Spending it just cost one extra spin (t3→t4). At t4 the loop is correctly blocked because there
genuinely is no frame, and the next real frame B re-arms notifyChan and wakes it.
Why it can never actually stall
For a real stall you'd need this end state: C blocked on notifyChan (empty) while frameChan holds an unread
frame. That state is unreachable, because of the safety invariant:
▎ Every push to frameChan posts a signal (or coalesces onto a pending one), the frame stays in frameChan
▎ until Read() pops it, and the consumer always calls Read() again after consuming a signal (return false →
▎ loop → encodeAndSendTrack).
Summary
Replace the per-track 5ms encode poll with an event-driven, newest-wins
pipeline. Each track now encodes off the buffer's enqueue signal, so the
freshest captured frame is always the one encoded and stale frames are
dropped rather than queued — reducing onboard latency under load.
Changes
FrameBuffer: keeps only the latest frame. Enqueueoverwrites the slot and signals
FrameReady; aClosedchannel lets ablocked reader wake immediately on close.
runEncodeLoop: blocks onFrameReadyinstead of a 5mstime.Sleeppoll, so a freshly pushed frame is picked up right away.Error handling is factored into
handleEncodeErrto keep cyclomaticcomplexity within the
cycloplimit.FrameBufferpoll fallback (
videoSourceis always a*FrameBuffer), silenced thegosecG118 false positive on the per-track cancel func (it is storedon the track and called at teardown), and restored
vnet/vnetto.gitignore.Testing
go build ./...golangci-lint run ./sender/...— 0 issuesgo test ./sender/...— passing🤖 Generated with Claude Code