Building a P2P Database in Rust

Hey there! If you’ve landed here, you’re probably interested in understanding how distributed systems work, particularly in Rust. Today…

Building a P2P Database in Rust

Hey there! If you’ve landed here, you’re probably interested in understanding how distributed systems work, particularly in Rust. Today, we’re going to roll up our sleeves and get our hands dirty building a Peer-to-Peer (P2P) key-value database.

Here’s a rundown of what we’ll be covering:

  1. UDP Handshake: We’ll start by setting up a method for nodes to discover each other on the network using UDP.
  2. TCP Communication: UDP is fast, but it doesn’t guarantee message delivery. So, for more reliable communication, we’ll implement TCP connections between nodes.
  3. Concurrency with Tokio: We’ll use the Tokio runtime to allow our nodes to handle multiple tasks at once. This is especially important for network operations.
  4. Key-Value Store: At its core, our database will store data in key-value pairs. It’s a straightforward approach that we can always expand later.
  5. Command Line Interface (CLI): We’ll also build a simple CLI tool. This way, users can interact with our database directly from the terminal to set or get values.

Ready to dive in? Let’s start building!

Step 1: Setup and Dependencies

Start by setting up a new Rust project and adding necessary dependencies:

[dependencies] 
serde = { version = "1", features = ["derive"] } 
serde_json = "1" 
tokio = { version = "1", features = ["full"] } 
mac_address = "1"

Step 2: Define Communication Messages

Use Rust enums to define various messages that nodes can send to each other.

#[derive(Debug, Serialize, Deserialize)] 
enum Message { 
    Handshake { node_name: String, tcp_addr: SocketAddr }, 
    Greeting, 
    Heartbeat, 
    HeartbeatResponse, 
    SetValue { key: String, value: String }, 
    GetValue { key: String }, 
    ValueResponse { value: Option<String> }, 
    Sync { key: String, value: String }, 
}

These messages cover:

  • Handshaking for new nodes.
  • Exchanging greetings.
  • Sending and responding to heartbeats.
  • Setting, getting, and synchronizing key-value pairs.

Step 3: Design the Key-Value Store

Our database is a simple key-value store. We need concurrent access, hence wrapping the HashMap with RwLock.

struct KeyValueStore { 
    store: RwLock<HashMap<String, String>>, 
} 
 
impl KeyValueStore { 
    fn new() -> Self { 
        KeyValueStore { 
            store: RwLock::new(HashMap::new()), 
        } 
    } 
 
    async fn set(&self, key: String, value: String) { 
        let mut store = self.store.write().await; 
        store.insert(key, value); 
    } 
 
    async fn get(&self, key: &str) -> Option<String> { 
        let store = self.store.read().await; 
        store.get(key).cloned() 
    } 
}

The set and get async methods allow us to interact with the database.

Step 4: Tracking Nodes with NodeInfo

To keep track of other nodes in the network, use the NodeInfo struct:

struct NodeInfo { 
    last_seen: std::time::Instant, 
    tcp_addr: SocketAddr, 
}

It stores the last time we interacted with a node and its TCP address for direct communication.

Step 5: Network Initialization

For the initial handshake and node discovery, we use UDP. Each node announces its presence with its MAC address as a unique identifier:

fn get_mac_address() -> Result<String, MacAddressError> { 
 
    let mac = mac_address::get_mac_address()?; 
 
    match mac { 
        Some(address) => Ok(address.to_string()), 
        None => Err(MacAddressError::InternalError), 
    } 
}

In the main function, initialize the UDP socket and set it to broadcast mode:

let socket = UdpSocket::bind(&local_addr).await?; 
socket.set_broadcast(true)?;

Step 6: Announcing Node’s Presence

After obtaining its MAC address, each node broadcasts a Handshake message:

