GET and SET

We will never find the command that Nedry used. He's covered his tracks far too well.

Jurassic Park (1993)

The goal of this chapter is to implement the two commands GET and SET, arguably the core of a key/value store like Redis. We will go through the basic implementation of the storage in memory, and we won't implement persistence yet (writing data on the file system). However, in the next chapter we will implement key expiry, both passive and active.

The features we will add in this chapter are going to increase the complexity of the system, and it will become increasingly clear that we need to change our approach. In chapter 5 we will therefore refactor the whole system into a set of independent asynchronous tasks. This change is essential to enable us to tackle replication and transactions in an elegant way.

Step 3.1 - Create the storage manager#

In order to properly manage storage[1], it's important to have a data type that allows us to manipulate data in a simple way. We will then introduce a struct Storage that captures the stored data and the low-level commands that the system exposes.

Let's start, as usual, by creating a new file with a nicely defined result type.

src/storage_result.rs
use std::fmt;

#[derive(Debug)]
pub enum StorageError {
    IncorrectRequest,
    CommandNotAvailable(String),
}

impl fmt::Display for StorageError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            StorageError::IncorrectRequest => {
                write!(f, "The client sent an incorrect request!")
            }
            StorageError::CommandNotAvailable(c) => {
                write!(f, "The requested command {} is not available!", c)
            }
        }
    }
}

pub type StorageResult<T> = Result<T, StorageError>;

At this point, we can replace ServerError::CommandError with StorageError variants. This will keep things coherent in terms of result types. We will however keep ServerError, as it will be useful in the future to manage other types of errors in the server.

The type Storage is at the moment just a wrapper around a std::collections::HashMap [docs] that contains instances of the enum StorageValue. This allows us to decouple the storage from the nature of the data itself.

src/storage.rs
use std::collections::HashMap;

#[derive(Debug, PartialEq)]
pub enum StorageValue {
    String(String),
}

pub struct Storage {
    store: HashMap<String, StorageValue>,
}

impl Storage {
    pub fn new() -> Self {
        let store: HashMap<String, StorageValue> = HashMap::new();

        Self { store: store }
    }
}

As usual, we need to add the new modules to main.rs.

src/main.rs
mod resp;
mod resp_result;
mod server;
mod storage;
mod storage_result;

We will move the two commands PING and ECHO into the storage, and define GET and SET there as well. In later chapters we will move the code of those commands again, but for the time being the storage looks like a good place to host them.

src/storage.rs
use crate::resp::RESP;
use crate::storage_result::{StorageError, StorageResult};
use std::collections::HashMap;

...

impl Storage {
    pub fn new() -> Self {
        let store: HashMap<String, StorageValue> = HashMap::new();

        Self { store: store }
    }

    // Process an incoming command with its parameters.
    pub fn process_command(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
        match command[0].to_lowercase().as_str() {
            "ping" => self.command_ping(&command),
            "echo" => self.command_echo(&command),
            _ => Err(StorageError::CommandNotAvailable(command[0].clone())),
        }
    }

    // The command `PING` responds with a simple string
    // that contains the value `PONG`.
    fn command_ping(&self, _command: &Vec<String>) -> StorageResult<RESP> {
        Ok(RESP::SimpleString("PONG".to_string()))
    }

    // The command `ECHO` responds with a bulk string
    // that contains the same value passed to `ECHO`.
    fn command_echo(&self, command: &Vec<String>) -> StorageResult<RESP> {
        Ok(RESP::BulkString(command[1].clone()))
    }
}

There are three tests for this module. Two of them, test_command_ping and test_command_echo have been moved here from src/server.rs.

src/storage.rs
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    // Test that a new storage can be created
    // and that it is empty.
    fn test_create_new() {
        let storage: Storage = Storage::new();

        assert_eq!(storage.store.len(), 0);
    }

    #[test]
    // Test that the storage provides the function
    // command_ping, and that its output is correct.
    // Check the command in lowercase format.
    fn test_command_ping() {
        let command = vec![String::from("ping")];
        let storage: Storage = Storage::new();

        let output = storage.command_ping(&command).unwrap();

        assert_eq!(output, RESP::SimpleString(String::from("PONG")));
    }

    #[test]
    // Test that the storage provides the function
    // command_ping, and that its output is correct.
    // Check the command in uppercase format.
    fn test_command_ping_uppercase() {
        let command = vec![String::from("PING")];
        let storage: Storage = Storage::new();

        let output = storage.command_ping(&command).unwrap();

        assert_eq!(output, RESP::SimpleString(String::from("PONG")));
    }

    #[test]
    // Test that the storage provides the function
    // command_echo and that its output is correct.
    fn test_command_echo() {
        let command = vec![String::from("echo"), String::from("42")];
        let storage: Storage = Storage::new();

        let output = storage.command_echo(&command).unwrap();

        assert_eq!(output, RESP::BulkString(String::from("42")));
    }
}

