Working with Asynchronous Streams in Rust

Asynchronous programming can be a bit challenging to grasp, but it’s incredibly useful for improving the performance of programs that deal…

Working with Asynchronous Streams in Rust

Asynchronous programming can be a bit challenging to grasp, but it’s incredibly useful for improving the performance of programs that deal with data over time, like network responses or file streams.

What Are Asynchronous Streams?

Asynchronous Nature: In Rust, operations are typically synchronous, meaning they block the thread until they complete. Asynchronous operations, on the other hand, allow other tasks to run while waiting for an operation to finish. This is particularly useful for IO-bound tasks (like network or file operations) where you don’t want to block the execution while waiting for a response from a disk read or a network request.

Streams: A stream in Rust can be thought of as an asynchronous version of an iterator. While an iterator yields a sequence of items synchronously, a stream yields a sequence of items asynchronously. This means that each item in the stream can be processed as it becomes available, without waiting for the entire sequence to be ready.

Under the Hood: Asynchronous streams in Rust are primarily facilitated through the futures and tokio crates. The futures crate provides the foundational traits and functions for asynchronous operations, including streams, while tokio offers a runtime environment where these asynchronous tasks can be executed.

When and Why Use Asynchronous Streams?

  1. Handling Large or Infinite Data: If you’re dealing with a large amount of data that comes in over time (like logs from a server) or an infinite sequence (like stock market ticks), asynchronous streams are ideal. They allow you to process each piece of data as soon as it arrives, rather than waiting for the entire dataset to be available.
  2. Improving Throughput in IO-bound Applications: In scenarios where your application spends a lot of time waiting for IO operations (like network requests or disk reads), asynchronous streams can significantly improve throughput. While one part of the stream is waiting for IO, other parts can continue processing, making the application more efficient.
  3. Real-Time Data Processing: Applications that require real-time processing of data, such as chat applications, real-time analytics, or live data feeds, benefit greatly from asynchronous streams. They can handle incoming data as soon as it’s available without blocking other operations.
  4. Resource Efficiency: Since asynchronous operations don’t block threads, they allow for more efficient use of system resources. This is particularly important in scenarios where resources are limited, such as in embedded systems or serverless environments.
  5. Scalability: Applications that need to scale to handle high levels of traffic, such as web servers, can benefit from asynchronous streams. They allow these applications to handle more requests concurrently, improving scalability.

Implementation example

Let’s build a simple asynchronous web server using Rust with the hyper crate. This server will listen for HTTP requests and respond with a streamed message. The message will be broken into multiple parts and sent asynchronously, showcasing how streams work in Rust.

Step-by-Step Guide

Setting Up Dependencies

First, you need to set up your Rust environment and Cargo.toml. Add the following dependencies:

[dependencies] 
futures = "0.3" 
tokio = { version = "1", features = ["full"] } 
hyper = "0.14"

Here, tokio is our async runtime, futures provides utilities for streams and futures, and hyper is for building HTTP servers and clients.

Writing the Stream Function

Let’s write a function that creates a stream. This stream will asynchronously send a series of strings to the client.

use futures::stream::{self, Stream}; 
 
fn create_message_stream() -> impl Stream<Item = Result<&'static str, std::io::Error>> { 
    stream::iter(vec![ 
        Ok("Hello"), 
        Ok(", "), 
        Ok("world"), 
        Ok("!"), 
        Ok("\n"), 
    ]) 
}

This function returns a stream of Result<&'static str, std::io::Error>. The stream::iter function turns a vector into a stream, where each vector element is yielded in sequence.

Setting Up the Async Web Server

Now, let’s set up the web server using hyper. We'll define a function to handle incoming HTTP requests and use our create_message_stream function to respond.

use hyper::{Body, Response, Server, Request, service::{make_service_fn, service_fn}}; 
use std::convert::Infallible; 
 
async fn handle_request(_req: Request<Body>) -> Result<Response<Body>, Infallible> { 
    let message_stream = create_message_stream(); 
    let response_body = Body::wrap_stream(message_stream); 
    Ok(Response::new(response_body)) 
} 
#[tokio::main] 
async fn main() { 
    let make_svc = make_service_fn(|_conn| async { 
        Ok::<_, Infallible>(service_fn(handle_request)) 
    }); 
    let addr = ([127, 0, 0, 1], 3000).into(); 
    let server = Server::bind(&addr).serve(make_svc); 
    println!("Listening on http://{}", addr); 
    if let Err(e) = server.await { 
        eprintln!("server error: {}", e); 
    } 
}

In this code:

  • The handle_request function is an asynchronous function that responds to each HTTP request. It uses create_message_stream to generate a stream of messages.
  • Body::wrap_stream converts our stream into a hyper body, which can be sent as an HTTP response.
  • The main function sets up the server using tokio as the async runtime. We define a service that handles incoming connections using our handle_request function.

