Adventures in Rust: Futures and Tokio

One of my newer hobbies recently has been learning and toying around with Rust. Recently, as part of this learning process, I've started implementing an IP address lookup service as a small side project. During the course of implementing this project I ran into what turned out to be a bit of a hurdle to tackle, specifically performing reverse dns resolution asynchronously.

This challenge stemmed primarily from my lack of understanding both Futures in Rust and the Tokio runtime model. I had many preconceived notions of what a Future is and how one behaves, primarily from my extensive experience with Scala and it's Futures. It also didn't help that I dove right in based on examples without reading much of the higher level documentation describing Futures. I also found that the more I understood about Futures, the more the Tokio runtime made sense to me.

My hope is that through exploring this problem, I can aid others in understanding these concepts which can be initially difficult grasp. This is especially so as the behavior and function of Futures vary wildly between various languages and runtimes.

Background

The goal of my IP address lookup service is to allow users to easily query information about an ip address by issuing a simple Http call and receive a json payload in response. This payload will include ASN information, GeoIP information (from Maxmind), and DNS information. To create this http service, I chose the excellent Hyper http library and by extension the Tokio runtime.

Initially creating the Http service using Hyper wasn't too much of a challenge and I was able to follow this blog post with minor changes based on recent updates to Hyper to get the web service up and running. I will freely admit that at this point I didn't understand much of what I was doing to implement and run the service. By the time I got around to creating the actual Http service I had already prototyped out the ASN and GeoIp portions of the program, so I was all set except for the DNS component.

Finally, after researching numerous DNS libraries, I landed on the domain library which contains a resolver that uses Tokio to asynchronously perform DNS queries.

First Steps

Given that I'd decided on the DNS library I wanted to use, I decided to try it out using a basic snippet found in the documentation. My first goal was to use this library to reverse lookup hostnames given an ip address. Using the basic snippet found in the api documentation for the resolv module, I was able to piece together a simple function to do this:

use std::net::IpAddr;
use tokio_core::reactor::Core;
use domain::resolv::Resolver;
use domain::resolv::lookup::addr::lookup_addr;

fn lookup_hostnames(ip: IpAddr) -> Vec<String> {
    let mut core = Core::new().unwrap();
    let resolv = Resolver::new(&core.handle());

    let addrs = lookup_addr(resolv, ip);
    let names_response = core.run(addrs).unwrap();
    names_response.iter().map(|n| n.to_string()).collect()
}

In this code sample we create a Tokio Core, which after a quick look at the documentation, is essentially an event loop. We use a handle to that Core create a Resolver. Using the Resolver, we can call lookup_addr to create a Future that contains the result of this lookup. Finally, we use the Core to run the future and produce the lookup result.

There are a couple of things that aren't great about this function. First, every time we call this function, we are creating an event loop for the express purpose of executing a single dns lookup. Secondly, when we execute core.run(addrs) we are blocking the current thread of execution until the lookup has completed.

To make matters worse attempting to call this function directly from my hyper service resulted in a panic with the message cannot recursively call into core. I was able to work around this by spawning a new thread to perform this task and waiting on the result:

fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
...
    let reverse_dns_result = thread::spawn(move || {
        dns::reverse_dns_lookup(ip)
    }).join().unwrap();
...
}

Note that the call function in this example is essentially a request handler used by Hyper to handle Http requests as they come in.

While this technically worked, I knew that things had gone awry. At this point, we're spawning a separate thread that creates it's own event loop to perform a single task which we're then blocking on to synchronously wait on the result. Not good right?

What actually is a Future?

At this point I tried a lot of different things that didn't work. I knew that the result of lookup_addr(resolv, ip) being used in the lookup_hostnames function was a Future and since I use Futures everday in Scala then this should be simple right? At the time, all I thought I had to do was to get ahold of that resulting Future and then weave it into the Future I created as part of my Hyper service invocation in it's call function. I still held onto the idea that a Future in rust was something that was already running asynchronously and all I had to do was just wait on the result to be materialized.

Several times I was able to find a code path to successfully get the future over to my Hyper service using hook or crook, yet it never seemed to work out. While I was able to get a reference to the future and use the and_then combinator to process the result and formulate an Http response, the future itself never seemed to complete. As a result the service would always hang and I'd never receive a result.

