Skip to content

Conversation

@schgoo
Copy link

@schgoo schgoo commented Dec 10, 2025

Adds uniflight, a crate for coalescing duplicate async tasks into a single execution. When multiple tasks request the same work (identified by a key), only the leader performs the work while followers wait and receive a clone of the result.

  • Handles cancellation and panic gracefully (followers become the new leader)
  • Includes comprehensive module documentation
  • Includes example demonstrating cache thundering herd prevention

@schgoo
Copy link
Author

schgoo commented Dec 10, 2025

@microsoft-github-policy-service agree company="Microsoft"

@schgoo schgoo changed the title Add uniflight (name pending) crate for duplicate request coalescing build: Add uniflight (name pending) crate for duplicate request coalescing Dec 10, 2025
@codecov
Copy link

codecov bot commented Dec 10, 2025

Codecov Report

❌ Patch coverage is 98.60140% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 99.9%. Comparing base (9dbba9d) to head (1aa66c4).

Files with missing lines Patch % Lines
crates/uniflight/src/lib.rs 98.6% 2 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##             main    #118     +/-   ##
========================================
- Coverage   100.0%   99.9%   -0.1%     
========================================
  Files          84      85      +1     
  Lines        7450    7593    +143     
========================================
+ Hits         7450    7591    +141     
- Misses          0       2      +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@geeknoid
Copy link
Member

I like the name :-)

Copy link
Member

@geeknoid geeknoid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logo.png file isn't rendering for me. Is this a valid png file? I tried even going to your repo & branch, and GitHub won't show the image to me.

@schgoo
Copy link
Author

schgoo commented Dec 11, 2025

@geeknoid I hadn't uploaded the logo yet. I just added it

handles.push(handle);