Running the Server

To run the server, use cargo run. Once it's running, you can access http://127.0.0.1:3000 from a web browser or use a tool like curl to see the streamed response.

Consuming Streams

To consume items from a stream, you use methods like next, for_each, or collect. Let’s see another example, writing a simple function that returns a stream that asynchronously yields numbers from 1 to 5:

use futures::stream::{self, StreamExt}; 
 
async fn simple_number_stream() -> impl futures::Stream<Item = i32> { 
    stream::iter(1..=5) 
}

To consume items from a stream, you use methods like next, for_each, or collect. Here's an example using for_each:

#[tokio::main] 
async fn main() { 
    let number_stream = simple_number_stream(); 
 
    number_stream 
            .for_each(|number| { 
                println!("Received number: {}", number); 
                futures::future::ready(()) 
            }) 
            .await; 
}

Combining Streams with Other Async Operations

Streams can be combined with other async operations. For instance, you might want to perform an asynchronous operation for each item in the stream:

async fn process_number(number: i32) -> i32 { 
    // Simulate an async operation 
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; 
    number * 2 
} 
 
#[tokio::main] 
async fn main() { 
    let number_stream = simple_number_stream(); 
    number_stream 
        .map(|number| process_number(number)) 
        .buffer_unordered(5) 
        .for_each(|processed_number| { 
            println!("Processed number: {:?}", processed_number); 
            futures::future::ready(()) 
        }) 
        .await; 
}

Merging and Combining Streams in Rust

Merging and combining streams is a powerful feature in Rust’s asynchronous programming model, allowing developers to efficiently process multiple sequences of data concurrently. This capability is particularly useful in scenarios where data from different sources needs to be aggregated, processed, or transformed in a unified way.

Understanding the Concept

In Rust, streams can be merged or combined using various combinators provided by the futures crate. These combinators allow for operations like chaining, zipping, selecting, and merging multiple streams. Each combinator has its specific use case:

  • Chain: Concatenates two streams end-to-end.
  • Zip: Pairs elements from two streams one-by-one.
  • Merge: Interleaves elements from two streams as they become available.
  • Select: Picks elements from whichever stream yields first.

A Working Example: Merging Temperature Readings

Suppose you have two sensors providing temperature readings, and you want to process these readings concurrently. We’ll use the merge combinator to achieve this.

First, add the necessary dependencies to your Cargo.toml:

[dependencies] 
futures = "0.3" 
tokio = { version = "1", features = ["full"] }

Now, let’s implement the stream merging:

use futures::stream::{self, StreamExt}; 
use tokio::time::{self, Duration}; 
 
async fn sensor_readings(id: u8) -> impl futures::Stream<Item = String> { 
    stream::unfold(0, move |count| { 
        let delay = time::sleep(Duration::from_secs(1)); 
        async move { 
            delay.await; 
            let reading = format!("Sensor {}: Temperature {}°C", id, 20 + count % 5); 
            Some((reading, count + 1)) 
        } 
    }) 
} 
 
#[tokio::main] 
async fn main() { 
    let sensor1 = sensor_readings(1); 
    let sensor2 = sensor_readings(2); 
    let merged_stream = sensor1.merge(sensor2); 
    merged_stream 
        .take(10) // Taking only 10 readings for demonstration 
        .for_each(|reading| { 
            println!("{}", reading); 
            futures::future::ready(()) 
        }) 
        .await; 
}

Dependencies: We include futures for stream combinators and tokio for asynchronous runtime support.

Sensor Simulation: The sensor_readings function simulates asynchronous sensor readings. It uses stream::unfold to generate a stream of temperature readings, incrementing the temperature value on each tick.

Merging Streams: The merge combinator is used to interleave readings from both sensors. This means the merged stream will yield items from either of the two streams as soon as they are available.

Stream Processing: In the main function, we take only the first 10 readings from the merged stream for demonstration purposes. The for_each combinator is used to handle each item asynchronously.

Output: The program prints temperature readings from both sensors. Due to the nature of merge, the readings are interleaved, showing how data from two different asynchronous sources can be processed in a unified manner.


👏 Your Support Matters: If you enjoy the content, please give it claps on Medium and share the content away! Your support encourages me to continue creating and sharing valuable Rust resources.


Error Handling in Async Streams

Handling errors in asynchronous streams is crucial. Rust provides several ways to deal with errors that might occur during stream processing. For instance, you can use map_err to handle and transform errors or try_next for streams where you want to stop processing on the first error.

use futures::{stream, StreamExt, TryStreamExt}; 
 
