Transactions

I was scamming those guys for months. Every transaction, I kept a little.

Lethal Weapon 2 (1989)

In this chapter we start implementing another extension of the "Build your own Redis" challenge: transactions. A transaction is a mechanism that allows a client to group multiple commands and execute them as a single, uninterrupted sequence. The idea is simple: instead of sending commands one at a time and having them interleaved with commands from other clients, you wrap a batch of operations in a block that the server will execute without switching to another client midway.

Redis transactions revolve around three commands: MULTI, EXEC, and DISCARD. A client sends MULTI to signal that it wants to start a transaction. From that point on, every command the client sends is not executed immediately but queued on the server. When the client sends EXEC, the server runs all queued commands in order and returns an array with the results. If the client changes its mind, it can send DISCARD to throw away the queue and exit the transaction without running anything.

It is worth noting that Redis transactions are not traditional database transactions. They guarantee isolation — no other client can slip commands in between yours — and they are atomic in the sense that all commands are run in one go. However, if one command in the queue fails at runtime, the remaining commands still execute. There is no rollback. This is a deliberate design choice: Redis commands can only fail because of programming errors (wrong types, wrong number of arguments), and the Redis authors decided that aborting an entire batch for a bug that should be caught during development was not worth the complexity and performance cost of a rollback mechanism.

Before we get to the transaction commands, the CodeCrafters challenge asks us to implement INCR, a command that increments the integer value of a key. This is a natural precursor to transactions because it gives us a second write operation besides SET, which makes testing transactional behaviour much more interesting. After that, we will add client tracking to the server — a piece of infrastructure that transactions require, since the server needs to know which client is in a transaction and maintain a separate command queue for each one.

CodeCrafters

On the CodeCrafters website, you can activate the extension by clicking on the "Extensions" button at the end of the challenge and turning on "Transactions".

Manual tests

If you are running the CodeCrafters tests manually with make, you need to open the Makefile and start a new suite. Clone test_txn_with_redis into test_txn_with_redis_prog and follow the same process to add tests as you work through the chapter.

Step 9.1 - The INCR command - Stage 1/3#

The first step to implement INCR is rather straightforward. We already have a tidy way to add new commands: a new module with the actual logic and a new branch in server::process_request. The logic behind INCR is for now pretty basic, as the challenge wants us to check that the key exists and has a numerical value.

Let's start with an extremely basic implementation of INCR. This code always replies with the integer 42.

src/commands/incr.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::ServerValue;

pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {
    let result = ServerValue::RESP(RESP::Integer(42));

    request.data(result).await;
}

We can add the file to the modules list.

src/commands/mod.rs
pub mod echo;
pub mod get;
pub mod incr;
pub mod info;
pub mod ping;
pub mod psync;
pub mod replconf;
pub mod set;
pub mod wait;

Last, we add the command to server::process_request.

src/server.rs
use crate::commands::{echo, get, info, ping, psync, replconf, set, wait};
use crate::commands::{echo, get, incr, info, ping, psync, replconf, set, wait};

pub async fn process_request(request: Request, server: &mut Server) {

    // Process the request using the requested command.
    match command_name.as_str() {
        "echo" => {
            echo::command(server, &request, &command).await;
        }
        "get" => {
            get::command(server, &request, &command).await;
        }
        "incr" => {
            incr::command(server, &request, &command).await;
        }
        "info" => {
            info::command(server, &request, &command).await;
        }

        ...

The command is now available, and you can try to run the CodeCrafters tests. The server will reply with 42, so the tests will fail. To add the correct logic we can start with tests.

For a minimal implementation that passes the CodeCrafters test, we need the function command to do the following:

  • Return the incremented value when the key exists and contains a numeric value.
  • Store the incremented value.
  • Return the correct error when the storage is not initialised.
  • Return the correct error when the syntax is incorrect.

The CodeCrafters test for this stage checks only the first condition, but we are going to cover all four. We will add checks for non existing and non numeric keys later.

Let's begin handling the failure cases. Generally, it's convenient to structure software so that it rules out everything that can go wrong and then solves the happy path. The first test checks that the storage is initialised.

src/commands/incr.rs
#[cfg(test)]
mod tests {
    use super::*;
    use crate::server_result::ServerMessage;
    use tokio::sync::mpsc;

    #[tokio::test]
    // Test that the function command returns the
    // correct error when the storage is not initialised.
    async fn test_storage_not_initialised() {
        let mut server = Server::new("localhost".to_string(), 6379);

        let cmd = vec![String::from("incr"), String::from("key")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::StorageNotInitialised)
        );
    }
}

To pass this test the body of the function becomes the following.

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {
    let result = ServerValue::RESP(RESP::Integer(42));

    request.data(result).await;
    // Extract the storage from the server.
    let storage = match server.storage.as_mut() {
        Some(storage) => storage,
        None => {
            request.error(ServerError::StorageNotInitialised).await;
            return;
        }
    };
}

Next, let's handle the case of incorrect syntax.

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {
    // Extract the storage from the server.
    let storage = match server.storage.as_mut() {
        Some(storage) => storage,
        None => {
            request.error(ServerError::StorageNotInitialised).await;
            return;
        }
    };

    // Check that the command received a single argument.
    if command.len() != 2 {
        request
            .error(ServerError::CommandSyntaxError(command.join(" ")))
            .await;
        return;
    }
}


mod tests {

    use super::*;
    use crate::server_result::ServerMessage;
    use crate::storage::Storage;
    use tokio::sync::mpsc;

...

    #[tokio::test]
    // Test that the function command returns the
    // correct error when the syntax is wrong.
    async fn test_wrong_syntax() {
        let storage = Storage::new();

        let mut server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let cmd = vec![String::from("incr")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandSyntaxError("incr".to_string()))
        );
    }

Last, we need to check the core of the function. If the key exists in the storage, the value should be incremented and the new value must be both returned and stored back under the given key.

src/commands/incr.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::{ServerError, ServerValue};
use crate::set::SetArgs;

pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {
    // Extract the storage from the server.
    let storage = match server.storage.as_mut() {
        Some(storage) => storage,
        None => {
            request.error(ServerError::StorageNotInitialised).await;
            return;
        }
    };

    // Check that the command received a single argument.
    if command.len() != 2 {
        request
            .error(ServerError::CommandSyntaxError(command.join(" ")))
            .await;
        return;
    }

    // Extract the key from the command line.
    let key = command[1].clone();

    // Find the value of the key.
    let output = storage.get(key.clone());

    // Check the value of the key.
    let result = match output {
        // The value exists and it's not None.
        // Parse it into an i64.
        Ok(Some(v)) => match v.parse::<i64>() {
            // The value was successfully parsed.
            Ok(n) => {
                // Increment the value.
                let value = n + 1;

                // Try to store the incremented value.
                match storage.set(key.clone(), value.to_string(), SetArgs::new()) {
                    // Incremented value successfully stored,
                    // send it back to the client.
                    Ok(_) => Ok(ServerValue::RESP(RESP::Integer(value))),
                    // An error occurred.
                    Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
                }
            }
            // The value is not an integer.
            Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
        },
        // The value doesn't exist or
        // another error occurred.
        _ => Err(ServerError::CommandInternalError(command.join(" "))),
    };

    request.result(result).await;
}


mod tests {

    #[tokio::test]
    // Test that the function command processes
    // an `INCR` request when the key exists and
    // contains a numeric value.
    async fn test_command_key_exists() {
        let mut storage = Storage::new();
        storage
            .set("key".to_string(), "42".to_string(), SetArgs::new())
            .unwrap();

        let mut server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let cmd = vec![String::from("incr"), String::from("key")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::Integer(43)))
        );

        assert_eq!(
            server
                .storage
                .as_mut()
                .unwrap()
                .get("key".to_string())
                .unwrap(),
            Some("43".to_string())
        );
    }

As you noticed, the last change added a lot of code. The reason is that the function handles errors that are not actually covered by tests. We could have written the function as:

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {

    // Extract the key from the command line.
    let key = command[1].clone();

    let v = storage.get(key.clone()).unwrap().unwrap(); 1

    let value = v.parse::<i64>().unwrap() + 1; 2

    let _ = storage.set(key, value.to_string(), SetArgs::new()); 3

    request.data(ServerValue::RESP(RESP::Integer(value))).await;
}