// Stagger the requests slightly to see the deduplication in action
tokio::time::sleep(Duration::from_millis(10)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could use the tick crate for delays once it's available?

#106

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to make that change once it gets checked in


impl<K, T> UniFlight<K, T>
where
K: Hash + Eq + Clone,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the key need to implement Clone?

In general, you want to minimize the type bounds on methods as much as you can, since it makes composition better on your consumers. As such, I'd move this type constraint to the work function instead, so as to remove the constraint from the new function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been wondering about that, because it's kind of a strange user experience with intellisense to try and find the work function, but it doesn't appear because your bounds are incorrect. I don't have a strong opinion on it, though.

Copy link
Author

@schgoo schgoo Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially given the fact that UniFlight is completely useless if work can't be called - does it really make sense to separate the bounds here? That said, the reason it needs clone is that HashMap takes ownership of it, but it still needs to be passed into the BroadcastOnceWaiter, so the waiter can remove it from the HashMap when the task completes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really not great that the key is passed by value and is internally cloned. That means the caller will clone the key before calling the function, and then internally it will get cloned again. If the parameter is passed by reference, at least the caller won't need to clone before calling you.

Regarding the bounds, you're right that you can't create a struct where the key is not hashable, so what's the benefit of having new not having bounds. Why put the bounds on the work function instead? It simply composes better at a higher-level. It can avoid your users needed to explicitly specify types when interacting with your stuff. So if you look around at the ecosystem, you can see that well designed APIs tend to minimize the type bounds on methods in general. You can have different impl blocks with different constraints and then bunch several methoss with the same constraints in those impl blocks. Or you can apply the constraints directly on the API.

Basically, if you don't need the constraint to support your implementation of the method, you shouldn't require that constraint on your callers.

You know, the same argument can be made to apply the constraints on the struct definition itself. You could declare that the Uniflight's K parameter always be hashable. But since you don't need that to define your struct, you don't apply the constraint there and only on the methods.

T: Clone,
{
let owned_mapping = Arc::clone(&self.mapping);
let mut mapping = self.mapping.lock();
Copy link
Member

@geeknoid geeknoid Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, this is a global lock which will cause contention as the # of cores increases. You can reduce contention by sharding the lock. For example, you could use a modulo of the hash value for the key to pick one of N locks. If you make the modulo equal to the number of processors, then you get more and more locks the more processors there are, such that contention should remain generally flat as the system scales up.

/// Execute and return the value for a given function, making sure that only one
/// operation is in-flight at a given moment. If a duplicate call comes in, that caller will
/// wait until the original call completes and return the same value.
pub fn work<F, Fut>(&self, key: K, func: F) -> impl Future<Output = T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key should be &K here, you don't want the key to be consumed by this API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we'll definitely need K to be Clone

/// wait until the original call completes and return the same value.
pub fn work<F, Fut>(&self, key: K, func: F) -> impl Future<Output = T>
where
F: FnOnce() -> Fut,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that this logic is susceptible to unexpected side effects. If the closure depends on external state, it could lead to surprising/incorrect behavior. For example:

let thread_id = get_thread_id();
let result = group.work("user:123", || async {
     thread_id.to_string()
 }).await;

This is a racy pattern. Depending on which threads becomes the leader, the result of the closure will be different. I'm making this obvious in this example, but it's a general problem. It's basically inviting concurrency issues to surface.

To get this pattern working reliably, all state going into the closure needs to be evaluated as part of deciding "is this the same work". If you only look at the key and ignore all the other state, you can have false sharing. So I think we can't allow any state to enter the closure "on the side", everything needs to be explicitly passed to the closure and evaluated for "sameness".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that user error though? Is it the responsibility of this crate to prevent them from causing themselves this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is a user error. However, we don't want to expose a programming model which puts people in the pit of failure by default.

Rust is great in that it's eliminate race conditions by construction. The pattern that this code exposes encourage the creation of races. So how can we tighten things up to either completely prevent issues, or at least make it unlikely to hit issues.

Not allowing state to be captured and forcing all state to be basically built into the key is one way to achieve this protection. I think this would translate in saying that this code doesn't take a closure, but it takes a raw function pointer. So if you want to pass something into this code, it will need to be in the key. And its in the key, then it'll need to participate in the Hash and Eq trait implementations. And this will then ensure that coalescing only happens when it really should.

Maybe there are other approaches too...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not allowing state to be captured and forcing all state to be basically built into the key is one way to achieve this protection.

I don't think that's possible in the strictest sense, as you can still (accidentally) bypass that with statics and thread locals, like

let result = group.work("user:123", || async { // 'pure' fn-ptr
     logger::last_message()
 }).await;

might set a different value depending on which thread executed that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I realize it's not air-tight, but I feel it is substantially reducing the risk of accidental mismatch. Capturing state in a closure is the most likely source of these problems.

This is very similar to classic makefiles and dependencies. If you don't capture all the dependencies in a makefile properly, then your incremental builds will mostly work. Until one day, they don't work and you have to do a "make clean" and start from scratch.

Copy link
Author

@schgoo schgoo Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I've been thinking about this a while. I think I agree that important context should be in the key, and then we can pass that into work like

pub fn work<Fut>(&self, key: K, f: fn(K) -> Fut) -> impl Future<Output = T>
where
    Fut: Future<Output = T>,
{ ... }

The problem is that if they're using this, they probably need some downstream client that needs to do I/O (for instance, a logger, as @ralfbiedert pointed out). In that case, we would need to capture that client in the key. Is it reasonable to require that their client/logger/etc. implements Hash+Eq? That seems a bit strange to me.

Maybe we could capture some additional context when Uniflight is initialized, and then pass that into the function pointer, as well, like:

  impl<K, T, C> UniFlight<K, T, C> {
      pub fn with_context<C2>(self, ctx: C2) -> UniFlight<K, T, C2> { ... }

      pub fn work<Fut>(
          &self,
          key: K,
          f: fn(&C, K) -> Fut,  // Receives context AND key
      ) -> impl Future<Output = T>
      where
          Fut: Future<Output = T>,
      { ... }
  }

Would we need to box the future to maintain the reference to C?

edit
I did some prototyping, and yes, we would need to box the future because a function pointer needs a concrete return type, it can't return impl Future. So that would look something like:

async fn work(&self, key: K, f: fn(&C, K) -> BoxFuture<'_, T>) -> T {
    f(&self.context, key).await
}

Is the cost to allocate a BoxFuture for every leader worth it for this design? Users would also have to pin their futures within the function pointer, which is a bit strange:

fn fetch_user(client: &Client, user_id: UserId) -> BoxFuture<'_, String> {
    Box::pin(async move {
        format!("Fetched user {} from {}", user_id.0, client.base_url)
    })
}

} else {
let (call, shared) = BroadcastOnce::new();
mapping.insert(key.clone(), call);
let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have the if and else branches return the waiter object, then call waiter.wait() outside the if construct. This avoids a bit of code duplication.

schgoo and others added 2 commits December 11, 2025 16:27
Co-authored-by: Martin Taillefer <geeknoid@users.noreply.github.com>
Co-authored-by: Martin Taillefer <geeknoid@users.noreply.github.com>
// Original: https://github.com/ihciah/singleflight-async
// Licensed under MIT/Apache-2.0

//! Coalesces duplicate async tasks into a single execution.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Deduplicates async tasks into a single execution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You really hate that word, huh? 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And use the same tagline in Cargo.toml's 'description' field, and in the top-level README's list of crates.

#[tokio::main]
async fn main() {
// Create a shared UniFlight instance for cache operations
let cache_group = Arc::new(UniFlight::<String, String>::new());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is a bit weird (and / or the UniFlight definition)

I assume UniFlight is meant to be used by multiple tasks, so the type itself should probably contain an inner Arc. Looking inside of that type, there is an Arc already, so why wouldn't that be sufficient to get the cloning / sharing of cache_group going?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, maybe this is a simple as using thread_aware::Arc and letting your worker / key pick a Strategy.

}

#[cfg(test)]
mod tests {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move most tests out into a tests/ folder, except the ones which strictly need to access implementation details?

/// Represents a class of work and creates a space in which units of work
/// can be executed with duplicate suppression.
#[derive(Debug)]
pub struct UniFlight<K, T> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love for this to impl ThreadAware and investigate how the resulting item should be emitted / shared.

Naively one could argue the resulting item should be "unique" and shared between threads, but I think in some situations we still want "per thread" semantics being honored, but my feeling is also this isn't quite as obvious for UniFlight to implement, and might tell us something about the design of ThreadAware itself.

/// wait until the original call completes and return the same value.
pub fn work<F, Fut>(&self, key: K, func: F) -> impl Future<Output = T>
where
F: FnOnce() -> Fut,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not allowing state to be captured and forcing all state to be basically built into the key is one way to achieve this protection.

I don't think that's possible in the strictest sense, as you can still (accidentally) bypass that with statics and thread locals, like

let result = group.work("user:123", || async { // 'pure' fn-ptr
     logger::last_message()
 }).await;

might set a different value depending on which thread executed that.

@schgoo schgoo changed the title build: Add uniflight (name pending) crate for duplicate request coalescing feat: Add uniflight (name pending) crate for duplicate request coalescing Dec 16, 2025
};

use tick::Clock;
use uniflight::UniFlight;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name uniflight for the crate but for the actual type it might be better to have more "down to business" names so reading the code it becomes clear what it is. Maybe uniflight::Coalesced to represent "a coalesced operation"?

Copy link
Author

@schgoo schgoo Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, @ralfbiedert is very resistant to the word "coalesce", but I agree with the essence of what you're saying here.

use uniflight::UniFlight;

#[tokio::main]
async fn main() {
Copy link
Member

@sandersaares sandersaares Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole thing seems like Once [or OnceLock/OnceCell] with an added layer of keying, and async support. Is this right?

Perhaps this suggests that the design could be decomposed into something like a thread-aware AsyncOnce and a suitably ergonomic thread-aware map?

let cache_groups = ConcurrentMap::<String, AsyncOnce<String>>::new();

let value = cache_groups
    .entry("key123")
    .or_insert(|| AsyncOnce::new((http_client), |(http_client)| load_item(http_client)))
    .await?;

Copy link
Author

@schgoo schgoo Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly enough, I was working on an update that removed the per-key async mutex and replaced it with OnceLock and EventListener. I'll explore the idea of this decomposition, but I don't believe we yet have anything like a thread-aware concurrent map, right? I'm also not sure about the value of implementing a thread-aware AsyncOnce. What would be the desired behavior upon relocation?

Copy link
Author

@schgoo schgoo Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got me thinking, though, maybe I should just use an existing AsyncOnce? I wasn't aware of this pattern before, but there are some widely used components out there already (for example, async-once-cell)

Copy link
Member

@sandersaares sandersaares Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe we yet have anything like a thread-aware concurrent map, right

It has been on the wishlist for a long time but indeed, we do not have it today. Perhaps that just means the opportunity is ripe!

I'm also not sure about the value of implementing a thread-aware AsyncOnce. What would be the desired behavior upon relocation?

The way I was thinking about this was that this closure given to or_insert() only ever gets executed once (approximately - eventual consistency in the collection might conceivably result in temporary copies that get dropped). Therefore there is only one AsyncOnce created and only one http_client that ever gets used per key. The leader may change but the closure and the HTTP client remain the same.

So if the leader polls it, the http_client (and any other state) need to be on the thread of the leader for efficient processing. This suggests to me that when a leader is (re)assigned, a relocation to the leader's thread needs to happen.

I do not have a thorough design mapped out in my head, so take this with a grain of salt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants