Tokio - HsuJv/Note GitHub Wiki

Tokio

What is Tokio

  • Tokio allows developers to write asynchronous programs in the Rust programming language.
  • More Specifically, Tokio is an event-driven, non-blocking I/O platform for writing asynchronous applications with Rust. At a high level, it provides a few major components:
    • A multithreaded, work-stealing based task scheduler.
    • A reactor backed by the operating system's event queue (epoll, kqueue, IOCP, etc...)
    • Asynchronous TCP and UDP sockets
  • Features:
    • Fast: Tokio is built on the Rust, which is itself very fast. Tokio's design is also geared towards enabling applications to be afap.
    • Zero-cost abstractions: Tokio is built around futures, which aren't a new idea but the way Tokio uses them is unique. Tokio's futures compile down to a state machine. There is no added overhead from synchronization, allocation, or other costs common with future implementations.
    • Concurrency: Tokio provides a multi-threaded, work-stealing, scheduler.
    • Non-blocking I/O: When hitting the network, Tokio will use the most efficient system available to the operating system. On Linux this means epoll, bsd platforms provide kqueue, and Windows has I/O completion ports.
    • Reliable: While Tokio cannot prevent all bugs, it is designed to minimize them. It does this by providing APIs that are hard to misuse.
    • Ownership and type system: Rust’s ownership model and type system enables implementing system level applications without the fear of memory unsafety.
    • Backpressure: In push based systems, when a producer produces data faster than the consumer can process, data will start backing up. Pending data is stored in memory. Unless the producer stops producing, the system will eventually run out of memory and crash. The ability for a consumer to inform the producer to slow down is backpressure. Because Tokio uses a poll based model, the problem mostly just goes away. Producers are lazy by default. They will not produce any data unless the consumer asks them to. This is built into Tokio’s foundation.
    • Cancellation: Because of Tokio’s poll based model, computations do no work unless they are polled. Thanks to Rust’s ownership model, the computation is able to implement drop handles to detect the future being dropped.
    • Lightweight: Tokio scales well without adding overhead to the application, allowing it to thrive in resource constrained environments.
    • No GC
    • Modular: Tokio leverages mio for the system event queue and futures for defining tasks. Tokio implements async syntax to improve readability of futures. Many libraries are implemented using Tokio, including hyper and actix.

Getting started

Hello World

  • First, dependencies are add to Cargo.toml
[dependencies]
tokio = "0.1"
  • Second the source
use tokio::io;
use tokio::net::TcpStream;
use tokio::prelude::*;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // parse the address
    let addr = "0.0.0.0:8090".parse().unwrap();
    let client = TcpStream::connect(&addr)
        .and_then(|stream| {
            println!("created client stream");
            io::write_all(stream, "hello world\n").then(|result| {
                println!("wrote to stream; success={:?}", result.is_ok());
                Ok(())
            })
        })
        .map_err(|err| {
            // all task must have an Error type of (). This forces error
            // handling and helps avoid silencing failures.

            eprintln!("connection error = {:?}", err);
        });
    println!("About to create the stream and write to it...");
    tokio::run(client);
    println!("Stream has been created and written to.");
    Ok(())
}

Anatomy of the source

  • TcpStream::connect returns a Future which is a value representing some computation that will complete at some point in the future. This means that TcpStream::connect does not wait for the stream to be created before it returns.
  • The and_then method yields the stream once it has been created.
  • Each combinator functions takes ownership of necessary state as well as the callback to perform and returns a new Future that the additional step sequenced.
  • The returned futures are lazy, i.e., no work is performed when calling the combinator. Instead, once all the asynchronous steps are sequenced, the final Future (representing the entire task) is ‘spawned’ (i.e., run).
  • map_err to convert whatever error we may have gotten to () to ensure that we acknowledge errors.
  • io::write_all takes the ownership of stream, returning a Future that completes once the entire message has been written to the stream.
  • then is used to sequence a step that gets run once the write has completed. The result is a Result that contains the original stream(compare to and_then, which passes the stream without the Result wrapper).
  • Executors are responsible for scheduling asynchronous tasks, driving them to completion. tokio::run is the default executor which starts the runtime, blocking the current thread until all spawned tasks have completed and all resources have been dropped.

Futures

  • This runtime model is very different than async libraries found in other languages. While, at a high level, APIs can look similar, the way code gets executed differs.