The code is neither elegant nor simple to write. There are several things to point out:

  • storage.get 1 returns StorageResult<Option<String>>. Since we need the inner String we chain two unwrap calls. Each one is clearly a potential failure that we are ignoring.
  • v.parse::<i64> 2 returns Result<i64, _>. We need the integer, so we call unwrap once again.
  • storage.set 3 returns a Result. We don't need the Ok value, but we need to explicitly capture the result, ignoring it.

It is definitely possible to write code like this, but the fact that we need to add unwrap calls and capture outputs nudges us towards handling things properly.

We are covering some error cases that we can expose through tests, however.

src/commands/incr.rs
mod tests {

    #[tokio::test]
    // Test that the function command returns the
    // correct error when the key does not exist.
    async fn test_command_key_does_not_exist() {
        let storage = Storage::new();

        let mut server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let cmd = vec![String::from("incr"), String::from("key")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandInternalError("incr key".to_string()))
        );
    }

    #[tokio::test]
    // Test that the function command returns the
    // correct error when the key contains a
    // non-numeric value.
    async fn test_command_key_is_not_numeric() {
        let mut storage = Storage::new();
        storage
            .set("key".to_string(), "answer".to_string(), SetArgs::new())
            .unwrap();

        let mut server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let cmd = vec![String::from("incr"), String::from("key")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandInternalError("incr key".to_string()))
        );
    }

Not all conditions can be tested. Storage::get and Storage::set cannot be forced to return an error unless we mock them, and we already mentioned that mocks in Rust are not that simple, so we are not going to cover those conditions with tests. However, the fact that the code compiles tells us we covered all cases.

CodeCrafters

Transactions Stage 1: The INCR command (1/3)

The code we wrote in this section passes Transactions - Stage 1 of the CodeCrafters challenge.

Step 9.2 - The INCR command - Stage 2/3#

In the next stage, the challenge asks us to implement the peculiar behaviour of INCR when the key does not exist. In that case the key should be created and set to the value 1.

This leads to a change of the test test_command_key_does_not_exist that is currently checking that the function returns an error.

src/commands/incr.rs
mod tests {

    #[tokio::test]
    // Test that the function command returns the
    // correct error when the key does not exist.
    // Test that the function command processes
    // an `INCR` request when the key does not exist
    // and creates it with value 1.
    async fn test_command_key_does_not_exist() {
        let storage = Storage::new();

        let mut server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let cmd = vec![String::from("incr"), String::from("key")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandInternalError("incr key".to_string()))
            ServerMessage::Data(ServerValue::RESP(RESP::Integer(1)))
        );
    }

To pass this test, we can modify the branches of match output.

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {

    // Check the value of the key.
    let result = match output {
        // The value exists and it's not None.
        // Parse it into an i64.
        Ok(Some(v)) => match v.parse::<i64>() {
            // The value was successfully parsed.
            Ok(n) => {
                // Increment the value.
                let value = n + 1;

                // Try to store the incremented value.
                match storage.set(key.clone(), value.to_string(), SetArgs::new()) {
                    // Incremented value successfully stored,
                    // send it back to the client.
                    Ok(_) => Ok(ServerValue::RESP(RESP::Integer(value))),
                    // An error occurred.
                    Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
                }
            }
            // The value is not an integer.
            Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
        },
        // The value doesn't exist or
        // another error occurred.
        _ => Err(ServerError::CommandInternalError(command.join(" "))),
        // The value doesn't exist.
        Ok(None) => {
            // Set the value to 1.
            let value = 1;

            // Try to store the incremented value.
            match storage.set(key.clone(), value.to_string(), SetArgs::new()) {
                // Incremented value successfully stored,
                // send it back to the client.
                Ok(_) => Ok(ServerValue::RESP(RESP::Integer(value))),
                // An error occurred.
                Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
            }
        }
        // An error occurred while reading the key.
        Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
    };

This code passes the unit test and the CodeCrafters stage tests. However, as you can see the degree of duplication between Ok(Some(v)) and Ok(None) is pretty high.

We can reduce the duplication merging the branches observing that the behaviour of INCR is basically to give a non-existing key the value String::from("0"). This allows us to merge the branches Ok(Some(v)) and Ok(None).

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {

    // Check the value of the key.
    let result = match output {
        // The value exists and it's not None.
        // Parse it into an i64.
        Ok(Some(v)) => match v.parse::<i64>() {...},
        // The value doesn't exist.
        Ok(None) => {...}
        // The key exists but the value might be None.
        // If it is None, we act as if the key
        // was correctly read and has value "0",
        // which will then be incremented to 1,
        // stored, and sent back to the user.
        Ok(storage_value) => {
            // Convert None into a "0".
            let storage_value = match storage_value {
                Some(v) => v,
                None => String::from("0"),
            };

            // Parse the storage value into an i64.
            match storage_value.parse::<i64>() {
                // The value was successfully parsed.
                Ok(n) => {
                    // Increment the value.
                    let value = n + 1;

                    // Try to store the incremented value.
                    match storage.set(key.clone(), value.to_string(), SetArgs::new()) {
                        // Incremented value successfully stored,
                        // send it back to the client.
                        Ok(_) => Ok(ServerValue::RESP(RESP::Integer(value))),
                        // An error occurred.
                        Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
                    }
                }
                // The value is not an integer.
                Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
            }
        }
        // An error occurred while reading the key.
        Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
    };

The conversion between None and String::from("0") can be further simplified using unwrap_or_else [docs] and a closure [docs].

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {

    // Check the value of the key.
    let result = match output {
        // The key exists but the value might be None.
        // If it is None, we act as if the key
        // was correctly read and has value "0",
        // which will then be incremented to 1,
        // stored, and sent back to the user.
        Ok(storage_value) => {
            // Convert None into a "0".
            let storage_value = match storage_value {
                Some(v) => v,
                None => String::from("0"),
            };
            let storage_value = storage_value.unwrap_or_else(|| String::from("0"));

Closures, the operator ? and map_err [docs] could also be used to avoid calling Err(...) multiple times. However, the resulting code would be more difficult to read (at least for Rust beginners), so we won't do it here.

The final version of the code for this step is the following.

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {
    // Extract the storage from the server.
    let storage = match server.storage.as_mut() {
        Some(storage) => storage,
        None => {
            request.error(ServerError::StorageNotInitialised).await;
            return;
        }
    };

    // Check that the command received a single argument.
    if command.len() != 2 {
        request
            .error(ServerError::CommandSyntaxError(command.join(" ")))
            .await;
        return;
    }

    // Extract the key from the command line.
    let key = command[1].clone();

    // Find the value of the key.
    let output = storage.get(key.clone());

    // Check the value of the key.
    let result = match output {
        // The key exists but the value might be None.
        // If it is None, we act as if the key
        // was correctly read and has value "0",
        // which will then be incremented to 1,
        // stored, and sent back to the user.
        Ok(storage_value) => {
            // Convert None into a "0".
            let storage_value = storage_value.unwrap_or_else(|| String::from("0"));

            // Parse the storage value into an i64.
            match storage_value.parse::<i64>() {
                // The value was successfully parsed.
                Ok(n) => {
                    // Increment the value.
                    let value = n + 1;

                    // Try to store the incremented value.
                    match storage.set(key.clone(), value.to_string(), SetArgs::new()) {
                        // Incremented value successfully stored,
                        // send it back to the client.
                        Ok(_) => Ok(ServerValue::RESP(RESP::Integer(value))),
                        // An error occurred.
                        Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
                    }
                }
                // The value is not an integer.
                Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
            }
        }
        // An error occurred while reading the key.
        Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
    };

    request.result(result).await;
}

CodeCrafters

Transactions Stage 2: The INCR command (2/3)

The code we wrote in this section passes Transactions - Stage 2 of the CodeCrafters challenge.

Step 9.3 - The INCR command - Stage 3/3#

In the third stage of the Transactions extension we need to implement the behaviour of INCR when the value of the given key is not an integer. The command returns a specific string in the form of a RESP simple error.

The change is almost trivial, but we need to first implement the new RESP type.

src/resp.rs
#[derive(Debug, PartialEq, Clone)]
pub enum RESP {
    Array(Vec<RESP>),
    BulkString(String),
    Integer(i64),
    Null,
    RDBPrefix(usize),
    SimpleError(String),
    SimpleString(String),
}

impl fmt::Display for RESP {

            Self::BulkString(data) => format!("${}\r\n{}\r\n", data.len(), data),
            Self::Integer(data) => format!(":{}\r\n", data),
            Self::Null => String::from("$-1\r\n"),
            Self::RDBPrefix(data) => format!("${}\r\n", data.to_string()),
            Self::SimpleError(data) => format!("-{}\r\n", data),
            Self::SimpleString(data) => format!("+{}\r\n", data),

Now that we have a way to send back simple errors we can change the implementation of INCR and the relative test.

src/commands/incr.rs
pub async fn command(server: &mut Server, request: &Request, command: &Vec<String>) {

        // The key exists but the value might be None.
        // If it is None, we act as if the key
        // was correctly read and has value "0",
        // which will then be incremented to 1,
        // stored, and sent back to the user.
        Ok(storage_value) => {
            // Convert None into a "0".
            let storage_value = storage_value.unwrap_or_else(|| String::from("0"));

            // Parse the storage value into an i64.
            match storage_value.parse::<i64>() {
                // The value was successfully parsed.
                Ok(n) => {...}
                // The value is not an integer.
                Err(_) => Err(ServerError::CommandInternalError(command.join(" "))),
                Err(_) => Ok(ServerValue::RESP(RESP::SimpleError(String::from(
                    "ERR value is not an integer or out of range",
                )))),
            }
        }


mod tests {

    async fn test_command_key_is_not_numeric() {

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandInternalError("incr key".to_string()))
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleError(String::from(
                "ERR value is not an integer or out of range"
            ))))
        );
    }

CodeCrafters

Transactions Stage 3: The INCR command (3/3)

The code we wrote in this section passes Transactions - Stage 3 of the CodeCrafters challenge.

Step 9.4 - The MULTI command#

The following stage introduces the command MULTI. For this stage, the challenge requires the command to just return a simple string "OK". The implementation is straightforward.

src/commands/multi.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::ServerValue;

pub async fn command(_server: &Server, request: &Request, _command: &Vec<String>) {
    request
        .data(ServerValue::RESP(RESP::SimpleString("OK".to_string())))
        .await;
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::server_result::ServerMessage;
    use tokio::sync::mpsc;

    #[tokio::test]
    // Test that the function command processes
    // a `MULTI` request and responds with OK.
    async fn test_command() {
        let cmd = vec![String::from("multi")];
        let server = Server::new("localhost".to_string(), 6379);
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            master_connection: false,
        };

        command(&server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("OK"))))
        );
    }
}