async fn error_handling_stream() { 
    let number_stream = stream::iter(vec![Ok(1), Err("error"), Ok(2)]); 
    number_stream 
        .map_err(|e| println!("Error occurred: {}", e)) 
        .for_each(|number| { 
            if let Ok(number) = number { 
                println!("Received number: {}", number); 
            } 
            futures::future::ready(()) 
        }) 
        .await; 
}

Practical Applications

Asynchronous streams are particularly useful in web servers, data processing pipelines, and any application that requires handling multiple asynchronous events or IO operations. In web servers, for example, they enable efficient handling of incoming requests and streaming responses, which is vital for high-performance web applications.

Testing Asynchronous Streams

Testing is a critical part of developing with asynchronous streams. Rust’s ecosystem provides several tools and libraries for writing tests for async code. You can use the tokio::test attribute for testing async functions and leverage futures::executor::block_on for executing futures in tests.

Testing Example: Asynchronous Number Stream

Suppose you have an asynchronous stream function that generates a sequence of numbers. We’ll write a test for this function to verify its behavior.

First, make sure you have the tokio test dependency in your Cargo.toml:

[dependencies] 
futures = "0.3" 
tokio = { version = "1", features = ["full", "test-util"] }

Now, let’s define the asynchronous stream function and its test.

The Asynchronous Stream Function

use futures::stream::{self, StreamExt}; 
 
async fn number_stream() -> impl futures::Stream<Item = i32> { 
    stream::iter(1..=5) 
}

This function returns a stream that asynchronously yields numbers from 1 to 5.

Writing the Test

#[cfg(test)] 
mod tests { 
    use super::*; 
    use tokio::stream::StreamExt; 
    use tokio::test; 
 
#[test] 
    async fn test_number_stream() { 
        let mut stream = number_stream(); 
        let mut numbers = Vec::new(); 
        while let Some(number) = stream.next().await { 
            numbers.push(number); 
        } 
        assert_eq!(numbers, vec![1, 2, 3, 4, 5]); 
    } 
}

To run the test, use the cargo test command:

cargo test

This command will execute the test, and you should see output indicating whether the test passed or failed.

Wrapping Up!

The big takeaway here is how async programming in Rust, especially with streams, is like a superpower for your code. It lets you handle multiple things at once without getting tangled up, which is pretty awesome for any program that needs to deal with a lot of data or tasks simultaneously.

We dived into how streams work in Rust, looked at merging them (kind of like mixing different ingredients to bake a super cake), and even touched on how to test them (because nobody likes a buggy cake, right?).

It’s clear that Rust’s async streams are not just a fancy feature — they’re super practical. Whether you’re building a web server that can handle tons of requests or working on some complex data processing, these streams come in handy.

And yes, testing might not be the most exciting part, but it’s like making sure your cake doesn’t fall flat. The tools Rust provides, like the tokio test framework, make this a lot less painful.

Lastly, remember that learning all this stuff is a bit of a journey. Don’t worry if it doesn’t all click right away. There’s a great community out there and loads of resources to help you out. It’s like having a bunch of friends who are also into baking — always there to share recipes and tips.


👏 Your Support Matters: If you enjoy the content, please give it claps on Medium and share the content away! Your support encourages me to continue creating and sharing valuable Rust resources.


🚀 Explore More Rust Resources by Luis Soares

📚 Learning Hub: Dive into the world of Rust programming with my comprehensive collection of resources:

  1. Hands-On Tutorials with GitHub Repos: Get practical experience by following step-by-step tutorials, each accompanied by a dedicated GitHub repository. Access Tutorials
  2. In-Depth Guides & Articles: Understand key Rust concepts through detailed guides and articles, loaded with practical examples. Read More
  3. E-Book: “Mastering Rust Ownership”: Enhance your Rust skills with my free e-Book, a definitive guide to understanding ownership in Rust. Download eBook
  4. Project Showcases: Explore 10 fully functional Rust projects, including an API Gateway, Peer-to-Peer Database, FaaS Platform, Application Container, Event Broker, VPN Server, Network Traffic Analyzer, and more. View Projects
  5. LinkedIn Newsletter: Stay updated with the latest in Rust programming by subscribing to my newsletter on LinkedIn. Subscribe Here

🔗 Connect with Me:

  • Medium: Read my articles on Medium and give claps if you find them helpful. It motivates me to keep writing and sharing Rust content. Follow on Medium
  • Personal Blog: Discover more on my personal blog, a hub for all my Rust-related content. Visit Blog
  • LinkedIn: Join my professional network for more insightful discussions and updates. Connect on LinkedIn
  • Twitter: Follow me on Twitter for quick updates and thoughts on Rust programming. Follow on Twitter

Wanna talk? Leave a comment, and drop me a message!

All the best,

Luis Soares

CTO | Tech Lead | Senior Software Engineer | Cloud Solutions Architect | Rust 🦀 | Golang | Java | ML AI & Statistics | Web3 & Blockchain

Read more