From 4b0b6c461cdeb9fa6ec48932911340fe524bf800 Mon Sep 17 00:00:00 2001 From: Henry Date: Thu, 1 Jan 2026 18:51:11 -0500 Subject: [PATCH 1/3] Add Opus decoder --- rs/hang/src/import/decoder.rs | 14 ++++- rs/hang/src/import/mod.rs | 2 + rs/hang/src/import/opus.rs | 112 ++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 rs/hang/src/import/opus.rs diff --git a/rs/hang/src/import/decoder.rs b/rs/hang/src/import/decoder.rs index 6e5554e72..7bc8eb670 100644 --- a/rs/hang/src/import/decoder.rs +++ b/rs/hang/src/import/decoder.rs @@ -2,7 +2,7 @@ use std::{fmt, str::FromStr}; use bytes::Buf; -use crate::{self as hang, import::Aac, Error}; +use crate::{self as hang, import::Aac, import::Opus, Error}; use super::{Avc3, Fmp4}; @@ -14,6 +14,8 @@ pub enum DecoderFormat { Fmp4, /// Raw AAC frames (not ADTS). Aac, + /// Raw Opus frames (not Ogg). + Opus, } impl FromStr for DecoderFormat { @@ -28,6 +30,7 @@ impl FromStr for DecoderFormat { } "fmp4" | "cmaf" => Ok(DecoderFormat::Fmp4), "aac" => Ok(DecoderFormat::Aac), + "opus" => Ok(DecoderFormat::Opus), _ => Err(Error::UnknownFormat(s.to_string())), } } @@ -39,6 +42,7 @@ impl fmt::Display for DecoderFormat { DecoderFormat::Avc3 => write!(f, "avc3"), DecoderFormat::Fmp4 => write!(f, "fmp4"), DecoderFormat::Aac => write!(f, "aac"), + DecoderFormat::Opus => write!(f, "opus"), } } } @@ -50,6 +54,7 @@ enum DecoderKind { // Boxed because it's a large struct and clippy complains about the size. Fmp4(Box), Aac(Aac), + Opus(Opus), } /// A generic interface for importing a stream of media into a hang broadcast. @@ -67,6 +72,7 @@ impl Decoder { DecoderFormat::Avc3 => Avc3::new(broadcast).into(), DecoderFormat::Fmp4 => Box::new(Fmp4::new(broadcast)).into(), DecoderFormat::Aac => Aac::new(broadcast).into(), + DecoderFormat::Opus => Opus::new(broadcast).into(), }; Self { decoder } @@ -83,6 +89,7 @@ impl Decoder { DecoderKind::Avc3(decoder) => decoder.initialize(buf)?, DecoderKind::Fmp4(decoder) => decoder.decode(buf)?, DecoderKind::Aac(decoder) => decoder.initialize(buf)?, + DecoderKind::Opus(decoder) => decoder.initialize(buf)?, } anyhow::ensure!(!buf.has_remaining(), "buffer was not fully consumed"); @@ -107,8 +114,9 @@ impl Decoder { match &mut self.decoder { DecoderKind::Avc3(decoder) => decoder.decode_stream(buf, None)?, DecoderKind::Fmp4(decoder) => decoder.decode(buf)?, - // TODO Fix or make this more type safe. + // TODO Fix or make these more type safe. DecoderKind::Aac(_) => anyhow::bail!("AAC does not support stream decoding"), + DecoderKind::Opus(_) => anyhow::bail!("Opus does not support stream decoding"), } Ok(()) @@ -133,6 +141,7 @@ impl Decoder { DecoderKind::Avc3(decoder) => decoder.decode_frame(buf, pts)?, DecoderKind::Fmp4(decoder) => decoder.decode(buf)?, DecoderKind::Aac(decoder) => decoder.decode(buf, pts)?, + DecoderKind::Opus(decoder) => decoder.decode(buf, pts)?, } Ok(()) @@ -144,6 +153,7 @@ impl Decoder { DecoderKind::Avc3(decoder) => decoder.is_initialized(), DecoderKind::Fmp4(decoder) => decoder.is_initialized(), DecoderKind::Aac(decoder) => decoder.is_initialized(), + DecoderKind::Opus(decoder) => decoder.is_initialized(), } } } diff --git a/rs/hang/src/import/mod.rs b/rs/hang/src/import/mod.rs index 763e8bc67..abc45c9b6 100644 --- a/rs/hang/src/import/mod.rs +++ b/rs/hang/src/import/mod.rs @@ -3,9 +3,11 @@ mod avc3; mod decoder; mod fmp4; mod hls; +mod opus; pub use aac::*; pub use avc3::*; pub use decoder::*; pub use fmp4::*; pub use hls::*; +pub use opus::*; diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs new file mode 100644 index 000000000..2efa9b290 --- /dev/null +++ b/rs/hang/src/import/opus.rs @@ -0,0 +1,112 @@ +use crate as hang; +use anyhow::Context; +use buf_list::BufList; +use bytes::Buf; +use moq_lite as moq; + +/// Opus decoder, initialized via OpusHead. +pub struct Opus { + broadcast: hang::BroadcastProducer, + track: Option, + zero: Option, +} + +impl Opus { + pub fn new(broadcast: hang::BroadcastProducer) -> Self { + Self { + broadcast, + track: None, + zero: None, + } + } + + pub fn initialize(&mut self, buf: &mut T) -> anyhow::Result<()> { + // Parse OpusHead (https://datatracker.ietf.org/doc/html/rfc7845#section-5.1) + // - Verifies "OpusHead" magic signature + // - Reads channel count + // - Reads sample rate + // - Ignores pre-skip, gain, channel mapping for now + + anyhow::ensure!(buf.chunk().starts_with(b"OpusHead"), "invalid OpusHead signature"); + + buf.advance(8); // Skip "OpusHead" signature + buf.advance(1); // Skip version + let channel_count = buf.get_u8() as u32; + buf.advance(2); // Skip pre-skip (lol) + let sample_rate = buf.get_u32_le(); + + // Skip gain, channel mapping until if/when we support them + if buf.remaining() > 0 { + buf.advance(buf.remaining()); + } + + let track = moq::Track { + name: self.broadcast.track_name("audio"), + priority: 2, + }; + + let config = hang::catalog::AudioConfig { + codec: hang::catalog::AudioCodec::Opus, + sample_rate, + channel_count, + bitrate: None, + description: None, + }; + + tracing::debug!(name = ?track.name, ?config, "starting track"); + + let track = track.produce(); + self.broadcast.insert_track(track.consumer); + + let mut catalog = self.broadcast.catalog.lock(); + let audio = catalog.insert_audio(track.producer.info.name.clone(), config); + audio.priority = 2; + + self.track = Some(track.producer.into()); + + Ok(()) + } + + pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { + let pts = self.pts(pts)?; + let track = self.track.as_mut().context("not initialized")?; + + // Create a BufList at chunk boundaries, potentially avoiding allocations. + let mut payload = BufList::new(); + while !buf.chunk().is_empty() { + payload.push_chunk(buf.copy_to_bytes(buf.chunk().len())); + } + + let frame = hang::Frame { + timestamp: pts, + keyframe: true, + payload, + }; + + track.write(frame)?; + + Ok(()) + } + + pub fn is_initialized(&self) -> bool { + self.track.is_some() + } + + fn pts(&mut self, hint: Option) -> anyhow::Result { + if let Some(pts) = hint { + return Ok(pts); + } + + let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); + Ok(hang::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) + } +} + +impl Drop for Opus { + fn drop(&mut self) { + if let Some(track) = self.track.take() { + tracing::debug!(name = ?track.info.name, "ending track"); + self.broadcast.catalog.lock().remove_audio(&track.info.name); + } + } +} From ecc4cd19679eccb83c600748208c296076685290 Mon Sep 17 00:00:00 2001 From: Henry Date: Thu, 1 Jan 2026 19:26:51 -0500 Subject: [PATCH 2/3] Ensure OpusHead is the proper size --- rs/hang/src/import/opus.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index 2efa9b290..2725a190e 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -27,6 +27,7 @@ impl Opus { // - Reads sample rate // - Ignores pre-skip, gain, channel mapping for now + anyhow::ensure!(buf.remaining() >= 19, "OpusHead must be at least 19 bytes"); anyhow::ensure!(buf.chunk().starts_with(b"OpusHead"), "invalid OpusHead signature"); buf.advance(8); // Skip "OpusHead" signature From 528b80c722dfd9d6e4dd5348c6ee4d7bd0966369 Mon Sep 17 00:00:00 2001 From: Henry Date: Fri, 2 Jan 2026 09:48:20 -0500 Subject: [PATCH 3/3] Handle signature check for fragmented buffers --- rs/hang/src/import/opus.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index 2725a190e..c6f822581 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -28,9 +28,10 @@ impl Opus { // - Ignores pre-skip, gain, channel mapping for now anyhow::ensure!(buf.remaining() >= 19, "OpusHead must be at least 19 bytes"); - anyhow::ensure!(buf.chunk().starts_with(b"OpusHead"), "invalid OpusHead signature"); + const OPUS_HEAD: u64 = u64::from_be_bytes(*b"OpusHead"); + let signature = buf.get_u64(); + anyhow::ensure!(signature == OPUS_HEAD, "invalid OpusHead signature"); - buf.advance(8); // Skip "OpusHead" signature buf.advance(1); // Skip version let channel_count = buf.get_u8() as u32; buf.advance(2); // Skip pre-skip (lol)