It was at this point that I finally took a step back and decided to read the documentation for futures-rs in an attempt to figure out what exactly I was doing wrong. While reading through the documentation, I found this important bit under "Runtime Characteristics" in the documentation for the poll function:

Futures alone are inert; they must be actively polled to make progress, meaning that each time the current task is woken up, it should actively re-poll pending futures that it still has an interest in.

At this point things started to click for me. Rust's Future implementation is a much lower level representation of a Future than I was previously used to. I won't delve deep into the specifics here, but suffice to say that a Future is essentially a state machine that does nothing until it's actively polled. Additionally as futures are composed using combinators such as map and join, we are actually building a more elaborate state machine, not manipulating the result of something that is already running somewhere.

While this epiphany was very helpful in understanding how Futures operate in Rust, it still didn't explain the behavior I was seeing. Since I was building up this elaborate state machine to serve my Http requests, the DNS lookup Future I combined in using and_then should eventually complete as Hyper was apparently polling my Future. However, this wasn't the case. Armed with my newfound knowledge, I next turned my attention to the Tokio reactor Core.

What actually is an Event Loop?

Admittedly before this point I did know that creating a new reactor Core on every DNS lookup was at least partially the reason that things weren't working out, I just didn't know why. Now that I understood Futures better, I also could see the reason why the reactor Core existed and in addition it gave me a better appreciation for why Tokio itself needed to exist. Now that we know more about Futures, let's take a look at the documentation for the Tokio reactor Core:

An event loop.

The event loop is the main source of blocking in an application which drives all other I/O events and notifications happening. Each event loop can have multiple handles pointing to it, each of which can then be used to create various I/O objects to interact with the event loop in interesting ways.

It was at this point that it began to come apparent to me what was going wrong in my attempts to simply extract a Future that I could use to materialize the DNS lookup result. First, let's take a quick look back at the two places we were using the reactor Core that was being created on each invocation:

fn lookup_hostnames(ip: IpAddr) -> Vec<String> {
    let mut core = Core::new().unwrap();
    let resolv = Resolver::new(&core.handle());
    ...
    let names_response = core.run(addrs).unwrap();
    ...
}

The first interesting bit we see here is that we are providing a Handle to the reactor Core when creating a the Resolver. Looking back at the documentation for reactor Core, a handle is used to "create various I/O objects to interact with the event loop". This indicates that the Resolver is likely using this handle to spawn I/O related tasks on this event loop.

The next interesting bit we see here is that we are invoking core.run to run this Future through to completion and wait on the result. One thing that was not immediately apparent to me can be gleaned from the first line of documentation for the reactor Core's run function:

Runs a future until completion, driving the event loop while we're otherwise waiting for the future to complete.

Not only does the run function run a Future until completion, but it also drives the event loop as a whole while doing so. This is something that makes a lot of sense when you think about it, but did not initially dawn on me.

Putting these two facts together, we can now see that the Resolver is spawning tasks into the reactor Core event loop and by calling core.run with run the future, we are also driving the event loop which results in those other tasks being run as well. The very thing that I had been missing all along is that I could not simply export the resulting Future and use it elsewhere, but I also had to ensure that the Core that was used to spawn these I/O tasks was run as well. Otherwise those tasks would never be completed and as a result the Future itself never completed either!

Now that we understand the problem and the underlying concept much better, we're ready to create a solution.

The Solution

Our previous naive solution has two major weaknesses that we need to solve. Firstly we are creating a new event loop on every single lookup we perform and secondly we are blocking the current thread of execution until that result has been fully realized.

The first thing we need to do is use a single reactor Core that can be used to consistently handle all reverse DNS lookups for us. Based on the documentation for reactor Core, we have two options to drive the event loop. Option one is the run function that takes a Future and runs the event loop until the future completes. Our second option is to use the turn function which will perform a single iteration of the event loop. As per the documentation we could use the turn function inside of an infinite loop to run the event loop indefinitely.

While we can probably create a solution using turn, what would really be ideal here is to build a Future that never ends and continually produces lookup requests that results in a reverse DNS lookup on our single event loop. Luckily, this very thing exists! One type available to us in the futures-rs crate is the Stream trait.

I won't go into much detail here, but suffice to say that a Stream is essentially a Future that continually produces values asynchronously. All we need to do is create a never ending stream of lookup requests that can be processed and responded to by our event loop. To this end, the futures::sync module provides us with some nice goodies to aid us in this task.

mpsc

In order to produce a never ending stream of requests that can be sent from any thread or handler in our server, the futures::sync module includes an asynchronous multi-producer, single-consumer channel. To unpack what exactly this is, let's look at the definition of the unbounded function, which creates an unbounded channel:

pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)

The unbounded function creates a tuple result containing both a sender and a receiver. The sender is used to publish items into the channel, and can be cloned and freely throughout the rust program and across different threads. The receiver on the other hand is a Stream and can be used to process the items sent via the sender asynchronously.

Using this unbounded stream we can now create a never ending stream of requests for our single event loop to process:

let (req_tx, req_rx) = mpsc::unbounded::<ReverseLookupRequest>(); // 1

thread::spawn(move || { // 2
    let mut core = Core::new().unwrap(); // 3
    let core_handle = core.handle();
    let resolv = Resolver::new(&core.handle()); 
    let resolver_loop = // 4
        req_rx.map_err(|e| println!("error = {:?}", e))
              .for_each(move |request| {
                  let future = handle_reverse_lookup(request, resolv.clone());
                  core_handle.spawn(future);
                  Ok(())
              });

    core.run(resolver_loop).expect("[dns] Failed to start reactor core loop."); // 5
});

This code snippet has a lot to unpack, so let's walk through it.

  1. The first thing we're doing here is creating an unbounded channel that takes and produces items of type ReverseLookupRequest. The req_tx variable is our sender which we will use to send values to the channel and req_rx variable is our receiver which is the stream that will be used to process the requests.
  2. Next we spawn the thread that is going to run our reactor::Core and handle all of our DNS requests.
  3. Afterwards, we create our reactor Core and our Resolver.
  4. We are calling for_each on our stream of requests to process each one of them asynchronously. To do this we are creating a future using the handle_reverse_lookup function (defined later) that will actually do the work of performing the lookup and responding with the result. Afterwards, we are spawning the resulting future into our event loop so it will be run asynchronously by the event loop.
  5. We're providing the Stream we created to core.run to run as a future that will never end, thus driving the event loop in perpetuity.

We haven't yet defined ReverseLookupRequest or handle_reverse_lookup yet, however we will soon. Now that we have a stream of incoming lookup requests, we now need a method to respond back to the requestor. Again, the futures::sync module has our back.

oneshot

The oneshot module allows us to create what essentially amounts to a completable promise. Just like the unbounded function, the oneshot module has a channel function that provides us with a sender and receiver. However, the primary distinction here is that the sender can only be used to send a single value, and the receiver is a Future that upon completion only contains a single value. We can use the oneshot module to provide a communication channel back to the sender, allowing us to respond with a lookup result.

Let's go ahead and define our ReverseLookupRequest struct:

struct ReverseLookupRequest {
    ip: IpAddr,
    sender: oneshot::Sender<ReverseLookupResponse>
}

A reverse lookup request contains an IP address to perform the reverse lookup against, as well as a sender that can be used to send the result back to the requester. Now that we've defined what a lookup request looks like, let's define the actual lookup handler function:

fn handle_reverse_lookup(request: ReverseLookupRequest, resolv: Resolver) -> impl Future<Item=(), Error=()> { // 1
    lookup_addr(resolv, request.ip).then(|result| { // 2
        let response = // 3
            match result {
                Ok(addrs) => ReverseLookupResponse { names: addrs.iter().map(|n| n.to_string()).collect() },
                Err(_) => ReverseLookupResponse { names: Vec::new() },
            };
        request.sender.send(response); // 4
        Ok(())
    })
}

Let's walk through this function.

  1. This function takes a lookup request and a Resolver, and it returns a future that actually performs the lookup and responds to the requester.
  2. We invoke lookup_addr which returns a Future containing the lookup result. We then chain a function onto the end of the future that creates and sends a lookup response using the then combinator.
  3. Here we are creating a response and handling the error scenario by returning an empty list of hostnames.
  4. Finally we use the oneshot sender to reply to the requester. By calling the sender.send function, we will be completing the corresponding Future that the requester has a reference to.

