-
Notifications
You must be signed in to change notification settings - Fork 136
Cmaf Native Support #822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Cmaf Native Support #822
Conversation
Co-authored-by: Emil Santurio <emilsas@gmail.com>
WalkthroughThis pull request refactors container format naming conventions and introduces MediaSource Extensions (MSE) support for audio and video playback. Changes include renaming container labels from "legacy" to "native" and "fmp4" to "cmaf" across TypeScript catalog definitions and codec implementations. New MSE-based audio and video source classes are added with fragment handling, buffer management, and stream integration. A passthrough mode for fMP4 fragments is introduced in the Rust import layer. MIME type utility functions are added for codec validation. Container configuration fields are added to audio and video codec structures, and timestamp encoding is updated to accommodate the new container format. Control flow improvements include explicit guards and block statements throughout frame and decoder loops. 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 18
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
rs/hang/src/catalog/root.rs (1)
292-381: Backward compatibility is not actually implemented despite the documentation claim.The
containerfield in bothAudioConfigandVideoConfiglacks#[serde(default)]. Without this attribute, serde requires the field to be present in JSON during deserialization, even thoughContainerhas aDefaultimplementation. The documentation stating "Defaults to 'native' for backward compatibility" is misleading.To support backward compatibility with older catalogs missing the
containerfield, add#[serde(default)]to thecontainerfield in both structs:#[serde(default)] pub container: Container,Then add a test to verify older JSON without the field deserializes correctly.
justfile (1)
150-255: Exit-code propagation block won’t run oncargo runfailure underset -e.If you want the explicit
EXIT_CODEhandling to work (and always run the post-cargo runlines), wrap the invocation withset +e/set -e(or remove the explicit propagation and rely ontrap cleanup EXIT).Proposed diff (keep explicit EXIT_CODE handling)
# Run hang to ingest from local files - if [ -n "$PASSTHROUGH_FLAG" ]; then - echo ">>> Running with --passthrough flag" - cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" --passthrough - else - echo ">>> Running without --passthrough flag" - cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" - fi - EXIT_CODE=$? + set +e + if [ -n "$PASSTHROUGH_FLAG" ]; then + echo ">>> Running with --passthrough flag" + cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" --passthrough + else + echo ">>> Running without --passthrough flag" + cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" + fi + EXIT_CODE=$? + set -e # Cleanup after cargo run completes (success or failure) cleanuprs/hang/src/import/hls.rs (1)
124-145: AvoidClient::builder().build().unwrap()and add an HTTP timeout for ingest.The current code uses
.unwrap()inside the closure, which panics on client creation failure instead of propagating the error through theanyhow::Resultreturn type. Additionally, the HTTP client has no timeout configured, which is problematic for network-based streaming ingest operations.Proposed diff
pub fn new(broadcast: BroadcastProducer, cfg: HlsConfig) -> anyhow::Result<Self> { let base_url = cfg.parse_playlist()?; - let client = cfg.client.unwrap_or_else(|| { - Client::builder() - .user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"))) - .build() - .unwrap() - }); + let client = match cfg.client { + Some(client) => client, + None => Client::builder() + .user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"))) + .timeout(Duration::from_secs(10)) + .build() + .context("failed to build reqwest client")?, + }; let passthrough = cfg.passthrough; Ok(Self {
🤖 Fix all issues with AI agents
In @js/hang/src/catalog/container.ts:
- Line 8: Update the stale inline comment entry for "cmaf" in the container list
so it no longer says "(future)"; locate the "cmaf" item in
js/hang/src/catalog/container.ts and change its annotation to reflect that CMAF
is supported/active (consistent with the implementation in container/codec.ts
and MSE usage).
In @js/hang/src/util/mime.ts:
- Around line 22-24: isMimeTypeSupported currently calls
MediaSource.isTypeSupported directly and will throw in non-browser environments;
guard access by checking that typeof MediaSource !== "undefined" and that
MediaSource.isTypeSupported is a function before calling it, returning false
when the guard fails; update the isMimeTypeSupported function to perform this
safe check so imports in Node/SSR/tests do not trigger ReferenceError.
In @js/hang/src/watch/audio/emitter.ts:
- Around line 86-92: When readyState is below HTMLMediaElement.HAVE_METADATA you
add a loadedmetadata listener via mseAudio.addEventListener("loadedmetadata",
tryPlay, { once: true }) but never remove it if the effect unmounts or re-runs;
update the effect to capture the handler reference (e.g., const onLoaded =
tryPlay), attach with mseAudio.addEventListener("loadedmetadata", onLoaded, {
once: true }), and in the effect cleanup call
mseAudio.removeEventListener("loadedmetadata", onLoaded) (guarding that mseAudio
still exists and its readyState is still < HAVE_METADATA) so the listener is
removed if the component unmounts or the effect is disposed before the event
fires.
In @js/hang/src/watch/audio/source.ts:
- Around line 44-47: The mseAudioElement getter can hold a stale
HTMLAudioElement when the source is not using MSE; update the code paths that
switch or disable MSE (the places around the #mseAudioElement declaration and
the logic blocks referenced at 92-107) to explicitly clear the private signal by
setting #mseAudioElement.value = undefined whenever leaving the MSE path or when
disposing/replacing the MSE element so the UI/control state cannot reference a
stale element; ensure every branch that creates or assigns an MSE element also
handles the opposite branch to clear the signal.
- Around line 194-236: The per-track forwarding currently uses the global
this.#signals.effect which leaks subscriptions across track switches; change the
two this.#signals.effect(...) calls inside #runMSEPath to use the local effect
(the parameter named effect) so subscriptions are scoped to the current run and
teardown with the run's lifecycle: subscribe to mseSource.audioElement and
mseSource.stats using the passed-in effect and ensure you unregister in
effect.cleanup (or the effect's native unsubscribe API) so you set
this.#mseAudioElement and this.#stats only while that run is active and they are
removed when the effect ends.
- Around line 186-236: The fallback from #runMSEPath to #runWebCodecsPath is
invalid for container === "cmaf" because Frame.Consumer will still emit fMP4
fragments that WebCodecs.AudioDecoder cannot decode; update the catch in
#runMSEPath to avoid calling this.#runWebCodecsPath when config.container ===
"cmaf" (either select/signal a non-CMAF rendition/container before falling back
or abort and surface the error), or implement fMP4 demuxing inside
#runWebCodecsPath so it accepts CMAF fragments (handle fragment parsing to raw
codec frames before feeding to AudioDecoder); ensure changes reference
mseSource.runTrack, SourceMSE, #runWebCodecsPath, and the config.container check
so the fallback only happens when fragments are compatible.
In @js/hang/src/watch/video/source-mse.ts:
- Around line 77-81: The created video element (this.#video) is appended to
document.body before later initialization steps like addSourceBuffer, so if
those fail the element leaks; change the flow to ensure cleanup on any
initialization error by wrapping the init sequence (creation, append,
addSourceBuffer and subsequent setup) in a try/catch/finally or promise chain
that removes the element from the DOM and clears this.#video on failure
(document.body.removeChild(this.#video) and this.#video = null), or
alternatively defer appending until after addSourceBuffer succeeds; ensure the
rejection path for addSourceBuffer and related methods removes the element and
nulls the reference to avoid leaks.
In @js/hang/src/watch/video/source.ts:
- Around line 102-104: The optional callback property setAudioSync is being
assigned but not cleared on effect/component cleanup, which can leave stale
closures called after a track switch; update every place where setAudioSync is
assigned (the initial MSE assignment and the other assignment block) to return a
cleanup that sets setAudioSync to undefined (or removes the assignment) and also
clear it immediately before assigning a new callback when switching tracks so no
stale calls remain. Ensure you update both assignment sites that set
setAudioSync to always null out/undefine the previous callback in their teardown
and before reassigning.
- Around line 216-270: The forwarding effects registered on this.#signals inside
#runMSEPath are not torn down and will leak across track switches; capture each
disposer returned by this.#signals.effect when you create them and register a
cleanup via effect.cleanup that calls each disposer (or store them in an array
and call them all), so that when the spawned effect ends (or mseSource is
closed) those signal forwards are removed; apply this to every
this.#signals.effect call associated with mseSource (the
frame/display/bufferStatus/syncStatus/stats forwarders) and ensure
effect.cleanup also closes mseSource if not already handled.
- Around line 207-214: The fallback from MSE to WebCodecs in #runTrack is broken
because when config.container === "cmaf" the code calls #runWebCodecsPath with
CMAF-fragmented MP4 data (moof/mdat) which VideoDecoder.decode() cannot handle;
either stop falling back for CMAF containers or implement demuxing before
calling #runWebCodecsPath. Fix by detecting config.container === "cmaf" in the
MSE error path inside #runTrack (and the analogous audio path), and either (a)
avoid calling #runWebCodecsPath and surface the error, or (b) update the
fallback to run a demux step that extracts raw codec frames from CMAF fragments
(moof/mdat → raw frames) and pass those frames to the VideoDecoder/AudioDecoder
in #runWebCodecsPath. Ensure references to #runMSEPath, #runWebCodecsPath, and
config.container are used so the change targets the correct logic.
In @rs/hang/src/catalog/audio/mod.rs:
- Around line 66-68: Add #[serde(default)] above the pub container: Container
field in the struct (the Container field in rs/hang/src/catalog/audio/mod.rs) so
deserializing legacy JSON that omits container falls back to the default; if
Container does not already implement Default, implement Default for Container
(returning the "native" variant) or provide a serde default function and
reference it (e.g., #[serde(default = "default_container")]) so the documented
backward-compatible default is applied.
In @rs/hang/src/catalog/video/mod.rs:
- Around line 114-117: The VideoConfig struct's container field lacks the serde
default attribute, so deserializing older catalogs that omit container will
fail; add #[serde(default)] to the container field declaration in VideoConfig so
Serde will call Container::default() (which returns Container::Native) when the
field is missing, keeping backward compatibility with existing serialized data.
In @rs/hang/src/import/fmp4.rs:
- Around line 710-714: The warning in transport_fragment is misleading because
moov_bytes being Some at this point means init() was never called, not that the
init segment wasn't sent; update the tracing::warn! message in
transport_fragment (which checks self.moov_bytes.is_some()) to clearly say
"init() was not called; moov_bytes still present" (or similar), and optionally
adjust the log level if you want this to be an error or debug signal rather than
a warning; keep the check on self.moov_bytes and only change the log text/level
to accurately reflect the condition.
- Around line 99-110: The current decode method captures available_bytes using
buf.chunk(), which only takes the first contiguous slice and will miss data for
multi-chunk buffers; update the logic in decode (the available_bytes creation
when self.passthrough_mode is true) to collect all remaining bytes instead of
just buf.chunk()—for example by reading buf.remaining() into a new Bytes/Vec or
by iterating over chunks and concatenating them—so subsequent slice/offset math
uses a complete copy of the remaining bytes rather than a single chunk.
🧹 Nitpick comments (16)
rs/hang/src/model/track.rs (2)
22-29: Newis_init_segment_groupstate looks fine, but be mindful it’s now part of the producer’s cloning semantics.Since
TrackProduceris#[derive(Clone)], this flag becomes part of whatever clone behavior is relied on elsewhere (especially if clones can write). If only one writer clone is intended, consider documenting that invariant nearTrackProducer(or make cloning/write patterns explicit).Also applies to: 33-40
82-95: Init segment detection should be “first keyframe at timestamp 0”, not just “timestamp 0”.As written, any keyframe-created group at
timestamp == 0becomes “init segment group”. If that can happen beyond the true init segment (even briefly), you’ll incorrectly keep a non-init group open.A low-cost guard is to compute
is_first_keyframebefore you mutateself.keyframe, and only setis_init_segment_groupwhen it’s the first keyframe.Proposed tweak (guard init detection)
pub fn write(&mut self, frame: Frame) -> Result<(), Error> { tracing::trace!(?frame, "write frame"); + let is_first_keyframe = self.keyframe.is_none(); + let mut header = BytesMut::new(); frame.timestamp.as_micros().encode(&mut header, lite::Version::Draft02); @@ let mut group = match self.group.take() { Some(group) => group, None if frame.keyframe => { let new_group = self.inner.append_group(); // Log when creating a new group, especially for init segment (timestamp 0) - if frame.timestamp.as_micros() == 0 { + if is_first_keyframe && frame.timestamp.as_micros() == 0 { tracing::info!(timestamp = ?frame.timestamp, "creating new group for init segment (timestamp 0)"); // Mark this as the init segment group so we can keep it open self.is_init_segment_group = true; } else { tracing::info!(timestamp = ?frame.timestamp, "creating new group for keyframe"); } new_group }rs/hang-cli/src/publish.rs (1)
52-54: Clarify the logging message.The message "HLS publish preserving original container format." is logged unconditionally, but the passthrough mode may be disabled. Consider making the message conditional or rephrasing it to reflect the actual state.
💡 Proposed improvement
- tracing::info!( - passthrough = *passthrough, - "HLS publish preserving original container format." - ); + tracing::info!( + passthrough = *passthrough, + "HLS publish configured with passthrough mode." + );Or make it conditional:
- tracing::info!( - passthrough = *passthrough, - "HLS publish preserving original container format." - ); + if *passthrough { + tracing::info!("HLS publish preserving original CMAF container format (passthrough mode)."); + } else { + tracing::info!("HLS publish decomposing CMAF fragments to native frames."); + }rs/hang/src/catalog/container.rs (1)
3-18: Consider makingContainerforward-compatible (#[non_exhaustive]) and simplifying serde renames.Adding future variants will otherwise break downstream exhaustive matches; also
rename_all = "camelCase"already yields"native" | "raw" | "cmaf"so explicit per-variant renames are redundant.Proposed diff
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] pub enum Container { - #[serde(rename = "native")] #[default] Native, - #[serde(rename = "raw")] Raw, - #[serde(rename = "cmaf")] Cmaf, }js/hang/src/util/mime.ts (1)
32-52: Optionally refuse to build MP4 MIME types unlesscontainer === "cmaf". (Prevents accidental MSE attempts for native/raw.)Proposed diff
export function buildVideoMimeType(config: Catalog.VideoConfig): string | undefined { + if (config.container !== "cmaf") return undefined; const mimeType = buildMimeType(config.codec, "video"); if (isMimeTypeSupported(mimeType)) { return mimeType; } return undefined; } export function buildAudioMimeType(config: Catalog.AudioConfig): string | undefined { + if (config.container !== "cmaf") return undefined; const mimeType = buildMimeType(config.codec, "audio"); if (isMimeTypeSupported(mimeType)) { return mimeType; } return undefined; }rs/hang/src/import/hls.rs (3)
27-48:HlsConfig::new()should ideally assert new fields in the unit test (incl.passthrough: false).Also applies to: 41-48
177-209: Init path should also callensure_init_segment(...)(like the steady-state path) to avoid extra playlist fetches insidepush_segment.Proposed diff
async fn consume_segments_limited( &mut self, kind: TrackKind, track: &mut TrackState, playlist: &MediaPlaylist, max_segments: usize, ) -> anyhow::Result<usize> { + self.ensure_init_segment(kind, track, playlist).await?; + // Calculate segments to process let next_seq = track.next_sequence.unwrap_or(0);Also applies to: 322-364
465-503:push_segmentdoing a playlist refetch to initialize is surprising and adds network/file I/O to the “hot path”.Proposed diff (after making `consume_segments_limited` call `ensure_init_segment`)
let mut bytes = self.fetch_bytes(url).await?; - // Ensure the importer is initialized before processing fragments - // Use track.init_ready to avoid borrowing issues - if !track.init_ready { - // Try to ensure init segment is processed - let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; - self.ensure_init_segment(kind, track, &playlist).await?; - } - // Get importer after ensuring init segment let importer = match kind { TrackKind::Video(index) => self.ensure_video_importer_for(index), TrackKind::Audio => self.ensure_audio_importer(), }; - // Final check after ensuring init segment + // Final check if !importer.is_initialized() { return Err(anyhow::anyhow!( "importer not initialized for {:?} after ensure_init_segment - init segment processing failed", kind )); }js/hang/src/watch/audio/source-mse.ts (2)
40-88:initialize()assumes DOM + MSE availability; consider failing fast with a clearer error when unavailable.
251-275:hasMoovAtomMP4 box parsing is fragile (32-bit signed bitwise; no extended-size handling).Proposed diff (unsigned size)
- const size = - (data[offset] << 24) | (data[offset + 1] << 16) | (data[offset + 2] << 8) | data[offset + 3]; + const size = + (((data[offset] << 24) | + (data[offset + 1] << 16) | + (data[offset + 2] << 8) | + data[offset + 3]) >>> + 0);rs/hang/src/import/fmp4.rs (3)
314-319: Logging atinfolevel may be too verbose for production.Verifying moov atom signature at
infolevel on every init segment could be noisy. Consider usingdebuglevel for this verification step.- tracing::info!(atom_type = %atom_type, "verifying moov atom signature in init segment"); + tracing::debug!(atom_type = %atom_type, "verifying moov atom signature in init segment");
200-207: Excessiveinfo-level logging in hot path.Multiple
tracing::info!calls occur per fragment (lines 200-207, 788, 796, 815-817). For a live video stream at 30fps with 2-second GOPs, this would produce 15+ info log lines per second. Consider demoting todebuglevel.Reduce log verbosity in hot path
- tracing::info!( + tracing::debug!( moof_size = moof_bytes.len(), mdat_size = mdat_bytes.len(), total_fragment_size = fragment.len(), "processing CMAF fragment (moof+mdat)" ); self.transport_fragment(fragment, moof)?; - tracing::info!("finished processing CMAF fragment, ready for next fragment"); + tracing::debug!("finished processing CMAF fragment");Similarly for lines 788, 796, 815-817 in
transport_fragment.Also applies to: 788-788, 796-796, 815-817
376-380: Repetitive container assignment pattern could be extracted.The same
if passthrough_mode { Container::Cmaf } else { Container::Native }pattern is repeated 7 times across different codec configurations. Consider extracting to a helper.Extract helper function
fn container_for_mode(passthrough_mode: bool) -> Container { if passthrough_mode { Container::Cmaf } else { Container::Native } }Then use
container: container_for_mode(passthrough_mode)in each config.Also applies to: 396-400, 427-431, 464-468, 508-512, 545-549, 559-563
js/hang/src/watch/video/source-mse.ts (3)
83-83: Empty event handler serves no purpose.The
waitingevent listener has an empty callback. Either add meaningful handling or remove it.- this.#video.addEventListener("waiting", () => {}); + // Remove if no handling needed, or add meaningful logic: + // this.#video.addEventListener("waiting", () => { + // this.bufferStatus.set({ state: "empty" }); + // });
504-515: RedundantendOfStreamcall.
endOfStream()is called in theifblock (line 510) and again in the subsequent block (line 520) whenreadyState === "open". If the first call succeeds, the second block's condition will be false. If the first call fails (caught), the state may have changed. Consider consolidating.Consolidate cleanup
- if (this.#sourceBuffer && this.#mediaSource) { - try { - if (this.#sourceBuffer.updating) { - this.#sourceBuffer.abort(); - } - if (this.#mediaSource.readyState === "open") { - this.#mediaSource.endOfStream(); - } - } catch (error) { - console.error("Error closing SourceBuffer:", error); - } - } - - if (this.#mediaSource) { - try { - if (this.#mediaSource.readyState === "open") { - this.#mediaSource.endOfStream(); - } - URL.revokeObjectURL(this.#video?.src || ""); - } catch (error) { - console.error("Error closing MediaSource:", error); - } - } + try { + if (this.#sourceBuffer?.updating) { + this.#sourceBuffer.abort(); + } + if (this.#mediaSource?.readyState === "open") { + this.#mediaSource.endOfStream(); + } + } catch (error) { + console.error("Error closing MediaSource/SourceBuffer:", error); + } + + if (this.#video?.src) { + URL.revokeObjectURL(this.#video.src); + }
517-526: Potential issue:revokeObjectURLwith empty string.If
this.#video?.srcis empty or undefined, passing an empty string toURL.revokeObjectURL("")is a no-op but could indicate the URL was already revoked or never set properly.- URL.revokeObjectURL(this.#video?.src || ""); + if (this.#video?.src) { + URL.revokeObjectURL(this.#video.src); + }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (27)
js/hang/src/catalog/audio.tsjs/hang/src/catalog/container.tsjs/hang/src/catalog/video.tsjs/hang/src/container/codec.tsjs/hang/src/frame.tsjs/hang/src/util/mime.tsjs/hang/src/watch/audio/emitter.tsjs/hang/src/watch/audio/source-mse.tsjs/hang/src/watch/audio/source.tsjs/hang/src/watch/broadcast.tsjs/hang/src/watch/video/source-mse.tsjs/hang/src/watch/video/source.tsjustfilers/hang-cli/src/publish.rsrs/hang/examples/video.rsrs/hang/src/catalog/audio/mod.rsrs/hang/src/catalog/container.rsrs/hang/src/catalog/mod.rsrs/hang/src/catalog/root.rsrs/hang/src/catalog/video/mod.rsrs/hang/src/import/aac.rsrs/hang/src/import/avc3.rsrs/hang/src/import/fmp4.rsrs/hang/src/import/hev1.rsrs/hang/src/import/hls.rsrs/hang/src/import/opus.rsrs/hang/src/model/track.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (CLAUDE.md)
In Rust crates, tests should be integrated within source files using inline test modules
Files:
rs/hang/src/catalog/mod.rsrs/hang/src/import/hev1.rsrs/hang-cli/src/publish.rsrs/hang/src/import/opus.rsrs/hang/src/catalog/container.rsrs/hang/examples/video.rsrs/hang/src/import/hls.rsrs/hang/src/model/track.rsrs/hang/src/catalog/video/mod.rsrs/hang/src/import/avc3.rsrs/hang/src/catalog/audio/mod.rsrs/hang/src/import/aac.rsrs/hang/src/catalog/root.rsrs/hang/src/import/fmp4.rs
🧬 Code graph analysis (15)
rs/hang/src/import/opus.rs (1)
js/hang/src/catalog/container.ts (1)
Container(12-12)
rs/hang/src/catalog/container.rs (1)
js/hang/src/catalog/container.ts (1)
Container(12-12)
js/hang/src/util/mime.ts (1)
js/hang/src/catalog/video.ts (1)
VideoConfig(101-101)
js/hang/src/catalog/container.ts (1)
js/hang/src/catalog/index.ts (1)
DEFAULT_CONTAINER(5-5)
js/hang/src/watch/audio/source-mse.ts (2)
js/hang/src/watch/video/source-mse.ts (2)
SourceMSE(30-540)fragments(256-270)js/signals/src/index.ts (1)
Getter(13-22)
rs/hang/src/catalog/video/mod.rs (1)
js/hang/src/catalog/container.ts (1)
Container(12-12)
rs/hang/src/import/avc3.rs (1)
js/hang/src/catalog/container.ts (1)
Container(12-12)
js/hang/src/watch/video/source.ts (2)
js/hang/src/catalog/video.ts (1)
VideoConfig(101-101)js/hang/src/watch/video/source-mse.ts (2)
SourceMSE(30-540)stats(537-539)
rs/hang/src/catalog/audio/mod.rs (1)
js/hang/src/catalog/container.ts (1)
Container(12-12)
js/hang/src/frame.ts (2)
rs/hang/src/catalog/root.rs (1)
next(250-270)rs/moq-relay/src/web.rs (1)
next(304-317)
rs/hang/src/import/aac.rs (1)
js/hang/src/catalog/container.ts (1)
Container(12-12)
js/hang/src/watch/video/source-mse.ts (4)
js/hang-ui/src/Components/stats/types.ts (2)
BufferStatus(29-31)SyncStatus(21-24)js/hang/src/watch/video/source.ts (9)
VideoStats(41-45)effect(135-158)effect(160-176)effect(178-205)effect(207-214)effect(216-270)effect(272-413)effect(451-471)effect(473-483)js/hang/src/watch/audio/source-mse.ts (2)
SourceMSE(17-377)fragments(125-139)js/hang/src/publish/priority.ts (1)
PRIORITY(3-11)
js/hang/src/watch/audio/emitter.ts (1)
js/hang/src/watch/audio/source.ts (5)
effect(92-146)effect(148-158)effect(160-192)effect(194-236)effect(238-296)
rs/hang/src/catalog/root.rs (5)
js/hang/src/publish/broadcast.ts (1)
track(120-139)js/hang/src/watch/broadcast.ts (1)
catalog(147-164)js/hang/src/catalog/audio.ts (1)
AudioConfig(60-60)js/hang/src/catalog/container.ts (1)
Container(12-12)js/hang/src/catalog/video.ts (1)
VideoConfig(101-101)
rs/hang/src/import/fmp4.rs (4)
js/hang/src/catalog/container.ts (1)
Container(12-12)rs/hang/src/import/aac.rs (2)
decode(127-146)new(15-21)rs/hang/src/import/opus.rs (1)
decode(73-92)js/hang/src/frame.ts (3)
decode(41-45)decode(240-275)Frame(12-17)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check
🔇 Additional comments (31)
rs/hang/src/model/track.rs (1)
57-69: Fix the assumption: dropping the group closes the watch channel, preventing late subscribers from receiving the init segment.When
drop(group)is called, thewatch::Sender<GroupState>is dropped, which closes the underlying channel. Late subscribers that callconsume()will have an already-closed receiver, and any consumer attempting to read will encounterError::Cancel(line 212-213 ofgroup.rs) instead of gracefully reading existing frames.To keep the init segment group truly open for new subscribers, explicitly call
group.close()to setstate.closed = Some(Ok(())), allowing consumers to gracefully finish reading and getOk(None)rather thanError::Cancel. Dropping without explicit closure defeats the intent.js/hang/src/catalog/video.ts (1)
17-17: LGTM: Documentation update aligns with container refactoring.The comment correctly reflects the new default container value.
js/hang/src/catalog/audio.ts (1)
18-18: LGTM: Documentation update aligns with container refactoring.Consistent with the video.ts changes.
js/hang/src/container/codec.ts (4)
14-21: Case label updated correctly.The "legacy" → "native" rename is consistent with the container enum changes.
37-52: Case label updated; verify VarInt usage for CMAF decode.The "legacy" → "native" rename is consistent. However, the CMAF decode logic uses VarInt, which differs from the U64 encoding. See concerns raised in the encodeTimestamp review.
65-70: Case label updated correctly.The "legacy" → "native" rename and CMAF size (8 bytes) are consistent with VarInt maximum size.
18-21: The CMAF encode/decode asymmetry is intentional and safe.The code comment (lines 46-49) clarifies the design: CMAF is used exclusively for receiving frames via Media Source Extensions (MSE). The server sends VarInt-encoded timestamp headers, which the client correctly decodes using
decodeVarInt. The client never encodes with CMAF—it uses the default "native" container format for publishing (seejs/hang/src/publish/video/encoder.tsandjs/hang/src/publish/audio/encoder.ts). Frame.Consumer decodes CMAF data received from the server, while Frame.encode is never called withcontainer="cmaf".The actual media timestamp is extracted by MSE from the moof atom, so the timestamp header asymmetry poses no practical issue.
Minor clarification needed: The
getTimestampSizecomment for CMAF (line 70) states "VarInt maximum size" but should reflect that encoding produces fixed 8-byte output.js/hang/src/frame.ts (4)
118-120: LGTM: Improved control flow clarity.Converting the single-line early exit to an explicit block improves readability and debugging.
152-154: LGTM: Consistent guard pattern.The explicit block aligns with the pattern established on lines 118-120.
248-250: LGTM: Explicit return improves clarity.Wrapping the conditional return in a block is consistent with the other control flow improvements in this file.
265-267: LGTM: Explicit promise resolution.Converting the concise arrow to a block statement improves consistency with the rest of the file.
js/hang/src/catalog/container.ts (1)
10-10: This review comment is incorrect. Thecontainer.tsfile is newly created, not modified from a previous version containing["legacy", "raw", "fmp4"]. Git history confirms it was added as a new file with the current enum values["native", "raw", "cmaf"].Additionally, no references to
"legacy"or"fmp4"as container enum values exist in the codebase. The code already implements backward compatibility through the.default(DEFAULT_CONTAINER)pattern whereDEFAULT_CONTAINER = "native", ensuring existing catalogs that lack a container field will work without validation failures.Likely an incorrect or invalid review comment.
js/hang/src/watch/audio/emitter.ts (3)
48-53: LGTM!The enabled state is now correctly computed from both
pausedandmutedsignals, and the logic correctly setsenabled = !paused && !mutedfor the source.
61-96: MSE path handling is well-structured.The bifurcation between MSE and WebCodecs paths is clean. The early return at line 95 correctly prevents the WebCodecs path setup when MSE is active. The direct HTMLAudioElement control for volume, mute, and play/pause is appropriate for MSE-based playback.
118-133: LGTM!The guard at line 121 correctly skips gain transitions when no GainNode exists (MSE path), preventing errors when the WebCodecs path isn't active.
rs/hang/src/import/hev1.rs (1)
65-65: LGTM!Setting
container: Nativeis correct for the H.265 inline SPS/PPS import path. This aligns with the container format documentation wherenativeindicates QUIC VarInt + raw frames.rs/hang/src/import/avc3.rs (1)
65-65: LGTM!Setting
container: Nativeis correct for the H.264 inline SPS/PPS import path, consistent with the H.265 handler.rs/hang/src/catalog/mod.rs (1)
10-10: LGTM!The new
containermodule is properly declared and re-exported, following the established pattern for other catalog submodules.Also applies to: 19-19
rs/hang/src/import/aac.rs (1)
110-110: LGTM!Setting
container: Nativeis correct for the AAC audio import path, consistent with the video codec handlers.js/hang/src/watch/broadcast.ts (1)
67-72: No action needed. ThesetAudioSyncmethod is explicitly typed to acceptHTMLAudioElement | undefinedand correctly handles undefined values by storing the value and resetting the sync timer. The code is working as designed.Likely an incorrect or invalid review comment.
rs/hang/examples/video.rs (1)
73-73: LGTM!The container field is correctly initialized with
Container::Native, demonstrating proper usage of the new field in video configuration.rs/hang/src/import/opus.rs (1)
56-56: LGTM!The container field is correctly set to
Container::Native, consistent with the native frame encoding used by the Opus importer.rs/hang/src/catalog/root.rs (1)
209-222: LGTM!The logging additions provide useful visibility into container field values during catalog publishing, which will be helpful for debugging container-aware handling.
rs/hang/src/import/hls.rs (1)
526-544: Passthrough propagation to per-track importers looks good.js/hang/src/watch/video/source.ts (1)
27-33:RequiredDecoderConfigchange and “strip codedWidth/Height to avoid reloads” looks reasonable.Also applies to: 173-176
js/hang/src/watch/audio/source-mse.ts (1)
343-376: Cleanup looks solid (abort SourceBuffer, end MediaSource, remove element).rs/hang/src/import/fmp4.rs (2)
234-341: LGTM: Init segment handling logic is well-structured.The init method correctly:
- Captures passthrough mode before locking catalog
- Creates tracks before sending init segments
- Assembles ftyp+moov correctly
- Stores init segment for keyframe distribution
- Sends to all tracks with appropriate logging
780-818: Each track receives the complete moof+mdat containing all tracks' data.In passthrough mode, the same
fragmentbytes (the complete moof+mdat) are written to every track in the moof via the loop at line 728. This means each track's stream contains the full fragment with data for all tracks, not just per-track data.This appears to be the intended design of
transport_fragment()(line 710), which is documented as transporting "complete CMAF fragments directly without decomposing." However, verify that sending full multi-track moof+mdat to individual track SourceBuffers is the correct MSE usage pattern, particularly whether the consumer SourceBuffer correctly parses and uses only its track's data from the multi-track moof.js/hang/src/watch/video/source-mse.ts (3)
188-237: LGTM: Frame capture implementation is solid.Good use of
requestVideoFrameCallbackwithrequestAnimationFramefallback. Proper VideoFrame timestamp conversion and cleanup of previous frames viaupdatecallback.
399-425: LGTM: Init segment detection is robust.The
isInitSegmentDatafunction correctly parses MP4 box structure and handles both ftyp+moov and moov-only init segments. Proper bounds checking prevents out-of-bounds reads.
256-270: LGTM: Fragment concatenation is efficient.Correctly handles single-fragment optimization and properly calculates total size for multi-fragment concatenation.
| * - "native": Uses QUIC VarInt encoding (1-8 bytes, variable length) | ||
| * - "raw": Uses fixed u64 encoding (8 bytes, big-endian) | ||
| * - "fmp4": Fragmented MP4 container (future) | ||
| * - "cmaf": Fragmented MP4 container (future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update stale comment about CMAF availability.
The comment indicates CMAF is "(future)", but the implementation in js/hang/src/container/codec.ts (lines 18-21, 45-52) and usage in MSE source files show it's already active.
📝 Suggested fix
-- "cmaf": Fragmented MP4 container (future)
+- "cmaf": Fragmented MP4 container with moof+mdat fragments📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| * - "cmaf": Fragmented MP4 container (future) | |
| * - "cmaf": Fragmented MP4 container with moof+mdat fragments |
🤖 Prompt for AI Agents
In @js/hang/src/catalog/container.ts at line 8, Update the stale inline comment
entry for "cmaf" in the container list so it no longer says "(future)"; locate
the "cmaf" item in js/hang/src/catalog/container.ts and change its annotation to
reflect that CMAF is supported/active (consistent with the implementation in
container/codec.ts and MSE usage).
| export function isMimeTypeSupported(mimeType: string): boolean { | ||
| return MediaSource.isTypeSupported(mimeType); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate the file and understand its context
find . -type f -name "mime.ts" | head -20Repository: moq-dev/moq
Length of output: 81
🏁 Script executed:
# Search for imports of isMimeTypeSupported to see where it's used
rg "isMimeTypeSupported" --type ts --type js -B 2 -A 2Repository: moq-dev/moq
Length of output: 1004
🏁 Script executed:
# Check if this module is imported in test files or SSR contexts
rg "from.*mime|import.*mime" --type ts --type js | grep -E "(test|spec|ssr|server)" | head -20Repository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Check the overall structure of the mime.ts file
cat -n js/hang/src/util/mime.tsRepository: moq-dev/moq
Length of output: 2076
🏁 Script executed:
# Search for other uses of MediaSource in the codebase
rg "MediaSource" --type ts --type js -B 1 -A 1Repository: moq-dev/moq
Length of output: 5428
🏁 Script executed:
# Check if mime.ts or any functions from it are imported in test files
find . -type f \( -name "*.test.ts" -o -name "*.spec.ts" -o -name "*.test.js" -o -name "*.spec.js" \) | xargs grep -l "mime" 2>/dev/null || echo "No test files found importing mime utilities"Repository: moq-dev/moq
Length of output: 99
🏁 Script executed:
# Check the project structure to understand if this is browser-only or if it can be run in Node
head -30 package.jsonRepository: moq-dev/moq
Length of output: 442
🏁 Script executed:
# Look for any SSR or Node-related imports or configurations
find . -name "*.config.*" -o -name "tsconfig*.json" | head -10Repository: moq-dev/moq
Length of output: 335
🏁 Script executed:
# Check if there are any existing guards for browser APIs in the codebase
rg "typeof.*!==.*undefined|globalThis\.|typeof.*global" --type ts --type js | head -20Repository: moq-dev/moq
Length of output: 2011
🏁 Script executed:
# Check where buildVideoMimeType and buildAudioMimeType are used
rg "buildVideoMimeType|buildAudioMimeType" --type ts --type js -B 2 -A 2Repository: moq-dev/moq
Length of output: 1585
🏁 Script executed:
# Check if mime.ts is a browser-only utility or if it might be loaded in Node contexts
find . -type f \( -name "package.json" -o -name "tsconfig.json" \) -path "*/js/hang/*" | xargs cat | grep -E "target|module|browser|exports" | head -20Repository: moq-dev/moq
Length of output: 87
🏁 Script executed:
# Check the vite and vitest configs to understand the build target
cat js/hang-demo/vite.config.tsRepository: moq-dev/moq
Length of output: 724
🏁 Script executed:
# Double-check if there's explicit browser/non-browser context configuration
cat js/hang/package.json | head -50Repository: moq-dev/moq
Length of output: 1428
Guard MediaSource access to prevent crashes in non-browser contexts.
The function directly accesses MediaSource.isTypeSupported() without checking if MediaSource exists. Since this utility is publicly exported and could be imported in non-browser environments (Node.js, tests, SSR), it will throw ReferenceError: MediaSource is not defined. The codebase already uses this guard pattern extensively for browser APIs (e.g., WebTransport, AudioContext, CaptureController).
Proposed fix
export function isMimeTypeSupported(mimeType: string): boolean {
- return MediaSource.isTypeSupported(mimeType);
+ return typeof MediaSource !== "undefined" && MediaSource.isTypeSupported(mimeType);
}🤖 Prompt for AI Agents
In @js/hang/src/util/mime.ts around lines 22 - 24, isMimeTypeSupported currently
calls MediaSource.isTypeSupported directly and will throw in non-browser
environments; guard access by checking that typeof MediaSource !== "undefined"
and that MediaSource.isTypeSupported is a function before calling it, returning
false when the guard fails; update the isMimeTypeSupported function to perform
this safe check so imports in Node/SSR/tests do not trigger ReferenceError.
| // Try to play if we have metadata (HAVE_METADATA = 1), browser will start when ready | ||
| if (mseAudio.readyState >= HTMLMediaElement.HAVE_METADATA) { | ||
| tryPlay(); | ||
| } else { | ||
| // Wait for loadedmetadata event if not ready yet | ||
| mseAudio.addEventListener("loadedmetadata", tryPlay, { once: true }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential memory leak: event listener not cleaned up.
When mseAudio.readyState < HAVE_METADATA, a loadedmetadata event listener is added with { once: true }. However, if the effect is cleaned up before the event fires (e.g., the component unmounts or the effect re-runs), the listener remains attached to the element.
Consider cleaning up the event listener in the effect's cleanup phase:
Proposed fix
// Try to play if we have metadata (HAVE_METADATA = 1), browser will start when ready
if (mseAudio.readyState >= HTMLMediaElement.HAVE_METADATA) {
tryPlay();
} else {
// Wait for loadedmetadata event if not ready yet
mseAudio.addEventListener("loadedmetadata", tryPlay, { once: true });
+ effect.cleanup(() => mseAudio.removeEventListener("loadedmetadata", tryPlay));
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @js/hang/src/watch/audio/emitter.ts around lines 86 - 92, When readyState is
below HTMLMediaElement.HAVE_METADATA you add a loadedmetadata listener via
mseAudio.addEventListener("loadedmetadata", tryPlay, { once: true }) but never
remove it if the effect unmounts or re-runs; update the effect to capture the
handler reference (e.g., const onLoaded = tryPlay), attach with
mseAudio.addEventListener("loadedmetadata", onLoaded, { once: true }), and in
the effect cleanup call mseAudio.removeEventListener("loadedmetadata", onLoaded)
(guarding that mseAudio still exists and its readyState is still <
HAVE_METADATA) so the listener is removed if the component unmounts or the
effect is disposed before the event fires.
| async appendFragment(fragment: Uint8Array): Promise<void> { | ||
| if (!this.#sourceBuffer || !this.#mediaSource) { | ||
| throw new Error("SourceBuffer not initialized"); | ||
| } | ||
|
|
||
| // Don't queue fragments if MediaSource is closed | ||
| if (this.#mediaSource.readyState === "closed") { | ||
| return; | ||
| } | ||
|
|
||
| if (this.#appendQueue.length >= SourceMSE.MAX_QUEUE_SIZE) { | ||
| const discarded = this.#appendQueue.shift(); | ||
| console.warn( | ||
| `[MSE Audio] Queue full (${SourceMSE.MAX_QUEUE_SIZE}), discarding oldest fragment (${discarded?.byteLength ?? 0} bytes)`, | ||
| ); | ||
| } | ||
|
|
||
| const copy = new Uint8Array(fragment); | ||
| this.#appendQueue.push(copy); | ||
|
|
||
| this.#processAppendQueue(); | ||
| } | ||
|
|
||
| #concatenateFragments(fragments: Uint8Array[]): Uint8Array { | ||
| if (fragments.length === 1) { | ||
| return fragments[0]; | ||
| } | ||
|
|
||
| const totalSize = fragments.reduce((sum, frag) => sum + frag.byteLength, 0); | ||
| const result = new Uint8Array(totalSize); | ||
| let offset = 0; | ||
| for (const fragment of fragments) { | ||
| result.set(fragment, offset); | ||
| offset += fragment.byteLength; | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| #processAppendQueue(): void { | ||
| if (!this.#sourceBuffer || this.#sourceBuffer.updating || this.#appendQueue.length === 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (this.#mediaSource?.readyState !== "open") { | ||
| return; | ||
| } | ||
|
|
||
| const fragment = this.#appendQueue.shift(); | ||
| if (!fragment) return; | ||
|
|
||
| try { | ||
| // appendBuffer accepts BufferSource (ArrayBuffer or ArrayBufferView) | ||
| this.#sourceBuffer.appendBuffer(fragment as BufferSource); | ||
|
|
||
| this.#stats.update((current) => ({ | ||
| bytesReceived: (current?.bytesReceived ?? 0) + fragment.byteLength, | ||
| })); | ||
| } catch (error) { | ||
| console.error("[MSE Audio] Error appending fragment:", error); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
wc -l js/hang/src/watch/audio/source-mse.tsRepository: moq-dev/moq
Length of output: 96
🏁 Script executed:
cat -n js/hang/src/watch/audio/source-mse.tsRepository: moq-dev/moq
Length of output: 13085
🏁 Script executed:
rg "updateend|MAX_QUEUE_SIZE|#processAppendQueue" js/hang/src/watch/audio/source-mse.tsRepository: moq-dev/moq
Length of output: 431
🏁 Script executed:
rg "appendFragment" js/hang/src/watch/audio/ -B 2 -A 2Repository: moq-dev/moq
Length of output: 2383
Append queue silently drops fragments when full; implement backpressure to prevent data loss.
The code discards the oldest fragment when the queue reaches MAX_QUEUE_SIZE (10) without pausing upstream processing. While an updateend listener exists to continue draining the queue, appendFragment() returns immediately without waiting, providing no backpressure to callers. When CMAF decoding outpaces SourceBuffer.appendBuffer, fragments are permanently lost rather than being throttled.
Make appendFragment() wait for the queue to drain below capacity before returning, or pause frame consumption at the source when the queue reaches a threshold. Consider sourceBuffer.remove() for long-running playback to avoid unbounded memory growth.
| // For MSE path, expose the HTMLAudioElement for direct control | ||
| #mseAudioElement = new Signal<HTMLAudioElement | undefined>(undefined); | ||
| readonly mseAudioElement = this.#mseAudioElement as Getter<HTMLAudioElement | undefined>; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear mseAudioElement when not on the MSE path to avoid stale UI/control state.
Proposed diff
#runDecoder(effect: Effect): void {
const enabled = effect.get(this.enabled);
if (!enabled) {
return;
}
+ // Default to “no MSE element” unless we’re actively on the MSE path.
+ effect.set(this.#mseAudioElement, undefined);
const catalog = effect.get(this.catalog);Also applies to: 92-107
🤖 Prompt for AI Agents
In @js/hang/src/watch/audio/source.ts around lines 44 - 47, The mseAudioElement
getter can hold a stale HTMLAudioElement when the source is not using MSE;
update the code paths that switch or disable MSE (the places around the
#mseAudioElement declaration and the logic blocks referenced at 92-107) to
explicitly clear the private signal by setting #mseAudioElement.value =
undefined whenever leaving the MSE path or when disposing/replacing the MSE
element so the UI/control state cannot reference a stale element; ensure every
branch that creates or assigns an MSE element also handles the opposite branch
to clear the signal.
| #runMSEPath(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { | ||
| console.log("[Video Stream] Subscribing to track", { | ||
| name, | ||
| codec: config.codec, | ||
| container: config.container, | ||
| width: config.codedWidth, | ||
| height: config.codedHeight, | ||
| }); | ||
| // Import MSE source dynamically to avoid loading if not needed | ||
| effect.spawn(async () => { | ||
| const { SourceMSE } = await import("./source-mse.js"); | ||
| const mseSource = new SourceMSE(this.latency); | ||
| effect.cleanup(() => mseSource.close()); | ||
|
|
||
| // Forward signals using effects | ||
| this.#signals.effect((eff) => { | ||
| const frame = eff.get(mseSource.frame); | ||
| eff.set(this.frame, frame); | ||
| }); | ||
|
|
||
| this.#signals.effect((eff) => { | ||
| const display = eff.get(mseSource.display); | ||
| eff.set(this.display, display); | ||
| }); | ||
|
|
||
| this.#signals.effect((eff) => { | ||
| const status = eff.get(mseSource.bufferStatus); | ||
| eff.set(this.bufferStatus, status, { state: "empty" }); | ||
| }); | ||
|
|
||
| this.#signals.effect((eff) => { | ||
| const status = eff.get(mseSource.syncStatus); | ||
| eff.set(this.syncStatus, status, { state: "ready" }); | ||
| }); | ||
|
|
||
| this.#signals.effect((eff) => { | ||
| const stats = eff.get(mseSource.stats); | ||
| eff.set(this.#stats, stats); | ||
| }); | ||
|
|
||
| // Expose method to set audio element for synchronization | ||
| this.setAudioSync = (audioElement: HTMLAudioElement | undefined) => { | ||
| mseSource.setAudioSync(audioElement); | ||
| }; | ||
|
|
||
| // Run MSE track | ||
| try { | ||
| await mseSource.runTrack(effect, broadcast, name, config); | ||
| } catch (error) { | ||
| console.error("MSE path error, falling back to WebCodecs:", error); | ||
| // Fallback to WebCodecs | ||
| this.#runWebCodecsPath(effect, broadcast, name, config); | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MSE path forwarding registered on this.#signals likely leaks across track switches; scope it to the current run.
Proposed diff (scope forwarding)
effect.spawn(async () => {
const { SourceMSE } = await import("./source-mse.js");
const mseSource = new SourceMSE(this.latency);
effect.cleanup(() => mseSource.close());
+ const forward = new Effect();
+ effect.cleanup(() => forward.close());
+
// Forward signals using effects
- this.#signals.effect((eff) => {
+ forward.effect((eff) => {
const frame = eff.get(mseSource.frame);
eff.set(this.frame, frame);
});
- this.#signals.effect((eff) => {
+ forward.effect((eff) => {
const display = eff.get(mseSource.display);
eff.set(this.display, display);
});
- this.#signals.effect((eff) => {
+ forward.effect((eff) => {
const status = eff.get(mseSource.bufferStatus);
eff.set(this.bufferStatus, status, { state: "empty" });
});
- this.#signals.effect((eff) => {
+ forward.effect((eff) => {
const status = eff.get(mseSource.syncStatus);
eff.set(this.syncStatus, status, { state: "ready" });
});
- this.#signals.effect((eff) => {
+ forward.effect((eff) => {
const stats = eff.get(mseSource.stats);
eff.set(this.#stats, stats);
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #runMSEPath(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { | |
| console.log("[Video Stream] Subscribing to track", { | |
| name, | |
| codec: config.codec, | |
| container: config.container, | |
| width: config.codedWidth, | |
| height: config.codedHeight, | |
| }); | |
| // Import MSE source dynamically to avoid loading if not needed | |
| effect.spawn(async () => { | |
| const { SourceMSE } = await import("./source-mse.js"); | |
| const mseSource = new SourceMSE(this.latency); | |
| effect.cleanup(() => mseSource.close()); | |
| // Forward signals using effects | |
| this.#signals.effect((eff) => { | |
| const frame = eff.get(mseSource.frame); | |
| eff.set(this.frame, frame); | |
| }); | |
| this.#signals.effect((eff) => { | |
| const display = eff.get(mseSource.display); | |
| eff.set(this.display, display); | |
| }); | |
| this.#signals.effect((eff) => { | |
| const status = eff.get(mseSource.bufferStatus); | |
| eff.set(this.bufferStatus, status, { state: "empty" }); | |
| }); | |
| this.#signals.effect((eff) => { | |
| const status = eff.get(mseSource.syncStatus); | |
| eff.set(this.syncStatus, status, { state: "ready" }); | |
| }); | |
| this.#signals.effect((eff) => { | |
| const stats = eff.get(mseSource.stats); | |
| eff.set(this.#stats, stats); | |
| }); | |
| // Expose method to set audio element for synchronization | |
| this.setAudioSync = (audioElement: HTMLAudioElement | undefined) => { | |
| mseSource.setAudioSync(audioElement); | |
| }; | |
| // Run MSE track | |
| try { | |
| await mseSource.runTrack(effect, broadcast, name, config); | |
| } catch (error) { | |
| console.error("MSE path error, falling back to WebCodecs:", error); | |
| // Fallback to WebCodecs | |
| this.#runWebCodecsPath(effect, broadcast, name, config); | |
| } | |
| }); | |
| } | |
| #runMSEPath(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { | |
| console.log("[Video Stream] Subscribing to track", { | |
| name, | |
| codec: config.codec, | |
| container: config.container, | |
| width: config.codedWidth, | |
| height: config.codedHeight, | |
| }); | |
| // Import MSE source dynamically to avoid loading if not needed | |
| effect.spawn(async () => { | |
| const { SourceMSE } = await import("./source-mse.js"); | |
| const mseSource = new SourceMSE(this.latency); | |
| effect.cleanup(() => mseSource.close()); | |
| const forward = new Effect(); | |
| effect.cleanup(() => forward.close()); | |
| // Forward signals using effects | |
| forward.effect((eff) => { | |
| const frame = eff.get(mseSource.frame); | |
| eff.set(this.frame, frame); | |
| }); | |
| forward.effect((eff) => { | |
| const display = eff.get(mseSource.display); | |
| eff.set(this.display, display); | |
| }); | |
| forward.effect((eff) => { | |
| const status = eff.get(mseSource.bufferStatus); | |
| eff.set(this.bufferStatus, status, { state: "empty" }); | |
| }); | |
| forward.effect((eff) => { | |
| const status = eff.get(mseSource.syncStatus); | |
| eff.set(this.syncStatus, status, { state: "ready" }); | |
| }); | |
| forward.effect((eff) => { | |
| const stats = eff.get(mseSource.stats); | |
| eff.set(this.#stats, stats); | |
| }); | |
| // Expose method to set audio element for synchronization | |
| this.setAudioSync = (audioElement: HTMLAudioElement | undefined) => { | |
| mseSource.setAudioSync(audioElement); | |
| }; | |
| // Run MSE track | |
| try { | |
| await mseSource.runTrack(effect, broadcast, name, config); | |
| } catch (error) { | |
| console.error("MSE path error, falling back to WebCodecs:", error); | |
| // Fallback to WebCodecs | |
| this.#runWebCodecsPath(effect, broadcast, name, config); | |
| } | |
| }); | |
| } |
🤖 Prompt for AI Agents
In @js/hang/src/watch/video/source.ts around lines 216 - 270, The forwarding
effects registered on this.#signals inside #runMSEPath are not torn down and
will leak across track switches; capture each disposer returned by
this.#signals.effect when you create them and register a cleanup via
effect.cleanup that calls each disposer (or store them in an array and call them
all), so that when the spawned effect ends (or mseSource is closed) those signal
forwards are removed; apply this to every this.#signals.effect call associated
with mseSource (the frame/display/bufferStatus/syncStatus/stats forwarders) and
ensure effect.cleanup also closes mseSource if not already handled.
| /// Container format for frame encoding. | ||
| /// Defaults to "native" for backward compatibility. | ||
| pub container: Container, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add #[serde(default)] for backward compatibility.
The documentation states this field "Defaults to 'native' for backward compatibility," but the field lacks a #[serde(default)] attribute. Without it, deserializing older JSON catalog data that omits the container field will fail. Add the attribute to ensure seamless deserialization of legacy catalogs.
🔧 Proposed fix
// Some codecs include a description so the decoder can be initialized without extra data.
// If not provided, there may be in-band metadata (marginally higher overhead).
#[serde(default)]
#[serde_as(as = "Option<Hex>")]
pub description: Option<Bytes>,
/// Container format for frame encoding.
/// Defaults to "native" for backward compatibility.
+ #[serde(default)]
pub container: Container,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Container format for frame encoding. | |
| /// Defaults to "native" for backward compatibility. | |
| pub container: Container, | |
| // Some codecs include a description so the decoder can be initialized without extra data. | |
| // If not provided, there may be in-band metadata (marginally higher overhead). | |
| #[serde(default)] | |
| #[serde_as(as = "Option<Hex>")] | |
| pub description: Option<Bytes>, | |
| /// Container format for frame encoding. | |
| /// Defaults to "native" for backward compatibility. | |
| #[serde(default)] | |
| pub container: Container, | |
| } |
🤖 Prompt for AI Agents
In @rs/hang/src/catalog/audio/mod.rs around lines 66 - 68, Add #[serde(default)]
above the pub container: Container field in the struct (the Container field in
rs/hang/src/catalog/audio/mod.rs) so deserializing legacy JSON that omits
container falls back to the default; if Container does not already implement
Default, implement Default for Container (returning the "native" variant) or
provide a serde default function and reference it (e.g., #[serde(default =
"default_container")]) so the documented backward-compatible default is applied.
|
|
||
| /// Container format for frame encoding. | ||
| /// Defaults to "native" for backward compatibility. | ||
| pub container: Container, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find and examine the Container enum definition
fd -t f "container.rs" rs/hang/src/catalog/ | head -20Repository: moq-dev/moq
Length of output: 87
🏁 Script executed:
#!/bin/bash
# Search for Container enum and its Default implementation
rg -n "enum Container|impl.*Default.*Container" rs/hang/src/catalog/ -A 5Repository: moq-dev/moq
Length of output: 373
🏁 Script executed:
#!/bin/bash
# Check the complete VideoConfig struct definition
rg -n "struct VideoConfig|pub container:" rs/hang/src/catalog/video/mod.rs -B 2 -A 10Repository: moq-dev/moq
Length of output: 809
🏁 Script executed:
#!/bin/bash
# Check the full Container enum definition and its derives
rg -n "enum Container" rs/hang/src/catalog/container.rs -B 5 -A 10Repository: moq-dev/moq
Length of output: 600
🏁 Script executed:
#!/bin/bash
# Verify Container has Default derive
rg -n "#\[derive" rs/hang/src/catalog/container.rs | head -5Repository: moq-dev/moq
Length of output: 134
Add #[serde(default)] to maintain backward compatibility.
The Container enum does implement Default (returning Native), but the container field in VideoConfig lacks the #[serde(default)] annotation. Without it, Serde will not use the default value during deserialization—any existing serialized catalog missing this field will fail to deserialize. Add #[serde(default)] to allow Serde to use Container::default() for missing fields:
/// Container format for frame encoding.
/// Defaults to "native" for backward compatibility.
+ #[serde(default)]
pub container: Container,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Container format for frame encoding. | |
| /// Defaults to "native" for backward compatibility. | |
| pub container: Container, | |
| /// Container format for frame encoding. | |
| /// Defaults to "native" for backward compatibility. | |
| #[serde(default)] | |
| pub container: Container, |
🤖 Prompt for AI Agents
In @rs/hang/src/catalog/video/mod.rs around lines 114 - 117, The VideoConfig
struct's container field lacks the serde default attribute, so deserializing
older catalogs that omit container will fail; add #[serde(default)] to the
container field declaration in VideoConfig so Serde will call
Container::default() (which returns Container::Native) when the field is
missing, keeping backward compatibility with existing serialized data.
| pub fn decode<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> { | ||
| // If passthrough mode, we need to extract raw bytes before parsing. | ||
| let available_bytes = if self.passthrough_mode && buf.has_remaining() { | ||
| let chunk = buf.chunk(); | ||
| Some(Bytes::copy_from_slice(chunk)) | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| let mut cursor = std::io::Cursor::new(buf); | ||
| let mut position = 0; | ||
| let mut bytes_offset = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue: Raw bytes capture may not work correctly with multi-chunk buffers.
The available_bytes is captured from buf.chunk() which only returns the first contiguous chunk. If the buffer has multiple chunks (common with network data), this will miss data, causing the slice operations later to fail or capture incorrect data.
Proposed fix: Capture all remaining bytes
- let available_bytes = if self.passthrough_mode && buf.has_remaining() {
- let chunk = buf.chunk();
- Some(Bytes::copy_from_slice(chunk))
+ let available_bytes = if self.passthrough_mode && buf.has_remaining() {
+ // Copy all remaining bytes, not just the first chunk
+ Some(Bytes::copy_from_slice(buf.as_ref()))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn decode<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> { | |
| // If passthrough mode, we need to extract raw bytes before parsing. | |
| let available_bytes = if self.passthrough_mode && buf.has_remaining() { | |
| let chunk = buf.chunk(); | |
| Some(Bytes::copy_from_slice(chunk)) | |
| } else { | |
| None | |
| }; | |
| let mut cursor = std::io::Cursor::new(buf); | |
| let mut position = 0; | |
| let mut bytes_offset = 0; | |
| pub fn decode<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> { | |
| // If passthrough mode, we need to extract raw bytes before parsing. | |
| let available_bytes = if self.passthrough_mode && buf.has_remaining() { | |
| // Copy all remaining bytes, not just the first chunk | |
| Some(Bytes::copy_from_slice(buf.as_ref())) | |
| } else { | |
| None | |
| }; | |
| let mut cursor = std::io::Cursor::new(buf); | |
| let mut position = 0; | |
| let mut bytes_offset = 0; |
🤖 Prompt for AI Agents
In @rs/hang/src/import/fmp4.rs around lines 99 - 110, The current decode method
captures available_bytes using buf.chunk(), which only takes the first
contiguous slice and will miss data for multi-chunk buffers; update the logic in
decode (the available_bytes creation when self.passthrough_mode is true) to
collect all remaining bytes instead of just buf.chunk()—for example by reading
buf.remaining() into a new Bytes/Vec or by iterating over chunks and
concatenating them—so subsequent slice/offset math uses a complete copy of the
remaining bytes rather than a single chunk.
| fn transport_fragment(&mut self, fragment: Bytes, moof: Moof) -> anyhow::Result<()> { | ||
| // Verify that init segment was sent before fragments | ||
| if self.moov_bytes.is_some() { | ||
| tracing::warn!("transporting fragment but moov_bytes is still set - init segment may not have been sent"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misleading warning: moov_bytes being Some is expected here.
At this point in transport_fragment, moov_bytes should have already been consumed by init(). If it's still Some, it indicates init() was never called, not that the init segment wasn't sent. The warning message is misleading.
Suggested clarification
- if self.moov_bytes.is_some() {
- tracing::warn!("transporting fragment but moov_bytes is still set - init segment may not have been sent");
+ if self.moov_bytes.is_some() {
+ tracing::warn!("transporting fragment but moov_bytes is still set - init() may not have been called");
}🤖 Prompt for AI Agents
In @rs/hang/src/import/fmp4.rs around lines 710 - 714, The warning in
transport_fragment is misleading because moov_bytes being Some at this point
means init() was never called, not that the init segment wasn't sent; update the
tracing::warn! message in transport_fragment (which checks
self.moov_bytes.is_some()) to clearly say "init() was not called; moov_bytes
still present" (or similar), and optionally adjust the log level if you want
this to be an error or debug signal rather than a warning; keep the check on
self.moov_bytes and only change the log text/level to accurately reflect the
condition.
kixelated
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs another attempt.
- The contents of each MoQ frame should be a fMP4 fragment (aka CMAF segment). No timestamp prefix allowed.
- We should let MSE perform A/V sync.
- We shouldn't push init segments over the data tracks. Either a separate track, or reconstructed from the catalog.
| return encodeU64(timestamp); | ||
| case "fmp4": | ||
| throw new Error("fmp4 container not yet implemented"); | ||
| case "cmaf": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be encoding the timestamp here. The entire frame payload should be a fMP4 fragment.
| * @param type - "video" or "audio" | ||
| * @returns MIME type string (e.g., "video/mp4; codecs=\"avc1.42E01E\"") | ||
| */ | ||
| export function buildMimeType(codec: string, type: "video" | "audio"): string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very MP4 specific, but it has a generic name.
|
|
||
| // For MSE path, expose the HTMLAudioElement for direct control | ||
| #mseAudioElement = new Signal<HTMLAudioElement | undefined>(undefined); | ||
| readonly mseAudioElement = this.#mseAudioElement as Getter<HTMLAudioElement | undefined>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we expose this. We should expose an AudioNode instead so it works regardless of the WebCodecs/MSE backend used.
|
|
||
| // Expose method to set audio element for synchronization | ||
| this.setAudioSync = (audioElement: HTMLAudioElement | undefined) => { | ||
| mseSource.setAudioSync(audioElement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this approach is just wrong. There's no way you're going to get decent synchronization via currentTime
We should have a single MediaSource and add/remove audio/video SourceBuffers.
| } | ||
|
|
||
| const isMoovAtom = hasMoovAtom(frame.data); | ||
| const isInitSegment = isMoovAtom && !initSegmentReceived; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should send init segments as a separate track, or embedded inside the catalog. Sending it at the start of each group is technically valid but goes against the spirit of them.
In fact, the description field consists of a avcC box which can be used to build a valid MP4 init segment I believe. @littlespex would know more
Add passthrough mode for CMAF/HLS import and MSE playback support
This PR adds comprehensive CMAF support with passthrough mode for ingestion and Media Source Extensions (MSE) for browser playback.
Changes
Rust/CLI Side
--passthroughflag tohang publish hlscommandjust pub-hlscommand to support passthrough mode via optional parameterTypeScript/Browser Side
SourceMSEclass for both audio and video trackscmaf, playback automatically uses MSE instead of WebCodecssetAudioSyncmethod, eliminatinganytype usageTechnical Details
Passthrough Mode:
The passthrough flag enables transport of complete CMAF fragments without re-encoding or decomposing them. This is particularly useful when you want to preserve the original CMAF structure.
Container Format:
native: Uses QUIC VarInt encoding (1-8 bytes, variable length), raw frame payloads (default)raw: Uses fixed u64 encoding (8 bytes, big-endian), raw frame payloadscmaf: Fragmented MP4 container - frames contain complete moof+mdat fragmentsMSE Implementation:
MediaSourceAPI for CMAF playbackUsage
Publish HLS with passthrough mode
just pub-hls bbb passthrough
Publish HLS without passthrough (default)
just pub-hls bbb
The browser will automatically use MSE for playback when the container is
cmaf, providing more efficient decoding and lower latency compared to WebCodecs for CMAF content.