Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/_internal_test_exports/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ pub fn receive_register(data: &[u8]) -> Option<()> {
rr.update(seq.into(), arrival, rtp_time, clock_rate);
}
1 => {
rr.nack_report();
let now = start + Duration::from_micros(rng.u64(u64::MAX / 100)?);
let rtt = Duration::from_millis(rng.u64(200)? + 10);
rr.nack_report(now, rtt);
}
2 => {
rr.reception_report();
Expand Down
63 changes: 11 additions & 52 deletions src/bwe/delay/control.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::VecDeque;
use std::time::{Duration, Instant};

use super::super::macros::{log_bitrate_estimate, log_delay_variation};
Expand All @@ -7,14 +6,11 @@ use super::arrival_group::ArrivalGroupAccumulator;
use super::rate_control::RateControl;
use super::trendline::TrendlineEstimator;
use crate::rtp_::Bitrate;
use crate::util::{already_happened, MovingAverage};
use crate::util::already_happened;

const MAX_RTT_HISTORY_WINDOW: usize = 32;
const UPDATE_INTERVAL: Duration = Duration::from_millis(25);
/// The maximum time we keep updating our estimate without receiving a TWCC report.
const MAX_TWCC_GAP: Duration = Duration::from_millis(500);
/// RFC 6298: Exponentially Weighted Moving Average smoothing factor for RTT (alpha = 1/8)
const RTT_SMOOTHING_FACTOR: f64 = 0.125;

/// Delay controller for googcc inspired BWE.
///
Expand All @@ -28,10 +24,8 @@ pub struct DelayController {
/// Last estimate produced, unlike [`next_estimate`] this will always have a value after the
/// first estimate.
last_estimate: Option<Bitrate>,
/// Smoothed RTT using EWMA (RFC 6298, alpha = 1/8).
smoothed_rtt: MovingAverage,
/// History of the max RTT derived for each TWCC report (kept for fallback).
max_rtt_history: VecDeque<Duration>,
/// Last known smoothed RTT from TWCC register.
last_smoothed_rtt: Option<Duration>,

/// The next time we should poll.
next_timeout: Instant,
Expand All @@ -46,8 +40,7 @@ impl DelayController {
trendline_estimator: TrendlineEstimator::new(20),
rate_control: RateControl::new(initial_bitrate, Bitrate::kbps(40), Bitrate::gbps(10)),
last_estimate: Some(initial_bitrate),
smoothed_rtt: MovingAverage::new(RTT_SMOOTHING_FACTOR),
max_rtt_history: VecDeque::default(),
last_smoothed_rtt: None,
next_timeout: already_happened(),
last_twcc_report: already_happened(),
}
Expand All @@ -59,12 +52,10 @@ impl DelayController {
acked: &[AckedPacket],
acked_bitrate: Option<Bitrate>,
probe_bitrate: Option<Bitrate>,
smoothed_rtt: Option<Duration>,
now: Instant,
) -> Option<Bitrate> {
let mut max_rtt = None;

for acked_packet in acked {
max_rtt = max_rtt.max(Some(acked_packet.rtt()));
if let Some(delay_variation) = self
.arrival_group_accumulator
.accumulate_packet(acked_packet)
Expand All @@ -87,17 +78,16 @@ impl DelayController {
}
}

if let Some(rtt) = max_rtt {
self.update_rtt(rtt);
}
// Store the smoothed RTT for use in handle_timeout
self.last_smoothed_rtt = smoothed_rtt;

let new_hypothesis = self.trendline_estimator.hypothesis();

self.update_estimate(
new_hypothesis,
acked_bitrate,
probe_bitrate,
self.get_smoothed_rtt(),
smoothed_rtt,
now,
);
self.last_twcc_report = now;
Expand All @@ -115,7 +105,7 @@ impl DelayController {
// no longer be considered valid. We need another TWCC report before we can update
// estimates.
let next_timeout_in = self
.get_smoothed_rtt()
.last_smoothed_rtt
.unwrap_or(MAX_TWCC_GAP)
.min(UPDATE_INTERVAL);

Expand All @@ -129,7 +119,7 @@ impl DelayController {
self.trendline_estimator.hypothesis(),
acked_bitrate,
None,
self.get_smoothed_rtt(),
self.last_smoothed_rtt,
now,
);
}
Expand All @@ -147,37 +137,6 @@ impl DelayController {
self.trendline_estimator.hypothesis() == BandwidthUsage::Overuse
}

/// Update smoothed RTT using EWMA (RFC 6298, alpha = 1/8).
fn update_rtt(&mut self, rtt: Duration) {
// Keep history as fallback in case smoothed RTT is not yet available
while self.max_rtt_history.len() >= MAX_RTT_HISTORY_WINDOW {
self.max_rtt_history.pop_front();
}
self.max_rtt_history.push_back(rtt);

// Update smoothed RTT using EWMA: smoothed = (7/8) * smoothed + (1/8) * sample
self.smoothed_rtt.update(rtt.as_secs_f64());
}

/// Get the current smoothed RTT, with fallback to mean of history if not yet available.
fn get_smoothed_rtt(&self) -> Option<Duration> {
// Try smoothed RTT first (EWMA)
if let Some(avg_secs) = self.smoothed_rtt.get() {
return Some(Duration::from_secs_f64(avg_secs));
}

// Fallback to mean of history during initialization
if self.max_rtt_history.is_empty() {
return None;
}

let sum = self
.max_rtt_history
.iter()
.fold(Duration::ZERO, |acc, rtt| acc + *rtt);
Some(sum / self.max_rtt_history.len() as u32)
}

fn update_estimate(
&mut self,
hypothesis: BandwidthUsage,
Expand Down Expand Up @@ -218,7 +177,7 @@ impl DelayController {
fn trendline_hypothesis_valid(&self, now: Instant) -> bool {
now.duration_since(self.last_twcc_report)
<= self
.get_smoothed_rtt()
.last_smoothed_rtt
.map(|rtt| rtt * 2)
.unwrap_or(MAX_TWCC_GAP)
.min(UPDATE_INTERVAL * 2)
Expand Down
22 changes: 17 additions & 5 deletions src/bwe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ impl Bwe {
pub fn update<'t>(
&mut self,
records: impl Iterator<Item = &'t crate::rtp_::TwccSendRecord>,
smoothed_rtt: Option<Duration>,
now: Instant,
) {
self.bwe.update(records, now);
self.bwe.update(records, smoothed_rtt, now);
}

pub fn poll_estimate(&mut self) -> Option<Bitrate> {
Expand Down Expand Up @@ -178,7 +179,12 @@ impl SendSideBandwidthEstimator {
}

/// Record a packet from a TWCC report.
pub fn update<'t>(&mut self, records: impl Iterator<Item = &'t TwccSendRecord>, now: Instant) {
pub fn update<'t>(
&mut self,
records: impl Iterator<Item = &'t TwccSendRecord>,
smoothed_rtt: Option<Duration>,
now: Instant,
) {
let _ = self.started_at.get_or_insert(now);

let send_records: Vec<_> = records.collect();
Expand Down Expand Up @@ -226,9 +232,13 @@ impl SendSideBandwidthEstimator {
let is_probe_result = probe_result.is_some();

// Update delay controller with the latest probe result
let maybe_estimate =
self.delay_controller
.update(&acked_packets, acked_bitrate, probe_result, now);
let maybe_estimate = self.delay_controller.update(
&acked_packets,
acked_bitrate,
probe_result,
smoothed_rtt,
now,
);

let Some(delay_estimate) = maybe_estimate else {
return;
Expand Down Expand Up @@ -420,10 +430,12 @@ pub struct AckedPacket {
remote_recv_time: Instant,
/// The local time when received confirmation that the other side received the seq i.e. when we
/// received the TWCC report for this packet.
#[allow(dead_code)]
local_recv_time: Instant,
}

impl AckedPacket {
#[allow(dead_code)]
fn rtt(&self) -> Duration {
self.local_recv_time - self.local_send_time
}
Expand Down
65 changes: 57 additions & 8 deletions src/rtp/rtcp/twcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{extend_u16, FeedbackMessageType, RtcpHeader, RtcpPacket};
use super::{RtcpType, Ssrc, TransportType};

use crate::rtp_::{TwccClusterId, TwccSeq};
use crate::util::MovingAverage;

/// Transport Wide Congestion Control.
///
Expand Down Expand Up @@ -997,7 +998,9 @@ impl<'a> TryFrom<&'a [u8]> for PacketChunk {
}
}

#[derive(Debug)]
/// RFC 6298: Exponentially Weighted Moving Average smoothing factor for RTT (alpha = 1/8)
const RTT_SMOOTHING_FACTOR: f64 = 0.125;

pub struct TwccSendRegister {
/// How many send records to keep.
keep: usize,
Expand All @@ -1014,6 +1017,19 @@ pub struct TwccSendRegister {

/// Last registered Twcc number.
last_registered: TwccSeq,

/// Smoothed RTT using EWMA (RFC 6298).
smoothed_rtt: MovingAverage,
}

/// Result of applying a TWCC report to the send register.
///
/// Contains an iterator over newly acknowledged records and the current smoothed RTT.
pub struct TwccRegisterUpdate<I> {
/// Iterator over the [`TwccSendRecord`]s that were acknowledged in this report.
pub records: I,
/// Current smoothed RTT (EWMA of measured RTTs).
pub smoothed_rtt: Option<Duration>,
}

impl<'a> IntoIterator for &'a TwccSendRegister {
Expand Down Expand Up @@ -1162,6 +1178,7 @@ impl TwccSendRegister {
time_zero: None,
apply_report_counter: 0,
last_registered: 0.into(),
smoothed_rtt: MovingAverage::new(RTT_SMOOTHING_FACTOR),
}
}

Expand All @@ -1183,13 +1200,14 @@ impl TwccSendRegister {

/// Apply a TWCC RTCP report.
///
/// Returns iterator over [`TwccSendRecord`]s included in the given [`Twcc`]
/// except for ones that was already acked and returned before.
/// Returns a [`TwccRegisterUpdate`] containing an iterator over [`TwccSendRecord`]s
/// included in the given [`Twcc`] (except for ones already acked), and the
/// current smoothed RTT.
pub fn apply_report(
&mut self,
twcc: Twcc,
now: Instant,
) -> Option<impl Iterator<Item = &TwccSendRecord>> {
) -> Option<TwccRegisterUpdate<impl Iterator<Item = &TwccSendRecord>>> {
if self.time_zero.is_none() {
self.time_zero = Some(now);
}
Expand Down Expand Up @@ -1290,8 +1308,27 @@ impl TwccSendRegister {

let range = first_seq_no..=last_seq_no;

Some(
TwccSendRecordsIter {
// Update smoothed RTT from newly processed records
let max_rtt = self
.queue
.iter()
.skip(first_index)
.take_while(|r| r.seq() <= last_seq_no)
.filter(|r| {
r.recv_report
.map(|rr| rr.apply_report_counter == apply_report_counter)
.unwrap_or(false)
})
.filter_map(|r| r.rtt())
.max();
if let Some(rtt) = max_rtt {
self.smoothed_rtt.update(rtt.as_secs_f64());
}

let smoothed_rtt = self.smoothed_rtt();

Some(TwccRegisterUpdate {
records: TwccSendRecordsIter {
range: range.clone(),
index: first_index,
current: first_seq_no,
Expand All @@ -1305,7 +1342,8 @@ impl TwccSendRegister {
.map(|r| r.apply_report_counter == apply_report_counter)
.unwrap_or_default()
}),
)
smoothed_rtt,
})
}

/// Calculate the egress loss for given time window.
Expand Down Expand Up @@ -1348,6 +1386,14 @@ impl TwccSendRegister {
pub fn rtt(&self) -> Option<Duration> {
self.queue.iter().rev().find_map(|s| s.rtt())
}

/// Get the smoothed RTT using EWMA (RFC 6298).
///
/// This provides a more stable RTT estimate compared to `rtt()` which
/// returns the raw RTT from the most recent packet.
pub fn smoothed_rtt(&self) -> Option<Duration> {
self.smoothed_rtt.get().map(Duration::from_secs_f64)
}
}

#[derive()]
Expand Down Expand Up @@ -1943,7 +1989,7 @@ mod test {
},
now,
);
let iter = iter.unwrap();
let TwccRegisterUpdate { records: iter, .. } = iter.unwrap();

for record in iter {
assert!(
Expand Down Expand Up @@ -2099,6 +2145,7 @@ mod test {
)
.expect("apply_report to return Some(_)");

let TwccRegisterUpdate { records: iter, .. } = iter;
assert_eq!(
iter.map(|r| *r.seq()).collect::<Vec<_>>(),
vec![0, 1, 2, 3, 4, 5, 6, 7]
Expand Down Expand Up @@ -2209,6 +2256,7 @@ mod test {
now + Duration::from_millis(8),
)
.unwrap()
.records
.filter_map(|sr| sr.remote_recv_time().map(|_| sr.seq().as_u16()))
.collect::<Vec<_>>();

Expand All @@ -2231,6 +2279,7 @@ mod test {
now + Duration::from_millis(14),
)
.unwrap()
.records
.filter_map(|sr| sr.remote_recv_time().map(|_| sr.seq().as_u16()))
.collect::<Vec<_>>();

Expand Down
Loading
Loading