Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 72 additions & 18 deletions src/ReactiveUI/Internal/ExpressionChainSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ private sealed class Sink : IDisposable
/// <summary>Whether <see cref="_last"/> holds a value yet.</summary>
private bool _hasLast;

/// <summary>
/// When <see langword="true"/>, the next emission that equals <see cref="_last"/> is suppressed
/// regardless of <see cref="_isDistinct"/>. Set immediately after the kicker emits during
/// <see cref="Level.SetParent"/>: any <c>PropertyChanged</c> event that races the
/// subscribe-then-read window has been queued behind <see cref="_gate"/> and will re-emit the
/// same value once we release the lock. The suppression collapses that single racing duplicate
/// without altering the user-facing distinct semantics.
/// </summary>
private bool _suppressNextIfSameAsLast;

/// <summary>Latched once this chain subscription has been disposed.</summary>
private bool _disposed;

Expand Down Expand Up @@ -139,47 +149,85 @@ public void Dispose()
/// <param name="value">The value the link produced (the parent for the next level).</param>
private void SetNextParent(int level, object? value) => _levels[level + 1].SetParent(value);

/// <summary>Handles a leaf raw emission: applies skip-initial, the non-null-parent filter, the cast and the distinct gate.</summary>
/// <summary>Handles a leaf raw emission: applies skip-initial, the non-null-parent filter, the cast, the
/// kicker dedup and the distinct gate.</summary>
/// <param name="parentMissing">Whether the leaf's parent was null (a raw emission that the non-null filter drops).</param>
/// <param name="value">The leaf value when the parent is present.</param>
private void Emit(bool parentMissing, object? value)
/// <param name="fromKicker">Whether this emission is the kicker push that <see cref="Level.SetParent"/>
/// performs immediately after attaching the link subscription.</param>
private void Emit(bool parentMissing, object? value, bool fromKicker = false)
{
if (_skipNext)
{
_skipNext = false;
_suppressNextIfSameAsLast = false;
return;
}

if (parentMissing)
{
_suppressNextIfSameAsLast = false;
return;
}

TValue typed;
if (value is null)
if (!TryConvert(value, out var typed))
{
typed = default!;
}
else if (value is TValue cast)
{
typed = cast;
}
else
{
_downstream.OnError(new InvalidCastException($"Unable to cast from {value.GetType()} to {typeof(TValue)}."));
return;
}

if (_isDistinct && _hasLast && EqualityComparer<TValue>.Default.Equals(typed, _last))
if (ShouldSuppress(typed, fromKicker))
{
return;
}

_last = typed;
_hasLast = true;
_suppressNextIfSameAsLast = fromKicker;
_downstream.OnNext(new ObservedChange<TSender, TValue>(_source!, _expression, typed));
}

/// <summary>Coerces the raw observed value into <typeparamref name="TValue"/>, forwarding a cast error if it
/// is neither <see langword="null"/> nor assignable.</summary>
/// <param name="value">The raw observed value.</param>
/// <param name="typed">The coerced value when conversion succeeds; the default otherwise.</param>
/// <returns><see langword="true"/> when the value was converted; <see langword="false"/> when an error has
/// been forwarded to the downstream observer.</returns>
private bool TryConvert(object? value, out TValue typed)
{
if (value is null)
{
typed = default!;
return true;
}

if (value is TValue cast)
{
typed = cast;
return true;
}

_downstream.OnError(new InvalidCastException($"Unable to cast from {value.GetType()} to {typeof(TValue)}."));
typed = default!;
return false;
}

/// <summary>Decides whether the coerced value should be suppressed: collapses a single racing duplicate
/// produced by the subscribe-then-read window and then applies the user-facing distinct gate.</summary>
/// <param name="typed">The coerced leaf value.</param>
/// <param name="fromKicker">Whether this emission is the kicker push (which is never suppressed).</param>
/// <returns><see langword="true"/> when the emission must be dropped.</returns>
private bool ShouldSuppress(TValue typed, bool fromKicker)
{
if (!fromKicker && _suppressNextIfSameAsLast && _hasLast && EqualityComparer<TValue>.Default.Equals(typed, _last))
{
_suppressNextIfSameAsLast = false;
return true;
}

_suppressNextIfSameAsLast = false;
return _isDistinct && _hasLast && EqualityComparer<TValue>.Default.Equals(typed, _last);
}