tokio::spawn(async move { 
        match get_mac_address() { 
            Ok(node_name) => { 
                let tcp_addr = format!("{}:{}", "0.0.0.0", TCP_PORT).parse().unwrap(); 
                let msg = Message::Handshake { 
                    node_name: node_name.clone(), 
                    tcp_addr, 
                }; 
        let serialized_msg = serde_json::to_string(&msg).unwrap(); 
 
        loop { 
            println!("Sending UDP broadcast..."); 
            socket_for_broadcast.send_to(serialized_msg.as_bytes(), BROADCAST_ADDR).await.unwrap(); 
            tokio::time::sleep(std::time::Duration::from_secs(5)).await; 
        } 
            }, 
            Err(e) => { 
                eprintln!("Error fetching MAC address: {:?}", e); 
            } 
        } 
    });

Other nodes store this information upon receipt, ensuring that they are aware of each other.


Download Now!


Step 7: Listening for TCP Connections

Nodes communicate directly using TCP. Initialize a TCP listener to handle incoming connections:

let listener = TcpListener::bind(("0.0.0.0", TCP_PORT)).await.unwrap();

For each connection, we spawn a new task to process the messages:

tokio::spawn(handle_tcp_stream(stream, nodes_clone.clone(), kv_store.clone()));

Step 8: Handling TCP Messages

Depending on the received message type:

  1. Heartbeat: Acknowledge with a HeartbeatResponse.
  2. SetValue: Store the key-value and synchronize with other nodes.
  3. GetValue: Retrieve the value from the store and respond.
  4. Sync: Update the store with the provided key-value pair.

This is managed within:

async fn handle_tcp_stream(mut stream: TcpStream, nodes: Arc<RwLock<HashMap<String, NodeInfo>>>, kv_store: Arc<KeyValueStore> ) { 
 
    let mut buf = vec![0u8; 1024]; 
    let len = stream.read(&mut buf).await.unwrap(); 
    let received_msg: Message = serde_json::from_slice(&buf[..len]).unwrap(); 
 
    match received_msg { 
        Message::Heartbeat => { 
            println!("Received Heartbeat"); 
            let response = Message::HeartbeatResponse; 
            let serialized_response = serde_json::to_string(&response).unwrap(); 
            stream.write_all(serialized_response.as_bytes()).await.unwrap(); 
        }, 
 
        Message::SetValue { key, value } => { 
            println!("Received SetValue"); 
            kv_store.set(key.clone(), value.clone()).await; 
 
            // Broadcast sync to all nodes 
            let nodes_guard = nodes.read().await; 
            for (_, node_info) in nodes_guard.iter() { 
                let mut stream = match TcpStream::connect(node_info.tcp_addr).await { 
                    Ok(stream) => stream, 
                    Err(_) => continue, 
                }; 
                let sync_msg = Message::Sync { key: key.clone(), value: value.clone() }; 
                let serialized_msg = serde_json::to_string(&sync_msg).unwrap(); 
                let _ = stream.write_all(serialized_msg.as_bytes()).await; 
            } 
 
            let response = Message::ValueResponse { value: Some("Value set successfully.".to_string()) }; 
            let serialized_response = serde_json::to_string(&response).unwrap(); 
            stream.write_all(serialized_response.as_bytes()).await.unwrap(); 
        }, 
 
        Message::GetValue { key } => { 
            println!("Received GetValue"); 
            let value = kv_store.get(&key).await; 
            let response = Message::ValueResponse { value }; 
            let serialized_response = serde_json::to_string(&response).unwrap(); 
            stream.write_all(serialized_response.as_bytes()).await.unwrap(); 
        }, 
 
        Message::Sync { key, value } => { 
            println!("Received Sync"); 
            kv_store.set(key, value).await; 
        }, 
        _ => {} 
    } 
}

Step 9: Continuous Broadcasting

Our nodes continuously broadcast their presence using a loop:

loop { 
    socket_for_broadcast.send_to(serialized_msg.as_bytes(), BROADCAST_ADDR).await.unwrap(); 
    tokio::time::sleep(std::time::Duration::from_secs(5)).await; 
}

Step 10: Listening for UDP Messages

Finally, each node needs to continuously listen for UDP messages:

