-
Notifications
You must be signed in to change notification settings - Fork 231
Unstable: Async NIFs and Tasks #712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
This behaviour of the async NIFs is not quite what I was after. Users could implement something like this right now with just a few more lines of code. My intention was to make the async NIFs "block" from the BEAM side, in that they only return once the NIF has run fully through, but I still think that what you have built here has merit:
|
Without knowing all previous discussion, this is what I thought about a bit when reading the description as well. From an ergonomics perspective, I think NIFs should handle the same (if possible!) from within Elixir, as a NIF might be exposed to users through libraries. As a user of such a NIF, I'd probably not want to know that I need to expect a message some time in the future, but just block for the long running work. |
The intention is that the library author of the NIF library should have some public API that exposes it as a "sync" function. So users of the NIF library wouldn't know any difference. |
Yes, that's a fair point. However, I'm tired of implementing this every time I want this functionality.
How would you handle a NIF which streams results back to the caller? If the call blocks, the caller can't handle intermediate messages as they arrive.
I need to hear more about what you have in mind here. As I see it, with an async NIF the way it's implemented right now, you don't need dirty scheduling because the work is immediately spawned onto the async runtime.
I think this is doable. I was thinking about doing this but I held off to reduce scope.
sure, maybe
Say more? Can you expand on an example of how you'd use what you are thinking here? Are you talking about file system and networking APIs?
agreed 💯 |
|
@filmor @evnu I've updated the functionality based on feedback. Channel API for Async TasksStatus: Experimental (requires OverviewThe Channel API provides type-safe, bidirectional communication between Elixir and Rust async tasks. It replaces the need for manual message handling with a clean, ergonomic interface. EnablingCreate [build]
rustflags = ["--cfg", "rustler_unstable"]Basic ExamplesExample 1: One-Way Communication with Progress UpdatesSend progress updates back to Elixir while processing work: use rustler::runtime::Channel;
#[rustler::task]
async fn process_items(channel: Channel<(), String>, items: Vec<String>) {
for (i, item) in items.iter().enumerate() {
tokio::time::sleep(Duration::from_millis(50)).await;
// Send progress update
channel.send(format!("Processing {}/{}: {}", i + 1, items.len(), item));
}
// Send final result
channel.finish(format!("Completed {} items", items.len()));
}Elixir usage: ref = MyNif.process_items(["task1", "task2", "task3"])
# Receive all messages
receive do
{^ref, "Completed " <> _ = final} ->
IO.puts("Done: #{final}")
{^ref, progress} ->
IO.puts(progress)
# Continue receiving...
endExample 2: Bidirectional Communication with CommandsBuild interactive workers that receive commands and send responses: use rustler::runtime::Channel;
#[derive(rustler::NifTaggedEnum, Clone)]
enum Command {
Add { value: i64 },
Multiply { value: i64 },
GetCurrent,
Shutdown,
}
#[derive(rustler::NifTaggedEnum, Clone)]
enum Response {
Updated { old_value: i64, new_value: i64 },
Current { value: i64 },
ShuttingDown { final_value: i64 },
}
#[rustler::task]
async fn stateful_worker(channel: Channel<Command, Response>) {
let mut current_value = 0i64;
while let Some(cmd) = channel.next().await {
let response = match cmd {
Command::Add { value } => {
let old = current_value;
current_value += value;
Response::Updated { old_value: old, new_value: current_value }
}
Command::Multiply { value } => {
let old = current_value;
current_value *= value;
Response::Updated { old_value: old, new_value: current_value }
}
Command::GetCurrent => {
Response::Current { value: current_value }
}
Command::Shutdown => {
channel.send(Response::ShuttingDown { final_value: current_value });
break;
}
};
channel.send(response);
}
channel.finish(Response::ShuttingDown { final_value: current_value });
}
// Helper NIF for sending commands
#[rustler::nif]
fn worker_send_command(
env: rustler::Env,
sender: rustler::runtime::ChannelSender<Command>,
command: rustler::Term,
) -> rustler::NifResult<rustler::types::Atom> {
rustler::runtime::channel::send(env, sender, command)
}Elixir usage: # Start worker
worker = MyNif.stateful_worker()
# Send commands
MyNif.worker_send_command(worker, {:add, %{value: 10}})
receive do
{^worker, {:updated, %{new_value: value}}} ->
IO.puts("New value: #{value}")
end
MyNif.worker_send_command(worker, {:multiply, %{value: 2}})
receive do
{^worker, {:updated, %{new_value: value}}} ->
IO.puts("New value: #{value}")
end
MyNif.worker_send_command(worker, :get_current)
receive do
{^worker, {:current, %{value: value}}} ->
IO.puts("Current: #{value}")
end
MyNif.worker_send_command(worker, :shutdown)
receive do
{^worker, {:shutting_down, %{final_value: value}}} ->
IO.puts("Final value: #{value}")
endKey ConceptsChannel Types
Message FormatAll messages are tuples:
Channel Methods// Receive next request (bidirectional only)
channel.next().await -> Option<Request>
// Send response
channel.send(response)
// Send final response and close
channel.finish(response)
// Get cloneable sender for spawned tasks
channel.responder() -> ResponseSenderHelper for Sending from Elixir#[rustler::nif]
fn send_to_channel(
env: rustler::Env,
sender: rustler::runtime::ChannelSender<YourRequestType>,
message: rustler::Term,
) -> rustler::NifResult<rustler::types::Atom> {
rustler::runtime::channel::send(env, sender, message)
}Runtime ConfigurationConfigure the Tokio runtime in your load function: fn load(_env: rustler::Env, _load_info: rustler::Term) -> bool {
rustler::runtime::builder(|builder| {
builder
.worker_threads(4)
.thread_name("my-nif-worker")
.thread_stack_size(2 * 1024 * 1024);
}).is_ok()
}
rustler::init!("Elixir.MyNif", load = load);ExamplesSee working examples in Limitations
FeedbackThis is experimental. Let me know what you think. |
|
@filmor @evnu ok...I got yielding async NIF support with Cooperative Yielding NIFsWhat is it?A new way to write long-running NIFs that cooperate with the BEAM scheduler using Usageuse rustler::runtime::yield_now;
#[rustler::nif]
async fn process_large_dataset(items: i64) -> i64 {
let mut sum = 0;
for i in 0..items {
sum += i;
if i % 100 == 0 {
yield_now().await; // Yield to scheduler
}
}
sum
}# Appears synchronous - blocks until complete
result = MyNif.process_large_dataset(10_000)Key Differences
How it works
|
|
I'll try to review this in detail this week. Good idea to side-step the issues with On reasonably recent versions of OTP (in particular, all that we support), you could also monitor the created process through the resource object. |
|
@scrogson good work! :) Before diving into the code, I have a more general question regarding process life-cycles when using the task API. When using a task, what is the expected way to handle a crash in the "owner" process on the Elixir side to avoid leaking an async thread? Example: test "async_with_progress sends intermediate messages using Caller" do
spawn(fn ->
ref = RustlerTest.async_with_progress(300)
exit(:die!)
end)
:timer.sleep(10000)
endMy assumption was that I should be able to use Follow-up question: When waiting in a bidirectional channel and the owning process dies, how would this need to be handled by the task? |
@filmor yes, agreed. I think this would be quite nice.
@filmor yes, I was thinking about this as well.
@evnu thanks...although the AI overlords were a big help 😉
@evnu great question, I should probably write a test and go from there. But, since the channel is a resource I imagine that from the elixir side if you try to use it by calling a NIF again, you will get some sort of error when using it to send the message to a channel with no receiver.
Another great question. I'll need to make sure that we have some tests which show exactly how this works. But I assume that since it's a resource, when the elixir side drops the resource and there are no more channel senders...the NIF side will receive a ExamplesI've got a branch of my |
|
Hey, this is cool work! One thing that needs to be confirmed, I recall talking to some people from the OTP team back in the day asking whether it is safe to hold terms across The consequence is that any terms that need to be held across to a subsequent call to Since terms are held within the future, there is no way to accomplish this with the current API design. This may very well have changed since then, but if it hasn't then I don't think this API design is sound |
Thanks @hansihe !
Yeah, we should be good! The design specifically avoids holding terms across schedule boundaries by using resources for state and creating fresh terms for passing through argv. |
How does it deal with terms held inside the future? |
rustler/rustler_codegen/src/nif.rs Lines 509 to 515 in bdcb343
The macro explicitly forbids #[rustler::nif]
async fn my_nif(data: String, count: i64) -> String {
// ...
}The codegen decodes There might be other types we might need to exclude here (maybe Excellent questions...keep it coming. |
|
Well, one can generate a process-independent environment ( |
|
Great! Yeah if term and env is not exposed then this should not be a problem.
I think
Yep, I'll find some time ASAP to review this properly |
|
maybe a hot take, but i think anything involving external async runtimes should be left out, since there isn't actually any integration with e.g. tokio being provided (most notably with wakers...) i think it gives a false sense of what is actually happening to someone writing code with rustler. (in fact i'd say building efficient code that interfaces with async rust probably should not use rescheduled nifs at all, but use messages with e.g. enif_send instead...) |
Async NIF Support with Tokio Runtime
Adds support for async Rust functions in NIFs, allowing long-running operations without blocking the BEAM scheduler. Async NIFs spawn tasks onto a Tokio runtime and send results via message passing.
Key Features
1. Async Functions
The
#[rustler::nif]macro detectsasync fnand generates wrapper code that::okimmediately (non-blocking)enif_sendwhen completeEnv,Term, or references)2. CallerPid for Intermediate Messages
Optional
CallerPidfirst parameter for sending progress updates. Doesn't count toward NIF arity.3. Configurable Runtime
Application developers configure via standard Elixir config:
NIF authors decode in load callback:
Implementation Details
Code Generation
sig.asyncness.is_some()runtime_handle(), sends result viaOwnedEnvRuntime Management
OnceCell<Arc<Runtime>>)RuntimeConfig: Decodable struct withworker_threads,thread_name,thread_stack_sizeconfigure(RuntimeConfig): Configure from Elixir termconfigure_runtime(|builder|): Programmatic configurationCallerPid Type
New wrapper type around
LocalPidthat the macro detects for special handling.Design Decisions
:okimmediately, send result via message (non-blocking, BEAM-idiomatic)use Rustlerconfig merging (standard Elixir pattern)Testing
Dependencies
Under
tokio_rtfeature:tokio = "1"(rt, rt-multi-thread, sync),once_cell = "1"Backward Compatibility
Fully backward-compatible, gated behind
tokio_rtfeature flag.Usage Example