Now that we have all the pieces created, let's tie this solution together and give it a nice usable interface.

Tying It All Together

Looking back to our code sample where we create our unbounded channel, we are not currently using the request sender, req_tx yet. This sender is what we will be using to send requests to be processed by our event loop. Let's create a struct to hold this value:

#[derive(Clone)]
pub struct DnsLookupHandle {
    request_sender: mpsc::UnboundedSender<ReverseLookupRequest>,
}

This provides us a handle to send requests into our event loop to be processed. This struct can be cloned and shared freely across various threads. Now that we have the handle, let's create a impl function that allows us to easily perform lookups on our event loop and will provide us with a future that we can use elsewhere to retrieve the result:

impl DnsLookupHandle {
    pub fn lookup_hostnames(&self, ip: IpAddr) -> impl Future<Item=Vec<String>, Error=oneshot::Canceled> { // 1
        let (resp_tx, resp_rx) = oneshot::channel::<ReverseLookupResponse>(); // 2
        let result = self.request_sender.unbounded_send(ReverseLookupRequest { ip: ip, sender: resp_tx }); // 3
        resp_rx.map(|res| res.names) // 4
    }
}

Given that all the hard work is out of the way, this function is rather simple. Let's walk through it.

  1. We're defining the function lookup_hostnames on our DnsLookupHandle struct that will return a future containing the resulting hostnames from the lookup.
  2. Here we define a oneshot channel that we will use to receive the lookup result from our event loop running on another thread.
  3. This is the most important bit of this example. We are creating a ReverseLookupRequest that contains the IP address we are looking up hostnames for and the oneshot sender that will be used to send the result upon lookup completion. We are sending that request to our event loop using request_sender which hooks into the channel that our event loop is processing requests from.
  4. Finally we are converting the response object returned from the event loop into a list of names using the map combinator on the resulting Future.

Finally, we have a thread safe way to perform all asyncronous lookups on a single event loop. Now that we've created our handle, let's use it to perform a lookup:

let handle = create_lookup_handle();
let ip_addr = "172.217.3.238".parse::<IpAddr>().unwrap();
let result_future = handle.lookup_hostnames(ip_addr);

for hostname in result_future.wait().unwrap() {
    println!(" - {}", hostname);
}

This simple snippet illustrates the creation and use of the DnsLookupHandle. We simply perform a reverse lookup for a single ip address, block on the completion of the future, and then print out each hostname found as a result of the reverse dns lookup.

The only surprise in this example is the create_lookup_handle function. I did not include it here, because it is mostly code from a previous example that starts the event loop on a separate thread and just returns a lookup handle containing then Sender for the unbounded channel that drives the event loop.

And there you have it, a solution that solves all of the weaknesses of our previous attempt. We can share the handle freely throughout our application and perform lookups in a non-blocking manner. Additionally, we only create a single reactor Core that is used to perform all of those lookups.

Conclusion

As a result of moving from the naive solution to the one provided in this post, the lookup server was able to increase the number of requests it could process from around 50 req/sec to around 2000 req/sec.

Solving this problem proved to be a bit of a challenge, mostly due to preconceived notions that I had about how futures worked in Rust prior to using them and an attempt to just "wing" an implementation based on the type signatures instead of actually reading through the documentation. Overall, I am happy I encountered these challenges as they have forced me into a better understanding of futures in Rust and how they operate.

I hope that some found this blog post insightful. I created it less to share the solution I arrived at and more to share the knowledge I gained while searching for a solution. All of these libraries are still improving rapidly, and I'm sure higher level APIs will be created to do a lot of this work more simply.

You can find all of the code from this blog post here. In addition to the solution detailed in this post, I have an alternate simplified solution included there as well. As I hoped might happend while writing this post, I thought of a much more simplified solution that solves the problem without much of the complexity I added by implementing this message passing approach. Feel free to look at that for a better solution, my hope is that after reading through this post, the solution itself will be self explanatory. Finally, although it's changed a good deal from the time I started writing this post, the project that inspired it can be found here.

Thanks for reading!

Share Comments
comments powered by Disqus