+OK\r\nThis conversation can serve no purpose any more.
Now that we have a basic generic server we need to focus on an important feature that is more closely related to Redis: the RESP protocol.
Redis clients and servers communicate with a custom binary protocol called RESP (REdis Serialization Protocol). On the Redis website you can find the full specification, but we won't implement the whole thing as it would be overkill, considering the scope of the project.
To complete the basic stages of the challenge we need to implement three types of data: simple strings, bulk strings, and arrays. This might sound trivial, but when it comes to binary protocols there are a lot of low-level functions to implement. So, while it won't be complicated, it will take the whole chapter to reach a working implementation.
Once these elements and the underlying machinery are in place it will be a breeze to implement other types (like integers) for later stages.
RESP is roughly made of a binary representation of certain data types and of a format for requests and responses (both successful and unsuccessful).
The official documentation explains the difference between RESP types, even though some corner cases could be clearer with additional examples. For the time being, you should familiarise yourself with the way simple strings, bulk strings, and arrays are implemented.
Please also note that Redis commands shall be sent by the client as RESP arrays of bulk strings and that responses will be sent according to the requested command. For example, the command GET [docs] will reply with either a bulk string with the value of the key or with a null.
The first important question to ask is: what is a binary communication protocol, and why do we need it?
In brief, as the topic is huge, we can keep the following concepts in mind.
A communication protocol is a set of rules that machines (or even humans, sometimes) have to follow to transfer information successfully. Examples of protocols are everywhere: TCP and HTTP/HTTPS for the Web, Wi-Fi and Ethernet for networks, SMTP and POP3 for emails. From a certain point of view, file and data formats in general can be seen as "protocols", as they provide a way to convey information. Strictly speaking, however, a communication protocol implies different parties that exchange information.
When we discuss digital protocols (for machine-to-machine communication) we are always considering sequences of bits, as this is all our computers can understand. However, sometimes such bits can be organised to represent information according to some specific encoding. For example, HTTP/1.x requests are text encoded using the US-ASCII format (as specified in RFC 7230), which means that the bits of each HTTP request and response are organised in groups of 8 (1 byte) and are always interpreted as ASCII strings.
A binary protocol uses arbitrary groups of bits, that might or might not align with the classic powers of 2 (8, 16, 32, and so on). TCP, for example, has several fields that are just one bit (SYN, ACK, and so on). The RESP documentation describes RESP as "a binary protocol that uses control sequences encoded in standard ASCII". In practice, this means that the structural parts of a RESP message (type prefixes like + and $, lengths, and \r\n terminators) are ASCII, but the payload of bulk strings can contain arbitrary binary data. As we will see shortly, this makes it a hybrid of sorts between a text protocol and a pure binary one.
Why would a system use such a low-level representation of data? According to the RESP specification, the protocol is designed as a compromise between three goals: being simple to implement, fast to parse, and human readable. RESP achieves its parsing speed by using prefixed lengths to transfer bulk data, which eliminates the need to scan payloads for special characters or to quote and escape their content. This allows a parser to process data in a single pass, with performance similar to that of a pure binary protocol.
Before we start writing code, let's get a feel for what RESP data actually looks like on the wire. Every RESP element begins with a single character that identifies its type, followed by the actual data, and terminated by \r\n (carriage return + line feed).
A simple string is prefixed with +:
+OK\r\nA bulk string is prefixed with $, followed by the length of the string, \r\n, the string data itself, and a final \r\n:
$5\r\nHELLO\r\nAn array is prefixed with *, followed by the number of elements, \r\n, and then the elements themselves one after another:
*2\r\n$4\r\nECHO\r\n$5\r\nHELLO\r\nThis last example is an array of two bulk strings: ECHO and HELLO. This is exactly how a Redis client sends the command ECHO HELLO to the server.
As you can see, \r\n is the separator between all RESP parts. Parsing RESP means reading bytes from a buffer and extracting the right pieces of data. Since this is a binary protocol, we need to work at the byte level, which means we'll need some low-level utility functions before we can parse RESP types themselves.
The following is an example of how RESP can be processed by the server. This is precisely what we will develop in the rest of the chapter, and will be detailed below.
$5\r\nHELLO\r\n.$ tells us that we are looking at a bulk string, so a dedicated function will be run on the bytes $5\r\nHELLO\r\n.$, then it will extract the length of the bulk string. To do this, it will read bytes until \r\n is met and discarded. This results in the characters 5, which is converted into the integer 5.H, E, L, L, O, \r, \n). The terminator is discarded and the remaining characters are transformed into a Rust string HELLO.As RESP commands are sent as arrays like *2\r\n$4\r\nECHO\r\n$5\r\nHELLO\r\n, the process described above is iterated multiple times to extract all components of the received command.
Each of these steps needs to contain several checks for data consistency and correctness. For example, we need to check that the length is an integer, and that the terminator is present after that number of characters. This won't be quick, but it will establish once and for all a foundation on which to build the actual Redis functions: PING, ECHO, GET, SET, and so on.
The steps we will follow in this chapter are:
PING.PING with a proper process.ECHO.This chapter is long, and it's easy to lose track of what we are doing. Unfortunately, there is no real way to avoid it, other than having a sloppy implementation that is going to make it extremely complicated to solve the next steps of the challenge. My recommendation is to come back to this roadmap after each step to make sure the final goal is still clear.
It's always worth having a proper description of errors, and this is even more true when it comes to parsing, which is notoriously error-prone. Let's start implementing an enum for errors and a type for results
src/resp_result.rs #[derive(Debug)]
pub enum RESPError {}
pub type RESPResult<T> = Result<T, RESPError>;We will add enum values to RESPError later when we need to encapsulate specific errors in the parsing stage.
Before writing any parsing logic, let's set up the testing infrastructure. We will develop the RESP parser using TDD (Test-Driven Development): write a test first, watch it fail, then write just enough code to make it pass.
The function we need to build is binary_extract_line. Its job is to read bytes from a buffer until a \r\n separator is found. Let's begin with a test for the simplest case: a well-formed buffer like OK\r\n.
First, we need to declare the new modules in main.rs so that the compiler knows about them
src/main.rs use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
};
mod resp;
mod resp_result;Rust has built-in support for testing [docs]. You can add a test module at the bottom of any source file using #[cfg(test)], which tells the compiler to include this module only when running tests. Each test function is annotated with #[test], and you can run all tests with cargo test.
This is one of the things that makes Rust particularly pleasant for TDD: the test infrastructure is part of the language itself, with no external dependencies or configuration needed.
That said, TDD in Rust feels different from TDD in interpreted languages like Python. In Python, the classic red-green-refactor cycle goes smoothly because the language is dynamic: you can write a test that calls a function that doesn't exist yet, run the test, and see it fail with a clear error. In Rust, that same test won't even compile. This means the "red" phase in Rust is often a compilation error rather than a test failure, and you need to write at least a stub of the function before you can run anything.
The difference goes deeper when it comes to mocking. In dynamic languages, mock libraries rely on runtime introspection to replace methods, intercept calls, and verify behaviour. Python's unittest.mock, for example, can monkey-patch virtually any object at runtime. Rust has no runtime reflection, so none of these techniques are available. Instead, mocking in Rust requires an upfront design decision: dependencies must be abstracted behind traits so that a mock implementation can be substituted at compile time. Libraries like mockall docs use procedural macros to generate mock structs from trait definitions, but the fundamental approach is very different from dynamic mocking.
In practice, this means that Rust code tends to be tested more through direct unit tests of pure functions (as we will do in this chapter) and through integration tests, rather than through heavy use of mocks and stubs.
Now let's write our first test. Since RESP separates elements with \r\n, we want to build a function called binary_extract_line that accepts a byte buffer and a mutable index, and returns the bytes before the \r\n separator. After the call, the index should point past the separator.
src/resp.rs #[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_binary_extract_line() {
let buffer = "OK\r\n".as_bytes();
let mut index: usize = 0;
let output = binary_extract_line(buffer, &mut index).unwrap();
assert_eq!(output, "OK".as_bytes());
assert_eq!(index, 4);
}
}Please note a few things about this test.
src/resp.rs #[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_binary_extract_line() {
let buffer = "OK\r\n".as_bytes(); 1
let mut index: usize = 0; 2
let output = binary_extract_line(buffer, &mut index).unwrap();
assert_eq!(output, "OK".as_bytes()); 3
assert_eq!(index, 4); 4
}
}The buffer is a byte slice &[u8], and we create it from a string slice with .as_bytes() 1. The index is 0 2 because we start extracting from the beginning of the slice. After the call, we expect output to be the bytes for OK (without the \r\n) 3, and index to be 4 (past O, K, \r, \n) 4.
If you run cargo test, you will see that the code does not compile because binary_extract_line doesn't exist. This is the correct first stage in TDD: the test exists and fails.
warning: unused import: `super::*`
--> src/resp.rs:3:9
|
3 | use super::*;
| ^^^^^^^^
|
= note: `#[warn(unused_imports)]` on by default
error[E0425]: cannot find function `binary_extract_line` in this scope
--> src/resp.rs:9:22
|
9 | let output = binary_extract_line(buffer, &mut index).unwrap();
| ^^^^^^^^^^^^^^^^^^^ not found in this scope
For more information about this error, try `rustc --explain E0425`.Now we need to come up with an implementation of the function binary_extract_line. We should write the simplest non-trivial implementation that makes our test pass. Here, trivial means a solution that is bespoke, created in order to pass the test but not to solve the general problem.
However, this is the first time we look at tests in this book. Therefore, it is worth implementing a trivial solution and discussing it.
The test requires us to read the first two bytes O and K and to skip the following two \r and \n. In this specific case, a trivial solution would do precisely that, with hard coded values.
src/resp.rs fn binary_extract_line(buffer: &[u8], index: &mut usize) -> Result<Vec<u8>, ()> {
*index += 4;
Ok(Vec::from(&buffer[0..2]))
}If you run cargo test you will see the following output
running 1 test
test resp::tests::test_binary_extract_line ... okwhich indicates that the code is correct.
However, it's pretty clear that the current code of binary_extract_line is not generic enough to be the right solution, and we can expose that with a new test.
src/resp.rs mod tests {
#[test]
fn test_binary_extract_line_longer_string() {
let buffer = "ECHO\r\n".as_bytes();
let mut index: usize = 0;
let output = binary_extract_line(buffer, &mut index).unwrap();
assert_eq!(output, "ECHO".as_bytes());
assert_eq!(index, 6);
}The output of cargo test is in this case
running 2 tests
test resp::tests::test_binary_extract_line ... ok
test resp::tests::test_binary_extract_line_longer_string ... FAILEDfollowed by an explanation of the failure
---- resp::tests::test_binary_extract_line_longer_string stdout ----
thread 'resp::tests::test_binary_extract_line_longer_string' panicked at src/resp.rs:30:9:
assertion `left == right` failed
left: [69, 67]
right: [69, 67, 72, 79]The values [69, 67, 72, 79] are the decimal ASCII codes for the letters E, C, H, O, so the test error is showing us that the function reads only EC instead of ECHO, which is coherent with the trivial implementation we provided.
We clearly need a more generic solution to the problem. Instead of using hard coded values, we need to scan the buffer looking for the string terminator \r\n.
I highly recommend you try to write your own code that passes the test before having a look at the proposed solution, as this is the best way to learn anything. Besides, my solution might not be the best!
This is my solution:
src/resp.rs use crate::resp_result::RESPResult;
// Extracts bytes from the buffer until a `\r\n` is reached.
fn binary_extract_line(buffer: &[u8], index: &mut usize) -> RESPResult<Vec<u8>> {
// The extracted bytes will be stored in this vector.
let mut output = Vec::new();
// Since the terminator \r\n is made of two characters,
// we keep track of the previous element we read.
// We start with a clone of the initial element of
// the buffer.
let mut previous_elem: u8 = buffer[*index].clone();
// We don't want to change the index until
// we are sure the function succeeds.
// This is a temporary index to keep track
// of what we are reading in the buffer.
let mut final_index: usize = *index;
// Scan the whole buffer looking for \r\n.
// Keep track of the current index and
// of the previous element.
for &elem in buffer[*index..].iter() {
// Advance the index, consider the
// current element valid.
final_index += 1;
// Check if we just passed the terminator \r\n.
if elem == b'\n' && previous_elem == b'\r' {
break;
}
// Store the current element for the next loop.
previous_elem = elem.clone();
}
// Copy the bytes from the buffer to the output vector.
// Skip the final \r\n removing two bytes.
output.extend_from_slice(&buffer[*index..final_index - 2]);
// Make sure the index passed to the function
// is updated with the final position,
// including the terminator \r\n.
*index = final_index;
Ok(output)
}The idea of this function is very simple, despite the apparent complexity of the code. The input is a byte slice buffer: &[u8], and we need to transfer bytes from it to a Vec<u8> until we find the two characters \r\n.
When the terminator of a sequence is a single character (like for example \0 for strings in the C language), the implementation of any copy function is pretty straightforward. Here, the terminator is made of two characters, which makes the solution slightly more complicated. It is not enough to reach a \r, as we need to be sure that it is followed by a \n.
src/resp.rs fn binary_extract_line(buffer: &[u8], index: &mut usize) -> RESPResult<Vec<u8>> {
// Scan the whole buffer looking for \r\n.
// Keep track of the current index and
// of the previous element.
for &elem in buffer[*index..].iter() {
// Advance the index, consider the
// current element valid.
final_index += 1;
// Check if we just passed the terminator \r\n.
if elem == b'\n' && previous_elem == b'\r' { 1
break;
}
// Store the current element for the next loop.
previous_elem = elem.clone();
}
// Copy the bytes from the buffer to the output vector.
// Skip the final \r\n removing two bytes.
output.extend_from_slice(&buffer[*index..final_index - 2]); 2
// Make sure the index passed to the function
// is updated with the final position,
// including the terminator \r\n.
*index = final_index;
Ok(output)
}As it is much simpler to keep track of what we read in the previous loop than to look ahead, the code reads every byte until we reach \n 1, and when we do we look back to check if the previous byte was a \r. The variable previous_elem is there to cache the last byte read precisely for this reason.
As you will notice, the loop doesn't copy a byte at each iteration. The reason is that it is pointless to copy bytes before knowing if the function succeeds. So, the loop tries to find the final index, and when that is done proceeds to copy the bytes between the initial index and the final one 2. Since the result should not contain the terminator \r\n, we stop two bytes before the final index.
Let's run the test to check that the solution works.
$ cargo test
running 2 tests
test resp::tests::test_binary_extract_line ... ok
test resp::tests::test_binary_extract_line_longer_string ... okEven if you are an experienced coder, it might take you several attempts to write a function that first compiles and then actually passes the test. Remember that what you see here looks very tidy, but has cost me many attempts!
The code written so far passes the happy path test. We give this name to all tests that check that a component works in the intended way.
However, we also need to verify that the code behaves properly when things don't go well. Usually, for each happy path there are several unhappy paths. For example, in this case we should ask ourselves what happens if we call this function with an empty buffer, with a buffer that doesn't contain \r\n, or with a buffer that contains just \n?
Let's use tests to develop the error handling.
Broadly speaking, you generally have a small number of tests checking the happy path, while the number of unhappy path tests will be higher. The number of ways things can go wrong is usually much higher than the number of correct solutions.
A particularly important subset of happy paths is that represented by edge cases. These are all the conditions that should lead to a successful result but are not the generic case. For example, an algorithm that calculates the sum of an array of integers should test what happens when the array is empty, which is arguably a non-generic condition that could cause issues if the code is written in a certain way.
When the buffer is empty we should get an error. Let's write a test for that
src/resp.rs mod tests {
#[test]
// Test that the function binary_extract_line
// returns the correct error when we try to
// read an empty buffer.
fn test_binary_extract_line_empty_buffer() {
let buffer = "".as_bytes();
let mut index: usize = 0;
match binary_extract_line(buffer, &mut index) {
Err(RESPError::OutOfBounds(index)) => {
assert_eq!(index, 0);
}
_ => panic!(),
}
}Rust tests should panic when they fail. This is the behaviour of the macro assert_eq!, so we need to do the same in the match statement.
This test expects an OutOfBounds error, which doesn't exist yet, so the compilation will fail. We need to add the error as a variant of RESPError and implement fmt::Display so the error can be printed.
src/resp_result.rs use std::fmt;
#[derive(Debug)]
pub enum RESPError {}
pub enum RESPError {
OutOfBounds(usize),
}
impl fmt::Display for RESPError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RESPError::OutOfBounds(index) => write!(f, "Out of bounds at index {}", index),
}
}
}Rust is not an object-oriented programming language, but some constructs can definitely be compared to equivalent OOP ones. Rust structs and enums, for example, cannot inherit from a parent structure, but we can write an implementation that adds methods to the data type, in a way that reminds of classes.
One of the most powerful concepts in Rust is that of a trait [docs]. For those coming from object-oriented languages, traits are halfway between interfaces and mixins.
As interfaces, they can be used in function prototypes to represent types, with the rich trait bound syntax impl SomeTrait (or its longer form <T: SomeTrait> after the function name). As mixins, they can behave like "classes without data", providing a default method implementation that can be attached to an existing type with an empty impl block or with #[derive].
A lot of language features like automatic type conversion depend on traits, so it's highly recommended to become familiar with the concept and its syntax.
Now that the error is present, the test will compile and run. However, it will fail as the function binary_extract_line doesn't contain any checks. Let's add them.
src/resp.rs use crate::resp_result::RESPResult;
use crate::resp_result::{RESPError, RESPResult};
// Extracts bytes from the buffer until a `\r\n` is reached.
fn binary_extract_line(buffer: &[u8], index: &mut usize) -> RESPResult<Vec<u8>> {
// The extracted bytes will be stored in this vector.
let mut output = Vec::new();
// Exit with an error if we are trying to read
// after the end of the buffer.
if *index >= buffer.len() {
return Err(RESPError::OutOfBounds(*index));
}
// Since the terminator \r\n is made of two characters,
// we keep track of the previous element we read.
// We start with a clone of the initial element of
// the buffer.
let mut previous_elem: u8 = buffer[*index].clone();
...There's another case to handle: what if the buffer doesn't contain \r\n at all? For example, "OK" without a separator. Currently our loop will reach the end of the buffer without finding the separator, and blindly try to extract data. We need a flag that tells us whether the separator was found.
src/resp.rs mod tests {
#[test]
// Test that the function binary_extract_line
// returns the correct error when we try to
// read a buffer that doesn't contain the
// terminator \r\n.
fn test_binary_extract_line_no_separator() {
let buffer = "OK".as_bytes();
let mut index: usize = 0;
match binary_extract_line(buffer, &mut index) {
Err(RESPError::OutOfBounds(index)) => {
assert_eq!(index, 2);
}
_ => panic!(),
}
}To make this pass, we add a separator_found flag and check it after the loop
src/resp.rs // Extracts bytes from the buffer until a `\r\n` is reached.
fn binary_extract_line(buffer: &[u8], index: &mut usize) -> RESPResult<Vec<u8>> {
// The extracted bytes will be stored in this vector.
let mut output = Vec::new();
// Exit with an error if we are trying to read
// after the end of the buffer.
if *index >= buffer.len() {
return Err(RESPError::OutOfBounds(*index));
}
// Since the terminator \r\n is made of two characters,
// we keep track of the previous element we read.
// We start with a clone of the initial element of
// the buffer.
let mut previous_elem: u8 = buffer[*index].clone();
// A flag to signal that we found the characters \r\n.
let mut separator_found: bool = false;
// We don't want to change the index until
// we are sure the function succeeds.
// This is a temporary index to keep track
// of what we are reading in the buffer.
let mut final_index: usize = *index;
// Scan the whole buffer looking for \r\n.
// Keep track of the current index and
// of the previous element.
for &elem in buffer[*index..].iter() {
// Advance the index, consider the
// current element valid.
final_index += 1;
// Check if we just passed the terminator \r\n.
if elem == b'\n' && previous_elem == b'\r' {
// Toggle the flag to signal that all is good.
separator_found = true;
break;
}
// Store the current element for the next loop.
previous_elem = elem.clone();
}
// If the previous element is not \n
// we are out of bounds
if !separator_found {
*index = final_index;
return Err(RESPError::OutOfBounds(*index));
}
// Copy the bytes from the buffer to the output vector.
// Skip the final \r\n removing two bytes.
output.extend_from_slice(&buffer[*index..final_index - 2]);
// Make sure the index passed to the function
// is updated with the final position,
// including the terminator \r\n.
*index = final_index;
Ok(output)
}
In this section we will go slightly faster, adding several tests and the required code to pass them. As you saw in the previous sections, the process is always the same: identify a missing feature or a bug, write a test, change the code to pass the test (if needed).
We need to check what happens if we pass an index that is too advanced into the buffer.
src/resp.rs mod tests {
#[test]
// Test that the function binary_extract_line
// returns the correct error when we try to
// read bytes starting with an index that is
// already greater than the length of the buffer.
fn test_binary_extract_line_index_too_advanced() {
let buffer = "OK".as_bytes();
let mut index: usize = 1;
match binary_extract_line(buffer, &mut index) {
Err(RESPError::OutOfBounds(index)) => {
assert_eq!(index, 2);
}
_ => panic!(),
}
}This test passes with the current version of the code.
We need to check what happens if the separator is incomplete: we can have only \r (half separator) or just \n instead of \r\n (incorrect separator).
src/resp.rs mod tests {
#[test]
// Test that the function binary_extract_line
// returns the correct error when we try to
// read a buffer that contains only \r.
fn test_binary_extract_line_half_separator() {
let buffer = "OK\r".as_bytes();
let mut index: usize = 0;
match binary_extract_line(buffer, &mut index) {
Err(RESPError::OutOfBounds(index)) => {
assert_eq!(index, 3);
}
_ => panic!(),
}
}
#[test]
// Test that the function binary_extract_line
// returns the correct error when we try to
// read a buffer that contains only \n.
fn test_binary_extract_line_incorrect_separator() {
let buffer = "OK\n".as_bytes();
let mut index: usize = 0;
match binary_extract_line(buffer, &mut index) {
Err(RESPError::OutOfBounds(index)) => {
assert_eq!(index, 3);
}
_ => panic!(),
}
}Even these tests pass without changes to the code.
Please note the pattern in all the error tests: each one verifies both that the right error variant is returned and that the index is updated to reflect where parsing stopped. This is important for debugging, as it tells the caller exactly where the problem occurred.
$ cargo test
running 6 tests
test resp::tests::test_binary_extract_line_empty_buffer ... ok
test resp::tests::test_binary_extract_line_half_separator ... ok
test resp::tests::test_binary_extract_line ... ok
test resp::tests::test_binary_extract_line_incorrect_separator ... ok
test resp::tests::test_binary_extract_line_no_separator ... ok
test resp::tests::test_binary_extract_line_index_too_advanced ... ok
test result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00sStrings are a surprisingly complicated topic in computer science. While binary values are relatively simple once we agree on the definition of byte, strings can be implemented in several different ways. In particular, programming languages differ in the way they represent strings in memory, and in how they deal with encoding.
When it comes to Rust, it's important to understand the two types of strings String [docs] and &str [docs]. As for the encoding, all modern languages embraced Unicode and UTF-8, so once again these are concepts one should be at least superficially familiar with.
You can learn more about Rust Strings and UTF-8 in the the Rust book.
In certain cases we know for sure that the RESP content is meant to represent UTF-8 data, which means that it will be useful to have a function that extracts binary data and converts it into a Rust String.
The test for such a function is
src/resp.rs mod tests {
#[test]
// Test that the function binary_extract_line_as_string
// correctly converts a buffer with a terminator
// into a String.
fn test_binary_extract_line_as_string() {
let buffer = "OK\r\n".as_bytes();
let mut index: usize = 0;
let output = binary_extract_line_as_string(buffer, &mut index).unwrap();
assert_eq!(output, String::from("OK"));
assert_eq!(index, 4);
}and a simple solution that passes the test is
src/resp.rs // Extracts bytes from the buffer until a `\r\n` is reached and converts them into a string.
pub fn binary_extract_line_as_string(buffer: &[u8], index: &mut usize) -> RESPResult<String> {
// Extract all possible bytes updating the index.
let line = binary_extract_line(buffer, index)?;
// Convert the bytes to a UTF-8 String.
Ok(String::from_utf8(line)?)
}The function String::from_utf8() [docs] returns Result<String, FromUtf8Error>, so we need to handle that error. The code String::from_utf8(line)? is at the moment incompatible with the return type RESPResult<String> that contains a RESPError.
We can implement the trait From that implicitly converts FromUtf8Error into RESPError
src/resp_result.rs use std::fmt;
use std::string::FromUtf8Error;
#[derive(Debug)]
pub enum RESPError {
FromUtf8,
OutOfBounds(usize),
}
impl fmt::Display for RESPError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RESPError::FromUtf8 => write!(f, "Cannot convert from UTF-8"),
RESPError::OutOfBounds(index) => write!(f, "Out of bounds at index {}", index),
}
}
}
impl From<FromUtf8Error> for RESPError {
fn from(_err: FromUtf8Error) -> Self {
Self::FromUtf8
}
}
pub type RESPResult<T> = Result<T, RESPError>;Whenever data is passed to a function, Rust checks that the data type corresponds to the function signature. If it doesn't, Rust tries to perform a conversion between the two data types using the method from, which is provided by a specific trait From<T>. As we see in the code above, this is extremely useful when it comes to dealing with discrepancies between return types.
We can also verify that the function correctly returns an error when the buffer contains bytes that are not valid UTF-8.
src/resp.rs mod tests {
#[test]
// Test that the function binary_extract_line_as_string
// returns the correct error when we try to
// read a buffer that contains an invalid UTF-8
// sequence of bytes.
fn test_binary_extract_line_as_string_invalid_utf8() {
let buffer: Vec<u8> = vec![0xFF, 0xFE, b'\r', b'\n'];
let mut index: usize = 0;
let error = binary_extract_line_as_string(&buffer, &mut index).unwrap_err();
assert_eq!(error, RESPError::FromUtf8);
}The bytes 0xFF and 0xFE are not valid in any UTF-8 sequence, so String::from_utf8 fails and the From<FromUtf8Error> implementation we wrote converts the error into RESPError::FromUtf8.
This test won't compile unless we add PartialEq to RESPError. This provides a way to compare error values during the tests (in assert_eq!)
src/resp_result.rs #[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum RESPError {
FromUtf8,
OutOfBounds(usize),
}The bytes 0xFF and 0xFE are outside the encoding scheme of UTF-8. They are actually the bytes used by UTF-16 to indicate the byte order (little- or big-endian) and are thus safe to use in any test that checks code that deals with UTF-8.
RESP types are always prefixed with a specific character (+ for simple strings, $ for bulk strings, * for arrays), so we need something that checks and removes the prefix before we can parse the rest of the data.
We could just skip the first byte, but it's better to verify that the buffer contains exactly what we expect. Two tests should be sufficient to check both the correct behaviour and the error handling.
src/resp.rs mod tests {
#[test]
// Test that the function resp_remove_type
// checks and returns the given type from a buffer
// and updates the index.
fn test_resp_remove_type() {
let buffer = "+OK\r\n".as_bytes();
let mut index: usize = 0;
resp_remove_type('+', buffer, &mut index).unwrap();
assert_eq!(index, 1);
}
#[test]
// Test that the function resp_remove_type
// returns the correct error when we try to
// read a given type from a buffer that
// contains a different type.
fn test_resp_remove_type_error() {
let buffer = "*OK\r\n".as_bytes();
let mut index: usize = 0;
let error = resp_remove_type('+', buffer, &mut index).unwrap_err();
assert_eq!(index, 0);
assert_eq!(error, RESPError::WrongType);
}A function that passes both tests is
src/resp.rs // Checks that the first character of a RESP buffer is the given one and removes it.
pub fn resp_remove_type(value: char, buffer: &[u8], index: &mut usize) -> RESPResult<()> {
// Check if the character at the given position
// of the buffer corresponds to the provided value.
if buffer[*index] != value as u8 {
return Err(RESPError::WrongType);
}
// Increment the index to skip the type character.
*index += 1;
Ok(())
}The code relies on the error RESPError::WrongType, so we must add that variant and the relative implementation of fmt::Display.
src/resp_result.rs use std::fmt;
use std::string::FromUtf8Error;
#[derive(Debug, PartialEq)]
pub enum RESPError {
FromUtf8,
OutOfBounds(usize),
WrongType,
}
impl fmt::Display for RESPError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RESPError::FromUtf8 => write!(f, "Cannot convert from UTF-8"),
RESPError::OutOfBounds(index) => write!(f, "Out of bounds at index {}", index),
RESPError::WrongType => write!(f, "Wrong prefix for RESP type"),
}
}
}
impl From<FromUtf8Error> for RESPError {
fn from(_err: FromUtf8Error) -> Self {
Self::FromUtf8
}
}
pub type RESPResult<T> = Result<T, RESPError>;As you can see, we picked up the pace. Remember that behind the scenes code is never written in one go, unless it's trivial, and that it might take several attempts to find the correct solution.
Also, remember that what you see here is my solution. The code works correctly, but this doesn't mean what you see here is the only or even the most correct way to solve the problem.
With the utility functions in place, we can now start parsing actual RESP types. Let's define an enum called RESP that will represent the different data types and start with the simplest one: a simple string.
src/resp.rs #[derive(Debug, PartialEq)]
pub enum RESP {
SimpleString(String),
}A simple string like +OK\r\n is parsed by removing the + prefix and then reading the rest of the line as a string of UTF-8 characters. The test for this operation is
src/resp.rs mod tests {
#[test]
// Test that the function parse_simple_string
// returns the correct RESP variant when
// we read a buffer that contains a
// RESP simple string.
fn test_parse_simple_string() {
let buffer = "+OK\r\n".as_bytes();
let mut index: usize = 0;
let output = parse_simple_string(buffer, &mut index).unwrap();
assert_eq!(output, RESP::SimpleString(String::from("OK")));
assert_eq!(index, 5);
}The function parse_simple_string is at this point a straightforward composition of the utilities we just created.
src/resp.rs // Parse a simple string in the form `+VALUE\r\n`.
fn parse_simple_string(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
// Remove the type for a simple string.
resp_remove_type('+', buffer, index)?;
// Read all possible bytes and convert them into UTF-8.
let line: String = binary_extract_line_as_string(buffer, index)?;
// Add the String to a custom RESP type.
Ok(RESP::SimpleString(line))
}This is where the investment in utility functions starts to pay off: parse_simple_string is just three lines of code because all the hard work of reading bytes, checking boundaries, and converting to strings is already done.
Strictly speaking, we should check that parse_simple_string returns an error when the input buffer causes resp_remove_type to fail (the type is not present, the type is different from +), and explore all the possible conditions that cause binary_extract_line_as_string to fail.
However, as these conditions have been tested on those specific functions, it seems overkill to test them all again. In languages with high introspection like Python, you can actually mock functions on the fly and check that they are called during the test, which is a good solution to avoid repeating all tests.
In Rust, we need to be more pragmatic, due to the characteristics of the language. I would personally advise against trying to achieve this type of coverage, but it's ultimately up to you if you want to go down that route.
Let's take a quick break from the implementation of RESP types and use RESP::SimpleString to send the response to PING. It's a small change, but it will give us a sense of the link between what we are doing with RESP and the overall system.
src/main.rs use crate::resp::RESP;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
};
async fn handle_connection(mut stream: TcpStream) {
loop {
// Read from the stream into the buffer.
match stream.read(&mut buffer).await {
// If the stream returned some data,
// process the request.
Ok(size) if size != 0 => {
// Hardcoded response.
let response = "+PONG\r\n";
// Hardcoded response using a specific variant.
let response = RESP::SimpleString(String::from("PONG"));
// Write the response to the stream.
if let Err(e) = stream.write_all(response.as_bytes()).await {
if let Err(e) = stream.write_all(response.to_string().as_bytes()).await {
eprintln!("Error writing to socket: {}", e);
}
}
...This code has an issue: the type RESP cannot be converted to a String with to_string() because it doesn't implement the proper trait. Let's add this feature.
src/resp.rs use crate::resp_result::{RESPError, RESPResult};
use std::fmt;
#[derive(Debug, PartialEq)]
pub enum RESP {
SimpleString(String),
}
impl fmt::Display for RESP {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let data = match self {
Self::SimpleString(data) => format!("+{}\r\n", data),
};
write!(f, "{}", data)
}
}The Display trait converts a RESP value back into its wire format. This is what the server will send over TCP to the client.
To provide the code that converts a type into a string we implement two different traits:
Into<String>, when implemented, is called automatically by Rust to convert a type into a String, and is considered a general-purpose conversion trait.fmt:Display, automatically implements the trait ToString as well. This trait is usually considered more suitable for strings that have to be printed on screen.While you can implement either, the Rust documentation [docs] says "Prefer implementing the Display trait for a type, rather than ToString". The reason is that Display provides a blanket implementation of ToString (so you get .to_string() for free), but it also makes the type work with formatting macros like format!() and println!(). Implementing ToString directly would only give you the former.
Now that the basic functions have been created we can move to a higher level. We eventually need to parse a generic RESP buffer, so we should implement a function that accepts a binary slice and returns the right variant of RESP.
The idea is to have a router function that inspects the first byte and returns the appropriate parsing function, and a dispatcher that calls it.
src/resp.rs // Select the correct parsing function according to the data type found
// in the buffer at the given index.
fn parser_router(
buffer: &[u8],
index: &mut usize,
) -> Option<fn(&[u8], &mut usize) -> RESPResult<RESP>> {
// Get the character at the current index
// and associate it with a parsing function.
match buffer[*index] {
b'+' => Some(parse_simple_string),
_ => None,
}
}
// Parse the bytes of the buffer at the given index and return a RESP data type.
pub fn bytes_to_resp(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
// Call the selected parsing function and
// manage the result.
match parser_router(buffer, index) {
Some(parse_func) => {
let result: RESP = parse_func(buffer, index)?;
Ok(result)
}
None => Err(RESPError::Unknown),
}
}The function parser_router returns one of the parsing functions according to the initial character in the buffer. The function bytes_to_resp uses the returned function to parse the buffer and return the result. This is done mainly to simplify the code by separating responsibilities and having shorter, more essential functions.
It's interesting to highlight the return type of parser_router:
src/resp.rs // Select the correct parsing function according to the data type found
// in the buffer at the given index.
fn parser_router(
buffer: &[u8],
index: &mut usize,
) -> Option<fn(&[u8], &mut usize) -> RESPResult<RESP>> {
...At a first glance, it looks very complicated (as is often the case with Rust types), so let's split it into smaller components:
Option<TYPE> is something we used before. It's an enum that contains either Some(TYPE) or None.fn(ARGTYPES) -> OUTTYPE is a function signature. This way, we tell Rust that what we are dealing with is a function (and not pure data). When we describe a function in Rust, we do it though the type of its arguments (ARGTYPES) and its output OUTTYPE.&[u8], &mut usize is the list of types passed to the function as arguments.RESPResult<RESP> is the type of the function output.The signature of the returned function comes directly from the definition of parse_simple_string:
src/resp.rs // Parse a simple string in the form `+VALUE\r\n`.
fn parse_simple_string(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
// Remove the type for a simple string.
resp_remove_type('+', buffer, index)?;
// Read all possible bytes and convert them into UTF-8.
let line: String = binary_extract_line_as_string(buffer, index)?;
// Add the String to a custom RESP type.
Ok(RESP::SimpleString(line))
}Using functions as data is a feature that exists in programming languages since their inception. At machine code level, data and instructions are just bytes in the same memory, and early programmers exploited this to craft techniques like self-modifying code and jump tables that felt like pure magic.
In C, function pointers allow us to pass functions as arguments like any other value, but in Rust we need to be a bit more explicit and declare precisely what we are doing.
Rust doesn't fall cleanly into either the category of object-oriented or functional languages, but borrows interesting features from both. On the OOP side, traits and impl blocks provide polymorphism and encapsulation. On the functional side, closures, iterators, and pattern matching encourage a style where data flows through transformations rather than being mutated in place.
We will extend parser_router each time we add a new RESP type. For now, it only recognises simple strings.
When the content of the RESP buffer cannot be parsed we return a specific error that has to be added to the RESPError enum.
src/resp_result.rs #[derive(Debug, PartialEq)]
pub enum RESPError {
FromUtf8,
OutOfBounds(usize),
Unknown,
WrongType,
}
impl fmt::Display for RESPError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RESPError::FromUtf8 => write!(f, "Cannot convert from UTF-8"),
RESPError::OutOfBounds(index) => write!(f, "Out of bounds at index {}", index),
RESPError::Unknown => write!(f, "Unknown format for RESP string"),
RESPError::WrongType => write!(f, "Wrong prefix for RESP type"),
}
}
}The tests for this code are the following. Once again, please note that we are not testing the calls with mocks, but actually performing an end-to-end test on the full process, router included.
src/resp.rs mod tests {
#[test]
// Test the parsing process end-to-end, checking
// that the function bytes_to_resp can parse
// a buffer with a RESP simple string.
fn test_bytes_to_resp_simple_string() {
let buffer = "+OK\r\n".as_bytes();
let mut index: usize = 0;
let output = bytes_to_resp(buffer, &mut index).unwrap();
assert_eq!(output, RESP::SimpleString(String::from("OK")));
assert_eq!(index, 5);
}
#[test]
// Test the parsing process end-to-end, checking
// that the function bytes_to_resp can returns
// the correct error when used to parse a
// buffer with an unknown data type.
fn test_bytes_to_resp_unknown() {
let buffer = "?OK\r\n".as_bytes();
let mut index: usize = 0;
let error = bytes_to_resp(buffer, &mut index).unwrap_err();
assert_eq!(error, RESPError::Unknown);
assert_eq!(index, 0);
}The main mantra of TDD is "test first, code later", so we are always supposed to write a test, see it fail, and write the minimal amount of code that makes the test pass. This way, coverage is always very high (typically 95%-100%) and the process forces us to write small chunks of well-organised code.
Unfortunately, due to the nature of Rust that we mentioned several times, a strongly typed compiled language with low introspection, we cannot follow the process strictly. From now on, tests will be provided alongside code, but they won't always be presented in the order mentioned above. Behind the scenes, my process is a mixture of pure TDD and traditional development, and I don't think it's useful to highlight every single step of what is, ultimately, a very personal approach.
Before we can parse bulk strings, we need one more utility function. While binary_extract_line reads bytes until \r\n, bulk strings provide the length of the data upfront. So we need a function that reads exactly length bytes from the buffer.
src/resp.rs // Extracts a given amount of bytes from the buffer.
fn binary_extract_bytes(buffer: &[u8], index: &mut usize, length: usize) -> RESPResult<Vec<u8>> {
// The output vector that will contain the
// extracted bytes.
let mut output = Vec::new();
// Check if we are allowed to read
// the requested amount of bytes.
if *index + length > buffer.len() {
return Err(RESPError::OutOfBounds(buffer.len()));
}
// Copy the bytes from the buffer into
// the output vector.
output.extend_from_slice(&buffer[*index..*index + length]);
// Update the index.
*index += length;
Ok(output)
}This is simpler than binary_extract_line because we know exactly how many bytes to read, so there's no scanning involved.
src/resp.rs mod tests {
#[test]
// Test that the function binary_extract_bytes
// correctly extracts the requested number of bytes
// from a buffer, updating the index.
fn test_binary_extract_bytes() {
let buffer = "SOMEBYTES".as_bytes();
let mut index: usize = 0;
let output = binary_extract_bytes(buffer, &mut index, 6).unwrap();
assert_eq!(output, "SOMEBY".as_bytes().to_vec());
assert_eq!(index, 6);
}
#[test]
// Test that the function binary_extract_bytes
// returns the correct error when we try to
// read more bytes than are available in the buffer.
fn test_binary_extract_bytes_out_of_bounds() {
let buffer = "SOMEBYTES".as_bytes();
let mut index: usize = 0;
let error = binary_extract_bytes(buffer, &mut index, 10).unwrap_err();
assert_eq!(error, RESPError::OutOfBounds(9));
assert_eq!(index, 0);
}A RESP bulk string looks like $5\r\nHELLO\r\n. After the $ prefix comes the length of the string, then \r\n, then the string data, and a final \r\n. A special case is the null string, represented as $-1\r\n.
Let's first add the new variants to the RESP enum. We need BulkString for regular bulk strings and Null for the empty case.
src/resp.rs #[derive(Debug, PartialEq)]
pub enum RESP {
BulkString(String),
Null,
SimpleString(String),
}
impl fmt::Display for RESP {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let data = match self {
Self::BulkString(data) => format!("${}\r\n{}\r\n", data.len(), data),
Self::Null => String::from("$-1\r\n"),
Self::SimpleString(data) => format!("+{}\r\n", data),
};
write!(f, "{}", data)
}
}To get the length of the data contained in a RESP bulk string we need to parse the part of the buffer that contains it. As the length ends with \r\n we can reuse binary_extract_line_as_string for this purpose and convert the string into a numeric value.
src/resp.rs // Extracts a single line from a RESP buffer and interprets it as length.
// The type used for the number is RESPLength.
pub fn resp_extract_length(buffer: &[u8], index: &mut usize) -> RESPResult<RESPLength> {
// Extract all bytes until a terminator is found
// and transform them into a String.
let line: String = binary_extract_line_as_string(buffer, index)?;
// Convert the String into RESPLength.
let length: RESPLength = line.parse()?;
Ok(length)
}This relies on the type RESPLength that we can define in src/resp_result.rs and import into src/resp.rs
src/resp_result.rs pub type RESPResult<T> = Result<T, RESPError>;
pub type RESPLength = i32;src/resp.rs use crate::resp_result::{RESPError, RESPResult};
use crate::resp_result::{RESPError, RESPLength, RESPResult};We cannot use usize here. The length of a string might be -1 (null string) and usize is an unsigned integer [docs]. The conversion into RESPLength requires the trait From<ParseIntError> [docs] to be implemented for RESPError, since we are returning the error directly through the operator ?. We also need an error variant for incorrect length values (negative values that are not -1).
src/resp_result.rs use std::fmt;
use std::num;
use std::string::FromUtf8Error;
#[derive(Debug, PartialEq)]
pub enum RESPError {
FromUtf8,
IncorrectLength(RESPLength),
OutOfBounds(usize),
ParseInt,
Unknown,
WrongType,
}
impl fmt::Display for RESPError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RESPError::FromUtf8 => write!(f, "Cannot convert from UTF-8"),
RESPError::IncorrectLength(length) => write!(f, "Incorrect length {}", length),
RESPError::OutOfBounds(index) => write!(f, "Out of bounds at index {}", index),
RESPError::ParseInt => write!(f, "Cannot parse string into integer"),
RESPError::Unknown => write!(f, "Unknown format for RESP string"),
RESPError::WrongType => write!(f, "Wrong prefix for RESP type"),
}
}
}
impl From<FromUtf8Error> for RESPError {
fn from(_err: FromUtf8Error) -> Self {
Self::FromUtf8
}
}
impl From<num::ParseIntError> for RESPError {
fn from(_err: num::ParseIntError) -> Self {
Self::ParseInt
}
}
pub type RESPResult<T> = Result<T, RESPError>;With those utility functions in place, we can parse a bulk string. The function removes the type prefix, extracts the length, handles the null case, reads the right number of bytes, and converts them to a string.
src/resp.rs // Parse a bulk string in the form `$NUM\r\nVALUE\r\n`.
fn parse_bulk_string(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
// Remove the type for a bulk string.
resp_remove_type('$', buffer, index)?;
// Read bytes from the buffer and interpret them
// as the length of the bulk string.
let length = resp_extract_length(buffer, index)?;
// If the length is -1 we are looking
// at an empty bulk string.
if length == -1 {
return Ok(RESP::Null);
}
// If the length is negative there is an error.
if length < -1 {
return Err(RESPError::IncorrectLength(length));
}
// Read all possible bytes from the buffer
let bytes = binary_extract_bytes(buffer, index, length as usize)?;
// Convert the bytes into UTF-8.
let data: String = String::from_utf8(bytes)?;
// Increment the index to skip the \r\n.
*index += 2;
Ok(RESP::BulkString(data))
}Please note how the function reads naturally from top to bottom, each step building on the previous one: remove the prefix, get the length, handle special cases, extract the data, convert it.
Let's test the happy path and the most important error cases
src/resp.rs mod tests {
#[test]
// Test that the function parse_bulk_string
// returns the correct RESP variant when
// we read a buffer that contains a
// RESP bulk string.
fn test_parse_bulk_string() {
let buffer = "$2\r\nOK\r\n".as_bytes();
let mut index: usize = 0;
let output = parse_bulk_string(buffer, &mut index).unwrap();
assert_eq!(output, RESP::BulkString(String::from("OK")));
assert_eq!(index, 8);
}
#[test]
// Test that the function parse_bulk_string
// returns the correct RESP variant when
// we read a buffer that contains an empty
// RESP bulk string.
fn test_parse_bulk_string_empty() {
let buffer = "$-1\r\n".as_bytes();
let mut index: usize = 0;
let output = parse_bulk_string(buffer, &mut index).unwrap();
assert_eq!(output, RESP::Null);
assert_eq!(index, 5);
}
#[test]
// Test that the function parse_bulk_string
// returns the correct error when we try to
// parse a RESP bulk string with an unparsable length.
fn test_parse_bulk_string_unparsable_length() {
let buffer = "$wrong\r\nOK\r\n".as_bytes();
let mut index: usize = 0;
let error = parse_bulk_string(buffer, &mut index).unwrap_err();
assert_eq!(error, RESPError::ParseInt);
assert_eq!(index, 8);
}
#[test]
// Test that the function parse_bulk_string
// returns the correct error when we try to
// parse a RESP bulk string with a negative
// length less than -1.
fn test_parse_bulk_string_negative_length() {
let buffer = "$-7\r\nOK\r\n".as_bytes();
let mut index: usize = 0;
let error = parse_bulk_string(buffer, &mut index).unwrap_err();
assert_eq!(error, RESPError::IncorrectLength(-7));
assert_eq!(index, 5);
}
#[test]
// Test that the function parse_bulk_string
// returns the correct error when we try to
// parse a RESP bulk string but the buffer
// doesn't contain enough bytes.
fn test_parse_bulk_string_data_too_short() {
let buffer = "$7\r\nOK\r\n".as_bytes();
let mut index: usize = 0;
let error = parse_bulk_string(buffer, &mut index).unwrap_err();
assert_eq!(error, RESPError::OutOfBounds(8));
assert_eq!(index, 4);
}As always, when it comes to low-level protocols, there are many more tests that we might add, but these should be enough for now. The repository also includes a test for unparsable length values.
Finally, let's add parse_bulk_string to parser_router
src/resp.rs // Select the correct parsing function according to the data type found
// in the buffer at the given index.
fn parser_router(
buffer: &[u8],
index: &mut usize,
) -> Option<fn(&[u8], &mut usize) -> RESPResult<RESP>> {
// Get the character at the current index
// and associate it with a parsing function.
match buffer[*index] {
b'+' => Some(parse_simple_string),
b'$' => Some(parse_bulk_string),
_ => None,
}
}with a test to check that bytes_to_resp can parse the new type
src/resp.rs mod tests {
#[test]
// Test the parsing process end-to-end, checking
// that the function bytes_to_resp can parse
// a buffer with a RESP bulk string.
fn test_bytes_to_resp_bulk_string() {
let buffer = "$2\r\nOK\r\n".as_bytes();
let mut index: usize = 0;
let output = bytes_to_resp(buffer, &mut index).unwrap();
assert_eq!(output, RESP::BulkString(String::from("OK")));
assert_eq!(index, 8);
}The final RESP type we need is the array. Arrays are more complicated than bulk strings because they are recursive, being containers of other RESP types (including other arrays).
A RESP array comes in the form *NUM_ELEMENTS\r\nELEM_1\r\nELEM_2\r\n...ELEM_N\r\n. For example: *2\r\n+OK\r\n$5\r\nVALUE\r\n, where 2 is the number of elements, which are +OK\r\n and $5\r\nVALUE\r\n. Parsing an array, however, is not too complicated. We can first read the number of elements and then loop over them calling parser_router on each element.
As before, we first add the relevant variant to the RESP enum
src/resp.rs #[derive(Debug, PartialEq)]
pub enum RESP {
Array(Vec<RESP>),
BulkString(String),
Null,
SimpleString(String),
}
impl fmt::Display for RESP {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let data = match self {
Self::Array(data) => {
let mut output = String::from("*");
output.push_str(format!("{}\r\n", data.len()).as_str());
for elem in data.iter() {
output.push_str(elem.to_string().as_str());
}
output
}
Self::BulkString(data) => format!("${}\r\n{}\r\n", data.len(), data),
Self::Null => String::from("$-1\r\n"),
Self::SimpleString(data) => format!("+{}\r\n", data),
};
write!(f, "{}", data)
}
}The parse_array function reads the array length, then calls parser_router for each element. Because the router already knows how to parse any RESP type, arrays can naturally contain simple strings, bulk strings, or even other arrays.
src/resp.rs // Parse an array in the form `*NUM\r\nELEM1\r\nELEM2\r\n...`.
fn parse_array(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
// Remove the type for an array.
resp_remove_type('*', buffer, index)?;
// Read bytes from the buffer and interpret them
// as the number of elements in the array.
let length = resp_extract_length(buffer, index)?;
// If the length is negative there is an error.
if length < 0 {
return Err(RESPError::IncorrectLength(length));
}
// The output vector that will contain the elements
// extracted from the array.
let mut data = Vec::new();
// Extract all elements.
for _ in 0..length {
// Automatically detect the type of element
// and parse it.
match parser_router(buffer, index) {
Some(parse_func) => {
// Run the selected parser function
// for the current element.
let array_element: RESP = parse_func(buffer, index)?;
// Store the parsed element in the vector.
data.push(array_element);
}
None => return Err(RESPError::Unknown),
}
}
Ok(RESP::Array(data))
}The tests are
src/resp.rs mod tests {
#[test]
// Test that the function parse_array
// returns the correct RESP variant when
// we read a buffer that contains an
// array of two elements, a simple string
// and a bulk string.
fn test_parse_array() {
let buffer = "*2\r\n+OK\r\n$5\r\nVALUE\r\n".as_bytes();
let mut index: usize = 0;
let output = parse_array(buffer, &mut index).unwrap();
assert_eq!(
output,
RESP::Array(vec![
RESP::SimpleString(String::from("OK")),
RESP::BulkString(String::from("VALUE"))
])
);
assert_eq!(index, 20);
}
#[test]
// Test that the function parse_array
// returns the correct error when we try to
// parse an array with negative length.
fn test_parse_array_invalid_length() {
let buffer = "*-1\r\n+OK\r\n$5\r\nVALUE\r\n".as_bytes();
let mut index: usize = 0;
let error = parse_array(buffer, &mut index).unwrap_err();
assert_eq!(error, RESPError::IncorrectLength(-1));
assert_eq!(index, 5);
}Finally, let's add parse_array itself to parser_router
src/resp.rs // Select the correct parsing function according to the data type found
// in the buffer at the given index.
fn parser_router(
buffer: &[u8],
index: &mut usize,
) -> Option<fn(&[u8], &mut usize) -> RESPResult<RESP>> {
// Get the character at the current index
// and associate it with a parsing function.
match buffer[*index] {
b'+' => Some(parse_simple_string),
b'$' => Some(parse_bulk_string),
b'*' => Some(parse_array),
_ => None,
}
}With a test to check the behaviour of bytes_to_resp
src/resp.rs mod tests {
#[test]
// Test the parsing process end-to-end, checking
// that the function bytes_to_resp can parse
// a buffer with a RESP array.
fn test_bytes_to_resp_array() {
let buffer = "*2\r\n+OK\r\n$5\r\nVALUE\r\n".as_bytes();
let mut index: usize = 0;
let output = bytes_to_resp(buffer, &mut index).unwrap();
assert_eq!(
output,
RESP::Array(vec![
RESP::SimpleString(String::from("OK")),
RESP::BulkString(String::from("VALUE"))
])
);
assert_eq!(index, 20);
}We can at this point refactor the part of the server that processes an incoming request, parses it, and executes the command. Redis commands are sent as arrays of bulk strings, so it's a good idea to have a generic processor of requests to simplify the addition of new commands in the future.
Let's start creating a result type for our server in a new file, src/server.rs
src/server.rs use std::fmt;
#[derive(Debug, PartialEq)]
pub enum ServerError {
CommandError,
}
impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ServerError::CommandError => write!(f, "Error while processing!"),
}
}
}
pub type ServerResult<T> = Result<T, ServerError>;For now, a single variant of ServerError will be sufficient. Later, when different error cases emerge, we will add more.
We need to add the module to main.rs to make it visible.
src/main.rs mod resp;
mod resp_result;
mod server;We can then create a function to process an incoming RESP request. This function needs to parse the incoming RESP data, figure out which command it contains, and call the right function to process it.
src/server.rs use crate::RESP;
use std::fmt;
...
// Process an incoming request and return a result.
pub fn process_request(request: RESP) -> ServerResult<RESP> {
// Check if the request is expressed using
// a RESP array and extract the elements.
let elements = match request {
RESP::Array(v) => v,
_ => {
return Err(ServerError::CommandError);
}
};
// The vector that contains all the commands we need to process.
let mut command = Vec::new();
// Check that each element of the array is a
// bulk string, extract the content, and add it
// to the vector.
for elem in elements.iter() {
match elem {
RESP::BulkString(v) => command.push(v),
_ => {
return Err(ServerError::CommandError);
}
}
}
// Match the first element of the vector (the name
// of the command) with the code that implements
// that command.
match command[0].to_lowercase().as_str() {
"ping" => Ok(RESP::SimpleString(String::from("PONG"))),
_ => {
return Err(ServerError::CommandError);
}
}
}We accept only a RESP::Array, and from it we can extract the inner Vec<RESP> and verify that each element is a RESP::BulkString. At that point the vector command contains each element of the command that was sent (e.g. ["PING"] or ["ECHO", "42"], where each element is a Rust String). We can then route the processing according to the first element of the vector.
The relative tests are
src/server.rs #[cfg(test)]
mod tests {
use super::*;
#[test]
// Test that the function process_request
// processes a PING request and that it
// responds with a PONG.
fn test_process_request_ping() {
let request = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
let output = process_request(request).unwrap();
assert_eq!(output, RESP::SimpleString(String::from("PONG")));
}
#[test]
// Test that the function process_request
// returns the correct error when it is
// given a request that doesn't contain
// a RESP array.
fn test_process_request_not_array() {
let request = RESP::BulkString(String::from("PING"));
let error = process_request(request).unwrap_err();
assert_eq!(error, ServerError::CommandError);
}
#[test]
// Test that the function process_request
// returns the correct error when it is
// given a request that contains a RESP array
// but the content of the array is not
// a bulk string.
fn test_process_request_not_bulkstrings() {
let request = RESP::Array(vec![RESP::SimpleString(String::from("PING"))]);
let error = process_request(request).unwrap_err();
assert_eq!(error, ServerError::CommandError);
}
}The last step is to call process_request from handle_connection
src/main.rs use crate::resp::RESP;
use crate::resp::{bytes_to_resp, RESP};
use crate::server::process_request;
...
async fn handle_connection(mut stream: TcpStream) {
loop {
// Read from the stream into the buffer.
match stream.read(&mut buffer).await {
// If the stream returned some data,
// process the request.
Ok(size) if size != 0 => {
// Hardcoded response using a specific variant.
let response = RESP::SimpleString(String::from("PONG"));
// Initialise the index to start at the
// beginning of the buffer.
let mut index: usize = 0;
// Process the bytes in the buffer according to
// the content and extract the request. Update the index.
let request = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
Ok(v) => v,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
};
// Process the request.
let response = match process_request(request) {
Ok(v) => v,
Err(e) => {
eprintln!("Error parsing command: {}", e);
return;
}
};
// Write the response to the stream.
if let Err(e) = stream.write_all(response.to_string().as_bytes()).await {
eprintln!("Error writing to socket: {}", e);
}
}Stage 4: Handle concurrent clients
Everything we did in this chapter was in preparation for future changes, but the last refactoring altered code that was actually used by the server. However, as we haven't changed the behaviour, this version of the code still passes Stage 4 of the CodeCrafters challenge.
All the work we have done in this chapter was meant to simplify our job when it comes to adding new commands. We will see that in action now.
Let's add the command ECHO, that will return its argument to the client as a bulk string. As the logic behind ECHO is trivial, the only thing we need to do is to add a new arm to the match construct in process_request
src/server.rs pub fn process_request(request: RESP) -> ServerResult<RESP> {
// Match the first element of the vector (the name
// of the command) with the code that implements
// that command.
match command[0].to_lowercase().as_str() {
"ping" => Ok(RESP::SimpleString(String::from("PONG"))),
"echo" => Ok(RESP::BulkString(command[1].clone())),
_ => {
return Err(ServerError::CommandError);
}
}with this test
src/server.rs mod tests {
#[test]
// Test that the function process_request
// processes an ECHO request and that it
// responds with a copy of the input.
fn test_process_request_echo() {
let request = RESP::Array(vec![
RESP::BulkString(String::from("ECHO")),
RESP::BulkString(String::from("42")),
]);
let output = process_request(request).unwrap();
assert_eq!(output, RESP::BulkString(String::from("42")));
}Stage 5: Implement the ECHO command
At this point your code should pass Stage 5 of the CodeCrafters challenge.