Synchronous Model

  • A briefly look at the synchronous (aka blocking) model that Rust standard library uses.
let mut buf = [0; 1023];
let n = socket.read(&mut buf).unwrap();
  • When the read is called, either the socket has pending data or it does not. -If there is, the call to read will return immediately and the buf will be filled with the data.
    • However, if there is not, the read function will block the current thread until data is received.
  • In order to perform reads on many different sockets concurrently, a thread per socket is required. Using a thread per socket does not scale up very well to large numbers of sockets. (Known as c10k)

Non-blocking sockets

  • The way to avoid blocking a thread when performing an operation like read is to not block the thread. Non-blocking sockets allow performing operations, like read, without blocking the thread.
  • When the socket has no pending data in its receive buffer, the read function returns immediately, indicating that the socket was “not ready” to perform the read operation.
  • When using a Tokio TcpStream, a call to read will always immediately return a value (ErrorKind::WouldBlock) even if there is no pending data to read.
  • If there is no pending data, the caller is responsible for calling read again at a later time. The trick is to know when that “later time” is.
  • Another way to think about a non-blocking read is as ‘polling’ the socket for data to read.
  • Futures are an abstraction around this polling model.
    • A Future represents a value that will be available at “some point in the future”.
    • We can poll the future and ask if the value is ready or not.

A closer look at futures

  • A future is a value that represents the completion of an asynchronous computation.
  • Usually, the future completes due to an event that happens elsewhere in the system.
    • A database query, when the query finishes, the future is completed, and its value is the result of the query.
    • An RPC invocation to a server. When the server replies, the future is completed, and its value is the server’s response.
    • A timeout. When time is up, the future is completed, the value is an empty tuple () (also referred to as “unit” or “the unit type”).
    • A long-running CPU-intensive task, running on a thread pool. When the task finishes, the future is completed, and its value is the return value of the task.
    • Reading bytes from a socket. When the bytes are ready, the future is completed – and depending on the buffering strategy, the bytes might be returned directly, or written as a side-effect into some existing buffer.
  • The entire point of the future abstraction is to allow asynchronous functions, i.e., functions that cannot immediately return a value, to be able to return something.
  • For instance, an asynchronous HTTP client could have the following code
// a get function
pub fn get(&self, uri: &str) -> ResponseFuture { ... }

// the user of the library use this to call
// Now, the response_future isn’t the actual response.
// It is a future that will complete once the response is received.
let response_future = client.get("https://www.example.com");

// however, since the caller has a concrete value (the future)
// they can start to use it.
let response_is_ok = response_future
    .map(|response| {
        response.status().is_ok()
    });

track_response_success(response_is_ok);
  • None of those actions taken with the future perform any immediate work. Instead, they define the work to be done when the response future completes and the actual response availabe.
  • Some combinator functions of futures:
    • and_then: chains two futures together
    • then: allows to chain a future to a previous one even if the previous one errored
    • map: simply maps a future's value from one type to another

Poll based Futures

  • Future are poll based, which means that instead of a Future being responsible for pushing data somewhere once it is complete, it relies on being asked whether it is complete or not.
    • Most future libraries for other programming languages use a push based model where callbacks are supplied to the future and the computation invokes the callback immediately with the computation result.
    • Using a poll based model offers many advantages, including being a zero cost abstraction.

The Future trait

  • The Future trait is as follows:
trait Future {
    /// The type of the value returned when the future completes.
    type Item;

    /// The type representing errors that occurred while processing the computation.
    type Error;

    /// The function that will be repeatedly called to see if the future
    /// has completed or not
    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
}
  • Item is the type of the value that the Future will yield when it completes.
  • Error is the type of Error that the Future may yield if there's an error.
  • poll method is what the runtime will call in order to see if the future is complete or not.
    • Async is an enum with values Ready(Item) or NotReady

Streams

  • Streams are the iterator equivalent of futures.
  • Instead of yielding a value at some point in the future, streams yield a collection of values each at some point in the future.
    • In other words, streams keep yielding values over time.
    • Streams can be used to represent a wide range of things, for instances:
      • UI Events caused by the user interacting with a GUI in different ways. When an event happens the stream yields a different message to your app over time.
      • Push Notifications from a server. Sometimes a request/response model is not what you need. A client can establish a notification stream with a server to be able to receive messages from the server without specifically being requested.
      • Incoming socket connections. As different clients connect to a server, the connections stream will yield socket connections.
  • Streams are very similar to futures in their implementation:
trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// The type representing errors that occurred while processing the computation.
    type Error;

    /// The function that will be repeatedly called to see if the 
    /// stream has another value it can yield
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}

Runtime

Tokio runtime

  • In order for a Future to make progress, something has to call poll, which is the job of the runtime. The runtime is responsible for repeatedly calling poll on a Future until its value is returned.
  • There are many different ways to do this and thus many types of runtime configurations.
    • CurrentThread runtime configuration will block the current thread and loop through all spawned Futures, calling poll on them.
    • ThreadPool configuration schedules Futures across a thread poll, which is also the default configuration used by the Tokio runtime

Spawning Tasks

  • One of the unique aspects of Tokio is that futures can be spawned on the runtime from within other futures or streams.
    • The futures used in this way are usually referred to as tasks
    • Tasks are similar to Go’s goroutine and Erlang’s process, but asynchronous.
    • In other words, tasks are asynchronous green threads.
  • Given that a task runs an asynchronous bit of logic, they are represented by the Future trait. The task’s future implementation completes with a () value once the task is done processing.
  • Tasks are passed to runtime, which handle scheduling the task.
  • The runtime is usually scheduling many tasks across a single or small set of threads.
  • Task must not perform computation-heavy logic or they will prevent other tasks from executing.
  • Task are implemented by either building up a future using various combinator functions available in the futures and tokio crates or by implementing the Future trait directly.
// Create some kind of future that we want our runtime to execute
let program = my_outer_stream.for_each(|my_outer_value| {
  println!("Got value {:?} from the stream", my_outer_value);

  let my_inner_future = future::ok(1);

  let task = my_inner_future.and_then(|my_inner_value| {
    eprintln!("Got a value {:?} from second future", my_inner_value);
    Ok(())
  });

  tokio::spawn(task);
  Ok(())
});

tokio::run(program);
  • Spawning task can happen within other futures or streams allowing multiple things to happen concurrently.

Working with futures

  • Futures are the building block used to manage asynchronous logic. They are the underlying asynchronous abstraction used by Tokio.
  • Applications built with Tokio are structured in terms of futures. Tokio takes these futures and drives them to completion.

Implementing futures

A future for hello world

// `Poll` is a type alias for `Result<Async<T>, E>`
use futures::{Future, Async, Poll};

struct HelloWorld;

impl Future for HelloWorld {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        Ok(Async::Ready("hello world".to_string()))
    }
}
  • The Item and Error associated types define the types returned by the future once it completes. By convention, infallible futures set Error to ()
  • Futures use a poll based model. The consumer of a future repeatedly calls the poll function.
  • When a future's poll function is called, the implementation will synchronously do as much work as possible until it is logically blocked on some asynchronous event that has not occured yet.

Running the future

#[macro_use]
extern crate futures;

use futures::{Future, Async, Poll};
use std::fmt;

struct Display<T>(T);

impl<T> Future for Display<T>
where
    T: Future,
    T::Item: fmt::Display,
{
    type Item = ();
    type Error = T::Error;

    fn poll(&mut self) -> Poll<(), T::Error> {
        let value = try_ready!(self.0.poll());
        // ------ equal to the follow
        // let value = match self.0.poll() {
        //     Ok(Async::Ready(value)) => value,
        //     Ok(Async::NotReady) => return Ok(Async::NotReady),
        //     Err(err) => return Err(err),
        // };

        println!("{}", value);
        Ok(Async::Ready(()))
    }
}

let future = Display(HelloWorld);
tokio::run(future);
  • The tokio::run accepts futures where both Item and Error are set to (). This is because Tokio only executes the futures, it does not do anything with values. The user of Tokio is required to fully process all values in the future.
  • The Display takes a future that yields items that can be displayed. When it is polled, it first tries to poll the inner future. If the inner future is not ready then Display cannot complete. In this case, Display also returns NotReady.
  • poll implementations must never return NotReady unless they received NotReady by calling an inner future.

Getting asynchronous

  • Futures are all about managing asynchronicity.
    • Implementing a future that completes asynchronously requires correctly handling receiving Async::NotReady from the inner future.
extern crate tokio;
#[macro_use]
extern crate futures;