let mut buf = vec![0u8; 1024]; 
    loop { 
        let (len, addr) = socket.recv_from(&mut buf).await?; 
        println!("Received data on UDP from {}", addr); 
        let received_msg: Message = serde_json::from_slice(&buf[..len])?; 
 
        let local_node_name = get_mac_address()?; 
 
        if let Message::Handshake { node_name, tcp_addr } = received_msg { 
            // Ignore packets from ourselves 
            if node_name == local_node_name { 
                continue; 
            } 
            println!("Received handshake from: {}", node_name); 
            { 
                let mut nodes_guard = nodes.write().await; 
                nodes_guard.insert(node_name.clone(), NodeInfo { last_seen: std::time::Instant::now(), tcp_addr }); 
            } 
 
            let greeting = Message::Greeting; 
            let serialized_greeting = serde_json::to_string(&greeting).unwrap(); 
            socket.send_to(serialized_greeting.as_bytes(), &addr).await?; 
 
            // Start heartbeat for this node 
            tokio::spawn(async move { 
                loop { 
                    tokio::time::sleep(std::time::Duration::from_secs(5)).await; 
                    println!("Sending heartbeat to {}", tcp_addr); 
                    let mut stream = TcpStream::connect(tcp_addr).await.unwrap(); 
                    let heartbeat_msg = Message::Heartbeat; 
                    let serialized_msg = serde_json::to_string(&heartbeat_msg).unwrap(); 
                    stream.write_all(serialized_msg.as_bytes()).await.unwrap(); 
                } 
            }); 
        } 
    }

Upon receiving a handshake, nodes send back a greeting and initiate a heartbeat mechanism.

Here is the final implementation:

use serde::{Deserialize, Serialize}; 
use tokio::{net::{TcpListener, TcpStream, UdpSocket}, sync::RwLock}; 
use std::collections::HashMap; 
use std::net::SocketAddr; 
use std::sync::Arc; 
use mac_address::MacAddressError; 
use tokio::io::{AsyncReadExt, AsyncWriteExt}; 
 
const BROADCAST_ADDR: &str = "255.255.255.255:8888"; 
const TCP_PORT: u16 = 9000; 
 
#[derive(Debug, Serialize, Deserialize)] 
enum Message { 
    Handshake { node_name: String, tcp_addr: SocketAddr }, 
    Greeting, 
    Heartbeat, 
    HeartbeatResponse, 
    SetValue { key: String, value: String }, 
    GetValue { key: String }, 
    ValueResponse { value: Option<String> }, 
    Sync { key: String, value: String }, 
} 
 
// Create a new struct for the key-value store 
struct KeyValueStore { 
    store: RwLock<HashMap<String, String>>, 
} 
 
impl KeyValueStore { 
    fn new() -> Self { 
        KeyValueStore { 
            store: RwLock::new(HashMap::new()), 
        } 
    } 
 
    async fn set(&self, key: String, value: String) { 
        let mut store = self.store.write().await; 
        store.insert(key, value); 
    } 
 
    async fn get(&self, key: &str) -> Option<String> { 
        let store = self.store.read().await; 
        store.get(key).cloned() 
    } 
} 
 
struct NodeInfo { 
    last_seen: std::time::Instant, 
    tcp_addr: SocketAddr, 
} 
 
fn get_mac_address() -> Result<String, MacAddressError> { 
    let mac = mac_address::get_mac_address()?; 
    match mac { 
        Some(address) => Ok(address.to_string()), 
        None => Err(MacAddressError::InternalError), 
    } 
} 
 
