Rust: Working With Any Number of Futures - rFronteddu/general_wiki GitHub Wiki
When we switched from using two futures to three in the previous section, we also had to switch from using join to using join3. It would be annoying to have to call a different function every time we changed the number of futures we wanted to join. Happily, we have a macro form of join to which we can pass an arbitrary number of arguments. It also handles awaiting the futures itself.
trpl::join!(tx1_fut, tx_fut, rx_fut);
However, even this macro form only works when we know the number of futures ahead of time. In real-world Rust, though, pushing futures into a collection and then waiting on some or all the futures in that collection to complete is a common pattern.
To check all the futures in some collection, we will need to iterate over and join on all of them. The trpl::join_all function accepts any type which implements the Iterator trait so it seems like just the ticket.
The code below it will not compile since Future is a trait not a concrete type. he concrete types are the individual data structures generated by the compiler for async blocks. You cannot put two different hand-written structs in a Vec, and the same thing applies to the different structs generated by the compiler.
let futures = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
To make this work, we need to use trait objects. Using trait objects lets us treat each of the anonymous futures produced by these types as the same type, since all of them implement the Future trait.
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
The type we had to write here is a little involved, so let’s walk through it:
The innermost type is the future itself. We note explicitly that the output of the future is the unit type () by writing Future<Output = ()>.
- Then we annotate the trait with dyn to mark it as dynamic.
- The entire trait reference is wrapped in a Box.
- Finally, we state explicitly that futures is a Vec containing these items.
That already made a big difference. Now when we run the compiler, we only have the errors mentioning Unpin. The first async block does not implement the Unpin trait, and suggests using pin! or Box::pin to resolve it.
let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];
here is a bit more we can explore here. For one thing, using Pin<Box> comes with a small amount of extra overhead from putting these futures on the heap with Box—and we are only doing that to get the types to line up. We don’t actually need the heap allocation, after all: these futures are local to this particular function.
As noted above, Pin is itself a wrapper type, so we can get the benefit of having a single type in the Vec—the original reason we reached for Box—without doing a heap allocation. We can use Pin directly with each future, using the std::pin::pin macro.
However, we must still be explicit about the type of the pinned reference; otherwise Rust will still not know to interpret these as dynamic trait objects, which is what we need them to be in the Vec. We therefore pin! each future when we define it, and define futures as a Vec containing pinned mutable references to the dynamic Future type:
let tx1_fut = pin!(async move {
// --snip--
});
let rx_fut = pin!(async {
// --snip--
});
let tx_fut = pin!(async move {
// --snip--
});
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
vec![tx1_fut, rx_fut, tx_fut];
let a = async { 1u32 };
let b = async { "Hello!" };
let c = async { true };
let (a_result, b_result, c_result) = trpl::join!(a, b, c);
println!("{a_result}, {b_result}, {c_result}");
We got this far by ignoring the fact that we might have different Output types. We can use trpl::join! to await them, because it allows you to pass in multiple future types and produces a tuple of those types. We cannot use trpl::join_all, because it requires the futures passed in all to have the same type.
This is a fundamental tradeoff: we can either deal with a dynamic number of futures with join_all, as long as they all have the same type, or we can deal with a set number of futures with the join functions or the join! macro, even if they have different types. This is the same as working with any other types in Rust.
When we “join” futures with the join family of functions and macros, we require all of them to finish before we move on. Sometimes, though, we only need some future from a set to finish before we move on—kind of like racing one future against another. This operation is often named race for exactly that reason.
Under the hood, race is built on a more general function, select, which you will encounter more often in real-world Rust code. A select function can do a lot of things that trpl::race function cannot, but it also has some additional complexity that we can skip over for now.
let slow = async {
println!("'slow' started.");
trpl::sleep(Duration::from_millis(100)).await;
println!("'slow' finished.");
};
let fast = async {
println!("'fast' started.");
trpl::sleep(Duration::from_millis(50)).await;
println!("'fast' finished.");
};
trpl::race(slow, fast).await;
Notice that if you flip the order of the arguments to race, the order of the “started” messages changes, even though the fast future always completes first. That is because the implementation of this particular race function is not fair. It always runs the futures passed as arguments in the order they are passed. Other implementations are fair, and will randomly choose which future to poll first. Regardless of whether the implementation of race we are using is fair, though, one of the futures will run up to the first .await in its body before another task can start.
Recall that at each await point, Rust gives a runtime a chance to pause the task and switch to another one if the future being awaited is not ready. The inverse is also true: Rust only pauses async blocks and hands control back to a runtime at an await point. Everything between await points is synchronous.
That means if you do a bunch of work in an async block without an await point, that future will block any other futures from making progress. You may sometimes hear this referred to as one future starving other futures. In some cases, that may not be a big deal. However, if you are doing some kind of expensive setup or long-running work, or if you have a future which will keep doing some particular task indefinitely, you will need to think about when and where to hand control back to the runtime.
By the same token, if you have long-running blocking operations, async can be a useful tool for providing ways for different parts of the program to relate to each other.
But how would you hand control back to the runtime in those cases?
Let’s simulate a long-running operation. The code below introduces a slow function. It uses std::thread::sleep instead of trpl::sleep so that calling slow will block the current thread for some number of milliseconds. We can use slow to stand in for real-world operations which are both long-running and blocking.
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
we use slow to emulate doing this kind of CPU-bound work in a pair of futures. To begin, each future only hands control back to the runtime after carrying out a bunch of slow operations.
let a = async {
println!("'a' started.");
slow("a", 30);
slow("a", 10);
slow("a", 20);
trpl::sleep(Duration::from_millis(50)).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
slow("b", 10);
slow("b", 15);
slow("b", 350);
trpl::sleep(Duration::from_millis(50)).await;
println!("'b' finished.");
};
trpl::race(a, b).await;
As with our earlier example, race still finishes as soon as a is done. There is no interleaving between the two futures, though. The a future does all of its work until the trpl::sleep call is awaited, then the b future does all of its work until its own trpl::sleep call is awaited, and then the a future completes. To allow both futures to make progress between their slow tasks, we need await points so we can hand control back to the runtime. That means we need something we can await!
IF removed the trpl::sleep at the end of the a future, it would complete without the b future running at all.
We do not really want to sleep here, though: we want to make progress as fast as we can. We just need to hand back control to the runtime. We can do that directly, using the yield_now function. Replace all those sleep calls with yield_now.
This is both clearer about the actual intent and can be significantly faster than using sleep, because timers like the one used by sleep often have limits to how granular they can be. The version of sleep we are using, for example, will always sleep for at least a millisecond, even if we pass it a Duration of one nanosecond. Again, modern computers are fast: they can do a lot in one millisecond!
Here, we skip all the status printing, pass a one-nanosecond Duration to trpl::sleep, and let each future run by itself, with no switching between the futures. Then we run for 1,000 iterations and see how long the future using trpl::sleep takes compared to the future using trpl::yield_now.
let one_ns = Duration::from_nanos(1);
let start = Instant::now();
async {
for _ in 1..1000 {
trpl::sleep(one_ns).await;
}
}
.await;
let time = Instant::now() - start;
println!(
"'sleep' version finished after {} seconds.",
time.as_secs_f32()
);
let start = Instant::now();
async {
for _ in 1..1000 {
trpl::yield_now().await;
}
}
.await;
let time = Instant::now() - start;
println!(
"'yield' version finished after {} seconds.",
time.as_secs_f32()
);
The version with yield_now is way faster!
This means that async can be useful even for compute-bound tasks, depending on what else your program is doing, because it provides a useful tool for structuring the relationships between different parts of the program. This is a form of cooperative multitasking, where each future has both the power to determine when it hands over control via await points. Each future therefore also has the responsibility to avoid blocking for too long. In some Rust-based embedded operating systems, this is the only kind of multitasking!
In real-world code, you will not usually be alternating function calls with await points on every single line, of course. While yielding control like this is relatively inexpensive, it is not free! In many cases, trying to break up a compute-bound task might make it significantly slower, so sometimes it is better for overall performance to let an operation block briefly. You should always measure to see what your code’s actual performance bottlenecks are. The underlying dynamic is an important one to keep in mind if you are seeing a lot of work happening in serial that you expected to happen concurrently, though!
We can also compose futures together to create new patterns. For example, we can build a timeout function with async building blocks we already have. When we are done, the result will be another building block we could use to build up yet further async abstractions.
let slow = async {
trpl::sleep(Duration::from_millis(100)).await;
"I finished!"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
use trpl::Either;
// --snip--
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
match trpl::race(future_to_try, trpl::sleep(max_time)).await {
Either::Left(output) => Ok(output),
Either::Right(_) => Err(max_time),
}
In practice, you will usually work directly with async and .await, and secondarily with functions and macros like join, join_all, race, and so on. You will only need to reach for pin now and again to use them with those APIs.