diff --git a/src/ReactiveUI/Internal/ExpressionChainSink.cs b/src/ReactiveUI/Internal/ExpressionChainSink.cs index 305f4eec69..0de4e70c82 100644 --- a/src/ReactiveUI/Internal/ExpressionChainSink.cs +++ b/src/ReactiveUI/Internal/ExpressionChainSink.cs @@ -81,6 +81,16 @@ private sealed class Sink : IDisposable /// Whether holds a value yet. private bool _hasLast; + /// + /// When , the next emission that equals is suppressed + /// regardless of . Set immediately after the kicker emits during + /// : any PropertyChanged event that races the + /// subscribe-then-read window has been queued behind and will re-emit the + /// same value once we release the lock. The suppression collapses that single racing duplicate + /// without altering the user-facing distinct semantics. + /// + private bool _suppressNextIfSameAsLast; + /// Latched once this chain subscription has been disposed. private bool _disposed; @@ -139,47 +149,85 @@ public void Dispose() /// The value the link produced (the parent for the next level). private void SetNextParent(int level, object? value) => _levels[level + 1].SetParent(value); - /// Handles a leaf raw emission: applies skip-initial, the non-null-parent filter, the cast and the distinct gate. + /// Handles a leaf raw emission: applies skip-initial, the non-null-parent filter, the cast, the + /// kicker dedup and the distinct gate. /// Whether the leaf's parent was null (a raw emission that the non-null filter drops). /// The leaf value when the parent is present. - private void Emit(bool parentMissing, object? value) + /// Whether this emission is the kicker push that + /// performs immediately after attaching the link subscription. + private void Emit(bool parentMissing, object? value, bool fromKicker = false) { if (_skipNext) { _skipNext = false; + _suppressNextIfSameAsLast = false; return; } if (parentMissing) { + _suppressNextIfSameAsLast = false; return; } - TValue typed; - if (value is null) + if (!TryConvert(value, out var typed)) { - typed = default!; - } - else if (value is TValue cast) - { - typed = cast; - } - else - { - _downstream.OnError(new InvalidCastException($"Unable to cast from {value.GetType()} to {typeof(TValue)}.")); return; } - if (_isDistinct && _hasLast && EqualityComparer.Default.Equals(typed, _last)) + if (ShouldSuppress(typed, fromKicker)) { return; } _last = typed; _hasLast = true; + _suppressNextIfSameAsLast = fromKicker; _downstream.OnNext(new ObservedChange(_source!, _expression, typed)); } + /// Coerces the raw observed value into , forwarding a cast error if it + /// is neither nor assignable. + /// The raw observed value. + /// The coerced value when conversion succeeds; the default otherwise. + /// when the value was converted; when an error has + /// been forwarded to the downstream observer. + private bool TryConvert(object? value, out TValue typed) + { + if (value is null) + { + typed = default!; + return true; + } + + if (value is TValue cast) + { + typed = cast; + return true; + } + + _downstream.OnError(new InvalidCastException($"Unable to cast from {value.GetType()} to {typeof(TValue)}.")); + typed = default!; + return false; + } + + /// Decides whether the coerced value should be suppressed: collapses a single racing duplicate + /// produced by the subscribe-then-read window and then applies the user-facing distinct gate. + /// The coerced leaf value. + /// Whether this emission is the kicker push (which is never suppressed). + /// when the emission must be dropped. + private bool ShouldSuppress(TValue typed, bool fromKicker) + { + if (!fromKicker && _suppressNextIfSameAsLast && _hasLast && EqualityComparer.Default.Equals(typed, _last)) + { + _suppressNextIfSameAsLast = false; + return true; + } + + _suppressNextIfSameAsLast = false; + return _isDistinct && _hasLast && EqualityComparer.Default.Equals(typed, _last); + } + /// A single chain link's watcher: re-subscribes on parent change and reads the link's value. /// The owning chain sink. /// This watcher's position in the chain. @@ -218,10 +266,14 @@ public void SetParent(object? parent) var link = sink._links[index]; - // Kicker: propagate the current value immediately, then subscribe for updates. - Push(ReadValue(parent)); + // Subscribe-then-read: attach the link subscription before reading the kicker value so + // any PropertyChanged that fires between the two steps is queued behind sink._gate + // (which our caller holds) rather than silently lost. The Push that follows is marked + // as the kicker so the leaf can collapse the single racing duplicate that the queued + // notification will deliver once we release the lock. _subscription.Disposable = sink._notify(parent, link, sink._beforeChange, sink._suppressWarnings) .Subscribe(new Observer(this)); + Push(ReadValue(parent), fromKicker: true); } /// @@ -274,11 +326,13 @@ public void OnNotification(IObservedChange change) /// Forwards this link's value to the next level, or emits it at the leaf. /// The value this link produced. - private void Push(object? value) + /// Whether this push originates from the kicker performed at the end of + /// (only meaningful at the leaf). + private void Push(object? value, bool fromKicker = false) { if (isLeaf) { - sink.Emit(parentMissing: false, value); + sink.Emit(parentMissing: false, value, fromKicker); } else { diff --git a/src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs b/src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs new file mode 100644 index 0000000000..b614194145 --- /dev/null +++ b/src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs @@ -0,0 +1,173 @@ +// Copyright (c) 2009-2026 .NET Foundation and Contributors. All rights reserved. +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.ComponentModel; +using System.Globalization; +using ReactiveUI.Tests.ReactiveObjects.Mocks; + +namespace ReactiveUI.Tests.WhenAny; + +/// +/// Tests targeting the "missed update" race between WhenAnyValue's initial value read and the +/// handler attachment. +/// +/// +/// In ExpressionChainSink.Sink.Level.SetParent, the subscribing thread (1) reads the current +/// property value via the cached getter and emits it downstream, then (2) subscribes to +/// PropertyChanged. Any mutation that fires PropertyChanged between steps (1) and (2) +/// runs against an empty subscriber list and is silently lost, leaving downstream stuck on the +/// pre-mutation value until the next mutation. +/// +public class WhenAnyValueSubscribeRaceTests +{ + /// + /// Deterministically reproduces the missed-update race using a synchronous wedge: the subscriber's + /// initial OnNext mutates the source property before WhenAnyValue has had a chance to + /// attach its PropertyChanged handler. With the bug present, the new value is never observed. + /// + /// A representing the asynchronous operation. + [Test] + public async Task WhenAnyValue_MutationBetweenInitialEmitAndHandlerAttach_IsLost() + { + var fixture = new TestFixture { IsOnlyOneWord = "initial" }; + var values = new List(); + + // Mutate during the initial emission. We are still inside ExpressionChainSink's SetParent, + // between the value read (already done) and the PropertyChanged handler attachment (not + // yet done). The PropertyChanged event raised by this setter has no subscriber and is lost. + using var subscription = fixture.WhenAnyValue(x => x.IsOnlyOneWord).Subscribe(value => + { + values.Add(value); + if (values.Count != 1) + { + return; + } + + fixture.IsOnlyOneWord = "raced"; + }); + + // Sanity check: the property actually holds the racing value. + await Assert.That(fixture.IsOnlyOneWord).IsEqualTo("raced"); + + // The subscriber must eventually see the racing value: either because the initial read + // captured it, or because the PropertyChanged handler picked it up. With the bug, the + // handler was attached after the event fired, so neither path delivered the update. + await Assert.That(values).Contains("raced"); + } + + /// + /// Same race, but using a hand-rolled source rather than + /// , to confirm the bug is in the chain sink (subscriber-side) and + /// not in any specifics. + /// + /// A representing the asynchronous operation. + [Test] + public async Task WhenAnyValue_MutationBetweenInitialEmitAndHandlerAttach_IsLost_PlainInpc() + { + var notifier = new PlainInpc { Value = 1 }; + var values = new List(); + + const int RacedValue = 2; + using var subscription = notifier.WhenAnyValue(x => x.Value).Subscribe(value => + { + values.Add(value); + if (values.Count != 1) + { + return; + } + + notifier.Value = RacedValue; + }); + + await Assert.That(notifier.Value).IsEqualTo(RacedValue); + await Assert.That(values).Contains(RacedValue); + } + + /// + /// Multi-threaded stress test that proves the race condition itself, not just the underlying + /// ordering defect. One mutator thread writes the property a fixed number of times in a tight + /// loop while the main thread subscribes via WhenAnyValue. In every iteration, after both + /// threads have finished, the subscriber's last observed value must equal the property's final + /// value. Any divergence means a real PropertyChanged raised on the mutator thread fired + /// during the main thread's read-then-subscribe window and was dropped on the floor. + /// + /// + /// Without the fix in ExpressionChainSink.Level.SetParent, this test fails reliably (on + /// the order of 5-10% of iterations drop the final mutation). With the fix, the handler is + /// attached before the kicker read, the mutator's PropertyChanged invocation runs the + /// handler on the mutator's thread, the handler blocks on sink._gate until the main + /// thread releases it, and then re-emits the post-mutation value. Every iteration converges. + /// + /// A representing the asynchronous operation. + [Test] + public async Task WhenAnyValue_ConcurrentMutationDuringSubscribe_NeverLosesFinalValue_Stress() + { + const int iterations = 2_000; + const int mutationsPerIteration = 32; + var divergences = new List(); + + for (var i = 0; i < iterations; i++) + { + var fixture = new TestFixture { IsOnlyOneWord = "v0" }; + using var mutatorReady = new ManualResetEventSlim(false); + using var mutatorDone = new ManualResetEventSlim(false); + + var mutator = new Thread(() => + { + mutatorReady.Set(); + for (var j = 1; j <= mutationsPerIteration; j++) + { + fixture.IsOnlyOneWord = "v" + j.ToString(CultureInfo.InvariantCulture); + } + + mutatorDone.Set(); + }) + { IsBackground = true }; + mutator.Start(); + mutatorReady.Wait(); + + string? latest = null; + using (fixture.WhenAnyValue(x => x.IsOnlyOneWord).Subscribe(v => latest = v)) + { + mutatorDone.Wait(); + mutator.Join(); + } + + var finalProperty = fixture.IsOnlyOneWord; + if (!string.Equals(latest, finalProperty, StringComparison.Ordinal)) + { + divergences.Add($"iter {i}: latest='{latest}' property='{finalProperty}'"); + } + } + + await Assert.That(divergences).IsEmpty(); + } + + /// + /// A minimal hand-rolled source used to isolate the chain + /// sink's behaviour from . + /// + private sealed class PlainInpc : INotifyPropertyChanged + { + private int _value; + + public event PropertyChangedEventHandler? PropertyChanged; + + public int Value + { + get => _value; + set + { + if (_value == value) + { + return; + } + + _value = value; + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value))); + } + } + } +}