#[tokio::main] 
async fn main() -> Result<(), Box<dyn std::error::Error>> { 
    let local_addr: SocketAddr = "0.0.0.0:8888".parse()?; 
    let socket = UdpSocket::bind(&local_addr).await?; 
    socket.set_broadcast(true)?; 
 
    // Initialize the key-value store 
    let kv_store = Arc::new(KeyValueStore::new()); 
 
    let nodes = Arc::new(RwLock::new(HashMap::<String, NodeInfo>::new())); 
 
    // Use Arc to share the socket among tasks. 
    let socket = Arc::new(socket); 
    let socket_for_broadcast = socket.clone(); 
 
    tokio::spawn(async move { 
        match get_mac_address() { 
            Ok(node_name) => { 
                let tcp_addr = format!("{}:{}", "0.0.0.0", TCP_PORT).parse().unwrap(); 
                let msg = Message::Handshake { 
                    node_name: node_name.clone(), 
                    tcp_addr, 
                }; 
        let serialized_msg = serde_json::to_string(&msg).unwrap(); 
 
        loop { 
            println!("Sending UDP broadcast..."); 
            socket_for_broadcast.send_to(serialized_msg.as_bytes(), BROADCAST_ADDR).await.unwrap(); 
            tokio::time::sleep(std::time::Duration::from_secs(5)).await; 
        } 
            }, 
            Err(e) => { 
                eprintln!("Error fetching MAC address: {:?}", e); 
            } 
        } 
    }); 
 
    let nodes_clone = nodes.clone(); 
 
    tokio::spawn(async move { 
        let listener = TcpListener::bind(("0.0.0.0", TCP_PORT)).await.unwrap(); 
        println!("TCP listener started."); 
        while let Ok((stream, _)) = listener.accept().await { 
            println!("Accepted new TCP connection."); 
            tokio::spawn(handle_tcp_stream(stream, nodes_clone.clone(), kv_store.clone())); 
        } 
    }); 
 
    let mut buf = vec![0u8; 1024]; 
    loop { 
        let (len, addr) = socket.recv_from(&mut buf).await?; 
        println!("Received data on UDP from {}", addr); 
        let received_msg: Message = serde_json::from_slice(&buf[..len])?; 
 
        let local_node_name = get_mac_address()?; 
 
        if let Message::Handshake { node_name, tcp_addr } = received_msg { 
            // Ignore packets from ourselves 
            if node_name == local_node_name { 
                continue; 
            } 
            println!("Received handshake from: {}", node_name); 
            { 
                let mut nodes_guard = nodes.write().await; 
                nodes_guard.insert(node_name.clone(), NodeInfo { last_seen: std::time::Instant::now(), tcp_addr }); 
            } 
 
            let greeting = Message::Greeting; 
            let serialized_greeting = serde_json::to_string(&greeting).unwrap(); 
            socket.send_to(serialized_greeting.as_bytes(), &addr).await?; 
 
            // Start heartbeat for this node 
            tokio::spawn(async move { 
                loop { 
                    tokio::time::sleep(std::time::Duration::from_secs(5)).await; 
                    println!("Sending heartbeat to {}", tcp_addr); 
                    let mut stream = TcpStream::connect(tcp_addr).await.unwrap(); 
                    let heartbeat_msg = Message::Heartbeat; 
                    let serialized_msg = serde_json::to_string(&heartbeat_msg).unwrap(); 
                    stream.write_all(serialized_msg.as_bytes()).await.unwrap(); 
                } 
            }); 
        } 
    } 
} 
 
async fn handle_tcp_stream(mut stream: TcpStream, nodes: Arc<RwLock<HashMap<String, NodeInfo>>>, kv_store: Arc<KeyValueStore> ) { 
    let mut buf = vec![0u8; 1024]; 
    let len = stream.read(&mut buf).await.unwrap(); 
    let received_msg: Message = serde_json::from_slice(&buf[..len]).unwrap(); 
 
    match received_msg { 
        Message::Heartbeat => { 
            println!("Received Heartbeat"); 
            let response = Message::HeartbeatResponse; 
            let serialized_response = serde_json::to_string(&response).unwrap(); 
            stream.write_all(serialized_response.as_bytes()).await.unwrap(); 
        }, 
        Message::SetValue { key, value } => { 
            println!("Received SetValue"); 
            kv_store.set(key.clone(), value.clone()).await; 
 
            // Broadcast sync to all nodes 
            let nodes_guard = nodes.read().await; 
            for (_, node_info) in nodes_guard.iter() { 
                let mut stream = match TcpStream::connect(node_info.tcp_addr).await { 
                    Ok(stream) => stream, 
                    Err(_) => continue, 
                }; 
                let sync_msg = Message::Sync { key: key.clone(), value: value.clone() }; 
                let serialized_msg = serde_json::to_string(&sync_msg).unwrap(); 
                let _ = stream.write_all(serialized_msg.as_bytes()).await; 
            } 
 
            let response = Message::ValueResponse { value: Some("Value set successfully.".to_string()) }; 
            let serialized_response = serde_json::to_string(&response).unwrap(); 
            stream.write_all(serialized_response.as_bytes()).await.unwrap(); 
        }, 
        Message::GetValue { key } => { 
            println!("Received GetValue"); 
            let value = kv_store.get(&key).await; 
            let response = Message::ValueResponse { value }; 
            let serialized_response = serde_json::to_string(&response).unwrap(); 
            stream.write_all(serialized_response.as_bytes()).await.unwrap(); 
        }, 
        Message::Sync { key, value } => { 
            println!("Received Sync"); 
            kv_store.set(key, value).await; 
        }, 
        _ => {} 
    } 
}

