Thread Boundaries in Rust

Hi, fellow Rustaceans! 🦀

Thread Boundaries in Rust

Hi, fellow Rustaceans! 🦀

Let’s get our hands dirty with Thread boundaries in Rust.

Thread boundaries separate the execution contexts of different threads. When data crosses these boundaries — moving from one thread to another — Rust’s type system ensures that this happens safely.

This is crucial because improper data sharing across threads can lead to race conditions, where two or more threads access shared data concurrently and at least one thread modifies the data, causing unexpected behavior.

Rust prevents such issues by ensuring that:

  1. Any data sent across threads is owned by the receiving thread.
  2. Shared access to mutable data is synchronized.

The Send Trait

The Send trait is a marker trait that indicates a type's values can be transferred across thread boundaries. In other words, if a type implements Send, it's safe to move it from one thread to another. This property is crucial for ensuring thread safety in concurrent Rust programs.

Most types in Rust are Send, but there are exceptions, especially with types that wrap around system resources or those that have non-thread-safe reference counting (e.g., Rc<T>). Types that involve raw pointers may also not be Send because they can lead to undefined behavior if not used carefully.

How Rust Uses the Send Trait

Rust’s compiler automatically implements Send for types that satisfy its requirements, which primarily involve ownership and thread safety. For example:

  • Primitive types are Send because they have no ownership semantics that could be violated across threads.
  • Types composed entirely of Send types are also Send.
  • Unique ownership types like Box<T> are Send if T is Send because ownership is transferred across threads.

However, shared ownership or non-thread-safe types like Rc<T> are not Send because they use non-atomic reference counting, which can lead to data races if accessed concurrently from multiple threads.

Using Send in Concurrent Programming

When writing concurrent code in Rust, you often work with APIs that require the Send trait. For example, when spawning a new thread using std::thread::spawn, the closure and its environment must be Send because they are moved to the new thread.

use std::thread; 
 
let x = 5; 
let handle = thread::spawn(move || { 
    println!("Value of x is: {}", x); 
}); 
 
handle.join().unwrap();

In this example, x is moved into the closure, which is executed by a new thread. Rust ensures that x is Send, allowing this operation to be safe.

Dealing with Non-Send Types

When you encounter a type that isn’t Send, you must use synchronization primitives or thread-safe wrappers to safely share or send it across threads. For instance, replacing Rc<T> with Arc<T> makes a type thread-safe and Send because Arc<T> uses atomic reference counting.

Threads Boundaries in Practice

Let’s now move from theory to exercises around thread boundaries and the Send trait to help you deepen your understanding of concurrent programming in Rust.

These exercises will range from basic to more complex scenarios, helping you grasp how to safely manage data across thread boundaries.

Exercise 1: Basic Send Compliance

Objective: Understand which types are Send and practice transferring ownership between threads.

  1. Create a struct MyStruct containing a Vec<i32> and a String.
  2. Implement a function spawn_and_move that takes an instance of MyStruct, spawns a new thread, and moves the instance into the thread. Print the contents of MyStruct inside the thread.
  3. Verify that MyStruct is Send and that your program compiles and runs correctly.
use std::thread; 
 
// Define MyStruct here 
fn spawn_and_move(data: MyStruct) { 
    thread::spawn(move || { 
        // Print contents of MyStruct 
    }); 
} 
fn main() { 
    let my_data = MyStruct { /* Initialize fields */ }; 
    spawn_and_move(my_data); 
}

Exercise 2: Non-Send Type Challenge

Objective: Experience firsthand what happens when you try to use a non-Send type across threads and learn how to fix it.

  1. Create a struct SharedData containing an Rc<Vec<i32>>.
  2. Write a function attempt_to_share that tries to spawn a thread and use SharedData inside it.
  3. Observe the compiler error and refactor SharedData to use Arc instead of Rc to fix the issue.
use std::rc::Rc; 
use std::sync::Arc; 
use std::thread; 
 
// Define SharedData with Rc first, then refactor to Arc 
fn attempt_to_share(data: SharedData) { 
    thread::spawn(move || { 
        // Attempt to access SharedData 
    }); 
} 
 
fn main() { 
    let shared_data = SharedData { /* Initialize fields */ }; 
    attempt_to_share(shared_data); 
}

Exercise 3: Implementing Send Manually

Objective: Deepen your understanding of Send by manually implementing it for a custom type that wraps a non-thread-safe type.

  1. Create a struct SafeWrapper<T> that wraps a *mut T (raw pointer).
  2. Implement Send for SafeWrapper<T> manually, ensuring that it's only Send when T is Send.
  3. Demonstrate using SafeWrapper to send a raw pointer across threads safely.
use std::marker::PhantomData; 
use std::thread; 
 
// Define SafeWrapper here 
unsafe impl<T> Send for SafeWrapper<T> where T: Send {} 
fn main() { 
    let raw_pointer = /* Allocate memory and get a raw pointer */; 
    let safe_wrapper = SafeWrapper::new(raw_pointer); 
    thread::spawn(move || { 
        // Use SafeWrapper 
    }); 
}