use tokio::net::{TcpStream, tcp::ConnectFuture};
use futures::{Future, Async, Poll};

struct GetPeerAddr {
    connect: ConnectFuture,
}

impl Future for GetPeerAddr {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.connect.poll() {
            Ok(Async::Ready(socket)) => {
                println!("peer address = {}", socket.peer_addr().unwrap());
                Ok(Async::Ready(()))
            }
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => {
                println!("failed to connect: {}", e);
                Ok(Async::Ready(()))
            }
        }
    }
}

fn main() {
    let addr = "127.0.0.1:8090".parse().unwrap();
    let connect_future = TcpStream::connect(&addr);
    let get_peer_addr = GetPeerAddr {
        connect: connect_future,
    };

    tokio::run(get_peer_addr);
}
  • GetPeerAddr contains ConnectFuture, a future that completes once a TCP stream has been established. This future is returned by TcpStream::connect.
  • When GetPeerAddr is passed to tokio::run, Tokio will repeatedly call poll until Ready is returned.
  • When implementing Future, Async::NotReady must not be returned unless Async::NotReady was obtained when calling poll on an inner future.

Combinators

  • Often times, Future implementations follow similar patterns. To help reduce boilerplate, the futures crate provides a number of utilities, called “combinators”, that abstract these patterns. Many of these combinators exist as functions on the Future trait.

  • Considering the code in Running the future, it is equal to the code following with map combinator

extern crate tokio;
extern crate futures;

use futures::Future;

fn main() {
    let future = HelloWorld.map(|value| {
        println!("{}", value);
    });

    tokio::run(future);
}
  • Here's how map is implemented:
pub struct Map<A, F> where A: Future {
    future: A,
    f: Option<F>,
}

impl<U, A, F> Future for Map<A, F>
    where A: Future,
          F: FnOnce(A::Item) -> U,
{
    type Item = U;
    type Error = A::Error;

    fn poll(&mut self) -> Poll<U, A::Error> {
        let value = try_ready!(self.future.poll());
        let f = self.f.take().expect("cannot poll Map twice");

        Ok(Async::Ready(f(value)))
    }
}

Essential combinators

  • This guide provides a very quick overview
  • Concrete futures (Any value can be turned into an immediately complete future.)
    • ok, analogous to Result::Ok, converts the provided value into a immediately ready future that yields back the value.
    • err, analogous to Result::Err, converts the provided error into an immediately ready future that fails with the error. as an immediately failed future.
    • result lifts a result to an immediately complete future.
    • In addition, lazy allows constructing a future given a closure. The closure is not immediately invoked, instead it is invoked the first time the future is polled.
  • Info Future: A crucial API to know about is the IntoFuture trait, which is a trait for values that can be converted into futures.
  • Adapters (Like Iterator, the Future trait includes a broad range of “adapter” methods. These methods all consume the future, returning a new future providing the requested behavior.):
    • Change the type of a future (map, map_err)
    • Run another future after one has completed (then, and_then, or_else)
    • Figure out which of two futures resolves first (select)
    • Wait for two futures to both complete (join)
    • Convert to a trait object (Box::new)
    • Convert unwinding into errors (catch_unwind)

When to use combinators

  • Using combinators can reduce a lot of boilerplate, but they are not always a good fit. Due to limitations, implementing Future manually is going to be common.

  • Functional style:

    • Closure passed to combinators must be 'static. This means it is not possible to pass references into the closure. Ownership of all state must be moved into the closure.
      • The reason for this is that lifetimes are based on the stack.
      • With asynchronous code, the ability to rely on the stack is lost.
      • Because of this, code written using combinators end up being very functional in style.
  • Returning futures:

    • Because combinators often use closures as part of their type signature, it is not possible to name the future type.
    • This, in turn, means that the future type cannot be used as part of a function’s signature. When passing a future as a function argument, generics can be used in almost all cases.

Streams

  • Streams are similar to futures. Instead of yielding a single value, they asynchronous yield one or more values.

The Stream trait

trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// The type representing errors that occurred while processing 
    /// the computation.
    type Error;

    /// The function that will be repeatedly called to see if the 
    /// stream has another value it can yield
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}
  • The Item associated type is the type yielded by the stream.
  • The poll function is very similar to Future’s poll function. The only difference is that, this time, Option<Self::Item> is returned.
  • Stream implementations have the poll function called many times. When the next value is ready, Ok(Async::Ready(Some(value))) is returned. When the stream is not ready to yield a value, Ok(Async::NotReady) is returned.
  • When the stream is exhausted and will yield no further values, Ok(Async::Ready(None)) is returned.
  • ust like with futures, streams must not return Async::NotReady unless Async::NotReady was obtained by an inner stream or future.