The new module must be added to the modules list to become accessible.

src/commands/mod.rs
pub mod echo;
pub mod get;
pub mod incr;
pub mod info;
pub mod multi;
pub mod ping;
pub mod psync;
pub mod replconf;
pub mod set;
pub mod wait;

As usual, the command needs then to be exposed in server::process_request.

src/server.rs
use crate::commands::{echo, get, incr, info, ping, psync, replconf, set, wait};
use crate::commands::{echo, get, incr, info, multi, ping, psync, replconf, set, wait};
use crate::connection::{
    stream_read_data_length, stream_read_line, stream_send_receive_resp, ConnectionMessage,
};
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::resp::{bytes_to_resp, resp_extract_length, resp_remove_type};
use crate::server_result::{ServerError, ServerMessage, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{io::AsyncWriteExt, net::TcpStream};

pub async fn process_request(request: Request, server: &mut Server) {

    match command_name.as_str() {
    
        "info" => {
            info::command(server, &request, &command).await;
        }
        "ping" => {
            ping::command(server, &request, &command).await;
        }
        "multi" => {
            multi::command(server, &request, &command).await;
        }
        "psync" => {
            psync::command(server, &request, &command).await;
        }

CodeCrafters

Transactions Stage 4: The MULTI command

The code we wrote in this section passes Transactions - Stage 4 of the CodeCrafters challenge.

Step 9.5 - The EXEC command#

Now that we have an initial implementation of MULTI we can create an initial implementation of EXEC. Remember that the idea behind transactions is that a client starts with MULTI, then sends a certain number of commands, and finally ends with EXEC.

Here lies the problem in the current code: each request processed by the server is anonymous. Currently, we process requests without any knowledge of which client (that is, which connection) sent it. Since in this step we need to collect multiple commands from the same client, the very first feature we need to implement is a way to connect requests and clients.

The CodeCrafters challenge is split into very small steps. While we will implement them one by one, I think it's worth having a look at the bigger picture first and understand what we need to implement for the whole MULTI/EXEC machinery to actually work. Roughly speaking, we need to do the following:

  • Add a structure to uniquely represent a client.
  • Register a new client when a new connection is established.
  • Identify future requests as coming from this client.
  • Add a flag to signal that the client is currently in a transaction.
  • Collect commands for clients that are in a transaction.
  • Execute collected commands for a given client.

Keep these requirements in mind while we go through the next three stages.

The challenge wants us to implement a trivial version of EXEC that always fails. We have done this before, so the set of changes is basically the same. The command sends a RESP::SimpleError with a fixed string.

src/commands/exec.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::ServerValue;

pub async fn command(_server: &Server, request: &Request, _command: &Vec<String>) {
    request
        .data(ServerValue::RESP(RESP::SimpleError(
            "ERR EXEC without MULTI".to_string(),
        )))
        .await;
}

The file is added to the modules list.

src/commands/mod.rs
pub mod echo;
pub mod exec;
pub mod get;
pub mod incr;
pub mod info;
pub mod multi;
pub mod ping;
pub mod psync;
pub mod replconf;
pub mod set;
pub mod wait;

And the command is added to process_request.

src/server.rs
use crate::commands::{echo, get, incr, info, multi, ping, psync, replconf, set, wait};
use crate::commands::{echo, exec, get, incr, info, multi, ping, psync, replconf, set, wait};
use crate::connection::{
    stream_read_data_length, stream_read_line, stream_send_receive_resp, ConnectionMessage,
};
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::resp::{bytes_to_resp, resp_extract_length, resp_remove_type};
use crate::server_result::{ServerError, ServerMessage, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{io::AsyncWriteExt, net::TcpStream};

pub async fn process_request(request: Request, server: &mut Server) {

    match command_name.as_str() {
    
        "echo" => {
            echo::command(server, &request, &command).await;
        }
        "exec" => {
            exec::command(server, &request, &command).await;
        }
        "get" => {
            get::command(server, &request, &command).await;
        }

Given that the implementation of EXEC is a simple placeholder, we can skip the creation of unit tests for now.

CodeCrafters

Transactions Stage 5: The EXEC command

The code we wrote in this section passes Transactions - Stage 5 of the CodeCrafters challenge.

Step 9.6 - Empty transaction#

In this stage, things become much more interesting. Now, the function that implements EXEC needs to know if a client is inside a MULTI or not, so we need to implement the following requirements:

  • Add a structure to uniquely represent a client.
  • Register a new client when a new connection is established.
  • Identify future requests as coming from this client.
  • Add a flag to signal that the client is currently in a transaction.

Implementing those requirements will result in a noticeable amount of changes to the code, so we will handle them one by one with the goal of keeping the system stable.

Step 9.6.1 - Represent clients#

For the time being, a client in our system needs to contain only two values: a unique ID and a flag for MULTI.

src/client.rs
pub struct Client {
    pub client_id: u64,
    pub multi: bool,
}

impl Client {
    pub fn new(client_id: u64) -> Self {
        Self {
            client_id: client_id,
            multi: false,
        }
    }
}

The new module must be added to main.rs to be activated.

src/main.rs
mod commands;

mod client;
mod connection;
mod replication;
mod request;
mod resp;
mod resp_result;
mod server;
mod server_result;
mod set;
mod storage;
mod storage_result;

Step 9.6.2 - Handle new clients (server-side)#

Registering clients poses a small problem. Clients should be recorded in the server, but the responsibility of processing connection belongs to the connection handler. When the latter establishes a new connection, it has to send a message to the server and receive from it the unique identifier that will be used for the next requests.

Let's start with the server, which has, in theory, a simple job. When the connection handler signals that a new client connected, a new ID should be generated and sent back. In practice, this simple operation is one of the classic pitfalls of concurrency, that can easily generate bugs that are difficult to find. The server is potentially serving multiple connections at the same time, so the creation of the ID must happen in an atomic way.

Atomicity and race conditions

As we already mentioned in chapter 3, a race condition is a bug that arises when the behaviour of a program depends on the relative timing of parallel executions accessing shared state. When multiple processes or threads access the same resource without coordination, the outcome is non-deterministic. For example, two threads reading a counter, both seeing the same value, both incrementing it, and the counter ending up off by one is the textbook example. See the Wikipedia entry on race conditions for a general overview.

An atomic operation is one that either completes entirely or does not happen at all, so no other thread can observe it half-done or slip in between a read and a write. An atomic increment rules out the bug above because "read, add one, store" is treated as a single indivisible step.

Rust exposes atomic types in the std::sync::atomic module [docs]. Operations on atomics take an Ordering parameter that controls memory ordering: for a simple counter like ours Ordering::Relaxed is sufficient, but stronger orderings (Acquire, Release, SeqCst) become relevant when an atomic synchronises other memory accesses. The Rustonomicon chapter on atomics explains the model.

For a deeper, language-agnostic introduction to lock-free programming and memory models, Jeff Preshing's article on lock-free programming is widely recommended as a starting point.

Fortunately, Rust provides atomic operations in std::sync::atomic. In particular, here we are interested in AtomicU64 [docs]. The server will be initialised with an atomic counter (starting at 1) and every time the connection handler connects with a new client the counter will be incremented.

Let's first give the connection handler a way to signal that a new client connected.

src/connection.rs
#[derive(Debug)]
pub enum ConnectionMessage {
    NewClient(mpsc::Sender<ServerMessage>),
    Request(Request),
}

The new variant of ConnectionMessage contains an mpsc::Sender that allows the server to respond with the client ID. So far, the server has no way to communicate with the connection handler, as there was no need to establish that channel. Now that we know which type of message the server will receive we can implement the logic to create the client ID.

src/server.rs
use crate::commands::{echo, get, incr, info, multi, ping, psync, replconf, set, wait};
use crate::connection::{
    stream_read_data_length, stream_read_line, stream_send_receive_resp, ConnectionMessage,
};
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::resp::{bytes_to_resp, resp_extract_length, resp_remove_type};
use crate::server_result::{ServerError, ServerMessage, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{io::AsyncWriteExt, net::TcpStream};

...

pub struct Server {
    pub info: ServerInfo,
    pub storage: Option<Storage>,
    pub replication: ReplicationConfig,
    pub replica_senders: Vec<mpsc::Sender<ServerMessage>>,
    pub wait_handler_sender: Option<mpsc::Sender<ServerMessage>>,
    pub next_client_id: AtomicU64,
}


impl Server {

    pub fn new(host: String, port: u16) -> Self {
        Self {
            info: ServerInfo {
                host: host,
                port: port,
            },
            storage: None,
            replication: ReplicationConfig::new_master(),
            replica_senders: Vec::new(),
            wait_handler_sender: None,
            next_client_id: AtomicU64::new(1),
        }
    }


// Listen for incoming messages and process the contained requests.
pub async fn run_server(mut server: Server, mut crx: mpsc::Receiver<ConnectionMessage>) {
    let mut interval_timer = tokio::time::interval(Duration::from_millis(10));

    loop {
        tokio::select! {
            // Keep listening for messages on the incoming channel.
            Some(message) = crx.recv() => {
                match message {
                    // A new client has connected. Assign an ID
                    // and store the client in the server.
                    ConnectionMessage::NewClient(sender) => {
                        // Get the current ID and atomically
                        // increase it.
                        let client_id = server.next_client_id.fetch_add(1, Ordering::Relaxed);

                        // Send the client ID to the Connection Handler.
                        // CODE HERE
                    }

                    // If the message contains a request, extract it.
                    ConnectionMessage::Request(request) => {
                        // Process the request.
                        process_request(request, &mut server).await;
                    }
                }
            }

            _ = interval_timer.tick() => {
                server.expire_keys();
            }
        }
    }
}

As you might have noticed, the code in run_server is not complete. The server should send the ID back to the connection handler. This means creating a new variant of ServerMessage and Rust would force us to implement the code to receive it in handle_connection, so for now we will keep a placeholder.

Step 9.6.3 - Handle new clients (connection-side)#

In this step, we need to complete the client registration in the connection handler and add the client ID to requests coming from the new client.

The server must have a way to send the client ID back to the connection handler, so let's first create a variant of ServerMessage for that purpose.

src/server_result.rs
#[derive(Debug, PartialEq)]
pub enum ServerMessage {
    ClientInit(u64),
    Data(ServerValue),
    Error(ServerError),
}

The changes to the function handle_connection are peculiar. The message ClientInit should be received only at the very beginning of the process, in response to NewClient. This allows the connection handler to know the client ID as soon as possible, which allows it to label requests received by the client.

src/connection.rs
pub async fn handle_connection(

    // Create a buffer to host incoming data.
    let mut buffer = [0; 512];

    // Create the MPSC channel to communicate with the connection.
    let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

    // Tell the server that a new client has connected.
    if let Err(e) = server_sender
        .send(ConnectionMessage::NewClient(connection_sender.clone()))
        .await
    {
        eprintln!("Error sending request: {}", e);
        return;
    }

    // This is the unique client ID that the
    // connection handler receives from the server.
    let client_id: u64 = match connection_receiver.recv().await {
        Some(ServerMessage::ClientInit(id)) => id,
        _ => {
            eprintln!("Error initialising client");
            return;
        }
    };

...

            // A response arrived from the server.
            Some(response) = connection_receiver.recv() => {
                let _ = match response {
                    // We shouldn't receive this
                    // message again from the server.
                    ServerMessage::ClientInit(_) => {
                        eprintln!("ClientInit received twice");
                        return;
                    },
                    ServerMessage::Data(ServerValue::RESP(v)) => stream.write_all(v.to_string().as_bytes()).await,
                    ServerMessage::Data(ServerValue::None) => Ok(()),
                    ServerMessage::Data(ServerValue::Binary(data)) => stream.write_all(&data).await,
                    ServerMessage::Error(e) => {
                        eprintln!("Error: {}", ConnectionError::ServerError(e));
                        return;
                    }
                };
            }

With this in place, we can complete the server implementation.

src/server.rs
// Listen for incoming messages and process the contained requests.
pub async fn run_server(mut server: Server, mut crx: mpsc::Receiver<ConnectionMessage>) {
    let mut interval_timer = tokio::time::interval(Duration::from_millis(10));

    loop {
        tokio::select! {
            // Keep listening for messages on the incoming channel.
            Some(message) = crx.recv() => {
                match message {
                    // A new client has connected. Assign an ID
                    // and store the client in the server.
                    ConnectionMessage::NewClient(sender) => {
                        // Get the current ID and atomically
                        // increase it.
                        let client_id = server.next_client_id.fetch_add(1, Ordering::Relaxed);

                        // Send the client ID to the Connection Handler.
                        // CODE HERE
                        if let Err(_) = sender.send(ServerMessage::ClientInit(client_id)).await {
                            eprintln!("Error initialising client");
                            return;
                        }
                    }

                    // If the message contains a request, extract it.
                    ConnectionMessage::Request(request) => {
                        // Process the request.
                        process_request(request, &mut server).await;
                    }
                }
            }

            _ = interval_timer.tick() => {
                server.expire_keys();
            }
        }
    }
}

Step 9.6.4 - Connect clients and requests#

Each request sent by a client should be marked with its client ID, so that the server can, eventually, be able to collect them while in a MULTI. We need to add the client ID to the type Request.

src/request.rs
#[derive(Debug)]
pub struct Request {
    pub value: RESP,
    pub sender: mpsc::Sender<ServerMessage>,
    pub binary: Vec<u8>,
    pub client_id: u64,
    pub master_connection: bool,
}

The connection handler will add the ID when the request is created and sent to the server.

src/connection.rs
pub async fn handle_connection(

                            // Process the bytes in the buffer according to
                            // the content and extract the request. Update the index.
                            let resp = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
                                Ok(v) => v,
                                Err(e) => {
                                    eprintln!("Error: {}", e);
                                    return;
                                }
                            };

                            // Create the request.
                            let request = Request {
                                value: resp,
                                sender: connection_sender.clone(),
                                binary: buffer[start..index].to_vec(),
                                client_id: client_id,
                                master_connection,
                            };

Unfortunately, the change to Request invalidates approximately 40 tests. Use cargo test to check which tests need to be fixed, all of them need the same change shown below.

src/server.rs
mod tests {

    async fn test_process_request_ping() {
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        let storage = Storage::new();

        let mut server: Server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        process_request(request, &mut server).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("PONG"))))
        );
    }

Step 9.6.5 - Register new clients#

What we have done in the previous steps is enough to identify a client with a unique ID, but the server doesn't have any way to connect IDs and actual clients. The simplest way to do that is to store them in a HashMap inside the server itself.

src/server.rs
use crate::client::Client;
use crate::commands::{echo, exec, get, incr, info, multi, ping, psync, replconf, set, wait};
use crate::connection::{
    stream_read_data_length, stream_read_line, stream_send_receive_resp, ConnectionMessage,
};
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::resp::{bytes_to_resp, resp_extract_length, resp_remove_type};
use crate::server_result::{ServerError, ServerMessage, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{io::AsyncWriteExt, net::TcpStream};

...

pub struct Server {
    pub info: ServerInfo,
    pub storage: Option<Storage>,
    pub replication: ReplicationConfig,
    pub replica_senders: Vec<mpsc::Sender<ServerMessage>>,
    pub wait_handler_sender: Option<mpsc::Sender<ServerMessage>>,
    pub next_client_id: AtomicU64,
    pub clients: HashMap<u64, Client>,
}


impl Server {

    pub fn new(host: String, port: u16) -> Self {
        Self {
            info: ServerInfo {
                host: host,
                port: port,
            },
            storage: None,
            replication: ReplicationConfig::new_master(),
            replica_senders: Vec::new(),
            wait_handler_sender: None,
            next_client_id: AtomicU64::new(1),
            clients: HashMap::new(),
        }
    }

Now, when the server generates the client ID it can also initialise the Client structure and store it.

src/server.rs
// Listen for incoming messages and process the contained requests.
pub async fn run_server(mut server: Server, mut crx: mpsc::Receiver<ConnectionMessage>) {
    let mut interval_timer = tokio::time::interval(Duration::from_millis(10));

    loop {
        tokio::select! {
            // Keep listening for messages on the incoming channel.
            Some(message) = crx.recv() => {
                match message {
                    // A new client has connected. Assign an ID
                    // and store the client in the server.
                    ConnectionMessage::NewClient(sender) => {
                        // Get the current ID and atomically
                        // increase it.
                        let client_id = server.next_client_id.fetch_add(1, Ordering::Relaxed);

                        // Create the client.
                        let client = Client::new(client_id);

                        // Add the client to the global register.
                        server.clients.insert(client_id, client);

                        // Send the client ID to the Connection Handler.
                        if let Err(_) = sender.send(ServerMessage::ClientInit(client_id)).await {
                            eprintln!("Error initialising client");
                            return;
                        }
                    }

                    // If the message contains a request, extract it.
                    ConnectionMessage::Request(request) => {
                        // Process the request.
                        process_request(request, &mut server).await;
                    }
                }
            }

            _ = interval_timer.tick() => {
                server.expire_keys();
            }
        }
    }
}

Step 9.6.6 - Wire up the exec handler#

It's finally time to wire all together and make MULTI and EXEC interact, at least at a basic level. When MULTI is invoked, the flag on the client must be toggled, so that EXEC can actually know if the client is inside a transaction or not and act accordingly.

In the implementation of MULTI we need to retrieve the client and toggle the flag multi.

src/commands/multi.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::ServerValue;
use crate::server_result::{ServerError, ServerValue};

pub async fn command(_server: &Server, request: &Request, _command: &Vec<String>) {
pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
    // Find the client corresponding to
    // the ID contained in the request.
    let client = match server.clients.get_mut(&request.client_id) {
        Some(client) => client,
        None => {
            request
                .error(ServerError::CommandInternalError(String::from(
                    "Cannot find client ID",
                )))
                .await;
            return;
        }
    };

    // Toggle MULTI on the client.
    client.multi = true;

    request
        .data(ServerValue::RESP(RESP::SimpleString("OK".to_string())))
        .await;
}

The additional logic requires changes in the existing test. We can also add a new one.

src/commands/multi.rs
#[cfg(test)]
mod tests {
    use super::*;
    use crate::client::Client;
    use crate::server_result::ServerMessage;
    use tokio::sync::mpsc;

    #[tokio::test]
    // Test that the function command processes
    // a `MULTI` request and responds with OK.
    async fn test_command() {
        let cmd = vec![String::from("multi")];
        let server = Server::new("localhost".to_string(), 6379);
        let mut server = Server::new("localhost".to_string(), 6379);
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let client = Client::new(1);
        server.clients.insert(1, client);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        command(&server, &request, &cmd).await;
        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("OK"))))
        );
    }

    #[tokio::test]
    // Test that the function command returns an error
    // when the client ID is not found.
    async fn test_command_without_client() {
        let cmd = vec![String::from("multi")];
        let mut server = Server::new("localhost".to_string(), 6379);
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandInternalError(String::from(
                "Cannot find client ID"
            ))),
        );
    }
}

The changes to EXEC are more substantial, mostly because the function was basically a placeholder.

src/commands/exec.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::ServerValue;
use crate::server_result::{ServerError, ServerValue};

pub async fn command(_server: &Server, request: &Request, _command: &Vec<String>) {
pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
    // Find the client corresponding to
    // the ID contained in the request.
    let client = match server.clients.get_mut(&request.client_id) {
        Some(client) => client,
        None => {
            request
                .error(ServerError::CommandInternalError(String::from(
                    "Cannot find client ID",
                )))
                .await;
            return;
        }
    };

    // Check if the client is inside a transaction.
    if !client.multi {
        // Client multi is off, so EXEC cannot be executed.
        request
            .data(ServerValue::RESP(RESP::SimpleError(String::from(
                "ERR EXEC without MULTI",
            ))))
            .await;
        return;
    };

    // Client multi is on, so we need to execute the
    // transaction. After this, multi should be off.
    client.multi = false;

    request
        .data(ServerValue::RESP(RESP::SimpleError(
            "ERR EXEC without MULTI".to_string(),
        )))
        .await;
    request
        .data(ServerValue::RESP(RESP::Array(Vec::new())))
        .await;
}

The function had no tests, but now we can add several.

src/commands/exec.rs
#[cfg(test)]
mod tests {
    use crate::client::Client;
    use crate::server_result::ServerMessage;

    use super::*;
    use tokio::sync::mpsc;

    #[tokio::test]
    // Test that the function command returns an error
    // when the client ID is not found.
    async fn test_command_without_client() {
        let cmd = vec![String::from("exec")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let mut server = Server::new("localhost".to_string(), 6379);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandInternalError(String::from(
                "Cannot find client ID"
            ))),
        );
    }

    #[tokio::test]
    // Test that the function command returns an error
    // when EXEC is called without MULTI.
    async fn test_command_without_multi() {
        let cmd = vec![String::from("exec")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let mut server = Server::new("localhost".to_string(), 6379);

        let client = Client::new(1);
        server.clients.insert(1, client);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleError(String::from(
                "ERR EXEC without MULTI"
            ))))
        );
    }

    #[tokio::test]
    // Test that the function command returns an
    // empty array when EXEC is called after MULTI.
    async fn test_command_with_multi() {
        let cmd = vec![String::from("exec")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let mut server = Server::new("localhost".to_string(), 6379);

        let mut client = Client::new(1);
        client.multi = true;
        server.clients.insert(1, client);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::Array(Vec::new())))
        );
    }
}