Exercise 4: Advanced Message Passing

Objective: Practice advanced message passing techniques between threads, emphasizing thread boundaries.

  1. Create two threads, producer and consumer.
  2. In producer, generate a series of MyStruct instances (from Exercise 1) and send them to consumer via a channel.
  3. In consumer, receive MyStruct instances and perform some computation on the data.
  4. Ensure proper synchronization and handling of the channel to prevent deadlocks.
use std::sync::mpsc; 
use std::thread; 
 
fn main() { 
    let (tx, rx) = mpsc::channel(); 
    let producer = thread::spawn(move || { 
        for _ in 0..10 { 
            let data = MyStruct { /* Initialize fields */ }; 
            tx.send(data).unwrap(); 
            // Optional: introduce a delay 
        } 
    }); 
    let consumer = thread::spawn(move || { 
        for received in rx { 
            // Process received MyStruct 
        } 
    }); 
    producer.join().unwrap(); 
    consumer.join().unwrap(); 
}

Exercise 5: Synchronizing Shared State

Objective: Learn to synchronize access to mutable shared state across threads using Mutex and Arc.

  1. Create a shared counter wrapped in a Mutex and an Arc.
  2. Spawn several threads that each increment the counter a fixed number of times.
  3. Ensure that each thread can safely access and modify the counter, and print the final value of the counter in the main thread.
use std::sync::{Arc, Mutex}; 
use std::thread; 
 
fn main() { 
    let counter = Arc::new(Mutex::new(0)); 
    let mut handles = vec![]; 
 
    for _ in 0..10 { 
        let counter = Arc::clone(&counter); 
        let handle = thread::spawn(move || { 
            let mut num = counter.lock().unwrap(); 
            *num += 1; 
        }); 
        handles.push(handle); 
    } 
 
    for handle in handles { 
        handle.join().unwrap(); 
    } 
 
    println!("Final counter value: {}", *counter.lock().unwrap()); 
 
}

Exercise 6: Error Handling Across Threads

Objective: Practice error handling in a multi-threaded context, ensuring that errors from worker threads are properly communicated back to the main thread.

  1. Define a task that can result in an error, such as attempting to open a non-existent file in a thread.
  2. Use a channel to send the result (either the successful outcome or the error) back to the main thread.
  3. In the main thread, handle the result by either processing the success case or reporting the error.
use std::fs::File; 
use std::sync::mpsc; 
use std::thread; 
use std::io::{self, Read}; 
 
fn main() { 
 
    let (tx, rx) = mpsc::channel(); 
    thread::spawn(move || { 
        let result = File::open("non_existent_file.txt") 
            .and_then(|mut file| { 
                let mut contents = String::new(); 
                file.read_to_string(&mut contents).map(|_| contents) 
            }); 
        tx.send(result).unwrap(); 
    }); 
 
    match rx.recv().unwrap() { 
        Ok(contents) => println!("File contents: {}", contents), 
        Err(e) => println!("Error reading file: {:?}", e), 
    } 
}

Exercise 7: Implementing a Barrier

Objective: Understand the use of synchronization primitives to manage the execution order in concurrent programming by implementing a barrier.

  1. Use Arc and Mutex to create a shared counter and a condition variable to implement a barrier that waits for all threads to reach a certain point before proceeding.
  2. Spawn multiple threads that perform some work, increment the counter, and then wait at the barrier.
  3. Once all threads have reached the barrier, allow them to proceed and complete their tasks.
use std::sync::{Arc, Condvar, Mutex}; 
use std::thread; 
 
fn main() { 
    let counter = Arc::new((Mutex::new(0), Condvar::new())); 
    let total_threads = 10; 
    for _ in 0..total_threads { 
        let counter = Arc::clone(&counter); 
        thread::spawn(move || { 
            let (lock, cvar) = &*counter; 
            let mut count = lock.lock().unwrap(); 
            *count += 1; 
            if *count < total_threads { 
                cvar.wait(count).unwrap(); 
            } else { 
                cvar.notify_all(); 
            } 
            // Thread proceeds after barrier 
        }); 
    } 
}

Exercise 8: Custom Thread Pool Implementation

Objective: Gain deeper insights into concurrent execution by implementing a basic thread pool, understanding task scheduling, and worker thread lifecycle.

  1. Define a ThreadPool struct with a vector of worker threads and a sender for a task channel.
  2. Implement a Worker struct that holds a thread and listens for tasks coming through a channel.
  3. In ThreadPool, implement a method to spawn worker threads and a method to send tasks to workers.
  4. Ensure that your thread pool can gracefully shut down, joining all threads and ensuring all tasks are completed.
use std::sync::{mpsc, Arc, Mutex}; 
use std::thread; 
 
type Job = Box<dyn FnOnce() + Send + 'static>; 
struct Worker { 
    id: usize, 
    thread: Option<thread::JoinHandle<()>>, 
} 
 
struct ThreadPool { 
    workers: Vec<Worker>, 
    sender: mpsc::Sender<Job>, 
} 
 
