Rust Concurrency in Practice

Concurrency in Rust is based on ownership, types, and borrowing principles, which help manage memory safety without a garbage collector.

Rust Concurrency in Practice

Concurrency in Rust is based on ownership, types, and borrowing principles, which help manage memory safety without a garbage collector.

Rust achieves thread safety by ensuring that shared data is either immutable or only accessible from one thread at a time.

Key Concepts:

  • Threads: Rust uses threads to run multiple parts of your code simultaneously. The std::thread module allows you to create new threads.
  • Message Passing: Rust’s concurrency model follows the “Do not communicate by sharing memory; instead, share memory by communicating” philosophy. Channels in Rust, provided by the std::sync::mpsc module, facilitate this.
  • Shared State Concurrency: Rust allows for shared-state concurrency using Mutex and Arc from the std::sync module to safely share and mutate data across threads.
  • Sync and Send Traits: These marker traits ensure that only safe types are shared between threads. Send allows a type to be transferred across thread boundaries, while Sync allows a type to be safely shared through references.

Let’s now put it into practice with exercises to help you grasp or refresh these key concepts! 🦀

Exercises

Exercise 1: Spawning Threads

Create a simple program that spawns multiple threads and prints “Hello from thread!” from each thread.

use std::thread; 
use std::time::Duration; 
 
fn main() { 
    for i in 0..5 { 
        thread::spawn(move || { 
            println!("Hello from thread {}", i); 
            thread::sleep(Duration::from_millis(100)); 
        }); 
    } 
    thread::sleep(Duration::from_secs(1)); // Wait for all threads to complete 
}

Exercise 2: Message Passing

Implement a program where you spawn two threads: one for sending a message using a channel, and another for receiving and printing that message.

use std::sync::mpsc; 
use std::thread; 
 
fn main() { 
    let (tx, rx) = mpsc::channel(); 
    thread::spawn(move || { 
        let msg = String::from("Hello from the sender!"); 
        tx.send(msg).unwrap(); 
    }); 
    let received = rx.recv().unwrap(); 
    println!("Received: {}", received); 
}

Exercise 3: Shared State with Mutex

Create a program that uses a Mutex to safely increment a counter from multiple threads.

use std::sync::{Arc, Mutex}; 
use std::thread; 
 
fn main() { 
    let counter = Arc::new(Mutex::new(0)); 
    let mut handles = vec![]; 
    for _ in 0..10 { 
        let counter = Arc::clone(&counter); 
        let handle = thread::spawn(move || { 
            let mut num = counter.lock().unwrap(); 
            *num += 1; 
        }); 
        handles.push(handle); 
    } 
    for handle in handles { 
        handle.join().unwrap(); 
    } 
    println!("Result: {}", *counter.lock().unwrap()); 
}

Exercise 4: Building a Concurrent Web Server

This is a more advanced exercise. Implement a basic concurrent web server using TcpListener and threads. Accept connections, read the request, and respond with a fixed message.

use std::io::prelude::*; 
use std::net::{TcpListener, TcpStream}; 
use std::thread; 
use std::time::Duration; 
 
fn handle_connection(mut stream: TcpStream) { 
    let mut buffer = [0; 1024]; 
    stream.read(&mut buffer).unwrap(); 
    let response = "HTTP/1.1 200 OK\r\n\r\nHello from Rust server!"; 
    stream.write(response.as_bytes()).unwrap(); 
    stream.flush().unwrap(); 
} 
fn main() { 
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); 
    for stream in listener.incoming() { 
        let stream = stream.unwrap(); 
        thread::spawn(|| { 
            handle_connection(stream); 
        }); 
    } 
}

Advanced Exercise: Implementing a Thread Pool

An effective way to manage resources in a concurrent program is by using a thread pool. A thread pool creates a fixed number of threads at program start-up and reuses these threads to execute tasks. This exercise involves implementing a simple thread pool to handle tasks in a concurrent web server.

Step 1: Define the ThreadPool Structure

Start by defining the structure of your thread pool and the initial function to create a new thread pool with a specified number of threads.

struct ThreadPool { 
    workers: Vec<Worker>, 
} 
 
impl ThreadPool { 
    fn new(size: usize) -> ThreadPool { 
        assert!(size > 0); 
        let mut workers = Vec::with_capacity(size); 
        for _ in 0..size { 
            // Create and store workers 
        } 
        ThreadPool { workers } 
    } 
}

Step 2: Implement the Worker

Each worker will be responsible for executing tasks. A worker holds a thread and listens for tasks sent through a channel.

struct Worker { 
    id: usize, 
    thread: Option<thread::JoinHandle<()>>, 
} 
 
impl Worker { 
    fn new(id: usize) -> Worker { 
        let thread = thread::spawn(|| { 
            // Placeholder for the task execution logic 
        }); 
        Worker { 
            id, 
            thread: Some(thread), 
        } 
    } 
}

Step 3: Sending Tasks to Threads

Modify the ThreadPool to hold a sending end of a channel. Workers will listen on the receiving end for tasks to execute. Tasks can be represented as closures.

use std::sync::mpsc; 
use std::sync::Arc; 
use std::sync::Mutex; 
 
type Job = Box<dyn FnOnce() + Send + 'static>; 
struct ThreadPool { 
    workers: Vec<Worker>, 
    sender: mpsc::Sender<Job>, 
} 
// Modify the ThreadPool::new function to include a channel

Each worker’s thread should retrieve tasks from the channel and execute them.

Step 4: Execute Tasks in the ThreadPool

Implement a method on ThreadPool to send tasks to the worker threads for execution.

