Thread Boundaries in Rust
Hi, fellow Rustaceans! 🦀
Hi, fellow Rustaceans! 🦀
Let’s get our hands dirty with Thread boundaries in Rust.
Thread boundaries separate the execution contexts of different threads. When data crosses these boundaries — moving from one thread to another — Rust’s type system ensures that this happens safely.
This is crucial because improper data sharing across threads can lead to race conditions, where two or more threads access shared data concurrently and at least one thread modifies the data, causing unexpected behavior.
Rust prevents such issues by ensuring that:
- Any data sent across threads is owned by the receiving thread.
- Shared access to mutable data is synchronized.
The Send
Trait
The Send
trait is a marker trait that indicates a type's values can be transferred across thread boundaries. In other words, if a type implements Send
, it's safe to move it from one thread to another. This property is crucial for ensuring thread safety in concurrent Rust programs.
Most types in Rust are Send
, but there are exceptions, especially with types that wrap around system resources or those that have non-thread-safe reference counting (e.g., Rc<T>
). Types that involve raw pointers may also not be Send
because they can lead to undefined behavior if not used carefully.
How Rust Uses the Send
Trait
Rust’s compiler automatically implements Send
for types that satisfy its requirements, which primarily involve ownership and thread safety. For example:
- Primitive types are
Send
because they have no ownership semantics that could be violated across threads. - Types composed entirely of
Send
types are alsoSend
. - Unique ownership types like
Box<T>
areSend
ifT
isSend
because ownership is transferred across threads.
However, shared ownership or non-thread-safe types like Rc<T>
are not Send
because they use non-atomic reference counting, which can lead to data races if accessed concurrently from multiple threads.
Using Send
in Concurrent Programming
When writing concurrent code in Rust, you often work with APIs that require the Send
trait. For example, when spawning a new thread using std::thread::spawn
, the closure and its environment must be Send
because they are moved to the new thread.
use std::thread;
let x = 5;
let handle = thread::spawn(move || {
println!("Value of x is: {}", x);
});
handle.join().unwrap();
In this example, x
is moved into the closure, which is executed by a new thread. Rust ensures that x
is Send
, allowing this operation to be safe.
Dealing with Non-Send
Types
When you encounter a type that isn’t Send
, you must use synchronization primitives or thread-safe wrappers to safely share or send it across threads. For instance, replacing Rc<T>
with Arc<T>
makes a type thread-safe and Send
because Arc<T>
uses atomic reference counting.
Threads Boundaries in Practice
Let’s now move from theory to exercises around thread boundaries and the Send
trait to help you deepen your understanding of concurrent programming in Rust.
These exercises will range from basic to more complex scenarios, helping you grasp how to safely manage data across thread boundaries.
Exercise 1: Basic Send
Compliance
Objective: Understand which types are Send
and practice transferring ownership between threads.
- Create a struct
MyStruct
containing aVec<i32>
and aString
. - Implement a function
spawn_and_move
that takes an instance ofMyStruct
, spawns a new thread, and moves the instance into the thread. Print the contents ofMyStruct
inside the thread. - Verify that
MyStruct
isSend
and that your program compiles and runs correctly.
use std::thread;
// Define MyStruct here
fn spawn_and_move(data: MyStruct) {
thread::spawn(move || {
// Print contents of MyStruct
});
}
fn main() {
let my_data = MyStruct { /* Initialize fields */ };
spawn_and_move(my_data);
}
Exercise 2: Non-Send
Type Challenge
Objective: Experience firsthand what happens when you try to use a non-Send
type across threads and learn how to fix it.
- Create a struct
SharedData
containing anRc<Vec<i32>>
. - Write a function
attempt_to_share
that tries to spawn a thread and useSharedData
inside it. - Observe the compiler error and refactor
SharedData
to useArc
instead ofRc
to fix the issue.
use std::rc::Rc;
use std::sync::Arc;
use std::thread;
// Define SharedData with Rc first, then refactor to Arc
fn attempt_to_share(data: SharedData) {
thread::spawn(move || {
// Attempt to access SharedData
});
}
fn main() {
let shared_data = SharedData { /* Initialize fields */ };
attempt_to_share(shared_data);
}
Exercise 3: Implementing Send
Manually
Objective: Deepen your understanding of Send
by manually implementing it for a custom type that wraps a non-thread-safe type.
- Create a struct
SafeWrapper<T>
that wraps a*mut T
(raw pointer). - Implement
Send
forSafeWrapper<T>
manually, ensuring that it's onlySend
whenT
isSend
. - Demonstrate using
SafeWrapper
to send a raw pointer across threads safely.
use std::marker::PhantomData;
use std::thread;
// Define SafeWrapper here
unsafe impl<T> Send for SafeWrapper<T> where T: Send {}
fn main() {
let raw_pointer = /* Allocate memory and get a raw pointer */;
let safe_wrapper = SafeWrapper::new(raw_pointer);
thread::spawn(move || {
// Use SafeWrapper
});
}
Exercise 4: Advanced Message Passing
Objective: Practice advanced message passing techniques between threads, emphasizing thread boundaries.
- Create two threads,
producer
andconsumer
. - In
producer
, generate a series ofMyStruct
instances (from Exercise 1) and send them toconsumer
via a channel. - In
consumer
, receiveMyStruct
instances and perform some computation on the data. - Ensure proper synchronization and handling of the channel to prevent deadlocks.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let producer = thread::spawn(move || {
for _ in 0..10 {
let data = MyStruct { /* Initialize fields */ };
tx.send(data).unwrap();
// Optional: introduce a delay
}
});
let consumer = thread::spawn(move || {
for received in rx {
// Process received MyStruct
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
Exercise 5: Synchronizing Shared State
Objective: Learn to synchronize access to mutable shared state across threads using Mutex
and Arc
.
- Create a shared
counter
wrapped in aMutex
and anArc
. - Spawn several threads that each increment the
counter
a fixed number of times. - Ensure that each thread can safely access and modify the
counter
, and print the final value of thecounter
in the main thread.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final counter value: {}", *counter.lock().unwrap());
}
Exercise 6: Error Handling Across Threads
Objective: Practice error handling in a multi-threaded context, ensuring that errors from worker threads are properly communicated back to the main thread.
- Define a task that can result in an error, such as attempting to open a non-existent file in a thread.
- Use a channel to send the result (either the successful outcome or the error) back to the main thread.
- In the main thread, handle the result by either processing the success case or reporting the error.
use std::fs::File;
use std::sync::mpsc;
use std::thread;
use std::io::{self, Read};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let result = File::open("non_existent_file.txt")
.and_then(|mut file| {
let mut contents = String::new();
file.read_to_string(&mut contents).map(|_| contents)
});
tx.send(result).unwrap();
});
match rx.recv().unwrap() {
Ok(contents) => println!("File contents: {}", contents),
Err(e) => println!("Error reading file: {:?}", e),
}
}
Exercise 7: Implementing a Barrier
Objective: Understand the use of synchronization primitives to manage the execution order in concurrent programming by implementing a barrier.
- Use
Arc
andMutex
to create a shared counter and a condition variable to implement a barrier that waits for all threads to reach a certain point before proceeding. - Spawn multiple threads that perform some work, increment the counter, and then wait at the barrier.
- Once all threads have reached the barrier, allow them to proceed and complete their tasks.
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let counter = Arc::new((Mutex::new(0), Condvar::new()));
let total_threads = 10;
for _ in 0..total_threads {
let counter = Arc::clone(&counter);
thread::spawn(move || {
let (lock, cvar) = &*counter;
let mut count = lock.lock().unwrap();
*count += 1;
if *count < total_threads {
cvar.wait(count).unwrap();
} else {
cvar.notify_all();
}
// Thread proceeds after barrier
});
}
}
Exercise 8: Custom Thread Pool Implementation
Objective: Gain deeper insights into concurrent execution by implementing a basic thread pool, understanding task scheduling, and worker thread lifecycle.
- Define a
ThreadPool
struct with a vector of worker threads and a sender for a task channel. - Implement a
Worker
struct that holds a thread and listens for tasks coming through a channel. - In
ThreadPool
, implement a method to spawn worker threads and a method to send tasks to workers. - Ensure that your thread pool can gracefully shut down, joining all threads and ensuring all tasks are completed.
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
fn main() {
let pool = ThreadPool::new(4);
for _ in 0..8 {
pool.execute(|| {
println!("Executing task");
});
}
// Implicitly handle graceful shutdown when ThreadPool is dropped
}
Exercise 9: Atomic Operations for Synchronization
Objective: Explore the use of atomic operations to manage shared state without locking, understanding the performance implications and use cases.
- Use an
AtomicUsize
to implement a shared counter that can be safely incremented by multiple threads without using a mutex. - Spawn multiple threads that each increment the counter a certain number of times.
- Ensure the final count reflects the total number of increments from all threads.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
let counter = AtomicUsize::new(0);
let mut handles = vec![];
for _ in 0..10 {
let counter = &counter;
let handle = thread::spawn(move || {
for _ in 0..100 {
counter.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final counter value: {}", counter.load(Ordering::SeqCst));
}
Exercise 10: Exploring Thread Local Storage
Objective: Understand thread local storage and its use cases by creating thread-specific data structures that are not shared across threads.
- Use the
thread_local!
macro to create a thread-localVec<i32>
. - Spawn several threads, each manipulating its own thread-local vector (e.g., adding numbers).
- Demonstrate that each thread maintains its separate vector and that no data is shared between threads.
use std::thread;
thread_local!(static NUMBERS: std::cell::RefCell<Vec<i32>> = std::cell::RefCell::new(Vec::new()));
fn main() {
let handles: Vec<_> = (0..10).map(|i| {
thread::spawn(move || {
NUMBERS.with(|numbers| {
let mut numbers = numbers.borrow_mut();
numbers.push(i);
println!("Thread {:?}: {:?}", thread::current().id(), *numbers);
});
})
}).collect();
for handle in handles {
handle.join().unwrap();
}
}
🚀 Explore More by Luis Soares
đź“š Learning Hub: Expand your knowledge in various tech domains, including Rust, Software Development, Cloud Computing, Cyber Security, Blockchain, and Linux, through my extensive resource collection:
- Hands-On Tutorials with GitHub Repos: Gain practical skills across different technologies with step-by-step tutorials, complemented by dedicated GitHub repositories. Access Tutorials
- In-Depth Guides & Articles: Deep dive into core concepts of Rust, Software Development, Cloud Computing, and more, with detailed guides and articles filled with practical examples. Read More
- E-Books Collection: Enhance your understanding of various tech fields with a series of free e-Books, including titles like “Mastering Rust Ownership” and “Application Security Guide” Download eBook
- Project Showcases: Discover a range of fully functional projects across different domains, such as an API Gateway, Blockchain Network, Cyber Security Tools, Cloud Services, and more. View Projects
- LinkedIn Newsletter: Stay ahead in the fast-evolving tech landscape with regular updates and insights on Rust, Software Development, and emerging technologies by subscribing to my newsletter on LinkedIn. Subscribe Here
đź”— Connect with Me:
- Medium: Read my articles on Medium and give claps if you find them helpful. It motivates me to keep writing and sharing Rust content. Follow on Medium
- Personal Blog: Discover more on my personal blog, a hub for all my Rust-related content. Visit Blog
- LinkedIn: Join my professional network for more insightful discussions and updates. Connect on LinkedIn
- Twitter: Follow me on Twitter for quick updates and thoughts on Rust programming. Follow on Twitter
Wanna talk? Leave a comment or drop me a message!
All the best,
Luis Soares
luis.soares@linux.com
Senior Software Engineer | Cloud Engineer | SRE | Tech Lead | Rust | Golang | Java | ML AI & Statistics | Web3 & Blockchain