As a final detail, we can silence the compiler warning about Client.client_id as we did in chapter 7 for Server.host.

src/client.rs
pub struct Client {
    #[expect(dead_code)]
    pub client_id: u64,
    pub multi: bool,
}

CodeCrafters

Transactions Stage 6: Empty transaction

The code we wrote in this section passes Transactions - Stage 6 of the CodeCrafters challenge.

Step 9.7 - Queueing commands#

Now that MULTI and EXEC actually interact with the client, it's time to implement the core feature they are supposed to provide. If the client is inside a MULTI, commands must be queued and not executed directly. As you can imagine, queuing commands is not a big deal, as we need to just store them instead of executing them when the client is in a certain state, a job that can be performed exclusively in the server.

Later, we will face the challenge of executing the stored commands, and this will be a bit more complicated.

For now, let's consider the type Client. There is no way to store inside it a set of commands, so let's add that first.

src/client.rs
use crate::request::Request;

pub struct Client {
    pub client_id: u64,
    pub requests: Vec<Request>,
    pub multi: bool,
}

impl Client {
    pub fn new(client_id: u64) -> Self {
        Self {
            client_id: client_id,
            requests: Vec::new(),
            multi: false,
        }
    }
}

When this is done, we can change process_request to either store command or to execute them according to the state of the client.

