Initial Steps

We'll go step by step and cut off every bulkhead and every vent until we have it cornered.

Alien (1979)

The initial steps are fairly typical for an application of this kind. We will create a server that first listens on a specific TCP port. Then we will change the code to respond to a single incoming request and terminate. As a third step, the server will respond to multiple incoming requests from the same client, and finally it will serve multiple clients.

At the end of the chapter we will have a running application that can be used with the official Redis CLI tool to serve the command PING.

Step 1.1 - Bind to a port#

The first thing we need to do is to create a server that binds to TCP port 6379.

The Rust standard library provides TcpListener [docs] that can be bound to a socket.

src/main.rs
use std::net::TcpListener;

 fn main() {
    println!("Hello, world!");
    // Create the TCP listener, bound to the
    // standard Redis port.
    let listener = TcpListener::bind("127.0.0.1:6379").unwrap();

    // Process each incoming connection.
    for stream in listener.incoming() {
        match stream {
            Ok(_stream) => {
                println!("accepted new connection");
            }
            Err(e) => {
                println!("error: {}", e);
            }
        }
    }
 }

This is the classic initial step for a server. We hard-coded the address of the server and the port (127.0.0.1:6379) but in the future we might want to make both values configurable through a specific command line option.

Let's have a look at the code in detail.

src/main.rs
use std::net::TcpListener;

fn main() {
    // Create the TCP listener, bound to the
    // standard Redis port.
    let listener = TcpListener::bind("127.0.0.1:6379").unwrap(); 1

    // Process each incoming connection.
    for stream in listener.incoming() { 2
        match stream {
            Ok(_stream) => {
                println!("accepted new connection"); 3
            }
            Err(e) => {
                println!("error: {}", e);
            }
        }
    }
}
  • unwrap [docs] 1 is a crude way to extract the Ok value from a Result. It's a convenient method for tests, as it panics if the result is an error, but in production code it might not be the best solution.
  • println! [docs] 3 is how we print text on the standard output. While there are more sophisticated ways to debug programs, printing is still a good way to understand what is going on in a system, and to log events.
  • A concept implicit in the code above is that of Iterator [docs] which in this case is implemented by Incoming, that is the return type of TcpListener::incoming 2. Iterators are important concepts in every programming language, and we will see many throughout the book.

TCP protocol

You don't need to know the (rather complicated) details of the TCP protocol for this project, but as a software developer you should be familiar with IP addresses and TCP ports.

Testing with the Redis CLI#

Other than using the CodeCrafters tests you can interact with your server using the Redis CLI. Once you installed it in your system you can first run your server with

$ cargo run

or, if you are using the CodeCrafters setup, running

$ spawn_redis_server.sh

and then open a new terminal and run redis-cli

$ redis-cli
127.0.0.1:6379>

The server should react printing the message accepted new connection. You cannot issue commands at the moment, as the server ignores data received through the stream.


CodeCrafters

Stage 1: Bind to a port

The application doesn't do much at the moment, and running cargo run won't show anything interesting. However, this passes Stage 1 of the CodeCrafters challenge.

Step 1.2 - Respond to PING#

The server has to respond to incoming requests, so the next step is to start listening on the TCP connection and parse the incoming data. Redis uses a binary protocol called Redis Serialization Protocol (RESP), so to understand the request we will eventually have to implement a parser for this protocol.

As a first step, however, we can simply discard the incoming data and send back the same response regardless of the request. The simplest Redis command is PING, which receives the response +PONG\r\n. This is the string PONG encoded as a RESP simple string, as we will see later.

src/main.rs
use std::net::TcpListener;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").unwrap();

    for stream in listener.incoming() {
        match stream {
            Ok(_stream) => {
                println!("accepted new connection");
            Ok(mut stream) => {
                // The connection is valid, handle it.
                handle_connection(&mut stream);
            }
            Err(e) => {
                println!("error: {}", e);
            }
        }
    }
}

// The main entry point for valid TCP connections.
fn handle_connection(stream: &mut TcpStream) {
    // Create a buffer to host incoming data.
    let mut buffer = [0; 512];

    // Read from the stream into the buffer.
    stream.read(&mut buffer).unwrap();

    // Hardcoded response.
    let response = "+PONG\r\n";

    // Write the response to the stream.
    stream.write(response.as_bytes()).unwrap();

    // Make sure the stream is flushed.
    stream.flush().unwrap();
}