impl ThreadPool { 
    // ... 
 
    fn execute<F>(&self, f: F) 
    where 
      F: FnOnce() + Send + 'static, 
      { 
          let job = Box::new(f); 
          self.sender.send(job).unwrap(); 
      } 
}

Step 5: Graceful Shutdown

Implement logic for a graceful shutdown of the thread pool. Ensure that all workers finish their current tasks before the program exits.

impl Drop for ThreadPool { 
    fn drop(&mut self) { 
        for worker in &mut self.workers { 
            println!("Shutting down worker {}", worker.id); 
 
            if let Some(thread) = worker.thread.take() { 
                thread.join().unwrap(); 
            } 
        } 
    } 
}

Integrating the ThreadPool into the Web Server

Replace the simple thread spawn logic in your web server with the ThreadPool. This will limit the number of concurrent threads and reuse them for incoming requests, which is more efficient than spawning a new thread per request.

fn main() { 
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); 
    let pool = ThreadPool::new(4); // Adjust the number of threads in the pool as needed 
 
    for stream in listener.incoming() { 
        let stream = stream.unwrap(); 
        pool.execute(|| { 
            handle_connection(stream); 
        }); 
    } 
    println!("Shutting down."); 
}

Going Further: Async/Await in Rust

As you become more comfortable with threads, message passing, and shared state concurrency, it’s important to explore Rust’s asynchronous programming capabilities. The async/await syntax in Rust provides a powerful model for writing concurrent code that is both efficient and easy to read.

Exercise: Asynchronous File Reading

In this exercise, you’ll write a program that reads the contents of a file asynchronously. This will demonstrate how to perform I/O-bound tasks without blocking the execution of your program.

First, include the necessary crates in your Cargo.toml:

[dependencies] 
tokio = { version = "1", features = ["full"] } 
futures = "0.3"

Next, write the async function to read a file:

use tokio::fs::File; 
use tokio::io::AsyncReadExt; 
 
async fn read_file_async(path: &str) -> Result<String, std::io::Error> { 
    let mut file = File::open(path).await?; 
    let mut contents = String::new(); 
    file.read_to_string(&mut contents).await?; 
    Ok(contents) 
}

To execute the async function, use the Tokio runtime:

#[tokio::main] 
async fn main() { 
    let path = "data.txt"; 
    match read_file_async(path).await { 
        Ok(contents) => println!("File contents: {}", contents), 
        Err(e) => println!("Error reading the file: {:?}", e), 
    } 
}

Exercise: Concurrent Web Requests

In this exercise, you’ll make multiple web requests concurrently and wait for all of them to complete. This is a common scenario in web servers and clients that need to aggregate data from multiple sources.

First, add the reqwest and tokio crates to your Cargo.toml:

[dependencies] 
reqwest = { version = "0.11", features = ["json"] } 
tokio = { version = "1", features = ["full"] }

Then, write a function to perform a web request asynchronously:

use reqwest::Error; 
 
async fn fetch_url(url: &str) -> Result<String, Error> { 
    let response = reqwest::get(url).await?; 
    let body = response.text().await?; 
    Ok(body) 
}

Execute multiple requests concurrently and wait for all to complete:

#[tokio::main] 
async fn main() { 
    let urls = ["https://example.com", "https://api.example.com", "https://blog.example.com"]; 
 
    let fetches = futures::future::join_all(urls.iter().map(|&url| fetch_url(url))); 
    match fetches.await { 
        Ok(responses) => { 
            for response in responses { 
                match response { 
                    Ok(content) => println!("Fetched content: {}", content), 
                    Err(e) => println!("Failed to fetch: {:?}", e), 
                } 
            } 
        }, 
        Err(_) => println!("Error in executing requests"), 
    } 
}

🚀 Explore More by Luis Soares

📚 Learning Hub: Expand your knowledge in various tech domains, including Rust, Software Development, Cloud Computing, Cyber Security, Blockchain, and Linux, through my extensive resource collection:

  • Hands-On Tutorials with GitHub Repos: Gain practical skills across different technologies with step-by-step tutorials, complemented by dedicated GitHub repositories. Access Tutorials
  • In-Depth Guides & Articles: Deep dive into core concepts of Rust, Software Development, Cloud Computing, and more, with detailed guides and articles filled with practical examples. Read More
  • E-Books Collection: Enhance your understanding of various tech fields with a series of free e-Books, including titles like “Mastering Rust Ownership” and “Application Security Guide” Download eBook
  • Project Showcases: Discover a range of fully functional projects across different domains, such as an API Gateway, Blockchain Network, Cyber Security Tools, Cloud Services, and more. View Projects
  • LinkedIn Newsletter: Stay ahead in the fast-evolving tech landscape with regular updates and insights on Rust, Software Development, and emerging technologies by subscribing to my newsletter on LinkedIn. Subscribe Here

🔗 Connect with Me:

  • Medium: Read my articles on Medium and give claps if you find them helpful. It motivates me to keep writing and sharing Rust content. Follow on Medium
  • Personal Blog: Discover more on my personal blog, a hub for all my Rust-related content. Visit Blog
  • LinkedIn: Join my professional network for more insightful discussions and updates. Connect on LinkedIn
  • Twitter: Follow me on Twitter for quick updates and thoughts on Rust programming. Follow on Twitter

Wanna talk? Leave a comment or drop me a message!

All the best,

Luis Soares
luis.soares@linux.com

Senior Software Engineer | Cloud Engineer | SRE | Tech Lead | Rust | Golang | Java | ML AI & Statistics | Web3 & Blockchain

Read more