impl Worker { 
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { 
        let thread = thread::spawn(move || loop { 
            let job = receiver.lock().unwrap().recv().unwrap(); 
            println!("Worker {} got a job; executing.", id); 
            job(); 
        }); 
        Worker { 
            id, 
            thread: Some(thread), 
        } 
    } 
} 
 
impl ThreadPool { 
    fn new(size: usize) -> ThreadPool { 
        let (sender, receiver) = mpsc::channel(); 
        let receiver = Arc::new(Mutex::new(receiver)); 
        let mut workers = Vec::with_capacity(size); 
        for id in 0..size { 
            workers.push(Worker::new(id, Arc::clone(&receiver))); 
        } 
        ThreadPool { workers, sender } 
    } 
    fn execute<F>(&self, f: F) 
    where 
        F: FnOnce() + Send + 'static, 
    { 
        let job = Box::new(f); 
        self.sender.send(job).unwrap(); 
    } 
} 
 
fn main() { 
    let pool = ThreadPool::new(4); 
    for _ in 0..8 { 
        pool.execute(|| { 
            println!("Executing task"); 
        }); 
    } 
    // Implicitly handle graceful shutdown when ThreadPool is dropped 
}

Exercise 9: Atomic Operations for Synchronization

Objective: Explore the use of atomic operations to manage shared state without locking, understanding the performance implications and use cases.

  1. Use an AtomicUsize to implement a shared counter that can be safely incremented by multiple threads without using a mutex.
  2. Spawn multiple threads that each increment the counter a certain number of times.
  3. Ensure the final count reflects the total number of increments from all threads.
use std::sync::atomic::{AtomicUsize, Ordering}; 
use std::thread; 
 
fn main() { 
    let counter = AtomicUsize::new(0); 
    let mut handles = vec![]; 
    for _ in 0..10 { 
        let counter = &counter; 
        let handle = thread::spawn(move || { 
            for _ in 0..100 { 
                counter.fetch_add(1, Ordering::SeqCst); 
            } 
        }); 
        handles.push(handle); 
    } 
    for handle in handles { 
        handle.join().unwrap(); 
    } 
    println!("Final counter value: {}", counter.load(Ordering::SeqCst)); 
}

Exercise 10: Exploring Thread Local Storage

Objective: Understand thread local storage and its use cases by creating thread-specific data structures that are not shared across threads.

  1. Use the thread_local! macro to create a thread-local Vec<i32>.
  2. Spawn several threads, each manipulating its own thread-local vector (e.g., adding numbers).
  3. Demonstrate that each thread maintains its separate vector and that no data is shared between threads.
use std::thread; 
 
thread_local!(static NUMBERS: std::cell::RefCell<Vec<i32>> = std::cell::RefCell::new(Vec::new())); 
fn main() { 
    let handles: Vec<_> = (0..10).map(|i| { 
        thread::spawn(move || { 
            NUMBERS.with(|numbers| { 
                let mut numbers = numbers.borrow_mut(); 
                numbers.push(i); 
                println!("Thread {:?}: {:?}", thread::current().id(), *numbers); 
            }); 
        }) 
    }).collect(); 
    for handle in handles { 
        handle.join().unwrap(); 
    } 
}

🚀 Explore More by Luis Soares

đź“š Learning Hub: Expand your knowledge in various tech domains, including Rust, Software Development, Cloud Computing, Cyber Security, Blockchain, and Linux, through my extensive resource collection:

  • Hands-On Tutorials with GitHub Repos: Gain practical skills across different technologies with step-by-step tutorials, complemented by dedicated GitHub repositories. Access Tutorials
  • In-Depth Guides & Articles: Deep dive into core concepts of Rust, Software Development, Cloud Computing, and more, with detailed guides and articles filled with practical examples. Read More
  • E-Books Collection: Enhance your understanding of various tech fields with a series of free e-Books, including titles like “Mastering Rust Ownership” and “Application Security Guide” Download eBook
  • Project Showcases: Discover a range of fully functional projects across different domains, such as an API Gateway, Blockchain Network, Cyber Security Tools, Cloud Services, and more. View Projects
  • LinkedIn Newsletter: Stay ahead in the fast-evolving tech landscape with regular updates and insights on Rust, Software Development, and emerging technologies by subscribing to my newsletter on LinkedIn. Subscribe Here

đź”— Connect with Me:

  • Medium: Read my articles on Medium and give claps if you find them helpful. It motivates me to keep writing and sharing Rust content. Follow on Medium
  • Personal Blog: Discover more on my personal blog, a hub for all my Rust-related content. Visit Blog
  • LinkedIn: Join my professional network for more insightful discussions and updates. Connect on LinkedIn
  • Twitter: Follow me on Twitter for quick updates and thoughts on Rust programming. Follow on Twitter

Wanna talk? Leave a comment or drop me a message!

All the best,

Luis Soares
luis.soares@linux.com

Senior Software Engineer | Cloud Engineer | SRE | Tech Lead | Rust | Golang | Java | ML AI & Statistics | Web3 & Blockchain

Read more