src/server.rs
pub async fn process_request(request: Request, server: &mut Server) {

    // Check that each element of the array is a
    // bulk string, extract the content, and add it
    // to the vector.
    for elem in elements.iter() {
        match elem {
            RESP::BulkString(v) => command.push(v.clone()),
            _ => {
                request.error(ServerError::IncorrectData).await;
                return;
            }
        }
    }

    // Look up the client associated with this request.
    let client = match server.clients.get_mut(&request.client_id) {
        Some(client) => client,
        None => {
            request
                .error(ServerError::CommandInternalError(String::from(
                    "Cannot find client ID",
                )))
                .await;
            return;
        }
    };

    // Extract the command name to route the request.
    let command_name = command[0].to_lowercase();

    // If the client is in multi mode and the command
    // is not EXEC, queue the request and reply QUEUED.
    if client.multi && command_name != "exec" {
        request
            .data(ServerValue::RESP(RESP::SimpleString(String::from(
                "QUEUED",
            ))))
            .await;
        client.requests.push(request);
        return;
    }

    // Process the request using the requested command.
    match command_name.as_str() {...}

We also update two tests.

src/server.rs
mod tests {

    async fn test_process_request_ping() {

        let mut server: Server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let client = Client::new(1);
        server.clients.insert(1, client);

        process_request(request, &mut server).await;


    async fn test_process_request_echo() {
    
        let mut server: Server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let client = Client::new(1);
        server.clients.insert(1, client);

        process_request(request, &mut server).await;

CodeCrafters

Transactions Stage 7: Queueing commands

The code we wrote in this section passes Transactions - Stage 7 of the CodeCrafters challenge.

Step 9.8 - Executing a transaction#

Finally, it's time to implement the execution of the transaction, that is the missing part of the command EXEC.

Let's recap what happens so far during a transaction:

  1. The client connects to the server, so the server creates an ID and connects to it a Client struct that contains the MULTI flag and a vector to host transaction commands.
  2. The client issues a MULTI, which turns on the flag associated with that client inside the server.
  3. The client sends commands that are routed to the client vector instead of being executed.
  4. The client issues an EXEC, which at the moment just turns off the flag.

What should happen is that the command EXEC executes all stored commands. How can we make a command execute other commands?

If you think about it, the architecture of the system already contains the solution. The entity that executes commands is the server, and the server is reached through requests sent through its communication channel. So, it's simply a matter of sending all the requests we stored to the server again.

If this sounds like the server sending requests to itself, that's precisely the idea. However, since we are executing the command EXEC, we cannot run another command (see a detailed explanation in the aside), so we need to run the commands in another entity.

The command-in-command deadlock

Every request from every connection is consumed by the select! in run_server. When a request arrives, the loop calls process_request().await, which eventually calls *::command().await for whatever command we are running. Being an async system, the loop does not iterate again until that call returns.

Imagine what would happen if exec::command was used to send the stored requests:

  1. The request is sent to the server. The message sits in a channel waiting to be read.
  2. The code in exec::command waits for a response.
  3. The only task that can deliver that response is the server loop, but that loop is currently parked in process_request().await, which is parked in exec::command().await, which is parked waiting for a response.

Nobody reads the message. Deadlock.

This entity is simply a new task that is given all it needs to go through all requests, sending them to the server and waiting for the responses, collecting them and sending them back to the client.

We need several components to make this work:

  • A way for the server to send messages to itself.
  • A channel to send results back to the client.
  • An asynchronous function that processes all stored requests, sending requests to the server and responses to the client.

Step 9.8.1 - The server self sender#

When we create the server in main we also create a channel that allows the connection handler to send messages to the server. The sender of that very channel is what we want to store in the server, so that at any time the server can send a message to itself.

We first add a new field to the struct Server.

src/server.rs
pub struct Server {
    pub info: ServerInfo,
    pub storage: Option<Storage>,
    pub replication: ReplicationConfig,
    pub replica_senders: Vec<mpsc::Sender<ServerMessage>>,
    pub wait_handler_sender: Option<mpsc::Sender<ServerMessage>>,
    pub self_sender: Option<mpsc::Sender<ConnectionMessage>>,
    pub next_client_id: AtomicU64,
    pub clients: HashMap<u64, Client>,
}

impl Server {

    pub fn new(host: String, port: u16) -> Self {
        Self {
            info: ServerInfo {
                host: host,
                port: port,
            },
            storage: None,
            replication: ReplicationConfig::new_master(),
            replica_senders: Vec::new(),
            wait_handler_sender: None,
            self_sender: None,
            next_client_id: AtomicU64::new(1),
            clients: HashMap::new(),
        }
    }

Then, we copy the server_sender that we generate in main.

src/main.rs
async fn main() -> std::io::Result<()> {

    // Create the server, attach the storage
    // and configure replication.
    let mut server = Server::new(args.host.clone(), args.port);
    server.set_storage(storage);
    server.set_replication(replication_config);

    // Create the MPSC channel to communicate with the server.
    let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);

    // Store the server sender in the server itself.
    // This allows the server to spawn functions
    // that send requests back to itself (e.g. EXEC).
    server.self_sender = Some(server_sender.clone());

    // If this server is a replica,
    // connect to the master and listen.
    if let Some(master_config) = server.replication.master.clone() {
        run_master_listener(
            master_config.host.clone(),
            master_config.port,
            &server.info,
            server_sender.clone(),
        )
        .await;
    }

Step 9.8.2 - A channel to communicate with the client#

The task that will execute commands will collect the output of each and send them back to the client. Therefore, it needs to have a channel to communicate with the client.

In the current implementation, the sender that allows us to send messages to the client is stored in each request, so in theory we could reuse that. However, while this is valid now, there is no guarantee this will be valid also in the future. The channel in the request is the channel used to reply to that specific request, and the fact that it corresponds to the client channel is purely a matter of the current setup.

This means that we need to store in the client the sender used to send it messages.

src/client.rs
use crate::request::Request;
use crate::server_result::ServerMessage;

use tokio::sync::mpsc;

pub struct Client {
    pub client_id: u64,
    pub sender: mpsc::Sender<ServerMessage>,
    pub requests: Vec<Request>,
    pub multi: bool,
}

impl Client {
    pub fn new(client_id: u64) -> Self {
    pub fn new(client_id: u64, sender: mpsc::Sender<ServerMessage>) -> Self {
        Self {
            client_id: client_id,
            sender: sender,
            requests: Vec::new(),
            multi: false,
        }
    }
}

The channel is available when we initialise the Client struct in the server.

src/server.rs
pub async fn run_server(mut server: Server, mut crx: mpsc::Receiver<ConnectionMessage>) {

                    // A new client has connected. Assign an ID
                    // and store the client in the server.
                    ConnectionMessage::NewClient(sender) => {
                        // Get the current ID and atomically
                        // increase it.
                        let client_id = server.next_client_id.fetch_add(1, Ordering::Relaxed);

                        // Create the client.
                        let client = Client::new(client_id);
                        let client = Client::new(client_id, sender.clone());

                        // Add the client to the global register.
                        server.clients.insert(client_id, client);

                        // Send the client ID to the Connection Handler.
                        if let Err(_) = sender.send(ServerMessage::ClientInit(client_id)).await {
                            eprintln!("Error initialising client");
                            return;
                        }
                    }

The change in Client::new causes the failure of several tests that we need to fix.

src/commands/exec.rs
mod tests {

    async fn test_command_without_multi() {
    
        let cmd = vec![String::from("exec")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let mut server = Server::new("localhost".to_string(), 6379);

        let client = Client::new(1);
        let client = Client::new(1, connection_sender.clone());
        server.clients.insert(1, client);


    async fn test_command_with_multi() {
    
        let cmd = vec![String::from("exec")];

        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let mut server = Server::new("localhost".to_string(), 6379);

        let mut client = Client::new(1);
        let mut client = Client::new(1, connection_sender.clone());
        client.multi = true;
        server.clients.insert(1, client);
src/commands/multi.rs
mod tests {

    async fn test_command() {
    
        let cmd = vec![String::from("multi")];
        let mut server = Server::new("localhost".to_string(), 6379);
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let client = Client::new(1);
        let client = Client::new(1, connection_sender.clone());
        server.clients.insert(1, client);
src/server.rs
mod tests {

    #[tokio::test]
    // Test that the function process_request
    // processes a PING request and that it
    // responds with a PONG.
    async fn test_process_request_ping() {
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };
        
        let storage = Storage::new();

        let mut server: Server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let client = Client::new(1);
        let client = Client::new(1, connection_sender.clone());
        server.clients.insert(1, client);

        let request = Request {
            value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        process_request(request, &mut server).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("PONG"))))
        );
    }

    #[tokio::test]
    // Test that the function process_request
    // processes an ECHO request and that it
    // responds with a copy of the input.
    async fn test_process_request_echo() {
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Array(vec![
                RESP::BulkString(String::from("ECHO")),
                RESP::BulkString(String::from("42")),
            ]),
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };
        
        let storage = Storage::new();

        let mut server: Server = Server::new("localhost".to_string(), 6379);
        server.set_storage(storage);

        let client = Client::new(1);
        let client = Client::new(1, connection_sender.clone());
        server.clients.insert(1, client);

        let request = Request {
            value: RESP::Array(vec![
                RESP::BulkString(String::from("ECHO")),
                RESP::BulkString(String::from("42")),
            ]),
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        process_request(request, &mut server).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::BulkString(String::from("42"))))
        );
    }

Step 9.8.3 - A function to process stored requests#

As we discussed above, the function EXEC will eventually spawn an asynchronous function that will go through all stored requests, sending them to the server, collecting outputs, and sending them to the client.

The function will therefore have the following signature.

src/server.rs
// Execute queued requests from a transaction and
// send the collected results back to the client
// as a single RESP array.
pub async fn exec_handler(
    requests: Vec<Request>,
    server_sender: mpsc::Sender<ConnectionMessage>,
    client_sender: mpsc::Sender<ServerMessage>,
) {

...

}

The server_sender will be used to send the stored requests to the server, and the client_sender to send the outputs to the client.

The first thing the function must do is to create a vector to host the outputs.

src/server.rs
// Execute queued requests from a transaction and
// send the collected results back to the client
// as a single RESP array.
pub async fn exec_handler(
    requests: Vec<Request>,
    server_sender: mpsc::Sender<ConnectionMessage>,
    client_sender: mpsc::Sender<ServerMessage>,
) {

    // Create a vector to host the results
    // of the stored requests.
    let mut results: Vec<RESP> = Vec::new();

}

Now, there is a small problem to solve. If the server receives a request, it will respond using the sender stored in the request itself, which at the moment is the client sender. However, EXEC is supposed to collect all outputs and to send them back in a single array, not to reply to each single command.

The solution, however, is rather simple. Since all communication takes place over MPSC channels, we just need to create a channel for the exec handler and replace the sender stored inside the request.

src/server.rs
// Execute queued requests from a transaction and
// send the collected results back to the client
// as a single RESP array.
pub async fn exec_handler(
    requests: Vec<Request>,
    server_sender: mpsc::Sender<ConnectionMessage>,
    client_sender: mpsc::Sender<ServerMessage>,
) {
    // Create a vector to host the results
    // of the stored requests.
    let mut results: Vec<RESP> = Vec::new();

    // Create a channel for this handler.
    let (handler_sender, mut handler_receiver) = mpsc::channel::<ServerMessage>(32);

    for mut request in requests.into_iter() {
        // Replace the request sender with the
        // handler sender. This way the server
        // will send the response back here instead
        // of sending it to the connection, and
        // through that to the client.
        // This allows us to collect responses and
        // to send them back into a RESP array.
        request.sender = handler_sender.clone();

        // Send the request to the server.
        if let Err(e) = server_sender
            .send(ConnectionMessage::Request(request))
            .await
        {
            eprintln!("Error sending request: {}", e);
            return;
        }

        match handler_receiver.recv().await {
            // We need to capture any response
            // that contains RESP, as we need
            // to collect all messages
            // and send them back to the client.
            Some(ServerMessage::Data(ServerValue::RESP(response))) => results.push(response),
            _ => {}
        }
    }
}

The last thing the exec handler has to do is to collect all responses in a RESP array and send it to the client.

src/server.rs
// Execute queued requests from a transaction and
// send the collected results back to the client
// as a single RESP array.
pub async fn exec_handler(
    requests: Vec<Request>,
    server_sender: mpsc::Sender<ConnectionMessage>,
    client_sender: mpsc::Sender<ServerMessage>,
) {
    // Create a vector to host the results
    // of the stored requests.
    let mut results: Vec<RESP> = Vec::new();

    // Create a channel for this handler.
    let (handler_sender, mut handler_receiver) = mpsc::channel::<ServerMessage>(32);

    for mut request in requests.into_iter() {...}

    // Create a RESP array out of the results vector.
    let result = ServerValue::RESP(RESP::Array(results));

    // Now send the RESP array to the connection
    // and the client.
    let _ = client_sender.send(ServerMessage::Data(result)).await;
}

The function exec_handler contains a relevant amount of logic, and should be tested. We can write three new tests.

src/server.rs
mod tests {

    #[tokio::test]
    // Test that exec_handler sends an empty RESP
    // array to the client when given no requests.
    async fn test_exec_handler_no_requests() {
        let (server_sender, _server_receiver) = mpsc::channel::<ConnectionMessage>(32);
        let (client_sender, mut client_receiver) = mpsc::channel::<ServerMessage>(32);

        exec_handler(Vec::new(), server_sender, client_sender).await;

        assert_eq!(
            client_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::Array(Vec::new())))
        );
    }

    #[tokio::test]
    // Test that exec_handler forwards each queued
    // request to the server and collects the
    // responses, in order, into a single RESP array
    // sent to the client.
    async fn test_exec_handler_collects_responses() {
        let (server_sender, mut server_receiver) = mpsc::channel::<ConnectionMessage>(32);
        let (client_sender, mut client_receiver) = mpsc::channel::<ServerMessage>(32);

        // Spawn a fake server that replies to each
        // incoming request with an integer equal
        // to the first argument of the request.
        tokio::spawn(async move {
            while let Some(ConnectionMessage::Request(request)) = server_receiver.recv().await {
                let n: i64 = match &request.value {
                    RESP::Array(v) => match &v[0] {
                        RESP::BulkString(s) => s.parse().unwrap(),
                        _ => 0,
                    },
                    _ => 0,
                };
                let _ = request
                    .sender
                    .send(ServerMessage::Data(ServerValue::RESP(RESP::Integer(n))))
                    .await;
            }
        });

        let (placeholder_sender, _placeholder_receiver) = mpsc::channel::<ServerMessage>(32);

        let requests = vec![
            Request {
                value: RESP::Array(vec![RESP::BulkString(String::from("1"))]),
                sender: placeholder_sender.clone(),
                binary: Vec::new(),
                client_id: 1,
                master_connection: false,
            },
            Request {
                value: RESP::Array(vec![RESP::BulkString(String::from("2"))]),
                sender: placeholder_sender.clone(),
                binary: Vec::new(),
                client_id: 1,
                master_connection: false,
            },
        ];

        exec_handler(requests, server_sender, client_sender).await;

        assert_eq!(
            client_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::Array(vec![
                RESP::Integer(1),
                RESP::Integer(2),
            ])))
        );
    }

    #[tokio::test]
    // Test that exec_handler silently drops
    // responses that are not wrapped in
    // ServerValue::RESP when collecting the results.
    async fn test_exec_handler_drops_non_resp_responses() {
        let (server_sender, mut server_receiver) = mpsc::channel::<ConnectionMessage>(32);
        let (client_sender, mut client_receiver) = mpsc::channel::<ServerMessage>(32);

        // Spawn a fake server that alternates between
        // a RESP integer response and a non-RESP
        // Data::None response.
        tokio::spawn(async move {
            let mut toggle = false;
            while let Some(ConnectionMessage::Request(request)) = server_receiver.recv().await {
                let message = if toggle {
                    ServerMessage::Data(ServerValue::None)
                } else {
                    ServerMessage::Data(ServerValue::RESP(RESP::Integer(42)))
                };
                toggle = !toggle;
                let _ = request.sender.send(message).await;
            }
        });

        let (placeholder_sender, _placeholder_receiver) = mpsc::channel::<ServerMessage>(32);

        let requests = vec![
            Request {
                value: RESP::Array(vec![RESP::BulkString(String::from("a"))]),
                sender: placeholder_sender.clone(),
                binary: Vec::new(),
                client_id: 1,
                master_connection: false,
            },
            Request {
                value: RESP::Array(vec![RESP::BulkString(String::from("b"))]),
                sender: placeholder_sender.clone(),
                binary: Vec::new(),
                client_id: 1,
                master_connection: false,
            },
        ];

        exec_handler(requests, server_sender, client_sender).await;

        assert_eq!(
            client_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::Array(vec![RESP::Integer(42)])))
        );
    }

Step 9.8.4 - Spawn the exec handler#

The very last step of this stage is to wire everything up together, spawning exec_handler in the EXEC function.

src/commands/exec.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server::{exec_handler, Server};
use crate::server_result::{ServerError, ServerValue};

pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {

    // Client multi is on, so we need to execute the
    // transaction. After this, multi should be off.
    client.multi = false;

    request
        .data(ServerValue::RESP(RESP::Array(Vec::new())))
        .await;

    // If there are no requests it's pointless to
    // spawn the exec handler.
    if client.requests.len() == 0 {
        request
            .data(ServerValue::RESP(RESP::Array(Vec::new())))
            .await;
        return;
    }

    // To execute the transaction the server
    // needs to have a self sender. This should be
    // initialised but as it's optional we need to check.
    let server_sender = match &server.self_sender {
        Some(sender) => sender.clone(),
        None => {
            request
                .error(ServerError::CommandInternalError(String::from(
                    "Server self sender not initialised",
                )))
                .await;
            return;
        }
    };

    // Spawn the Exec Handler.
    tokio::spawn(exec_handler(
        client.requests.clone(),
        server_sender,
        client.sender.clone(),
    ));

There are a couple of checks in the function before the task is spawned. The first one is checking that there are actual requests to process, and the second one that the server self sender is initialised.

To perform client.requests.clone() we need to add the trait Clone to Request.

src/request.rs
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Request {
    pub value: RESP,
    pub sender: mpsc::Sender<ServerMessage>,
    pub binary: Vec<u8>,
    pub client_id: u64,
    pub master_connection: bool,
}

CodeCrafters

Transactions Stage 8: Executing a transaction

The code we wrote in this section passes Transactions - Stage 8 of the CodeCrafters challenge.

Step 9.9 - The DISCARD command#

The command DISCARD works just like EXEC, but instead of executing the queued requests it discards them. We add the command as always.

src/commands/discard.rs
use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::{ServerError, ServerValue};

pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
    // Find the client corresponding to
    // the ID contained in the request.
    let client = match server.clients.get_mut(&request.client_id) {
        Some(client) => client,
        None => {
            request
                .error(ServerError::CommandInternalError(String::from(
                    "Cannot find client ID",
                )))
                .await;
            return;
        }
    };

    // Check if the client is inside a transaction.
    if !client.multi {
        // Client multi is off, so DISCARD cannot be executed.
        request
            .data(ServerValue::RESP(RESP::SimpleError(String::from(
                "ERR DISCARD without MULTI",
            ))))
            .await;
        return;
    };

    // Client multi is on, so we need to remove all
    // queued requests. After this, multi should be off.
    client.multi = false;

    // Remove queued requests.
    client.requests.clear();

    request
        .data(ServerValue::RESP(RESP::SimpleString(String::from("OK"))))
        .await;
}

The structure of the function is taken from exec::command, even though the second part of the code is clearly different.

We can also add three tests.

src/commands/discard.rs
#[cfg(test)]
mod tests {
    use super::*;
    use crate::client::Client;
    use crate::server_result::ServerMessage;
    use tokio::sync::mpsc;

    #[tokio::test]
    // Test that the function command returns an error
    // when the client ID is not found.
    async fn test_command_without_client() {
        let cmd = vec![String::from("discard")];
        let mut server = Server::new("localhost".to_string(), 6379);
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let request = Request {
            value: RESP::Null,
            sender: connection_sender,
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        command(&mut server, &request, &cmd).await;

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Error(ServerError::CommandInternalError(String::from(
                "Cannot find client ID"
            ))),
        );
    }

    #[tokio::test]
    // Test that the function command clears
    // queued requests and the multi flag.
    async fn test_command() {
        let cmd = vec![String::from("discard")];
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let mut server = Server::new("localhost".to_string(), 6379);

        let mut client = Client::new(1, connection_sender.clone());

        let request = Request {
            value: RESP::Null,
            sender: connection_sender.clone(),
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        client.multi = true;
        client.requests.push(request.clone());

        server.clients.insert(1, client);

        command(&mut server, &request, &cmd).await;

        let client = server.clients.get(&1).unwrap();

        assert_eq!(client.requests.len(), 0);
        assert_eq!(client.multi, false);

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("OK"))))
        );
    }

    #[tokio::test]
    // Test that the function command returns an error
    // when DISCARD is called without MULTI.
    async fn test_command_without_multi() {
        let cmd = vec![String::from("discard")];
        let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);

        let mut server = Server::new("localhost".to_string(), 6379);

        let mut client = Client::new(1, connection_sender.clone());

        let request = Request {
            value: RESP::Null,
            sender: connection_sender.clone(),
            binary: Vec::new(),
            client_id: 1,
            master_connection: false,
        };

        client.multi = false;

        server.clients.insert(1, client);

        command(&mut server, &request, &cmd).await;

        let client = server.clients.get(&1).unwrap();

        assert_eq!(client.requests.len(), 0);
        assert_eq!(client.multi, false);

        assert_eq!(
            connection_receiver.try_recv().unwrap(),
            ServerMessage::Data(ServerValue::RESP(RESP::SimpleError(String::from(
                "ERR DISCARD without MULTI"
            ))))
        );
    }
}