In this version we isolate the management of the incoming connections.

src/main.rs
fn main() {

    // Process each incoming connection.
    for stream in listener.incoming() {
        match stream {
            Ok(mut stream) => {
                // The connection is valid, handle it.
                handle_connection(&mut stream); 1
            }

...

// The main entry point for valid TCP connections.
fn handle_connection(stream: &mut TcpStream) {
    // Create a buffer to host incoming data.
    let mut buffer = [0; 512];

    // Read from the stream into the buffer.
    stream.read(&mut buffer).unwrap(); 2

    // Hardcoded response.
    let response = "+PONG\r\n";

    // Write the response to the stream.
    stream.write(response.as_bytes()).unwrap(); 3

    // Make sure the stream is flushed.
    stream.flush().unwrap();
}

We pass each new connection, represented by a TcpStream [docs], to the function handle_connection 1. There, we read [docs] from the stream into a binary buffer 2, then we prepare the response and write [docs] it to the stream 3. At the moment, the code still uses unwrap to manage the error cases, but in the future it will be worth handling errors more appropriately.

Iterator and Item

How do we know the type of stream? Have a look at TcpListener::incoming [docs] and you will see that the return type is the struct Incoming [docs]. This in turn implements Iterator [docs] and specifies type Item = Result<TcpStream, Error>, which leads us to TcpStream [docs].

Testing and TDD#

Testing is arguably more complicated in strongly typed compiled languages like Rust than it is in languages like Python. In particular, while it is possible to create mocks, it is in general hard to replace functions at run time. So, a strict TDD approach is, in my opinion, impossible. This forces us to apply inversion of control more often, but it's important to understand that we cannot achieve the same level of inspection that we have in languages with a higher level of abstraction.

For example, in the current case the classic strategy in a high-level OOP language would be:

  • create a mock of a TcpStream that has only the method read
  • run the function handle_connection
  • check that the mock method has been called with the right value

In Rust, however, this is more complicated. We have to pass the function a value of type TcpStream and while we can redefine TcpStream just for tests with some clever use of conditional checks like #[cfg(...)], this would clutter the production code and cannot be done for a single test only.

So, we will add tests to the code following the TDD approach, but not all cases will be covered. For now, it's perfectly fine to print messages and data to the standard output to understand what the code is doing. Here, a quick way to debug the code is to add a println! to see what the server receives from the client

src/main.rs
fn handle_connection(stream: &mut TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    println!("Received: {:?}", buffer);

    let response = "+PONG\r\n";
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

Memory and safety

A prominent part of the Rust language deals with safe memory usage, so it's paramount to understand how to use references and mutability, in particular when it comes to passing values to functions. I highly recommend Patterns Are Not Expressions by Árpád Goretity and There are no mutable parameters in Rust by Michael Snoyman.

Ownership, which stems directly from safe memory usage, is everywhere in Rust, and can arguably be a pain to deal with, sometimes. Understanding the rationale of ownership is paramount and should be done upfront, otherwise you'll spend a lot of time being frustrated by the compiler's complaints. I recommend to read at least chapter 4 of the Rust book.


CodeCrafters

Stage 2: Respond to PING

The application passes the Stage 2 of the CodeCrafters challenge. However, we can't use the Redis CLI to interact with it. The server terminates the connection immediately after sending the response, which is something the CLI tool doesn't like.

Step 1.3 - Respond to multiple PING#

The problem at this point is that the function handle_connection terminates once it processes one response, while it should keep it open, receiving bytes from the client and sending responses.

This is easily fixed by moving the function's code into a loop.

src/main.rs
// The main entry point for valid TCP connections.
fn handle_connection(stream: &mut TcpStream) {
    // Create a buffer to host incoming data.
    let mut buffer = [0; 512];

    // Read from the stream into the buffer.
    stream.read(&mut buffer).unwrap();

    // Hardcoded response.
    let response = "+PONG\r\n";

    // Write the response to the stream.
    stream.write(response.as_bytes()).unwrap();

    // Make sure the stream is flushed.
    stream.flush().unwrap();
    loop {
        // Read from the stream into the buffer.
        match stream.read(&mut buffer) {
            // If the stream returned some data,
            // process the request.
            Ok(size) if size != 0 => {
                // Hardcoded response.
                let response = "+PONG\r\n";

                // Write the response to the stream.
                stream.write(response.as_bytes()).unwrap();

                // Make sure the stream is flushed.
                stream.flush().unwrap();
            }
            // If the stream returned no data
            // the connection has been closed.
            Ok(_) => {
                println!("Connection closed");
                break;
            }
            Err(e) => {
                println!("Error: {}", e);
                break;
            }
        }
    }
}

While we do that, it is worth taking the time to properly manage the output of TcpStream::read [docs]. The function returns a Result<usize> that contains the number of bytes received and since it is a blocking function, if the amount of bytes is 0 the connection has been closed. If the result is Err, something went wrong with the stream.

It is important to understand the blocking nature of TcpStream::read. When the code executes the function the whole program sits waiting for something to happen on the TCP connection, without doing anything until data appears or an error happens.

This is clearly unacceptable for a server that has to interact with multiple clients, so in the next step we will see a solution to this problem.

Loops and pattern matching

loop [docs] is an interesting construct. In other languages infinite loops with breaks are often frowned upon, while in Rust they are not only accepted but considered an idiomatic solution.

Pattern matching [docs] is a powerful concept that is predominant in functional programming languages. It's important to learn how to deconstruct types and how to use guards. Rust is very strict about covering all possible patterns, which greatly helps to avoid bugs.

Testing with the Redis CLI#

Now that the server is actually listening you can open a new terminal, run redis-cli, and have a brief interaction with your server sending a PING command.

$ redis-cli
127.0.0.1:6379> ping
PONG

At the moment your server will respond with PONG whatever the request is, but we will soon implement other commands.

127.0.0.1:6379> echo 42
PONG
127.0.0.1:6379> 

CodeCrafters

Stage 3: Respond to multiple PINGs

The code we have at the moment passes Stage 3 of the CodeCrafters challenge. The application is not truly processing the incoming request yet, but the backbone is in place.

Step 1.4 - Handle concurrent clients#

Now the game starts to become a bit more serious, both in terms of requirements and in terms of the technical implementation. A server needs to interact with multiple clients, so we have to introduce concurrency.

There are mainly three ways to implement concurrency: multiprocessing, multithreading, and asynchronous programming. They have different pros and cons and depending on the language you might find one of them easier to implement than others.

In this case we will use asynchronous programming. The application we are implementing is mostly running I/O bound tasks and it's worth exploring how Rust's strict memory model works with async/await.

To run the asynchronous loop we will use Tokio. Let's add the crate to the project

$ cargo add tokio --features full

which will add a dependency to Cargo.toml

Cargo.toml
[dependencies]
tokio = { version = "1.51.0", features = ["full"] }|@|

Please note that the actual version of Tokio will likely be different for you. We can now import the relevant components, replacing the previous imports

src/main.rs
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream},
};

The changes to the code are minimal, reflecting the fact that the syntax of asynchronous programs tries to follow the synchronous paradigm, hiding the complexity of concurrency.

src/main.rs
// The main entry point for valid TCP connections.
fn handle_connection(stream: &mut TcpStream) {
async fn handle_connection(mut stream: TcpStream) {
    // Create a buffer to host incoming data.
    let mut buffer = [0; 512];

    loop {
        // Read from the stream into the buffer.
        match stream.read(&mut buffer) {
        match stream.read(&mut buffer).await {
            // If the stream returned some data,
            // process the request.
            Ok(size) if size != 0 => {
                // Hardcoded response.
                let response = "+PONG\r\n";

                // Write the response to the stream.
                stream.write(response.as_bytes()).unwrap();

                // Make sure the stream is flushed.
                stream.flush().unwrap();
                if let Err(e) = stream.write_all(response.as_bytes()).await {
                    eprintln!("Error writing to socket: {}", e);
                }
            }

...

There are four main changes in the function handle_connection.

src/main.rs
// The main entry point for valid TCP connections.
async fn handle_connection(mut stream: TcpStream) { 1
    // Create a buffer to host incoming data.
    let mut buffer = [0; 512];

    loop {
        // Read from the stream into the buffer.
        match stream.read(&mut buffer).await { 2
            // If the stream returned some data,
            // process the request.
            Ok(size) if size != 0 => {
                // Hardcoded response.
                let response = "+PONG\r\n";

                // Write the response to the stream.
                if let Err(e) = stream.write_all(response.as_bytes()).await { 3
                    eprintln!("Error writing to socket: {}", e);
                }
            }
            // If the stream returned no data
            // the connection has been closed.
            Ok(_) => {
                println!("Connection closed");
                break;
            }
            Err(e) => {
                println!("Error: {}", e);
                break;
            }
        }
    }
}
  • The function becomes asynchronous thanks to the word async in front of its definition 1.
  • The function takes ownership of the stream 1.
  • I/O operations are now asynchronous as well, so we need to await the result of stream.read(&mut buffer) 2.
  • The calls to write and flush have been merged into write_all, which needs to be awaited as well 3.

We also added some error management around stream.write_all that wasn't there previously. This is completely unrelated to async, as we could still use unwrap and let the server panic, but that doesn't sound like a good idea now that the code starts to evolve.

Asynchronous programming

There is a huge amount of resources available on asynchronous programming both in general and specifically to Rust. I highly recommend reading first one or more generic introductions about multiprocessing and multithreading at the operating system level.

As for Rust, threads are explained in chapter 16 of the Rust book [docs] while async has its own book [docs]. Among many others, Jon Gjengset is doing incredible work, digging deep into Rust topics on his YouTube channel. The video Crust of Rust: async/await, in particular, can answer many questions.

The core changes are in the function main. Here, we need to instantiate the asynchronous loop and create an independent handler for each connection

src/main.rs
fn main() {
#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create the TCP listener, bound to the
    // standard Redis port.
    let listener = TcpListener::bind("127.0.0.1:6379").unwrap();
    let listener = TcpListener::bind("127.0.0.1:6379").await?;

    // Process each incoming connection.
    for stream in listener.incoming() {
        match stream {
            Ok(mut stream) => {
                // The connection is valid, handle it.
                handle_connection(&mut stream);
            }
            Err(e) => {
                println!("error: {}", e);
            }
        }
    }
    loop {
        // Process each incoming connection.
        match listener.accept().await {
            // The connection is valid, handle it.
            Ok((stream, _)) => {
                // Spawn a task to take care of this connection.
                tokio::spawn(handle_connection(stream));
            }
            Err(e) => {
                println!("Error: {}", e);
                continue;
            }
        }
    }
}

Let's have a look at the changes in detail.

src/main.rs
#[tokio::main] 1
async fn main() -> std::io::Result<()> { 2
    // Create the TCP listener, bound to the
    // standard Redis port.
    let listener = TcpListener::bind("127.0.0.1:6379").await?; 3

    loop { 4
        // Process each incoming connection.
        match listener.accept().await { 5
            // The connection is valid, handle it.
            Ok((stream, _)) => {
                // Spawn a task to take care of this connection.
                tokio::spawn(handle_connection(stream)); 6
            }
            Err(e) => {
                println!("Error: {}", e);
                continue;
            }
        }
    }
}

For the function main to be async we need to decorate it with #[tokio::main]1 which, behind the scenes, transforms it into a normal entry point that creates the main loop and runs the rest of the code. TcpListener::bind returns std::io::Result<()>, and adding the same return type to main 2 allows us to use the operator ? 3 to unwrap the result of bind or return the error.

The new connection monitoring function TcpListener::accept [docs] is not an iterator any more. It's an asynchronous function, so the usual pattern is to wrap it in a loop 4 and await it 5. Each time a new connection is created the function returns a tuple (TcpStream, SocketAddr), of which we keep only the first element (the stream).

The whole solution revolves around tokio::spawn 6, that creates a new task. This means that the provided function will be run in isolation in the asynchronous loop, and that the system will pause it every time an await is invoked in its code.

A common pattern for spawn is to run it in combination with async move [docs], passing some closure defined on the spot. This is done to make sure that ownership of the closure context is transferred to the task. In this case, there is no need here to use async move as the function handle_connection already takes ownership of stream.

Error checking and propagation

Error handling [docs] is an important topic in every programming language, but it requires even more planning in a strongly typed one.

Rust has a very elegant way to manage error conditions that can arise in complicated chains of functions or in loops: the operator ? [docs]. To use it, you need to ensure compatibility between the error and the return type of the current function, so you might want to look into automatic type conversion with From [docs],

The question mark operator is syntactic sugar for a match, and using it keeps the code tidy without sacrificing readability.


CodeCrafters

Stage 4: Handle concurrent clients

This version of the code passes Stage 4 of the CodeCrafters challenge. We can actually connect to the server using multiple Redis CLI instances in different terminals.