Testing the P2P Nodes

Building a P2P system is only half the battle. Ensuring it behaves as expected through comprehensive testing is crucial. Let’s walk through how to test our P2P nodes.

1. Node Discovery with UDP Handshake

Start multiple nodes on different terminals to simulate a network of nodes. As they initiate, you should see UDP broadcasts from each node trying to discover peers. When a node receives a handshake message, it should log the discovery of the new node and store its details.

2. TCP Communication

Once nodes have discovered each other, they’ll begin establishing TCP connections for more reliable communication. In the terminal, you should observe logs indicating successful TCP connections, such as Accepted new TCP connection. If a connection fails, appropriate error messages should be displayed.

3. Key-Value Store Interactions

Utilize the CLI to set a key-value pair on one node. The receiving node should log the SetValue action and store the key-value pair. Simultaneously, it should broadcast a synchronization message to inform other nodes of the update. Other nodes should then log Received Sync and update their local stores. If you use the CLI to retrieve the value of a key from a different node, it should successfully return the value.

4. Node Heartbeats

With the nodes running continuously, every few seconds, you should notice nodes sending and receiving heartbeat messages. This activity can be identified by logs such as Sending heartbeat to [Node Address] and Received Heartbeat.

Implementing the Command Line Interface (CLI)

A robust and user-friendly CLI is crucial for directly interacting with our P2P database. Let’s build one to set, get, and observe data in our network with ease. We’ll utilize Rust’s powerful ecosystem and libraries to streamline the process.

1. Setting Up

To begin with, we’ll use the clap crate, which aids in creating command-line argument parsers. Add it to your Cargo.toml:

[dependencies] 
clap = "2"

2. Defining Commands

Let’s start by defining the basic commands we want:

  • set: To store a key-value pair in the database.
  • get: To retrieve the value of a given key.

In your main.rs, let's lay out our commands:

use std::net::SocketAddr; 
use serde::{Deserialize, Serialize}; 
use structopt::StructOpt; 
use tokio::io::{AsyncReadExt, AsyncWriteExt}; 
use tokio::net::TcpStream; 
 
#[derive(Debug, Serialize, Deserialize)] 
enum Message { 
    // ... existing variants 
    SetValue { key: String, value: String }, 
    GetValue { key: String }, 
    ValueResponse { value: Option<String> }, 
} 
 
 
#[derive(StructOpt, Debug)] 
#[structopt(name = "node-cli", about = "CLI for interacting with the P2P network.")] 
enum Cli { 
    /// Set a key-value pair on a specific node 
    Set { 
        #[structopt(short, long)] 
        node: String, 
        key: String, 
        value: String, 
    }, 
    /// Get value by key from a specific node 
    Get { 
        #[structopt(short, long)] 
        node: String, 
        key: String, 
    }, 
} 
 
 
#[tokio::main] 
async fn main() -> Result<(), Box<dyn std::error::Error>> { 
    let opt = Cli::from_args(); 
    handle_cli_command(opt).await 
}