As we did before, the function must be added to server::process_request.

src/server.rs
use crate::commands::{echo, exec, get, incr, info, multi, ping, psync, replconf, set, wait};
use crate::commands::{
    discard, echo, exec, get, incr, info, multi, ping, psync, replconf, set, wait,
};

pub async fn process_request(request: Request, server: &mut Server) {

    // Process the request using the requested command.
    match command_name.as_str() {
        "discard" => {
            discard::command(server, &request, &command).await;
        }
        "echo" => {
            echo::command(server, &request, &command).await;
        }
        "exec" => {
            exec::command(server, &request, &command).await;
        }

        ...

There is a caveat, however. In process_request, we explicitly block the execution of all commands but EXEC when in multi mode. Now we need to whitelist DISCARD as well.

src/server.rs
pub async fn process_request(request: Request, server: &mut Server) {

    // Extract the command name to route the request.
    let command_name = command[0].to_lowercase();

    // List of commands that should be executed immediately
    // even when in multi mode.
    let ignored_commands = vec!["exec", "discard"];

    // If the client is in multi mode and the command
    // is not EXEC, queue the request and reply QUEUED.
    if client.multi && command_name != "exec" {
    // is not in the list above, queue the request
    // and reply QUEUED.
    if client.multi && !ignored_commands.contains(&command_name.as_str()) {

CodeCrafters

Transactions Stage 9: The DISCARD command

The code we wrote in this section passes Transactions - Stage 9 of the CodeCrafters challenge.

Step 9.10 - Failures within transactions#

This step passes automatically with the code we wrote. The output of a failed request is sent back to the client as a normal message, so exec_handler is already routing everything correctly.

CodeCrafters

Transactions Stage 10: Failures within transactions

The code we wrote in this section passes Transactions - Stage 10 of the CodeCrafters challenge.

Step 9.11 - Multiple transactions#

This step also passes automatically with the code we wrote. Since we stored requests inside Client, and a new client is created for each connection, we naturally achieve the goal of this stage.

CodeCrafters

Transactions Stage 11: Multiple transactions

The code we wrote in this section passes Transactions - Stage 11 of the CodeCrafters challenge and concludes the transactions extension.