Step 3.2 - Use the storage manager#

We need now to actually instantiate and use the storage manager. This introduces a new problem, that of sharing values across asynchronous tasks. It is a problem that has many ramifications, and in this chapter we will see only some of them.

In Rust, a variable that is passed to a function is owned by that function. This, practically speaking, means that we cannot instantiate a variable of type Storage and then pass it to each connection that we create. When we pass it to the first connection the ownership is gone.

For example:

src/main.rs
use crate::resp::{bytes_to_resp, RESP};
use crate::server::process_request;
use crate::storage::Storage;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream},
};

...

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create the TCP listener, bound to the
    // standard Redis port.
    let listener = TcpListener::bind("127.0.0.1:6379").await?;

    let mut storage = Storage::new();

    loop {
        // Process each incoming connection.
        match listener.accept().await {
            // The connection is valid, handle it.
            Ok((stream, _)) => {
                tokio::spawn(handle_connection(stream));
                tokio::spawn(handle_connection(stream, storage));
            }
            Err(e) => {
                println!("Error: {}", e);
                continue;
            }
        }
    }
}

async fn handle_connection(mut stream: TcpStream) {
async fn handle_connection(mut stream: TcpStream, mut storage: Storage) {
    ...

This code is an attempt to create the storage and then pass it to the connection handlers. The compiler however will not accept it:

error[E0382]: use of moved value: `storage`
   |
   |     let mut storage = Storage::new();
   |         ----------- move occurs because `storage` has type `storage::Storage`, which does not implement the `Copy` trait
...
   |     loop {
   |     ---- inside of this loop
...
   |                 tokio::spawn(handle_connection(stream, storage));
   |                                                        ^^^^^^^ value moved here, in previous iteration of loop
   |

This behaviour comes directly from the language rules: there cannot be more than one owner.

We might consider passing a reference, which works as long as we don't need to mutate the referenced value. The reason is that there can be only one mutable reference alive at a time. For example

src/main.rs
use crate::resp::{bytes_to_resp, RESP};
use crate::server::process_request;
use crate::storage::Storage;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream},
};

...

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create the TCP listener, bound to the
    // standard Redis port.
    let listener = TcpListener::bind("127.0.0.1:6379").await?;

    let mut storage = Storage::new();

    loop {
        // Process each incoming connection.
        match listener.accept().await {
            // The connection is valid, handle it.
            Ok((stream, _)) => {
                tokio::spawn(handle_connection(stream));
                tokio::spawn(handle_connection(stream, &mut storage));
            }
            Err(e) => {
                println!("Error: {}", e);
                continue;
            }
        }
    }
}

async fn handle_connection(mut stream: TcpStream) {
async fn handle_connection(mut stream: TcpStream, storage: &mut Storage) {
    ...

The compiler is indeed still not happy with this solution

error[E0499]: cannot borrow `storage` as mutable more than once at a time
   |
   |                 tokio::spawn(handle_connection(stream, &mut storage));
   |                              --------------------------^^^^^^^^^^^^-
   |                              |                         |
   |                              |                         `storage` was mutably borrowed
   |                              |                         here in the previous iteration
   |                              |                         of the loop
   |                              argument requires that `storage` is borrowed
   |                              for `'static`

Having references that allow multiple concurrent reads is perfectly acceptable, but there should be only one reference that allows to write (mutate).

We actually have two problems to solve here:

  1. Grant multiple tasks access to the same resource, allowing them to both read and write.
  2. Make sure that the resource is always valid, forbidding one of the tasks from dropping it.

The first problem can be solved using a classic locking mechanism. A perfect structure for such a job is std::sync::Mutex [docs], as it allows multiple tasks to work on the same resources creating and releasing exclusive access locks.

A way to address the second problem is to use a reference counter. Such a structure keeps count of how many references exist for a resource, and drops it only when the number of references is zero. An implementation of this concept can be found in std::sync::Arc [docs], which increments reference counts atomically. This last requirement is important since asynchronous tasks ultimately run on threads, so it is important for operations to be thread-safe. Incrementing a value like a reference count in a non-atomic way is the classic example of an unsafe concurrent operation.

Reference counting and race conditions

Reference counting is one of the most important techniques used to manage memory allocation, and you might want to learn a bit about its pros and cons and the relationship between it and garbage collection algorithms.

If you plan to write concurrent code, you need to be familiar with the concept of race condition, which is one of many issues that poorly written multithreaded code can run into.

Practically speaking, this means that we can use Arc<Mutex<T>> to share access to a single value of type T. Variables of type Arc can be cloned to increment the reference count and whoever has a reference can try to lock the resource using Mutex::lock [docs], thereby gaining exclusive access.

src/main.rs
use crate::resp::{bytes_to_resp, RESP};
use crate::server::process_request;
use crate::storage::Storage;
use std::sync::{Arc, Mutex};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream},
};

