From b4639b08c61d748825c84877a3625d7fdf42ecd8 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sat, 13 Jun 2026 16:30:08 -0700 Subject: [PATCH] fix: WhenAnyValue subscribes to PropertyChanged before reading the initial value ExpressionChainSink.Level.SetParent read the property and emitted the kicker value before subscribing to PropertyChanged. Any mutation that fired PropertyChanged between the two was raised against an empty handler list and silently lost, leaving downstream observers pinned to the pre-write value until the next change. Subscribe first, then read. A racing handler emit that arrives queued behind the chain sink's gate then delivers the same post-mutation value the kicker just emitted, so add a one-shot "suppress next emission if equal to the kicker" guard that fires independently of the user-facing isDistinct flag. Repro tests cover deterministic single-thread and multi-threaded stress paths. Fixes #4380 --- .../Internal/ExpressionChainSink.cs | 90 +++++++-- .../WhenAny/WhenAnyValueSubscribeRaceTests.cs | 173 ++++++++++++++++++ 2 files changed, 245 insertions(+), 18 deletions(-) create mode 100644 src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs diff --git a/src/ReactiveUI/Internal/ExpressionChainSink.cs b/src/ReactiveUI/Internal/ExpressionChainSink.cs index 305f4eec69..0de4e70c82 100644 --- a/src/ReactiveUI/Internal/ExpressionChainSink.cs +++ b/src/ReactiveUI/Internal/ExpressionChainSink.cs @@ -81,6 +81,16 @@ private sealed class Sink : IDisposable /// Whether holds a value yet. private bool _hasLast; + /// + /// When , the next emission that equals is suppressed + /// regardless of . Set immediately after the kicker emits during + /// : any PropertyChanged event that races the + /// subscribe-then-read window has been queued behind and will re-emit the + /// same value once we release the lock. The suppression collapses that single racing duplicate + /// without altering the user-facing distinct semantics. + /// + private bool _suppressNextIfSameAsLast; + /// Latched once this chain subscription has been disposed. private bool _disposed; @@ -139,47 +149,85 @@ public void Dispose() /// The value the link produced (the parent for the next level). private void SetNextParent(int level, object? value) => _levels[level + 1].SetParent(value); - /// Handles a leaf raw emission: applies skip-initial, the non-null-parent filter, the cast and the distinct gate. + /// Handles a leaf raw emission: applies skip-initial, the non-null-parent filter, the cast, the + /// kicker dedup and the distinct gate. /// Whether the leaf's parent was null (a raw emission that the non-null filter drops). /// The leaf value when the parent is present. - private void Emit(bool parentMissing, object? value) + /// Whether this emission is the kicker push that + /// performs immediately after attaching the link subscription. + private void Emit(bool parentMissing, object? value, bool fromKicker = false) { if (_skipNext) { _skipNext = false; + _suppressNextIfSameAsLast = false; return; } if (parentMissing) { + _suppressNextIfSameAsLast = false; return; } - TValue typed; - if (value is null) + if (!TryConvert(value, out var typed)) { - typed = default!; - } - else if (value is TValue cast) - { - typed = cast; - } - else - { - _downstream.OnError(new InvalidCastException($"Unable to cast from {value.GetType()} to {typeof(TValue)}.")); return; } - if (_isDistinct && _hasLast && EqualityComparer.Default.Equals(typed, _last)) + if (ShouldSuppress(typed, fromKicker)) { return; } _last = typed; _hasLast = true; + _suppressNextIfSameAsLast = fromKicker; _downstream.OnNext(new ObservedChange(_source!, _expression, typed)); } + /// Coerces the raw observed value into , forwarding a cast error if it + /// is neither nor assignable. + /// The raw observed value. + /// The coerced value when conversion succeeds; the default otherwise. + /// when the value was converted; when an error has + /// been forwarded to the downstream observer. + private bool TryConvert(object? value, out TValue typed) + { + if (value is null) + { + typed = default!; + return true; + } + + if (value is TValue cast) + { + typed = cast; + return true; + } + + _downstream.OnError(new InvalidCastException($"Unable to cast from {value.GetType()} to {typeof(TValue)}.")); + typed = default!; + return false; + } + + /// Decides whether the coerced value should be suppressed: collapses a single racing duplicate + /// produced by the subscribe-then-read window and then applies the user-facing distinct gate. + /// The coerced leaf value. + /// Whether this emission is the kicker push (which is never suppressed). + /// when the emission must be dropped. + private bool ShouldSuppress(TValue typed, bool fromKicker) + { + if (!fromKicker && _suppressNextIfSameAsLast && _hasLast && EqualityComparer.Default.Equals(typed, _last)) + { + _suppressNextIfSameAsLast = false; + return true; + } + + _suppressNextIfSameAsLast = false; + return _isDistinct && _hasLast && EqualityComparer.Default.Equals(typed, _last); + } + /// A single chain link's watcher: re-subscribes on parent change and reads the link's value. /// The owning chain sink. /// This watcher's position in the chain. @@ -218,10 +266,14 @@ public void SetParent(object? parent) var link = sink._links[index]; - // Kicker: propagate the current value immediately, then subscribe for updates. - Push(ReadValue(parent)); + // Subscribe-then-read: attach the link subscription before reading the kicker value so + // any PropertyChanged that fires between the two steps is queued behind sink._gate + // (which our caller holds) rather than silently lost. The Push that follows is marked + // as the kicker so the leaf can collapse the single racing duplicate that the queued + // notification will deliver once we release the lock. _subscription.Disposable = sink._notify(parent, link, sink._beforeChange, sink._suppressWarnings) .Subscribe(new Observer(this)); + Push(ReadValue(parent), fromKicker: true); } /// @@ -274,11 +326,13 @@ public void OnNotification(IObservedChange change) /// Forwards this link's value to the next level, or emits it at the leaf. /// The value this link produced. - private void Push(object? value) + /// Whether this push originates from the kicker performed at the end of + /// (only meaningful at the leaf). + private void Push(object? value, bool fromKicker = false) { if (isLeaf) { - sink.Emit(parentMissing: false, value); + sink.Emit(parentMissing: false, value, fromKicker); } else { diff --git a/src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs b/src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs new file mode 100644 index 0000000000..b614194145 --- /dev/null +++ b/src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs @@ -0,0 +1,173 @@ +// Copyright (c) 2009-2026 .NET Foundation and Contributors. All rights reserved. +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.ComponentModel; +using System.Globalization; +using ReactiveUI.Tests.ReactiveObjects.Mocks; + +namespace ReactiveUI.Tests.WhenAny; + +/// +/// Tests targeting the "missed update" race between WhenAnyValue's initial value read and the +/// handler attachment. +/// +/// +/// In ExpressionChainSink.Sink.Level.SetParent, the subscribing thread (1) reads the current +/// property value via the cached getter and emits it downstream, then (2) subscribes to +/// PropertyChanged. Any mutation that fires PropertyChanged between steps (1) and (2) +/// runs against an empty subscriber list and is silently lost, leaving downstream stuck on the +/// pre-mutation value until the next mutation. +/// +public class WhenAnyValueSubscribeRaceTests +{ + /// + /// Deterministically reproduces the missed-update race using a synchronous wedge: the subscriber's + /// initial OnNext mutates the source property before WhenAnyValue has had a chance to + /// attach its PropertyChanged handler. With the bug present, the new value is never observed. + /// + /// A representing the asynchronous operation. + [Test] + public async Task WhenAnyValue_MutationBetweenInitialEmitAndHandlerAttach_IsLost() + { + var fixture = new TestFixture { IsOnlyOneWord = "initial" }; + var values = new List(); + + // Mutate during the initial emission. We are still inside ExpressionChainSink's SetParent, + // between the value read (already done) and the PropertyChanged handler attachment (not + // yet done). The PropertyChanged event raised by this setter has no subscriber and is lost. + using var subscription = fixture.WhenAnyValue(x => x.IsOnlyOneWord).Subscribe(value => + { + values.Add(value); + if (values.Count != 1) + { + return; + } + + fixture.IsOnlyOneWord = "raced"; + }); + + // Sanity check: the property actually holds the racing value. + await Assert.That(fixture.IsOnlyOneWord).IsEqualTo("raced"); + + // The subscriber must eventually see the racing value: either because the initial read + // captured it, or because the PropertyChanged handler picked it up. With the bug, the + // handler was attached after the event fired, so neither path delivered the update. + await Assert.That(values).Contains("raced"); + } + + /// + /// Same race, but using a hand-rolled source rather than + /// , to confirm the bug is in the chain sink (subscriber-side) and + /// not in any specifics. + /// + /// A representing the asynchronous operation. + [Test] + public async Task WhenAnyValue_MutationBetweenInitialEmitAndHandlerAttach_IsLost_PlainInpc() + { + var notifier = new PlainInpc { Value = 1 }; + var values = new List(); + + const int RacedValue = 2; + using var subscription = notifier.WhenAnyValue(x => x.Value).Subscribe(value => + { + values.Add(value); + if (values.Count != 1) + { + return; + } + + notifier.Value = RacedValue; + }); + + await Assert.That(notifier.Value).IsEqualTo(RacedValue); + await Assert.That(values).Contains(RacedValue); + } + + /// + /// Multi-threaded stress test that proves the race condition itself, not just the underlying + /// ordering defect. One mutator thread writes the property a fixed number of times in a tight + /// loop while the main thread subscribes via WhenAnyValue. In every iteration, after both + /// threads have finished, the subscriber's last observed value must equal the property's final + /// value. Any divergence means a real PropertyChanged raised on the mutator thread fired + /// during the main thread's read-then-subscribe window and was dropped on the floor. + /// + /// + /// Without the fix in ExpressionChainSink.Level.SetParent, this test fails reliably (on + /// the order of 5-10% of iterations drop the final mutation). With the fix, the handler is + /// attached before the kicker read, the mutator's PropertyChanged invocation runs the + /// handler on the mutator's thread, the handler blocks on sink._gate until the main + /// thread releases it, and then re-emits the post-mutation value. Every iteration converges. + /// + /// A representing the asynchronous operation. + [Test] + public async Task WhenAnyValue_ConcurrentMutationDuringSubscribe_NeverLosesFinalValue_Stress() + { + const int iterations = 2_000; + const int mutationsPerIteration = 32; + var divergences = new List(); + + for (var i = 0; i < iterations; i++) + { + var fixture = new TestFixture { IsOnlyOneWord = "v0" }; + using var mutatorReady = new ManualResetEventSlim(false); + using var mutatorDone = new ManualResetEventSlim(false); + + var mutator = new Thread(() => + { + mutatorReady.Set(); + for (var j = 1; j <= mutationsPerIteration; j++) + { + fixture.IsOnlyOneWord = "v" + j.ToString(CultureInfo.InvariantCulture); + } + + mutatorDone.Set(); + }) + { IsBackground = true }; + mutator.Start(); + mutatorReady.Wait(); + + string? latest = null; + using (fixture.WhenAnyValue(x => x.IsOnlyOneWord).Subscribe(v => latest = v)) + { + mutatorDone.Wait(); + mutator.Join(); + } + + var finalProperty = fixture.IsOnlyOneWord; + if (!string.Equals(latest, finalProperty, StringComparison.Ordinal)) + { + divergences.Add($"iter {i}: latest='{latest}' property='{finalProperty}'"); + } + } + + await Assert.That(divergences).IsEmpty(); + } + + /// + /// A minimal hand-rolled source used to isolate the chain + /// sink's behaviour from . + /// + private sealed class PlainInpc : INotifyPropertyChanged + { + private int _value; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Value + { + get => _value; + set + { + if (_value == value) + { + return; + } + + _value = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value))); + } + } + } +}