Rust Streams - rFronteddu/general_wiki GitHub Wiki
The async recv method produces a sequence of items over time. This is an instance of a much more general pattern, often called a stream. A sequence of items is something we have seen before, when we looked at the Iterator trait but there are two differences between iterators and the async channel receiver. The first difference is the element of time: iterators are synchronous, while the channel receiver is asynchronous. The second difference is the API. When working directly with an Iterator, we call its synchronous next method. With a trpl::Receiver, we call an asynchronous recv method instead, but these APIs otherwise feel very similar. That similarity is not a coincidence. A stream is like an asynchronous form of iteration. Whereas the trpl::Receiver specifically waits to receive messages, though, a general-purpose stream API needs to be much more general: it will just provide the next item like Iterator does, but asynchronously. In fact, this is roughly how it works in Rust, so we can actually create a stream from any iterator. As with an iterator, we can work with a stream by calling its next method, and then awaiting the output.
To make this work we need the right trait in scope to be able to use the next method. Given our discussion so far, you might reasonably expect that to be Stream, but the trait we need here is actually StreamExt. The Ext there is for “extension”: this is a common pattern in the Rust community for extending one trait with another.
You might be wondering why StreamExt instead of Stream, and for that matter whether there is a Stream type at all. Briefly, the answer is that throughout the Rust ecosystem, the Stream trait defines a low-level interface which effectively combines the Iterator and Future traits. The StreamExt trait supplies a higher-level set of APIs on top of Stream, including the next method and also many other utility methods like those from Iterator. We will return to the Stream and StreamExt traits in a bit more detail at the end of the chapter. For now, this is enough to let us keep moving.
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::run(async {
let values = 1..101;
let iter = values.map(|n| n * 2);
let stream = trpl::stream_from_iter(iter);
let mut filtered =
stream.filter(|value| value % 3 == 0 || value % 5 == 0);
while let Some(value) = filtered.next().await {
println!("The value was: {value}");
}
});
}
Of course, this is not very interesting. We could do that with normal iterators and without any async at all. So let’s look at some of the other things we can do which are unique to streams.
Lots of things are naturally represented as streams: items becoming available in a queue, or working with more data than can fit in a computer’s memory by only pulling chunks of it from the file system at a time, or data arriving over the network over time. Because streams are futures, we can use them with any other kind of future, too, and we can combine them in interesting ways. For example, we can batch up events to avoid triggering too many network calls, set timeouts on sequences of long-running operations, or throttle user interface events to avoid doing needless work.
Let’s start by building a little stream of messages, similar to what we might see from a WebSocket or other real-time communication protocols.
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages = get_messages();
while let Some(message) = messages.next().await {
println!("{message}");
}
});
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for message in messages {
tx.send(format!("Message: '{message}'")).unwrap();
}
ReceiverStream::new(rx)
We could do this with the regular Receiver API, or even the regular Iterator API, though. Let’s add something that requires streams, like adding a timeout which applies to every item in the stream, and a delay on the items we emit.
Below, we start by adding a timeout to the stream with the timeout method, which comes from the StreamExt trait. Then we update the body of the while let loop, because the stream now returns a Result. The Ok variant indicates a message arrived in time; the Err variant indicates that the timeout elapsed before any message arrived. We match on that result and either print the message when we receive it successfully, or print a notice about the timeout. Finally, notice that we pin the messages after applying the timeout to them, because the timeout helper produces a future which needs to be pinned to be polled.
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages =
pin!(get_messages().timeout(Duration::from_millis(200)));
while let Some(result) = messages.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
In get_messages, we use the enumerate iterator method with the messages array so that we can get the index of each item we are sending along with the item itself. Then we apply a 100 millisecond delay to even-index items and a 300 millisecond delay to odd-index items, to simulate the different delays we might see from a stream of messages in the real world. Because our timeout is for 200 milliseconds, this should affect half of the messages. To sleep between messages in the get_messages function without blocking, we need to use async. However, we cannot make get_messages itself into an async function, because then we would return a Future<Output = Stream<Item = String>> instead of just a Stream<Item = String>>. The caller would have to await get_messages itself to get access to the stream. But remember: everything in a given future happens linearly; concurrency happens between futures. Awaiting get_messages would require it to send all the messages, including sleeping between sending each message, before returning the receiver stream. As a result, the timeout would end up useless. There would be no delays in the stream itself: the delays would all happen before the stream was even available.
Instead, we leave get_messages as a regular function which returns a stream, and spawn a task to handle the async sleep calls.
calling spawn_task like this works because we already set up our runtime. Calling this particular implementation of spawn_task without first setting up a runtime will cause a panic. Other implementations choose different tradeoffs: they might spawn a new runtime and so avoid the panic but end up with a bit of extra overhead, or simply not provide a standalone way to spawn tasks without reference to a runtime. You should make sure you know what tradeoff your runtime has chosen and write your code accordingly!
The timeout does not prevent the messages from arriving in the end—we still get all of the original messages. This is because our channel is unbounded: it can hold as many messages as we can fit in memory. If the message does not arrive before the timeout, our stream handler will account for that, but when it polls the stream again, the message may now have arrived.
You can get different behavior if needed by using other kinds of channels, or other kinds of streams more generally.
First, let’s create another stream, which will emit an item every millisecond if we let it run directly. For simplicity, we can use the sleep function to send a message on a delay, and combine it with the same approach of creating a stream from a channel we used in get_messages. The difference is that this time, we are going to send back the count of intervals which has elapsed, so the return type will be impl Stream<Item = u32>, and we can call the function get_intervals.
Below, we start by defining a count in the task. (We could define it outside the task, too, but it is clearer to limit the scope of any given variable.) Then we create a an infinite loop. Each iteration of the loop asynchronously sleeps for one millisecond, increments the count, and then sends it over the channel. Since this is all wrapped in the task created by spawn_task, all of it will get cleaned up along with the runtime, including the infinite loop.
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
This kind of infinite loop, which only ends when the whole runtime gets torn down, is fairly common in async Rust: many programs need to keep running indefinitely. With async, this does not block anything else, as long as there is at least one await point in each iteration through the loop.
Back in our main function’s async block, we start by calling get_intervals. Then we merge the messages and intervals streams with the merge method. Finally, we loop over that combined stream instead of over messages
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
At this point, neither messages nor intervals needs to be pinned or mutable, because both will be combined into the single merged stream. However, this call to merge does not compile! (Neither does the next call in the while let loop, but we will come back to that after fixing this.) The two streams have different types. The messages stream has the type Timeout<impl Stream<Item = String>>, where Timeout is the type which implements Stream for a timeout call. Meanwhile, the intervals stream has the type impl Stream<Item = u32>. To merge these two streams, we need to transform one of them to match the other.
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
First, we use the throttle method on the intervals stream, so that it does not overwhelm the messages stream. Throttling is a way of limiting the rate at which a function will be called—or, in this case, how often the stream will be polled. Once every hundred milliseconds should do, since that is in the same ballpark as how often our messages arrive.
To limit the number of items we will accept from a stream, we can use the take method. We apply it to the merged stream, because we want to limit the final output, not just one stream or the other.
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval #{count}"))
.throttle(Duration::from_millis(100))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20);
let mut stream = pin!(merged);
Now when we run the program, it stop after pulling twenty items from the stream, and the intervals do not overwhelm the messages. We also do not get Interval: 100 or Interval: 200 or so on, but instead simply get Interval: 1, Interval: 2, and so on—even though we have a source stream which can produce an event every millisecond. That is because the throttle call produces a new stream, wrapping the original stream, so that the original stream only gets polled at the throttle rate, not its own “native” rate. We do not have a bunch of unhandled interval messages we are simply choosing to ignore. Instead, we never produce those interval messages in the first place! This is the inherent “laziness” of Rust’s futures at work again, allowing us to choose our performance characteristics.
There is one last thing we need to handle: errors! With both of these channel-based streams, the send calls could fail when the other side of the channel closes—and that is just a matter of how the runtime executes the futures which make up the stream. Up till now we have ignored this by calling unwrap, but in a well-behaved app, we should explicitly handle the error, at minimum by ending the loop so we do not try to send any more messages!
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
eprintln!("Cannot send message '{message}': {send_error}");
break;
}
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
};
}
});
ReceiverStream::new(rx)
}