/// <summary>A single chain link's watcher: re-subscribes on parent change and reads the link's value.</summary>
/// <param name="sink">The owning chain sink.</param>
/// <param name="index">This watcher's position in the chain.</param>
Expand Down Expand Up @@ -218,10 +266,14 @@ public void SetParent(object? parent)

var link = sink._links[index];

// Kicker: propagate the current value immediately, then subscribe for updates.
Push(ReadValue(parent));
// Subscribe-then-read: attach the link subscription before reading the kicker value so
// any PropertyChanged that fires between the two steps is queued behind sink._gate
// (which our caller holds) rather than silently lost. The Push that follows is marked
// as the kicker so the leaf can collapse the single racing duplicate that the queued
// notification will deliver once we release the lock.
_subscription.Disposable = sink._notify(parent, link, sink._beforeChange, sink._suppressWarnings)
.Subscribe(new Observer(this));
Push(ReadValue(parent), fromKicker: true);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -274,11 +326,13 @@ public void OnNotification(IObservedChange<object?, object?> change)

/// <summary>Forwards this link's value to the next level, or emits it at the leaf.</summary>
/// <param name="value">The value this link produced.</param>
private void Push(object? value)
/// <param name="fromKicker">Whether this push originates from the kicker performed at the end of
/// <see cref="SetParent"/> (only meaningful at the leaf).</param>
private void Push(object? value, bool fromKicker = false)
{
if (isLeaf)
{
sink.Emit(parentMissing: false, value);
sink.Emit(parentMissing: false, value, fromKicker);
}
else
{
Expand Down
173 changes: 173 additions & 0 deletions src/tests/ReactiveUI.Tests/WhenAny/WhenAnyValueSubscribeRaceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) 2009-2026 .NET Foundation and Contributors. All rights reserved.
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.ComponentModel;
using System.Globalization;
using ReactiveUI.Tests.ReactiveObjects.Mocks;

namespace ReactiveUI.Tests.WhenAny;

/// <summary>
/// Tests targeting the "missed update" race between <c>WhenAnyValue</c>'s initial value read and the
/// <see cref="INotifyPropertyChanged.PropertyChanged"/> handler attachment.
/// </summary>
/// <remarks>
/// In <c>ExpressionChainSink.Sink.Level.SetParent</c>, the subscribing thread (1) reads the current
/// property value via the cached getter and emits it downstream, then (2) subscribes to
/// <c>PropertyChanged</c>. Any mutation that fires <c>PropertyChanged</c> between steps (1) and (2)
/// runs against an empty subscriber list and is silently lost, leaving downstream stuck on the
/// pre-mutation value until the next mutation.
/// </remarks>
public class WhenAnyValueSubscribeRaceTests
{
/// <summary>
/// Deterministically reproduces the missed-update race using a synchronous wedge: the subscriber's
/// initial <c>OnNext</c> mutates the source property before <c>WhenAnyValue</c> has had a chance to
/// attach its <c>PropertyChanged</c> handler. With the bug present, the new value is never observed.
/// </summary>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
[Test]
public async Task WhenAnyValue_MutationBetweenInitialEmitAndHandlerAttach_IsLost()
{
var fixture = new TestFixture { IsOnlyOneWord = "initial" };
var values = new List<string?>();

// Mutate during the initial emission. We are still inside ExpressionChainSink's SetParent,
// between the value read (already done) and the PropertyChanged handler attachment (not
// yet done). The PropertyChanged event raised by this setter has no subscriber and is lost.
using var subscription = fixture.WhenAnyValue(x => x.IsOnlyOneWord).Subscribe(value =>
{
values.Add(value);
if (values.Count != 1)
{
return;
}

fixture.IsOnlyOneWord = "raced";
});

// Sanity check: the property actually holds the racing value.
await Assert.That(fixture.IsOnlyOneWord).IsEqualTo("raced");

// The subscriber must eventually see the racing value: either because the initial read
// captured it, or because the PropertyChanged handler picked it up. With the bug, the
// handler was attached after the event fired, so neither path delivered the update.
await Assert.That(values).Contains("raced");
}

/// <summary>
/// Same race, but using a hand-rolled <see cref="INotifyPropertyChanged"/> source rather than
/// <see cref="ReactiveObject"/>, to confirm the bug is in the chain sink (subscriber-side) and
/// not in any <see cref="ReactiveObject"/> specifics.
/// </summary>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
[Test]
public async Task WhenAnyValue_MutationBetweenInitialEmitAndHandlerAttach_IsLost_PlainInpc()
{
var notifier = new PlainInpc { Value = 1 };
var values = new List<int>();

const int RacedValue = 2;
using var subscription = notifier.WhenAnyValue(x => x.Value).Subscribe(value =>
{
values.Add(value);
if (values.Count != 1)
{
return;
}

notifier.Value = RacedValue;
});

await Assert.That(notifier.Value).IsEqualTo(RacedValue);
await Assert.That(values).Contains(RacedValue);
}

/// <summary>
/// Multi-threaded stress test that proves the race condition itself, not just the underlying
/// ordering defect. One mutator thread writes the property a fixed number of times in a tight
/// loop while the main thread subscribes via <c>WhenAnyValue</c>. In every iteration, after both
/// threads have finished, the subscriber's last observed value must equal the property's final
/// value. Any divergence means a real <c>PropertyChanged</c> raised on the mutator thread fired
/// during the main thread's read-then-subscribe window and was dropped on the floor.
/// </summary>
/// <remarks>
/// Without the fix in <c>ExpressionChainSink.Level.SetParent</c>, this test fails reliably (on
/// the order of 5-10% of iterations drop the final mutation). With the fix, the handler is
/// attached before the kicker read, the mutator's <c>PropertyChanged</c> invocation runs the
/// handler on the mutator's thread, the handler blocks on <c>sink._gate</c> until the main
/// thread releases it, and then re-emits the post-mutation value. Every iteration converges.
/// </remarks>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
[Test]
public async Task WhenAnyValue_ConcurrentMutationDuringSubscribe_NeverLosesFinalValue_Stress()
{
const int iterations = 2_000;
const int mutationsPerIteration = 32;
var divergences = new List<string>();

for (var i = 0; i < iterations; i++)
{
var fixture = new TestFixture { IsOnlyOneWord = "v0" };
using var mutatorReady = new ManualResetEventSlim(false);
using var mutatorDone = new ManualResetEventSlim(false);

var mutator = new Thread(() =>
{
mutatorReady.Set();
for (var j = 1; j <= mutationsPerIteration; j++)
{
fixture.IsOnlyOneWord = "v" + j.ToString(CultureInfo.InvariantCulture);
}

mutatorDone.Set();
})
{ IsBackground = true };
mutator.Start();
mutatorReady.Wait();

string? latest = null;
using (fixture.WhenAnyValue(x => x.IsOnlyOneWord).Subscribe(v => latest = v))
{
mutatorDone.Wait();
mutator.Join();
}

var finalProperty = fixture.IsOnlyOneWord;
if (!string.Equals(latest, finalProperty, StringComparison.Ordinal))
{
divergences.Add($"iter {i}: latest='{latest}' property='{finalProperty}'");
}
}

await Assert.That(divergences).IsEmpty();
}

/// <summary>
/// A minimal hand-rolled <see cref="INotifyPropertyChanged"/> source used to isolate the chain
/// sink's behaviour from <see cref="ReactiveObject"/>.
/// </summary>
private sealed class PlainInpc : INotifyPropertyChanged
{
private int _value;

public event PropertyChangedEventHandler? PropertyChanged;

public int Value
{
get => _value;
set
{
if (_value == value)
{
return;
}

_value = value;
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value)));
}
}
}
}
Loading