mod resp;
mod resp_result;
mod server;
mod storage;
mod storage_result;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create the TCP listener, bound to the
    // standard Redis port.
    let listener = TcpListener::bind("127.0.0.1:6379").await?;

    // Create a storage and protect it against concurrency issues.
    let storage = Arc::new(Mutex::new(Storage::new()));

    loop {
        // Process each incoming connection.
        match listener.accept().await {
            // The connection is valid, handle it.
            Ok((stream, _)) => {
                // Spawn a task to take care of this connection.
                tokio::spawn(handle_connection(stream));
                tokio::spawn(handle_connection(stream, storage.clone()));
            }
            Err(e) => {
                println!("Error: {}", e);
                continue;
            }
        }
    }
}

// The main entry point for valid TCP connections.
async fn handle_connection(mut stream: TcpStream) {
async fn handle_connection(mut stream: TcpStream, storage: Arc<Mutex<Storage>>) {
    // Create a buffer to host incoming data.
    let mut buffer = [0; 512];

    loop {
        // Read from the stream into the buffer.
        match stream.read(&mut buffer).await {
            // If the stream returned some data,
            // process the request.
            Ok(size) if size != 0 => {
                // Initialise the index to start at the
                // beginning of the buffer.
                let mut index: usize = 0;

                // Process the bytes in the buffer according to
                // the content and extract the request. Update the index.
                let request = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
                    Ok(v) => v,
                    Err(e) => {
                        eprintln!("Error: {}", e);
                        return;
                    }
                };

                // Process the request.
                let response = match process_request(request) {
                let response = match process_request(request, storage.clone()) {
                    Ok(v) => v,
                    Err(e) => {
                        eprintln!("Error parsing command: {}", e);
                        return;
                    }
                };

...

As you can see, we are just passing an additional parameter to handle_connection and to process_request. The Arc is cloned in the main loop, and this increments the reference count that keeps the resource alive until every task is completed.

In the function process_request we need to move from ServerError::CommandError to the newly defined StorageError::IncorrectRequest. At the end of the function we also lock and use the storage.

src/server.rs
use crate::storage::Storage;
use crate::storage_result::{StorageError, StorageResult};
use crate::RESP;
use std::fmt;
use std::sync::{Arc, Mutex};

...

// Process an incoming request and return a result.
pub fn process_request(request: RESP) -> ServerResult<RESP> {
pub fn process_request(request: RESP, storage: Arc<Mutex<Storage>>) -> StorageResult<RESP> {
    // Check if the request is expressed using
    // a RESP array and extract the elements.
    let elements = match request {
        RESP::Array(v) => v,
        _ => {
            return Err(ServerError::CommandError);
            return Err(StorageError::IncorrectRequest);
        }
    };

    // The vector that contains all the commands we need to process.
    let mut command = Vec::new();

    // Check that each element of the array is a
    // bulk string, extract the content, and add it
    // to the vector.
    for elem in elements.iter() {
        match elem {
            RESP::BulkString(v) => command.push(v),
            RESP::BulkString(v) => command.push(v.clone()),
            _ => {
                return Err(ServerError::CommandError);
                return Err(StorageError::IncorrectRequest);
            }
        }
    }

    // Match the first element of the vector (the name
    // of the command) with the code that implements
    // that command.
    match command[0].to_lowercase().as_str() {
        "ping" => Ok(RESP::SimpleString(String::from("PONG"))),
        "echo" => Ok(RESP::BulkString(command[1].clone())),
        _ => {
            return Err(ServerError::CommandError);
        }
    }
    // Acquire a lock on the storage.
    let mut guard = storage.lock().unwrap();

    // Process the command contained in the request.
    let response = guard.process_command(&command);

    // Return the response.
    response
}

Please note that the vector command now needs to contain a clone of the bulk string because we are eventually transferring its ownership to storage.process_command.

We also need to change the tests accordingly.

src/server.rs
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    // Test that the function process_request
    // processes a PING request and that it
    // responds with a PONG.
    fn test_process_request_ping() {
        let request = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
        let storage = Arc::new(Mutex::new(Storage::new()));

        let output = process_request(request).unwrap();
        let output = process_request(request, storage).unwrap();

        assert_eq!(output, RESP::SimpleString(String::from("PONG")));
    }

    #[test]
    // Test that the function process_request
    // returns the correct error when it is
    // given a request that doesn't contain
    // a RESP array.
    fn test_process_request_not_array() {
        let request = RESP::BulkString(String::from("PING"));
        let storage = Arc::new(Mutex::new(Storage::new()));
 
        let error = process_request(request).unwrap_err();
        let error = process_request(request, storage).unwrap_err();
 
        assert_eq!(error, ServerError::CommandError);
        assert_eq!(error, StorageError::IncorrectRequest);
    }

    #[test]
    // Test that the function process_request
    // returns the correct error when it is
    // given a request that contains a RESP array
    // but the content of the array is not
    // a bulk string.
    fn test_process_request_not_bulkstrings() {
        let request = RESP::Array(vec![RESP::SimpleString(String::from("PING"))]);
        let storage = Arc::new(Mutex::new(Storage::new()));
 
        let error = process_request(request).unwrap_err();
        let error = process_request(request, storage).unwrap_err();
 
        assert_eq!(error, ServerError::CommandError);
        assert_eq!(error, StorageError::IncorrectRequest);
     }
    }

    #[test]
    // Test that the function process_request
    // processes an ECHO request and that it
    // responds with a copy of the input.
    fn test_process_request_echo() {
        let request = RESP::Array(vec![
            RESP::BulkString(String::from("ECHO")),
            RESP::BulkString(String::from("42")),
        ]);
        let storage = Arc::new(Mutex::new(Storage::new()));
 
        let output = process_request(request).unwrap();
        let output = process_request(request, storage).unwrap();

        assert_eq!(output, RESP::BulkString(String::from("42")));
    }
}

and to run those we need to add PartialEq to StorageError

src/storage_result.rs
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum StorageError {
    IncorrectRequest,
    CommandNotAvailable(String),
}

Last, since we don't need ServerResult and ServerError any more, we can remove the definitions. We will reintroduce them later, but since they are useless at the moment, it's better to remove them completely and take them back only when they are actually needed.

src/server.rs
use crate::storage::Storage;
use crate::storage_result::{StorageError, StorageResult};
use crate::RESP;
use std::fmt;

#[derive(Debug, PartialEq)]
pub enum ServerError {
    CommandError,
}

impl fmt::Display for ServerError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ServerError::CommandError => write!(f, "Error while processing!"),
        }
    }
}

pub type ServerResult<T> = Result<T, ServerError>;

CodeCrafters

Stage 5: Implement the ECHO command

Once again the code we wrote was in preparation for a future change, so the previous behaviour should be unchanged. This version of the code still passes Stage 5 of the CodeCrafters challenge.

Step 3.3 - Implement GET and SET#

The basic implementation of GET and SET is extremely simple. After all we are building a wrapper around a dictionary, so we can directly add two methods get and set to the struct Storage.

src/storage.rs
impl Storage {

    // Implement the `set` operation for the storage.
    fn set(&mut self, key: String, value: String) -> StorageResult<String> {
        self.store.insert(key, StorageValue::String(value));

        Ok(String::from("OK"))
    }

    // Implement the `get` operation for the storage.
    fn get(&self, key: String) -> StorageResult<Option<String>> {
        match self.store.get(&key) {
            Some(StorageValue::String(v)) => return Ok(Some(v.clone())),
            None => return Ok(None),
        }
    }

Here, we are simply using the methods HashMap::insert [docs] and HashMap::get [docs]. The tests for the functions we wrote are

src/storage.rs
mod tests {

    #[test]
    // Test that the function set works as expected.
    // When a key and value pair is stored the
    // output is the value, the storage contains
    // an element, and the value can be retrieved.
    fn test_set_value() {
        let mut storage: Storage = Storage::new();
        let avalue = StorageValue::String(String::from("avalue"));

        let output = storage
            .set(String::from("akey"), String::from("avalue"))
            .unwrap();

        assert_eq!(output, String::from("OK"));
        assert_eq!(storage.store.len(), 1);
        match storage.store.get(&String::from("akey")) {
            Some(value) => assert_eq!(value, &avalue),
            None => panic!(),
        }
    }

    #[test]
    // Test that the function get works as expected.
    // When a key value is retrieved, the output
    // is the value, and the key is not deleted
    // from the storage.
    fn test_get_value() {
        let mut storage: Storage = Storage::new();
        storage.store.insert(
            String::from("akey"),
            StorageValue::String(String::from("avalue")),
        );

        let result = storage.get(String::from("akey")).unwrap();

        assert_eq!(storage.store.len(), 1);
        assert_eq!(result, Some(String::from("avalue")));
    }

    #[test]
    // Test that the function get works as expected.
    // When a key doesn't exist the output is None, and
    // the storage is left unchanged.
    fn test_get_value_key_does_not_exist() {
        let storage: Storage = Storage::new();

        let result = storage.get(String::from("akey")).unwrap();

        assert_eq!(storage.store.len(), 0);
        assert_eq!(result, None);
    }

We must wrap these methods in order to expose them as commands. Wrappers are primarily useful for handling errors. For example, it's in the wrappers that we need to consider syntax errors in the commands passed by users.

src/storage.rs
impl Storage {

    // The command `SET` stores the given key and value
    // pair and responds with `OK`.
    fn command_set(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
        // Check the command length. The command
        // requires at least 2 parameters.
        if command.len() != 3 {
            return Err(StorageError::CommandSyntaxError(command.join(" ")));
        }

        // Use the function set to store the key and value pair.
        let _ = self.set(command[1].clone(), command[2].clone());

        Ok(RESP::SimpleString(String::from("OK")))
    }

    // The command `GET` retrieves the value of the given key
    // and responds with a bulk string that contains it.
    fn command_get(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
        // Check the command length. The command
        // requires at least 1 parameter.
        if command.len() != 2 {
            return Err(StorageError::CommandSyntaxError(command.join(" ")));
        }

        // Use the function get to retrieve the value of the given key.
        let output = self.get(command[1].clone());

        match output {
            // If the key corresponds to a value, return
            // it as a bulk string.
            Ok(Some(v)) => Ok(RESP::BulkString(v)),
            // If the key is not in the storage, return
            // a RESP null value.
            Ok(None) => Ok(RESP::Null),
            Err(_) => Err(StorageError::CommandInternalError(command.join(" "))),
        }
    }

The tests for the wrappers are

src/storage.rs
mod tests {

    #[test]
    // Test that the storage provides the function
    // command_set and that its output is correct.
    fn test_process_command_set() {
        let mut storage: Storage = Storage::new();
        let command = vec![
            String::from("set"),
            String::from("key"),
            String::from("value"),
        ];

        let output = storage.process_command(&command).unwrap();

        assert_eq!(output, RESP::SimpleString(String::from("OK")));
        assert_eq!(storage.store.len(), 1);
    }

    #[test]
    // Test that the storage provides the function
    // command_get and that its output is correct.
    fn test_process_command_get() {
        let mut storage: Storage = Storage::new();
        storage.store.insert(
            String::from("akey"),
            StorageValue::String(String::from("avalue")),
        );
        let command = vec![String::from("get"), String::from("akey")];

        let output = storage.process_command(&command).unwrap();

        assert_eq!(output, RESP::BulkString(String::from("avalue")));
        assert_eq!(storage.store.len(), 1);
    }

The code introduces two new errors.

src/storage_result.rs
#[derive(Debug)]
pub enum StorageError {
    IncorrectRequest,
    CommandNotAvailable(String),
    CommandSyntaxError(String),
    CommandInternalError(String),
}

impl fmt::Display for StorageError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            StorageError::IncorrectRequest => {
                write!(f, "The client sent an incorrect request!")
            }
            StorageError::CommandNotAvailable(c) => {
                write!(f, "The requested command {} is not available!", c)
            }
            StorageError::CommandSyntaxError(string) => {
                write!(f, "Syntax error while processing {}!", string)
            }
            StorageError::CommandInternalError(string) => {
                write!(f, "Internal error while processing {}!", string)
            }
        }
    }
}

and the tests use process_command, so we need to add these two commands to that function

src/storage.rs
impl Storage {

    // Process an incoming command with its parameters.
    pub fn process_command(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
        match command[0].to_lowercase().as_str() {
            "ping" => self.command_ping(&command),
            "echo" => self.command_echo(&command),
            "get" => self.command_get(&command),
            "set" => self.command_set(&command),
            _ => Err(StorageError::CommandNotAvailable(command[0].clone())),
        }
    }

CodeCrafters

Stage 6: Implement the SET & GET commands

It's time to update the testing suite and to check that this version of the code passes Stage 6 of the CodeCrafters challenge.