3. Handling Commands

Once we’ve set up our command structure, it’s time to handle the user’s input and link it with our P2P database functions.

async fn handle_cli_command(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { 
    match cli { 
        Cli::Set { node, key, value } => { 
            let addr: SocketAddr = node.parse()?; 
            let mut stream = TcpStream::connect(addr).await?; 
            let msg = Message::SetValue { key, value }; 
            let serialized_msg = serde_json::to_string(&msg).unwrap(); 
            stream.write_all(serialized_msg.as_bytes()).await?; 
 
            let mut buf = vec![0u8; 1024]; 
            let len = stream.read(&mut buf).await?; 
            let response: Message = serde_json::from_slice(&buf[..len])?; 
            match response { 
                Message::ValueResponse { value: Some(resp) } => println!("{}", resp), 
                _ => println!("Unexpected response from node."), 
            } 
        } 
        Cli::Get { node, key } => { 
            let addr: SocketAddr = node.parse()?; 
            let mut stream = TcpStream::connect(addr).await?; 
            let msg = Message::GetValue { key }; 
            let serialized_msg = serde_json::to_string(&msg).unwrap(); 
            stream.write_all(serialized_msg.as_bytes()).await?; 
 
            let mut buf = vec![0u8; 1024]; 
            let len = stream.read(&mut buf).await?; 
            let response: Message = serde_json::from_slice(&buf[..len])?; 
            match response { 
                Message::ValueResponse { value: Some(resp) } => println!("{}", resp), 
                Message::ValueResponse { value: None } => println!("Key not found."), 
                _ => println!("Unexpected response from node."), 
            } 
        } 
    } 
 
    Ok(()) 
}

4. Testing the CLI

After starting the node, you can use the following commands to interact with the database:

Set Value: Store a key-value pair in the database.

set <key> <value>

Get Value: Retrieve the value associated with a given key.

get <key>

Remember to replace and with your desired key and value.

Example:

./rustdbcli set --node 0.0.0.0:9000 "testKey" "testValue"

And there you have it! You’ve built a functional CLI that interacts seamlessly with your P2P database. With a bit more polish and additional features, you’ll be well on your way to having a production-ready tool.

You can find the complete implementation over at my GitHub repository: https://github.com/luishsr/rustp2p/

Your feedback, suggestions, or contributions are always welcome.


Check out some interesting hands-on Rust articles!

🌟 Developing a Fully Functional API Gateway in Rust — Discover how to set up a robust and scalable gateway that stands as the frontline for your microservices.

🌟 Implementing a Network Traffic Analyzer — Ever wondered about the data packets zooming through your network? Unravel their mysteries with this deep dive into network analysis.

🌟 Building an Application Container in Rust — Join us in creating a lightweight, performant, and secure container from scratch! Docker’s got nothing on this. 😉

🌟 Crafting a Secure Server-to-Server Handshake with Rust & OpenSSL — 
If you’ve been itching to give your servers a unique secret handshake that only they understand, you’ve come to the right place. Today, we’re venturing into the world of secure server-to-server handshakes, using the powerful combo of Rust and OpenSSL.

🌟Building a Function-as-a-Service (FaaS) in Rust: If you’ve been exploring cloud computing, you’ve likely come across FaaS platforms like AWS Lambda or Google Cloud Functions. In this article, we’ll be creating our own simple FaaS platform using Rust.

🌟 Implementing a VPN Server in Rust: Interested in understanding the inner workings of a VPN? Thinking about setting up your own VPN server? Today, we’re taking a straightforward look at how to set up a basic VPN server using Rust.


Read more articles about Rust in my Rust Programming Library!


Download Now!


Visit my Blog for more articles, news, and software engineering stuff!

Follow me on Medium, LinkedIn, and Twitter.

Leave a comment, and drop me a message!

All the best,

Luis Soares

CTO | Tech Lead | Senior Software Engineer | Cloud Solutions Architect | Rust 🦀 | Golang | Java | ML AI & Statistics | Web3 & Blockchain

Read more