An example of Fibonacci

extern crate futures;

use futures::{Stream, Poll, Async};

pub struct Fibonacci {
    curr: u64,
    next: u64,
}

impl Fibonacci {
    fn new() -> Fibonacci {
        Fibonacci {
            curr: 1,
            next: 1,
        }
    }
}

impl Stream for Fibonacci {
    type Item = u64;

    // The stream will never yield an error
    type Error = ();

    fn poll(&mut self) -> Poll<Option<u64>, ()> {
        let curr = self.curr;
        let next = curr + self.next;

        self.curr = self.next;
        self.next = next;

        Ok(Async::Ready(Some(curr)))
    }
}

#[macro_use]
extern crate futures;

use futures::{Future, Stream, Poll, Async};
use std::fmt;

pub struct Display10<T> {
    stream: T,
    curr: usize,
}

impl<T> Display10<T> {
    fn new(stream: T) -> Display10<T> {
        Display10 {
            stream,
            curr: 0,
        }
    }
}

impl<T> Future for Display10<T>
where
    T: Stream,
    T::Item: fmt::Display,
{
    type Item = ();
    type Error = T::Error;

    fn poll(&mut self) -> Poll<(), Self::Error> {
        while self.curr < 10 {
            let value = match try_ready!(self.stream.poll()) {
                Some(value) => value,
                // There were less than 10 values to display, terminate the
                // future.
                None => break,
            };

            println!("value #{} = {}", self.curr, value);
            self.curr += 1;
        }

        Ok(Async::Ready(()))
    }
}

extern crate tokio;

let fib = Fibonacci::new();
let display = Display10::new(fib);

tokio::run(display);

Stream: Combinators

  • unfold:
extern crate futures;

use futures::{stream, Stream};

fn fibonacci() -> impl Stream<Item = u64, Error = ()> {
    stream::unfold((1, 1), |(curr, next)| {
        let new_next = curr + next;

        Some(Ok((curr, (next, new_next))))
    })
}
  • take and for_each:
extern crate tokio;
extern crate futures;

use futures::Stream;

tokio::run(
    fibonacci().take(10)
        .for_each(|num| {
            println!("{}", num);
            Ok(())
        })
);

Stream: Essential combinators

  • Concrete streams (The stream module contains functions for converting values and iterators into streams.):

    • once converts the provided value into an immediately ready stream that yields a single item: the provided value.
    • iter_ok and iter_result both take IntoIterator values and converts them to an immediately ready stream that yields the iterator values.
    • empty returns a stream that immediately yields None.
  • Adapters -- Like Iterator, the Stream trait includes a broad range of “adapter” methods. These methods all consume the stream, returning a new stream providing the requested behavior.

    • Change the type of a stream (map, map_err, and_then).
    • Handle stream errors (or_else).
    • Filter stream values (take, take_while, skip, skip_while, filter, filter_map).
    • Asynchronously iterate the values (for_each, fold).
    • Combine multiple streams together (zip, chain, select).

Spawning

extern crate tokio;
extern crate futures;

use futures::future::lazy;

tokio::run(lazy(|| {
    for i in 0..4 {
        tokio::spawn(lazy(move || {
            println!("Hello from task {}", i);
            Ok(())
        }));
    }

    Ok(())
}));
  • The tokio::run function will block until the the future passed to run terminates as well as all other spawned tasks. In this case, tokio::run blocks until all four tasks output to STDOUT and terminate.

When to spawn tasks

  • As all things software related, the answer is that it depends. Generally, the answer is spawn a new task whenever you can. The more available tasks, the greater the ability to run the tasks in parallel. However, keep in mind that if multiple tasks do require communication, this will involve channel overhead.

When not to spawn tasks

  • If the amount of coordination via message passing and synchronization primitives outweighs the parallism benefits from spawning tasks, then maintaining a single task is preferred.
  • For example, it is generally better to maintain reading from and writing to a single TCP socket on a single task instead of splitting up reading and writing between two tasks.
⚠️ **GitHub.com Fallback** ⚠️