-
Notifications
You must be signed in to change notification settings - Fork 11
feat: Add uniflight (name pending) crate for duplicate request coalescing #118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@microsoft-github-policy-service agree company="Microsoft" |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #118 +/- ##
========================================
- Coverage 100.0% 99.9% -0.1%
========================================
Files 84 85 +1
Lines 7450 7593 +143
========================================
+ Hits 7450 7591 +141
- Misses 0 2 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I like the name :-) |
geeknoid
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logo.png file isn't rendering for me. Is this a valid png file? I tried even going to your repo & branch, and GitHub won't show the image to me.
|
@geeknoid I hadn't uploaded the logo yet. I just added it |
| handles.push(handle); | ||
|
|
||
| // Stagger the requests slightly to see the deduplication in action | ||
| tokio::time::sleep(Duration::from_millis(10)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could use the tick crate for delays once it's available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to make that change once it gets checked in
|
|
||
| impl<K, T> UniFlight<K, T> | ||
| where | ||
| K: Hash + Eq + Clone, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the key need to implement Clone?
In general, you want to minimize the type bounds on methods as much as you can, since it makes composition better on your consumers. As such, I'd move this type constraint to the work function instead, so as to remove the constraint from the new function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been wondering about that, because it's kind of a strange user experience with intellisense to try and find the work function, but it doesn't appear because your bounds are incorrect. I don't have a strong opinion on it, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially given the fact that UniFlight is completely useless if work can't be called - does it really make sense to separate the bounds here? That said, the reason it needs clone is that HashMap takes ownership of it, but it still needs to be passed into the BroadcastOnceWaiter, so the waiter can remove it from the HashMap when the task completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really not great that the key is passed by value and is internally cloned. That means the caller will clone the key before calling the function, and then internally it will get cloned again. If the parameter is passed by reference, at least the caller won't need to clone before calling you.
Regarding the bounds, you're right that you can't create a struct where the key is not hashable, so what's the benefit of having new not having bounds. Why put the bounds on the work function instead? It simply composes better at a higher-level. It can avoid your users needed to explicitly specify types when interacting with your stuff. So if you look around at the ecosystem, you can see that well designed APIs tend to minimize the type bounds on methods in general. You can have different impl blocks with different constraints and then bunch several methoss with the same constraints in those impl blocks. Or you can apply the constraints directly on the API.
Basically, if you don't need the constraint to support your implementation of the method, you shouldn't require that constraint on your callers.
You know, the same argument can be made to apply the constraints on the struct definition itself. You could declare that the Uniflight's K parameter always be hashable. But since you don't need that to define your struct, you don't apply the constraint there and only on the methods.
| T: Clone, | ||
| { | ||
| let owned_mapping = Arc::clone(&self.mapping); | ||
| let mut mapping = self.mapping.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, this is a global lock which will cause contention as the # of cores increases. You can reduce contention by sharding the lock. For example, you could use a modulo of the hash value for the key to pick one of N locks. If you make the modulo equal to the number of processors, then you get more and more locks the more processors there are, such that contention should remain generally flat as the system scales up.
| /// Execute and return the value for a given function, making sure that only one | ||
| /// operation is in-flight at a given moment. If a duplicate call comes in, that caller will | ||
| /// wait until the original call completes and return the same value. | ||
| pub fn work<F, Fut>(&self, key: K, func: F) -> impl Future<Output = T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the key should be &K here, you don't want the key to be consumed by this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we'll definitely need K to be Clone
| /// wait until the original call completes and return the same value. | ||
| pub fn work<F, Fut>(&self, key: K, func: F) -> impl Future<Output = T> | ||
| where | ||
| F: FnOnce() -> Fut, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry that this logic is susceptible to unexpected side effects. If the closure depends on external state, it could lead to surprising/incorrect behavior. For example:
let thread_id = get_thread_id();
let result = group.work("user:123", || async {
thread_id.to_string()
}).await;This is a racy pattern. Depending on which threads becomes the leader, the result of the closure will be different. I'm making this obvious in this example, but it's a general problem. It's basically inviting concurrency issues to surface.
To get this pattern working reliably, all state going into the closure needs to be evaluated as part of deciding "is this the same work". If you only look at the key and ignore all the other state, you can have false sharing. So I think we can't allow any state to enter the closure "on the side", everything needs to be explicitly passed to the closure and evaluated for "sameness".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that user error though? Is it the responsibility of this crate to prevent them from causing themselves this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is a user error. However, we don't want to expose a programming model which puts people in the pit of failure by default.
Rust is great in that it's eliminate race conditions by construction. The pattern that this code exposes encourage the creation of races. So how can we tighten things up to either completely prevent issues, or at least make it unlikely to hit issues.
Not allowing state to be captured and forcing all state to be basically built into the key is one way to achieve this protection. I think this would translate in saying that this code doesn't take a closure, but it takes a raw function pointer. So if you want to pass something into this code, it will need to be in the key. And its in the key, then it'll need to participate in the Hash and Eq trait implementations. And this will then ensure that coalescing only happens when it really should.
Maybe there are other approaches too...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not allowing state to be captured and forcing all state to be basically built into the key is one way to achieve this protection.
I don't think that's possible in the strictest sense, as you can still (accidentally) bypass that with statics and thread locals, like
let result = group.work("user:123", || async { // 'pure' fn-ptr
logger::last_message()
}).await;might set a different value depending on which thread executed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I realize it's not air-tight, but I feel it is substantially reducing the risk of accidental mismatch. Capturing state in a closure is the most likely source of these problems.
This is very similar to classic makefiles and dependencies. If you don't capture all the dependencies in a makefile properly, then your incremental builds will mostly work. Until one day, they don't work and you have to do a "make clean" and start from scratch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I've been thinking about this a while. I think I agree that important context should be in the key, and then we can pass that into work like
pub fn work<Fut>(&self, key: K, f: fn(K) -> Fut) -> impl Future<Output = T>
where
Fut: Future<Output = T>,
{ ... }The problem is that if they're using this, they probably need some downstream client that needs to do I/O (for instance, a logger, as @ralfbiedert pointed out). In that case, we would need to capture that client in the key. Is it reasonable to require that their client/logger/etc. implements Hash+Eq? That seems a bit strange to me.
Maybe we could capture some additional context when Uniflight is initialized, and then pass that into the function pointer, as well, like:
impl<K, T, C> UniFlight<K, T, C> {
pub fn with_context<C2>(self, ctx: C2) -> UniFlight<K, T, C2> { ... }
pub fn work<Fut>(
&self,
key: K,
f: fn(&C, K) -> Fut, // Receives context AND key
) -> impl Future<Output = T>
where
Fut: Future<Output = T>,
{ ... }
}Would we need to box the future to maintain the reference to C?
edit
I did some prototyping, and yes, we would need to box the future because a function pointer needs a concrete return type, it can't return impl Future. So that would look something like:
async fn work(&self, key: K, f: fn(&C, K) -> BoxFuture<'_, T>) -> T {
f(&self.context, key).await
}Is the cost to allocate a BoxFuture for every leader worth it for this design? Users would also have to pin their futures within the function pointer, which is a bit strange:
fn fetch_user(client: &Client, user_id: UserId) -> BoxFuture<'_, String> {
Box::pin(async move {
format!("Fetched user {} from {}", user_id.0, client.base_url)
})
}
crates/uniflight/src/lib.rs
Outdated
| } else { | ||
| let (call, shared) = BroadcastOnce::new(); | ||
| mapping.insert(key.clone(), call); | ||
| let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have the if and else branches return the waiter object, then call waiter.wait() outside the if construct. This avoids a bit of code duplication.
Co-authored-by: Martin Taillefer <geeknoid@users.noreply.github.com>
Co-authored-by: Martin Taillefer <geeknoid@users.noreply.github.com>
| // Original: https://github.com/ihciah/singleflight-async | ||
| // Licensed under MIT/Apache-2.0 | ||
|
|
||
| //! Coalesces duplicate async tasks into a single execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
Deduplicates async tasks into a single execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You really hate that word, huh? 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And use the same tagline in Cargo.toml's 'description' field, and in the top-level README's list of crates.
| #[tokio::main] | ||
| async fn main() { | ||
| // Create a shared UniFlight instance for cache operations | ||
| let cache_group = Arc::new(UniFlight::<String, String>::new()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This example is a bit weird (and / or the UniFlight definition)
I assume UniFlight is meant to be used by multiple tasks, so the type itself should probably contain an inner Arc. Looking inside of that type, there is an Arc already, so why wouldn't that be sufficient to get the cloning / sharing of cache_group going?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, maybe this is a simple as using thread_aware::Arc and letting your worker / key pick a Strategy.
crates/uniflight/src/lib.rs
Outdated
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move most tests out into a tests/ folder, except the ones which strictly need to access implementation details?
| /// Represents a class of work and creates a space in which units of work | ||
| /// can be executed with duplicate suppression. | ||
| #[derive(Debug)] | ||
| pub struct UniFlight<K, T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love for this to impl ThreadAware and investigate how the resulting item should be emitted / shared.
Naively one could argue the resulting item should be "unique" and shared between threads, but I think in some situations we still want "per thread" semantics being honored, but my feeling is also this isn't quite as obvious for UniFlight to implement, and might tell us something about the design of ThreadAware itself.
| /// wait until the original call completes and return the same value. | ||
| pub fn work<F, Fut>(&self, key: K, func: F) -> impl Future<Output = T> | ||
| where | ||
| F: FnOnce() -> Fut, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not allowing state to be captured and forcing all state to be basically built into the key is one way to achieve this protection.
I don't think that's possible in the strictest sense, as you can still (accidentally) bypass that with statics and thread locals, like
let result = group.work("user:123", || async { // 'pure' fn-ptr
logger::last_message()
}).await;might set a different value depending on which thread executed that.
| }; | ||
|
|
||
| use tick::Clock; | ||
| use uniflight::UniFlight; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the name uniflight for the crate but for the actual type it might be better to have more "down to business" names so reading the code it becomes clear what it is. Maybe uniflight::Coalesced to represent "a coalesced operation"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, @ralfbiedert is very resistant to the word "coalesce", but I agree with the essence of what you're saying here.
| use uniflight::UniFlight; | ||
|
|
||
| #[tokio::main] | ||
| async fn main() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole thing seems like Once [or OnceLock/OnceCell] with an added layer of keying, and async support. Is this right?
Perhaps this suggests that the design could be decomposed into something like a thread-aware AsyncOnce and a suitably ergonomic thread-aware map?
let cache_groups = ConcurrentMap::<String, AsyncOnce<String>>::new();
let value = cache_groups
.entry("key123")
.or_insert(|| AsyncOnce::new((http_client), |(http_client)| load_item(http_client)))
.await?;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly enough, I was working on an update that removed the per-key async mutex and replaced it with OnceLock and EventListener. I'll explore the idea of this decomposition, but I don't believe we yet have anything like a thread-aware concurrent map, right? I'm also not sure about the value of implementing a thread-aware AsyncOnce. What would be the desired behavior upon relocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This got me thinking, though, maybe I should just use an existing AsyncOnce? I wasn't aware of this pattern before, but there are some widely used components out there already (for example, async-once-cell)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe we yet have anything like a thread-aware concurrent map, right
It has been on the wishlist for a long time but indeed, we do not have it today. Perhaps that just means the opportunity is ripe!
I'm also not sure about the value of implementing a thread-aware AsyncOnce. What would be the desired behavior upon relocation?
The way I was thinking about this was that this closure given to or_insert() only ever gets executed once (approximately - eventual consistency in the collection might conceivably result in temporary copies that get dropped). Therefore there is only one AsyncOnce created and only one http_client that ever gets used per key. The leader may change but the closure and the HTTP client remain the same.
So if the leader polls it, the http_client (and any other state) need to be on the thread of the leader for efficient processing. This suggests to me that when a leader is (re)assigned, a relocation to the leader's thread needs to happen.
I do not have a thorough design mapped out in my head, so take this with a grain of salt.
Adds
uniflight, a crate for coalescing duplicate async tasks into a single execution. When multiple tasks request the same work (identified by a key), only the leader performs the work while followers wait and receive a clone of the result.