$ cargo add clap --features deriveYou're not a replicant.
In this chapter we start implementing the first extension of the challenge "Build your own Redis": replication.
Replication is a cornerstone of high-availability architecture in database systems. At its core, replication means maintaining multiple copies of your data across different nodes. This allows the system to offload read traffic to replicas, improving scalability and responsiveness, while also insulating against hardware or software failures on the primary node.
But while the concept of replication is straightforward, its implementation is anything but. Making an exact static copy of a dataset is trivial; the real challenge begins when data changes. Every write — an insertion, update, or deletion — must be reliably propagated to all replicas, and doing this quickly and correctly is where things get complex.
Add networks to the mix, and the challenge deepens. Networks are unpredictable: messages can be delayed, arrive out of order, or disappear entirely. In the context of replication, this unreliability means the system must carefully coordinate updates, deal with lagging nodes, and handle inconsistencies — all without sacrificing performance.
Redis uses a master-replica model for replication. In this setup, a single master node handles all writes, while one or more replica nodes receive a stream of updates and serve read-only queries. This architecture simplifies consistency guarantees—since only the master accepts writes—but introduces challenges around replication lag and failover.
Replication in Redis is asynchronous by default. The master does not wait for replicas to confirm receipt of changes, which improves performance but comes with a risk: if the master crashes before changes reach the replicas, data may be lost. Despite this, the simplicity and speed of Redis replication make it a powerful tool for building highly available and horizontally scalable systems.
The CodeCrafters challenge extension is pretty long. In this first part we will implement replica servers, some new commands, and the handshake process between the master and replicas. In the next chapter we will discuss and implement actual data replication.
On the CodeCrafters website, you can activate the extension by clicking on the "Extensions" button at the end of the challenge and turning on "Replication".
If you are running the CodeCrafters tests manually with make test_base_with_redis_prog, you need to open the Makefile and start a new suite. Clone test_repl_with_redis into test_repl_with_redis_prog and follow the same process to add tests as you work through the next chapters.
The first step of the challenge is to pass the server listening port as command line parameter. In general, it's a good idea not to hard code TCP ports or other configuration parameters, but in this specific case the reason why we need the parameter is so that we can create replicas on the same machine during tests.
Parsing the command line is a notoriously difficult task, so it's always a good idea to use a library to do that. In Rust, we can use Clap [docs].
The first thing to do is to install it
$ cargo add clap --features deriveThis will automatically change the files Cargo.lock and Cargo.toml. Once this is done, we can define the command line option --port. While it's not required by the CodeCrafters stage, the option --host was added to make the server more configurable.
src/main.rs use crate::resp::RESP;
use crate::storage::Storage;
use clap::Parser;
use connection::{run_listener, ConnectionMessage};
use server::{run_server, Server};
use tokio::sync::mpsc;
...
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(
short = 'H',
long,
help = "The host address to bind the server to",
default_value_t = String::from("127.0.0.1")
)]
host: String,
#[arg(
short,
long,
help = "The TCP port to use for the server",
default_value_t = 6379
)]
port: u16,
}The option --features of cargo add is used to enable feature flags when installing a library. In this case, we are activating support for deriving command-line parsers using Rust's procedural macros #[derive(Parser)]. Cargo features are documented here and Clap features are documented here.
In Clap, default_value_t is used to match the type of the argument (u16 in this case). Remember that all values from the command line are always strings and have to be parsed, which is exactly what a library like this can help us to do.
To capture and use arguments we need to run Args::parse, which returns a structure Args as defined previously. We can then use the attribute port of that structure.
src/main.rs #[tokio::main]
async fn main() -> std::io::Result<()> {
// Parse the command line arguments.
let args = Args::parse();
// Create the storage and activate expiry.
let mut storage = Storage::new();
storage.set_active_expiry(true);
// Create the server and attach the storage.
let mut server = Server::new();
server.set_storage(storage);
// Create the MPSC channel to communicate with the server.
let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
// Spawn the server task.
tokio::spawn(run_server(server, server_receiver));
// Run the listener on the configured address and port.
run_listener("127.0.0.1".to_string(), 6379, server_sender).await;
run_listener(args.host, args.port, server_sender).await;
Ok(())
}Replication Stage 1: Configure listening port
The code we wrote in this section passes Replication - Stage 1 of the CodeCrafters challenge.
In this section we will change the server so that it can be a master or a replica. We will add structures that capture the replication configuration (on a replica) and implement a basic version of the command INFO.
In this step we are going to set up the data structures to manage replication and implement the first version of the command INFO. From the official documentation, we can see that the full server configuration is a rather long list of entries, so it's best to go for a solution that can be easily extended in the future.
There are many different ways to capture the state of the replication in a server. One of these is an Option that contains either None (on the master) or the relevant configuration (on the replica).
src/replication.rs #[derive(Debug, PartialEq)]
pub struct MasterConfig {
pub host: String,
pub port: u16,
}
pub struct ReplicationConfig {
pub master: Option<MasterConfig>,
}The initial implementation of ReplicationConfig should give us a way to create it both for the master and for the replica.
src/replication.rs impl ReplicationConfig {
pub fn new_master() -> Self {
ReplicationConfig { master: None }
}
pub fn new_replica(master_host: String, master_port: u16) -> Self {
ReplicationConfig {
master: Some(MasterConfig {
host: master_host,
port: master_port,
}),
}
}
}As we have done before, we also add some tests.
src/replication.rs #[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_config_master() {
let config = ReplicationConfig::new_master();
assert_eq!(config.master, None);
}
#[test]
fn test_replication_config_replica() {
let config = ReplicationConfig::new_replica(String::from("other"), 1234);
assert_eq!(
config.master,
Some(MasterConfig {
host: String::from("other"),
port: 1234
})
);
}
}With this in place, we can add the structure ReplicationConfig to the server
src/server.rs use crate::commands::{echo, get, ping, set};
use crate::connection::ConnectionMessage;
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::server_result::ServerError;
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
pub struct Server {
pub storage: Option<Storage>,
pub replication: ReplicationConfig,
}
impl Server {
pub fn new() -> Self {
Self { storage: None }
Self {
storage: None,
replication: ReplicationConfig::new_master(),
}
}
pub fn set_storage(&mut self, storage: Storage) {
self.storage = Some(storage);
}
pub fn set_replication(&mut self, config: ReplicationConfig) {
self.replication = config;
}
// The entry point for key expiry operations.
pub fn expire_keys(&mut self) {
// Get the storage contained in the server.
let storage = match self.storage.as_mut() {
Some(storage) => storage,
None => return,
};
// Trigger the expiry process.
storage.expire_keys();
}
}As you can see, ReplicationConfig has two associated functions new_master and new_replica, while Server has a method new and an additional method set_replication. The choice here is mostly a matter of style, but there is a practical reason.
Replication is only one of the possible additional configurations of a server, and in the future we might want to add more. It seems appropriate, then, to be able to first create the server no matter what and then to trigger the additional states with specific methods.
src/main.rs let mut server = Server::new();
server.set_replication(replication_config);ReplicationConfig, on the other hand, has only two possible states, so it makes sense to provide two different initialisation functions.
src/main.rs let master_config = ReplicationConfig::new_master();
let replica_config = ReplicationConfig::new_replica(...);As mentioned before, however, the whole thing is mostly a matter of style and personal preference.
At this point we have no way to specify if we want to run a replica (it will be part of the next step), so we just need to initialise everything for the master
src/main.rs use crate::replication::ReplicationConfig;
use crate::resp::RESP;
use crate::storage::Storage;
use clap::Parser;
use connection::{run_listener, ConnectionMessage};
use server::{run_server, Server};
use tokio::sync::mpsc;
mod commands;
mod connection;
mod replication;
mod request;
mod resp;
mod resp_result;
mod server;
mod server_result;
mod set;
mod storage;
mod storage_result;
...
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Parse the command line arguments.
let args = Args::parse();
// Create the replication configuration.
let replication_config = ReplicationConfig::new_master();
// Create the storage and activate expiry.
let mut storage = Storage::new();
storage.set_active_expiry(true);
// Create the server and attach the storage.
// Create the server, attach the storage
// and configure replication.
let mut server = Server::new();
server.set_storage(storage);
server.set_replication(replication_config);
// Create the MPSC channel to communicate with the server.
let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
// Spawn the server task.
tokio::spawn(run_server(server, server_receiver));
// Run the listener on the configured address and port.
run_listener(args.host, args.port, server_sender).await;
Ok(())
}What we did so far allows us to capture some information about replication, but there is no way to access that from outside. We can then implement the first version of the command INFO that will return the configuration of the server in a bulk string where each line is a key-value pair separated by : and with headers prefixed by #.
As we have no way to set up a replica yet, for this step of the challenge the command should just return
# Replication
role:masterThis is a rather simple task, and we could just output those strings. However, since we will soon have to manage a richer set of values, this step is worth a certain amount of planning.
In the future, the amount of information contained in the output of INFO will increase as we add more details on the configuration of the server. This means that we need proper structures to host the content of that report, and that we have to decide where to put the presentation logic (that is, the code that formats it into the content of the bulk string).
We can start to create relevant data structures
src/replication.rs #[derive(Debug, PartialEq)]
pub enum Role {
Master,
Replica,
}
#[derive(Debug, PartialEq)]
pub struct ReplicationInfo {
pub role: Role,
}We can also add a method to ReplicationConfig that returns a structure ReplicationInfo
src/replication.rs impl ReplicationConfig {
// Return the current replication configuration.
pub fn info(&self) -> ReplicationInfo {
ReplicationInfo {
role: match &self.master {
Some(_) => Role::Replica,
None => Role::Master,
},
}
}As you can see, the method ReplicationConfig::info could in theory return the INFO command's formatted output directly. This is an architectural choice that depends both on technical and on style reasons. It seems reasonable to keep them separated mostly because there might be a need to store or transfer the configuration data in a different format. At the moment, either way would work.
We can also add two tests for the new function
src/replication.rs mod tests {
#[test]
// Test that the replication configuration
// returns the correct information when
// in master mode.
fn test_replication_info_master() {
let config = ReplicationConfig::new_master();
assert_eq!(config.info(), ReplicationInfo { role: Role::Master });
}
#[test]
// Test that the replication configuration
// returns the correct information when
// in replica mode.
fn test_replication_info_replica() {
let config = ReplicationConfig::new_replica(String::from("other"), 1234);
assert_eq!(
config.info(),
ReplicationInfo {
role: Role::Replica
}
);
}To implement the command INFO it will be useful to have a function that transforms a Vec<String> into a RESP::BulkString. This way we can add all the required keys and headers to the vector and then transform it before we send the response.
src/resp.rs // Create a bulk string from a vector of strings.
pub fn bulk_string_from_vec(strings: Vec<String>) -> RESP {
RESP::BulkString(strings.join("\r\n"))
}We can also add a test for this function
src/resp.rs mod tests {
#[test]
fn test_bulk_string_from_vec() {
let strings = vec![
String::from("First string"),
String::from("Second string"),
String::from("Third string"),
];
let bulk_string: RESP = bulk_string_from_vec(strings);
assert_eq!(
bulk_string,
RESP::BulkString(String::from(
"First string\r\nSecond string\r\nThird string"
))
);
}Implementing the command INFO is at this point very simple, thanks to the changes we made previously.
The core function is
src/commands/info.rs use crate::replication::Role;
use crate::request::Request;
use crate::resp::bulk_string_from_vec;
use crate::server::Server;
use crate::server_result::ServerValue;
pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
// Get the replication info from the server.
let replication_info = server.replication.info();
// Add a header to the output.
let mut output = vec![String::from("# Replication")];
// Add the correct content to the output
// according to the replication role.
match replication_info.role {
Role::Replica => output.push(String::from("role:slave")),
Role::Master => output.push(String::from("role:master")),
};
request
.data(ServerValue::RESP(bulk_string_from_vec(output)))
.await;
}As we created a new module, we need to declare it
src/commands/mod.rs pub mod echo;
pub mod get;
pub mod info;
pub mod ping;
pub mod set;We can also add tests for this function
src/commands/info.rs #[cfg(test)]
mod tests {
use super::*;
use crate::replication::ReplicationConfig;
use crate::resp::RESP;
use crate::server_result::{ServerMessage, ServerValue};
use tokio::sync::mpsc;
#[tokio::test]
async fn test_command_master() {
let mut server: Server = Server::new();
let cmd = vec![String::from("info")];
let (request_channel_tx, mut request_channel_rx) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Null,
sender: request_channel_tx.clone(),
};
command(&mut server, &request, &cmd).await;
let response = match request_channel_rx.try_recv().unwrap() {
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(value))) => value,
_ => panic!(),
};
assert!(response.contains("# Replication"));
assert!(response.contains("role:master"));
}
#[tokio::test]
async fn test_command_replica() {
let config = ReplicationConfig::new_replica(String::from("someserver"), 4242);
let mut server: Server = Server::new();
server.set_replication(config);
let cmd = vec![String::from("info")];
let (request_channel_tx, mut request_channel_rx) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Null,
sender: request_channel_tx.clone(),
};
command(&mut server, &request, &cmd).await;
let response = match request_channel_rx.try_recv().unwrap() {
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(value))) => value,
_ => panic!(),
};
assert!(response.contains("# Replication"));
assert!(response.contains("role:slave"));
}
}Last, we add the new command to the server
src/server.rs use crate::commands::{echo, get, ping, set};
use crate::commands::{echo, get, info, ping, set};
use crate::connection::ConnectionMessage;
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::server_result::ServerError;
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
...
pub async fn process_request(request: Request, server: &mut Server) {
// Process the request using the requested command.
match command_name.as_str() {
"echo" => {
echo::command(server, &request, &command).await;
}
"get" => {
get::command(server, &request, &command).await;
}
"info" => {
info::command(server, &request, &command).await;
}
"ping" => {
ping::command(server, &request, &command).await;
}
"set" => {
set::command(server, &request, &command).await;
}
_ => {
request
.error(ServerError::CommandNotAvailable(command[0].clone()))
.await;
}
}Replication Stage 2: The INFO command
The code we wrote in this section passes Replication - Stage 2 of the CodeCrafters challenge.
In this step we will add a command line option to configure our server as a replica. The option is --replicaof [HOST] [PORT], where HOST is the address of the master server and PORT its TCP port.
We first need to add the option to the command line arguments
src/main.rs #[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(
short = 'H',
long,
help = "The host address to bind the server to",
default_value_t = String::from("127.0.0.1")
)]
host: String,
#[arg(
short,
long,
help = "The TCP port to use for the server",
default_value_t = 6379
)]
port: u16,
#[arg(
short,
long,
help = "The master server for this replica, in the form `address port`"
)]
replicaof: Option<String>,
}With Clap, it's sufficient to identify the type of the argument as Option<String> to make it optional (see the documentation).
Now we can use the argument to decide which type of server we want to create
src/main.rs async fn main() -> std::io::Result<()> {
// Parse the command line arguments.
let args = Args::parse();
// Create the replication configuration.
let replication_config = ReplicationConfig::new_master();
// Create the replication configuration
// according to the options passed on the command line.
let replication_config = match args.replicaof {
None => ReplicationConfig::new_master(),
Some(params) => {
// Split host and port that are provided as a single string.
let (host, port_string) = match params.split_once(" ") {
Some(value) => value,
None => {
eprintln!("Please provide 'HOST PORT' separated by space");
std::process::exit(1);
}
};
// Convert the port into a number.
let port: u16 = match port_string.parse() {
Ok(p) => p,
Err(_) => {
eprintln!("Port is not a number");
std::process::exit(1);
}
};
ReplicationConfig::new_replica(host.to_owned(), port)
}
};As always when it comes to parsing, there are many possible errors to handle, but Clap does it for us. We need to deal with two specific conditions, however. The first is that the value passed to --replicaof might not be formatted correctly and the second is that the provided port might not be a number.
To check the format, we can use split_once [docs] to separate HOST and PORT, and use the returned Option to detect an error. We can then use parse [docs] on the port string to convert it into a number. The compiler infers the target type u16 automatically.
I think it's pretty interesting to have a deeper look at how the compiler infers the output type for parse in the code above.
Let's start with the explicit syntax that we should use, should the Rust compiler be less smart
let port = match port_string.parse::<u16>() {
Ok(p) => p,
...
};The syntax ::<> (in this case ::<u16>) is colloquially called the turbofish which is used to disambiguate expressions where the type inference isn't enough. For example, the code
let port = match port_string.parse() {
...
};would be ambiguous, as the string port_string can be parsed into many different types, and since port has no associated type the compiler would be at a loss. You can find the official definition of the turbofish in The Rust Reference Glossary.
Now, the Rust compiler can however use type inference, so the following code works.
let port: u16 = match port_string.parse() {
Ok(p) => p,
...
};Please note that the type inference is not trivial. The signature of parse is
pub fn parse<F>(&self) -> Result<F, <F as FromStr>::Err>so here F is inferred to be u16 because of the output of the pattern matching case Ok.
At this point, Rust just needs an implementation of parse for the type u16, which can be found in the trait FromStr, is implemented in the Rust Core Library.
Replication Stage 3: The INFO command on a replica
The code we wrote in this section passes Replication - Stage 3 of the CodeCrafters challenge.
The next step of the challenge is to add keys to the replication configuration, namely replid and repl_offset. The master replication ID is a pseudo-random alphanumeric string of 40 characters, and is used to identify the master session (the ID is regenerated when the server is rebooted). The master replication offset, instead, starts at 0 and is incremented every time someone writes to the master. You can read a full explanation in the official documentation.
In this step, the two values will be just printed by the command INFO and not actively used.
It is simple enough to add fields to ReplicationConfig, but to initialise replid we need a way to create a pseudo-random string of 40 characters. To do this we can use the crate rand [docs] that provides convenient functions to generate random numbers. Let's install it the usual way
$ cargo add randand then modify the code
src/replication.rs use rand::{
distr::{Alphanumeric, SampleString},
rng,
};
...
pub struct ReplicationConfig {
pub master: Option<MasterConfig>,
pub replid: String,
pub repl_offset: usize,
}
impl ReplicationConfig {
pub fn new_master() -> Self {
ReplicationConfig { master: None }
ReplicationConfig {
master: None,
replid: Alphanumeric.sample_string(&mut rng(), 40),
repl_offset: 0,
}
}
pub fn new_replica(master_host: String, master_port: u16) -> Self {
ReplicationConfig {
master: Some(MasterConfig {
host: master_host,
port: master_port,
}),
replid: Alphanumeric.sample_string(&mut rng(), 40),
repl_offset: 0,
}
}The trait SampleString [docs] is required to provide sample_string, while rng [docs] is just one of the possible ways to sample a distribution.
We can also improve the tests to cover the two new fields
src/replication.rs mod tests {
#[test]
// Test that the replication configuration
// can be initialised for a master.
fn test_replication_config_master() {
let config = ReplicationConfig::new_master();
assert_eq!(config.master, None);
assert_eq!(config.replid.len(), 40);
assert_eq!(config.repl_offset, 0);
}
#[test]
// Test that the replication configuration
// can be initialised for a replica.
fn test_replication_config_replica() {
let config = ReplicationConfig::new_replica(String::from("other"), 1234);
assert_eq!(
config.master,
Some(MasterConfig {
host: String::from("other"),
port: 1234
})
);
assert_eq!(config.replid.len(), 40);
assert_eq!(config.repl_offset, 0);
}When this is done we need to focus on the command INFO that should include the two new fields. We can start adding them to ReplicationInfo
src/replication.rs #[derive(Debug, PartialEq)]
pub struct ReplicationInfo {
pub role: Role,
pub replid: String,
pub repl_offset: usize,
}
...
impl ReplicationConfig {
// Return the current replication configuration.
pub fn info(&self) -> ReplicationInfo {
ReplicationInfo {
role: match &self.master {
Some(_) => Role::Replica,
None => Role::Master,
},
replid: self.replid.clone(),
repl_offset: self.repl_offset,
}
}
mod tests {
#[test]
// Test that the replication configuration
// returns the correct information when
// in master mode.
fn test_replication_info_master() {
let config = ReplicationConfig::new_master();
assert_eq!(config.info(), ReplicationInfo { role: Role::Master });
assert_eq!(config.info().role, Role::Master);
assert_eq!(config.info().replid.len(), 40);
assert_eq!(config.info().repl_offset, 0);
}
#[test]
// Test that the replication configuration
// returns the correct information when
// in replica mode.
fn test_replication_info_replica() {
let config = ReplicationConfig::new_replica(String::from("other"), 1234);
assert_eq!(
config.info(),
ReplicationInfo {
role: Role::Replica
}
);
assert_eq!(config.info().role, Role::Replica);
assert_eq!(config.info().replid.len(), 40);
assert_eq!(config.info().repl_offset, 0);
}With this in place we only need to add the fields to the output of the command itself and to the tests.
src/commands/info.rs pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
let replication_info = server.replication.info();
let mut output = vec![String::from("# Replication")];
match replication_info.role {
Role::Replica => output.push(String::from("role:slave")),
Role::Master => output.push(String::from("role:master")),
};
output.push(format!("master_replid:{}", replication_info.replid));
output.push(format!(
"master_repl_offset:{}",
replication_info.repl_offset
));
request
.data(ServerValue::RESP(bulk_string_from_vec(output)))
.await;
}
mod tests {
async fn test_command_master() {
let response = match request_channel_rx.try_recv().unwrap() {
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(value))) => value,
_ => panic!(),
};
assert!(response.contains("# Replication"));
assert!(response.contains("role:master"));
assert!(response.contains("master_replid:"));
assert!(response.contains("master_repl_offset:0"));
}
async fn test_command_replica() {
let response = match request_channel_rx.try_recv().unwrap() {
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(value))) => value,
_ => panic!(),
};
assert!(response.contains("# Replication"));
assert!(response.contains("role:slave"));
assert!(response.contains("master_repl_offset:0"));
}Replication Stage 4: Initial Replication ID and Offset
The code we wrote in this section passes Replication - Stage 4 of the CodeCrafters challenge.
The handshake between a replica and the master is the perfect example of something that doesn't work according to the standard request/response model. When a replica is created, it has to actively send messages to the master to synchronise with it, as opposed to the standard reactions to incoming requests from clients.
The process we are going to implement in this and the following sections is:
The connection to the master will then be used to keep the replica in sync. Please note, however, that the replica has to manage both the connection to the master and the one with clients, and that the system we are going to implement doesn't replace the one we developed in the past chapters.
In the first stage of the handshake challenge, the test will check that the replica sends PING to the master and that the response PONG is accepted by the replica.
We begin with a simple implementation of the process that doesn't involve any handshake, just to make sure we understand the data flow. The handshake is a rich process, but ultimately is just a way to decide if the connection can be established or not. Here, we skip that complex process to focus on the simpler problem of managing a connection to the master.
First of all, we need a function to manage the connection between the replica and the master.
src/connection.rs // Initialise and manage a connection with the master.
pub async fn run_master_listener(
host: String,
port: u16,
server_sender: mpsc::Sender<ConnectionMessage>,
) {
// Actively connect to the master
let stream = TcpStream::connect(format!("{}:{}", host, port))
.await
.unwrap();
// Spawn a task to take care of this connection.
tokio::spawn(async move { handle_connection(stream, server_sender.clone()).await });
}The prototype of this function is the same of the function run_listener, but the code is very different. Here, instead of just listening for data coming on connections initiated by clients, we create a new connection to the master and then we listen to it.
For now, no data will be sent on that stream. The server will receive our connection request and spawn an actor to manage it, but neither the replica nor the master will send anything through it.
For now, let's add some logic to run the function run_master_listener when the server is a replica.
src/main.rs use crate::replication::ReplicationConfig;
use crate::resp::RESP;
use crate::storage::Storage;
use clap::Parser;
use connection::{run_listener, ConnectionMessage};
use connection::{run_listener, run_master_listener, ConnectionMessage};
use server::{run_server, Server};
use tokio::sync::mpsc;
...
async fn main() -> std::io::Result<()> {
// Create the server, attach the storage
// and configure replication.
let mut server = Server::new();
server.set_storage(storage);
server.set_replication(replication_config);
// Create the MPSC channel to communicate with the server.
let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
// If this server is a replica,
// connect to the master and listen.
if let Some(master_config) = server.replication.master.clone() {
run_master_listener(
master_config.host.clone(),
master_config.port,
server_sender.clone(),
)
.await;
}To know if the server is a replica we can check if server.replication.master is Some or None. To avoid complaints about partially moved structures, we need to be able to clone MasterConfig adding the trait Clone
src/replication.rs #[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct MasterConfig {
pub host: String,
pub port: u16,
}We can now implement the first version of the handshake process. While the idea behind the handshake is very simple, we need several structures for its implementation, most of them to handle errors, as is usually the case with such processes.
The idea is to create a function called handshake that will exchange data with the master and ultimately either succeed or fail. We will call the function in run_master_listener, and in case of failure the replica will shut down.
src/connection.rs use crate::resp::bytes_to_resp;
use crate::server::handshake;
use crate::server_result::{ServerMessage, ServerValue};
use crate::{request::Request, server_result::ServerError};
use std::fmt;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
// Initialise and manage a connection with the master.
pub async fn run_master_listener(
host: String,
port: u16,
server_sender: mpsc::Sender<ConnectionMessage>,
) {
// Actively connect to the master
let stream = TcpStream::connect(format!("{}:{}", host, port))
.await
.unwrap();
// Run the handshake protocol
if let Err(e) = handshake().await {
eprintln!("Handshake failed: {}", e.to_string());
std::process::exit(1);
}
// Spawn a task to take care of this connection.
tokio::spawn(async move { handle_connection(stream, server_sender.clone()).await });
}A trivial implementation of that function is
src/server.rs use crate::commands::{echo, get, info, ping, set};
use crate::connection::ConnectionMessage;
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::server_result::ServerError;
use crate::server_result::{ServerError, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
...
// The implementation of the handshake process
// between a replica and the master.
pub async fn handshake() -> ServerResult {
Ok(ServerValue::None)
}The function could clearly just return a normal Result. However, it is part of the server, so it's a good idea to use the structures ServerValue and ServerError that we defined previously. We need to define the new variant ServerValue::None, and to bring back the type alias ServerResult, which we removed in a previous chapter.
src/server_result.rs #[derive(Debug, PartialEq)]
pub enum ServerValue {
None,
RESP(RESP),
}
pub type ServerResult = Result<ServerValue, ServerError>;This will make the compiler raise an error, as the new variant of ServerValue is not covered by one of the match conditions in our code
src/connection.rs pub async fn handle_connection(
// A response arrived from the server.
Some(response) = connection_receiver.recv() => {
let _ = match response {
ServerMessage::Data(ServerValue::RESP(v)) => stream.write_all(v.to_string().as_bytes()).await,
ServerMessage::Data(ServerValue::None) => Ok(()),
ServerMessage::Error(e) => {
eprintln!("Error: {}", ConnectionError::ServerError(e));
return;
}
};
}Here, we return the variant Ok with a unit () (which is the Rust way to express an empty result). The idea is that if the server doesn't return anything (ServerValue::None) we don't need to do anything.
During the first stage of the Redis handshake protocol the replica sends the message PING to the master and checks that the master sends PONG. In this step we will implement this part of the handshake and our code will finally pass the challenge.
The first addition to the function handshake is the definition of the message that we want to send.
src/server.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake() -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
Ok(ServerValue::None)
}Please note that when we send commands to the master, we wrap everything in a RESP array.
At this point we can write the command on the TCP stream that represents the connection with the master.
src/server.rs use crate::commands::{echo, get, info, ping, set};
use crate::connection::ConnectionMessage;
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::resp::bytes_to_resp;
use crate::server_result::{ServerError, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
...
// The implementation of the handshake process
// between a replica and the master.
pub async fn handshake() -> ServerResult {
pub async fn handshake(stream: &mut TcpStream) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
// Send the PING command.
stream
.write_all(ping.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
ping.to_string(),
e.to_string()
))
})?;
Ok(ServerValue::None)
}Despite the amount of code that we added, the change is rather simple. First of all we need to make sure the function receives a mutable TcpStream so that we can read from and write to it. Then we can asynchronously call stream.write_all to send the RESP string (converted to bytes). The documentation of this function says that the return type is io::Result<()>.
Since handshake returns a ServerResult we need to convert the return value. One way to do this is to use match, mapping both the Ok and the Err values, but another interesting option is to transform the error using map_err and then to use the question mark operator ?.
Stripped of all arguments, the code looks like
src/server.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake(stream: &mut TcpStream) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(...);
// Send the PING command.
stream
.write_all(...) 1
.await 2
.map_err(...)?; 3
Ok(ServerValue::None)
}Basically, we call write_all 1, we await the result 2, we transform its error to be compatible with ServerResult 3 and we use ? to implement a match behind the scenes.
To make this work we need two changes. The first is the variant ServerError::HandshakeFailed
src/server_result.rs use crate::resp::RESP;
use std::fmt;
#[derive(Debug, PartialEq)]
pub enum ServerError {
CommandInternalError(String),
CommandNotAvailable(String),
CommandSyntaxError(String),
HandshakeFailed(String),
IncorrectData,
StorageNotInitialised,
}
impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ServerError::CommandInternalError(string) => {
write!(f, "Internal error while processing {}.", string)
}
ServerError::CommandNotAvailable(c) => {
write!(f, "The requested command {} is not available.", c)
}
ServerError::CommandSyntaxError(string) => {
write!(f, "Syntax error while processing {}.", string)
}
ServerError::HandshakeFailed(string) => {
write!(f, "Handshake process failed: {}.", string)
}
ServerError::IncorrectData => {
write!(f, "Data received from stream is incorrect.")
}
ServerError::StorageNotInitialised => {
write!(f, "Storage has not been initialised.")
}
}
}
}and the second is the way we call the function handshake. The TCP stream becomes mutable and has to be passed to the function as a mutable reference.
src/connection.rs // Initialise and manage a connection with the master.
pub async fn run_master_listener(
host: String,
port: u16,
server_sender: mpsc::Sender<ConnectionMessage>,
) {
// Actively connect to the master
let stream = TcpStream::connect(format!("{}:{}", host, port))
let mut stream = TcpStream::connect(format!("{}:{}", host, port))
.await
.unwrap();
// Run the handshake protocol
if let Err(e) = handshake().await {
if let Err(e) = handshake(&mut stream).await {
eprintln!("Handshake failed: {}", e.to_string());
std::process::exit(1);
}
// Spawn a task to take care of this connection.
tokio::spawn(async move { handle_connection(stream, server_sender.clone()).await });
}Now that we sent the command we need to receive the response from the master
src/server.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake(stream: &mut TcpStream) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
// Send the PING command.
stream
.write_all(ping.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
ping.to_string(),
e.to_string()
))
})?;
// Create a buffer to contain the response bytes.
let mut buffer = [0; 512];
// Read the response bytes from the stream.
let size = stream.read(&mut buffer).await.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot read from stream: {}",
ping.to_string(),
e.to_string()
))
})?;
// Check that the response contains data.
if size == 0 {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Connection terminated",
ping.to_string()
)));
}
Ok(ServerValue::None)
}As you can see, the pattern explained previously repeats again. This time we call stream.read which stores data in the buffer that we provide and returns its length. If the stream returned no data, the master has probably closed the connection, so we need to terminate the handshake process.
Last, we need to transform the binary data received from the master into RESP and to process it according to the handshake protocol.
src/server.rs pub async fn handshake(stream: &mut TcpStream) -> ServerResult {
// Check that the response contains data.
if size == 0 {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Connection terminated",
ping.to_string()
)));
}
let mut index: usize = 0;
// Convert the raw bytes into RESP.
let resp = bytes_to_resp(&buffer, &mut index).map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot convert binary to RESP: {}",
ping.to_string(),
e.to_string()
))
})?;
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
ping.to_string(),
resp.to_string()
)));
};
Ok(ServerValue::None)
}We developed the function bytes_to_resp in chapter 2, and we can conveniently reuse it here. The last part of the code performs a simple check to see if the master responded with PONG as the protocol dictates.
The code we wrote to send PING and to check the response is simple but also very long, and the handshake protocol is made of many of these steps. It's tempting to refactor the code now, but we will take another path. The plan is to complete all three stages of "Send handshake" and only at the end refactor the code.
Stripped down to its essential components, the full function now looks like the following.
src/server.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake(stream: &mut TcpStream) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(...);
// Send the PING command.
stream.write_all()...
// Read the response bytes from the stream.
let size = stream.read()...
// Check that the response contains data.
if size == 0 {...}
// Convert the raw bytes into RESP.
let resp = bytes_to_resp()...
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {...};
Ok(ServerValue::None)
}This is clearly a matter of coding style, and nothing stops you from writing some functions that make the code above tidier. However, I learned that premature optimisation and refactoring are often dangerous, so I prefer to postpone that step. At the end of the three stages we will have a clear view of what functions can make our code shorter and clearer.
Replication Stage 5: Send handshake (1/3)
The code we wrote in this section passes Replication - Stage 5 of the CodeCrafters challenge.
The next step to complete this part of the replication extension is to implement REPLCONF. This is an internal command that the replica uses to send its configuration to the master. In particular, during the handshake process, the replica is going to send REPLCONF listening-port PORT (where PORT is the one the replica is using) and REPLCONF capa psync2 to activate PSYNC support. Both times, the master is supposed to reply with the simple string OK.
In this step, we are going to implement both calls in the handshake function and the command REPLCONF in the master.
Since the first call has to send the listening port, the server itself (in this case the replica) needs to be aware of this information. So far, we created a listener in main using args.host and args.port, but neither the hostname nor the port have been stored for later use.
We can create a structure ServerInfo to contain those values
src/server.rs pub struct ServerInfo {
pub host: String,
pub port: u16,
}
pub struct Server {
pub info: ServerInfo,
pub storage: Option<Storage>,
pub replication: ReplicationConfig,
}and it seems natural to have the server constructor require them as arguments.
src/server.rs impl Server {
pub fn new() -> Self {
pub fn new(host: String, port: u16) -> Self {
Self {
info: ServerInfo {
host: host,
port: port,
},
storage: None,
replication: ReplicationConfig::new_master(),
}
}
mod tests {
#[test]
// Test that the structure Server can
// be initialised with an empty storage.
fn test_create_new() {
let server: Server = Server::new();
let server: Server = Server::new("localhost".to_string(), 6379);
match server.storage {
Some(_) => panic!(),
None => (),
};
}
#[test]
// Test that an existing Storage structure
// can be associated with a Server.
fn test_set_storage() {
let storage = Storage::new();
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
server.set_storage(storage);
match server.storage {
Some(_) => (),
None => panic!(),
};
}
#[tokio::test]
// Test that the function process_request
// processes a PING request and that it
// responds with a PONG.
async fn test_process_request_ping() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("PONG"))))
);
}
#[tokio::test]
// Test that the function process_request
// processes an ECHO request and that it
// responds with a copy of the input.
async fn test_process_request_echo() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![
RESP::BulkString(String::from("ECHO")),
RESP::BulkString(String::from("42")),
]),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(String::from("42"))))
);
}
#[tokio::test]
// Test that the function process_request
// returns the correct error when it is
// given a request that doesn't contain
// a RESP array.
async fn test_process_request_not_array() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::BulkString(String::from("PING")),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Error(ServerError::IncorrectData)
);
}
#[tokio::test]
// Test that the function process_request
// returns the correct error when it is
// given a request that contains a RESP array
// but the content of the array is not
// a bulk string.
async fn test_process_request_not_bulkstrings() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![RESP::SimpleString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Error(ServerError::IncorrectData)
);
}This change must be reflected throughout the code whenever we create a new server. First of all in main
src/main.rs async fn main() -> std::io::Result<()> {
// Parse the command line arguments.
let args = Args::parse();
...
// Create the storage and activate expiry.
let mut storage = Storage::new();
storage.set_active_expiry(true);
// Create the server, attach the storage
// and configure replication.
let mut server = Server::new();
let mut server = Server::new(args.host.clone(), args.port);
server.set_storage(storage);
server.set_replication(replication_config);
...
// Spawn the server task.
tokio::spawn(run_server(server, server_receiver));
// Run the listener on the configured address and port.
run_listener("127.0.0.1".to_string(), args.port, server_sender).await;
run_listener(args.host, args.port, server_sender).await;
Ok(())
}and then in several tests throughout the code base, you can find them running cargo test. All tests require the same change.
src/commands/echo.rs #[cfg(test)]
mod tests {
use super::*;
use crate::server_result::ServerMessage;
use tokio::sync::mpsc;
#[tokio::test]
// Test that the function command processes
// an `ECHO` request and that it
// responds with a copy of the input.
async fn test_command() {
let cmd = vec![String::from("echo"), String::from("hey")];
let server = Server::new();
let server = Server::new("localhost".to_string(), 6379);
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Null,
sender: connection_sender,
};
command(&server, &request, &cmd).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(String::from("hey"))))
);
}
}src/commands/get.rs mod tests {
async fn test_command() {
let mut server = Server::new();
let mut server = Server::new("localhost".to_string(), 6379);
server.set_storage(storage);
async fn test_storage_not_initialised() {
let mut server = Server::new();
let mut server = Server::new("localhost".to_string(), 6379);
let cmd = vec![String::from("get"), String::from("key")];
async fn test_wrong_syntax_missing_key() {
let mut server = Server::new();
let mut server = Server::new("localhost".to_string(), 6379);
server.set_storage(storage);src/commands/info.rs mod tests {
async fn test_command_master() {
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
let cmd = vec![String::from("info")];
async fn test_command_replica() {
let config = ReplicationConfig::new_replica(String::from("someserver"), 4242);
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
server.set_replication(config);src/commands/ping.rs mod tests {
async fn test_command_ping() {
let cmd = vec![String::from("ping")];
let mut server: Server = Server::new();
let server = Server::new("localhost".to_string(), 6379);
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
async fn test_command_ping_uppercase() {
let cmd = vec![String::from("PING")];
let mut server: Server = Server::new();
let server = Server::new("localhost".to_string(), 6379);
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);src/commands/set.rs mod tests {
async fn test_command() {
let storage = Storage::new();
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
async fn test_storage_not_initialised() {
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);
async fn test_wrong_syntax_missing_key() {
let storage = Storage::new();
let mut server: Server = Server::new();
let mut server: Server = Server::new("localhost".to_string(), 6379);In this step we need to change the definition of run_master_listener to accept ServerInfo and to pass it to handshake
src/connection.rs use crate::resp::bytes_to_resp;
use crate::server::handshake;
use crate::server::{handshake, ServerInfo};
use crate::server_result::{ServerMessage, ServerValue};
use crate::{request::Request, server_result::ServerError};
use std::fmt;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
// Initialise and manage a connection with the master.
pub async fn run_master_listener(
host: String,
port: u16,
info: &ServerInfo,
server_sender: mpsc::Sender<ConnectionMessage>,
) {
// Actively connect to the master
let mut stream = TcpStream::connect(format!("{}:{}", host, port))
.await
.unwrap();
// Run the handshake protocol
if let Err(e) = handshake(&mut stream).await {
if let Err(e) = handshake(&mut stream, info).await {
eprintln!("Handshake failed: {}", e.to_string());
std::process::exit(1);
}
// Spawn a task to take care of this connection.
tokio::spawn(async move { handle_connection(stream, server_sender.clone()).await });
}Once that is done, we need to change the call in main
src/main.rs async fn main() -> std::io::Result<()> {
// Create the server, attach the storage
// and configure replication.
let mut server = Server::new(args.host.clone(), args.port);
server.set_storage(storage);
server.set_replication(replication_config);
// Create the MPSC channel to communicate with the server.
let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
// If this server is a replica,
// connect to the master and listen.
if let Some(master_config) = server.replication.master.clone() {
run_master_listener(
master_config.host.clone(),
master_config.port,
&server.info,
server_sender.clone(),
)
.await;
}
// Spawn the server task.
tokio::spawn(run_server(server, server_receiver));At this point, we can change the definition of handshake itself.
src/server.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake(stream: &mut TcpStream) -> ServerResult {
pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
...The purpose of the change is to be able to access data that we might want to send to the server during the handshake. In particular, during the second phase of the protocol the replica needs to send the message REPLCONF listening-port PORT_NUMBER, where PORT_NUMBER is the actual port used by the replica. The new code added to the function handshake is essentially a copy of the code we wrote in the first part of the process (PING).
First of all we prepare and send the command REPLCONF.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
ping.to_string(),
resp.to_string()
)));
};
// Handshake: REPLCONF
// Prepare the RESP REPLCONF command.
let replconf = RESP::Array(vec![
RESP::BulkString(String::from("REPLCONF")),
RESP::BulkString(String::from("listening-port")),
RESP::BulkString(info.port.to_string()),
]);
// Send the REPLCONF command.
stream
.write_all(replconf.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
replconf.to_string(),
e.to_string()
))
})?;Then we need to receive the response, so we prepare a buffer and read bytes from the stream. We also check that the response contains actual bytes and didn't terminate the connection.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Send the REPLCONF command.
stream
.write_all(replconf.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
replconf.to_string(),
e.to_string()
))
})?;
// Create a buffer to contain the response bytes.
let mut buffer = [0; 512];
// Read the response bytes from the stream.
let size = stream.read(&mut buffer).await.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot read from stream: {}",
replconf.to_string(),
e.to_string()
))
})?;
// Check that the response contains data.
if size == 0 {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Connection terminated",
replconf.to_string()
)));
}Finally, we convert the response bytes into RESP and check if it's valid in terms of the handshake process.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Check that the response contains data.
if size == 0 {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Connection terminated",
replconf.to_string()
)));
}
let mut index: usize = 0;
// Convert the raw bytes into RESP.
let resp = bytes_to_resp(&buffer, &mut index).map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot convert binary to RESP: {}",
replconf.to_string(),
e.to_string()
))
})?;
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};As we discussed before, there is a great deal of repetition in this function. For the time being, we will keep it and go on until the handshake is fully working, at which point we will refactor the function. This is purely a matter of personal style, as some developers prefer to refactor as soon as they spot the first code duplication.
I'd like however to highlight the reasons behind my choice:
PING and the first REPLCONF, and the next commands might require a different approach. Rather than generalising now and having to do it again later, I prefer to wait.Once again, let's have a look at the function stripped down to its essential components to get a feeling of the data flow.
src/server.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(...);
// Send the PING command.
stream.write_all()...
// Read the response bytes from the stream.
let size = stream.read()...
// Check that the response contains data.
if size == 0 {...}
// Convert the raw bytes into RESP.
let resp = bytes_to_resp()...
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {...};
// Handshake: REPLCONF
// Prepare the RESP REPLCONF command.
let replconf = RESP::Array(...);
// Send the REPLCONF command.
stream.write_all()...
// Read the response bytes from the stream.
let size = stream.read()...
// Check that the response contains data.
if size == 0 {...}
// Convert the raw bytes into RESP.
let resp = bytes_to_resp()...
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {...};
Ok(ServerValue::None)
}We need to send the second REPLCONF with the supported capabilities, and the challenge asks us to hard code capa psync2. Once again, we can duplicate the code we wrote for PING (or for the first REPLCONF), using a different RESP array.
As we have done before, the first stage is to prepare and send the second command REPLCONF.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};
// Handshake: REPLCONF
// Prepare the RESP REPLCONF command.
let replconf = RESP::Array(vec![
RESP::BulkString(String::from("REPLCONF")),
RESP::BulkString(String::from("capa")),
RESP::BulkString(String::from("psync2")),
]);
// Send the REPLCONF command.
stream
.write_all(replconf.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
replconf.to_string(),
e.to_string()
))
})?;After that, we receive and check the binary response.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Send the REPLCONF command.
stream
.write_all(replconf.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
replconf.to_string(),
e.to_string()
))
})?;
// Create a buffer to contain the response bytes.
let mut buffer = [0; 512];
// Read the response bytes from the stream.
let size = stream.read(&mut buffer).await.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot read from stream: {}",
replconf.to_string(),
e.to_string()
))
})?;
// Check that the response contains data.
if size == 0 {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Connection terminated",
replconf.to_string()
)));
}Finally, we check the response in terms of the handshake logic.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Check that the response contains data.
if size == 0 {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Connection terminated",
replconf.to_string()
)));
}
let mut index: usize = 0;
// Convert the raw bytes into RESP.
let resp = bytes_to_resp(&buffer, &mut index).map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot convert binary to RESP: {}",
replconf.to_string(),
e.to_string()
))
})?;
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};There is a final detail to take care of. The field port of ServerInfo is read by the function handshake, but the field host is ignored. We added it for completeness, but for the time being nothign uses it, so cargo build flags it.
warning: field `host` is never read
--> src/server.rs:17:9
|
16 | pub struct ServerInfo {
| ---------- field in this struct
17 | pub host: String,
| ^^^^
|
= note: `#[warn(dead_code)]` (part of `#[warn(unused)]`) on by defaultWe could apply the suggested fix, flagging it as dead code, but the solution is not perfect. The warning silences the message if the field is unused, but doesn't trigger if the field is used, so the risk of #[warn(dead_code)] is that our code might be littered by useless warnings. A much better solution in this case is #[expect(dead_code)] (stabilised in Rust 1.81, which provides precisely what we need.
The official description is "allows explicitly noting that a particular lint should occur, and warning if it doesn't. The intended use case for this is temporarily silencing a lint, whether due to lint implementation bugs or ongoing refactoring, while wanting to know when the lint is no longer required."
Therefore, we can silence the warning with expect.
src/server.rs pub struct ServerInfo {
#[expect(dead_code)]
pub host: String,
pub port: u16,
}Replication Stage 6: Send handshake (2/3)
The code we wrote in this section passes Replication - Stage 6 of the CodeCrafters challenge.
Here we send PSYNC ? -1 and complete this part of the challenge. This command has a complex response which is not just pure RESP, so we are tasked only to send the command and to ignore any server response for now.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};
// Handshake: PSYNC
// Prepare the RESP PSYNC command.
let psync = RESP::Array(vec![
RESP::BulkString(String::from("PSYNC")),
RESP::BulkString(String::from("?")),
RESP::BulkString(String::from("-1")),
]);
// Send the PSYNC command.
stream
.write_all(psync.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
replconf.to_string(),
e.to_string()
))
})?;Replication Stage 7: Send handshake (3/3)
The code we wrote in this section passes Replication - Stage 7 of the CodeCrafters challenge.
Now that we have a working system, we can review what we have done and refactor it to reduce code duplication.
Let's have another look at the stripped down skeleton of the code that we duplicated.
src/server.rs // Prepare the RESP command.
let command = RESP::Array(...);
// Send the command.
stream.write_all()...
// Read the response bytes from the stream.
let size = stream.read()...
// Check that the response contains data.
if size == 0 {...}
// Convert the raw bytes into RESP.
let resp = bytes_to_resp()...
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {...};When it comes to refactoring, it's always best to move forward cautiously step by step, checking that every transformation works properly. This is true in particular in Rust, where heavy mocking is not usually employed, thus making it harder to test if the new code behaves exactly like the old one.
Let's start isolating the code that sends a RESP message on a TCP stream
src/connection.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
// Send the PING command.
stream
.write_all(ping.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
ping.to_string(),
e.to_string()
))
})?;
// Create a buffer to contain the response bytes.
let mut buffer = [0; 512];To do it we can write a function called stream_write_resp. The best place for such a function is the connection module, as this is a very low-level interaction with the TCP stream.
src/connection.rs use crate::resp::bytes_to_resp;
use crate::resp::{bytes_to_resp, RESP};
use crate::server::{handshake, ServerInfo};
use crate::server_result::{ServerMessage, ServerValue};
use crate::{request::Request, server_result::ServerError};
use std::fmt;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
#[derive(Debug)]
pub enum ConnectionError {
CannotWriteToStream(String),
ServerError(ServerError),
}
impl fmt::Display for ConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionError::CannotWriteToStream(string) => {
write!(f, "Cannot write to stream: {}.", string)
}
ConnectionError::ServerError(e) => {
write!(f, "{}", format!("Server error: {}", e))
}
}
}
}
type ConnectionResult<T> = Result<T, ConnectionError>;
...
// Write RESP data to the stream.
pub async fn stream_write_resp(stream: &mut TcpStream, data: &RESP) -> ConnectionResult<usize> {
// Convert the RESP message to a String.
let string_data = data.to_string();
// Convert the String into bytes.
let bytes = string_data.as_bytes();
// Write the bytes to the TCP stream.
match stream.write_all(bytes).await {
Ok(_) => Ok(bytes.len()),
Err(e) => Err(ConnectionError::CannotWriteToStream(e.to_string())),
}
}The function stream_write_resp is not going to simplify our code that much. Should we try to use it in handshake, we would go from the code shown above to
src/connection.rs // The implementation of the handshake process
// between a replica and the master.
pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
// Send the PING command.
stream_write_resp(stream, &ping)
.await
.map_err(|e| ServerError::HandshakeFailed(e.to_string()))?;
// Create a buffer to contain the response bytes.
let mut buffer = [0; 512];which is a bit more compact but doesn't drastically change the readability of the whole code. For now we will not replace the code in handshake.
Indeed, the purpose of this change is mostly to migrate the management of the write errors into the connection space, and this is the reason why we need to define ConnectionResult and CannotWriteToStream.
The second piece of code that we can isolate is
src/connection.rs // Create a buffer to contain the response bytes.
let mut buffer = [0; 512];
// Read the response bytes from the stream.
let size = stream.read(&mut buffer).await.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot read from stream: {}",
ping.to_string(),
e.to_string()
))
})?;
// Check that the response contains data.
if size == 0 {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Connection terminated",
ping.to_string()
)));
}
let mut index: usize = 0;
// Convert the raw bytes into RESP.
let resp = bytes_to_resp(&buffer, &mut index).map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot convert binary to RESP: {}",
ping.to_string(),
e.to_string()
))
})?;
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
ping.to_string(),
resp.to_string()
)));
};Please note that the code above doesn't include the buffer creation. Memory allocation has a cost, so it's worth avoiding it in functions called repeatedly. Rather, it's best to allocate the memory space and reuse it as many times as possible.
Here, we read the server response, deal with potential errors (connection closed), and convert the received bytes into RESP. The new function is
src/connection.rs #[derive(Debug)]
pub enum ConnectionError {
CannotReadFromStream(String),
CannotWriteToStream(String),
MalformedRESP(String),
ServerError(ServerError),
}
impl fmt::Display for ConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionError::CannotReadFromStream(string) => {
write!(f, "Cannot read from stream: {}.", string)
}
ConnectionError::CannotWriteToStream(string) => {
write!(f, "Cannot write to stream: {}.", string)
}
ConnectionError::MalformedRESP(string) => {
write!(f, "Cannot convert bytes to RESP: {}.", string)
}
ConnectionError::ServerError(e) => {
write!(f, "{}", format!("Server error: {}", e))
}
}
}
}
...
// Read RESP data from the stream.
async fn stream_read_resp(stream: &mut TcpStream, buffer: &mut [u8]) -> ConnectionResult<RESP> {
// Read bytes from the TCP stream.
match stream.read(buffer).await {
Ok(size) => Ok(size),
Err(e) => Err(ConnectionError::CannotReadFromStream(e.to_string())),
}?;
// Set the index to start reading from the first byte.
let mut index: usize = 0;
// Convert bytes to RESP.
bytes_to_resp(&buffer, &mut index).map_err(|e| ConnectionError::MalformedRESP(e.to_string()))
}We can put the two functions stream_write_resp and stream_read_resp into a single function stream_send_receive_resp
src/connection.rs #[derive(Debug)]
pub enum ConnectionError {
CannotReadFromStream(String),
CannotWriteToStream(String),
MalformedRESP(String),
RequestFailed(String, String),
ServerError(ServerError),
}
impl fmt::Display for ConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionError::CannotReadFromStream(string) => {
write!(f, "Cannot read from stream: {}.", string)
}
ConnectionError::CannotWriteToStream(string) => {
write!(f, "Cannot write to stream: {}.", string)
}
ConnectionError::MalformedRESP(string) => {
write!(f, "Cannot convert bytes to RESP: {}.", string)
}
ConnectionError::RequestFailed(request, e) => {
write!(f, "Request {} failed: {}.", request, e)
}
ConnectionError::ServerError(e) => {
write!(f, "{}", format!("Server error: {}", e))
}
}
}
}
...
// Write a RESP request to the stream and read the RESP response.
pub async fn stream_send_receive_resp(
stream: &mut TcpStream,
data: &RESP,
buffer: &mut [u8],
) -> ConnectionResult<RESP> {
// Write the RESP command to the stream.
stream_write_resp(stream, &data)
.await
.map_err(|e| ConnectionError::RequestFailed(data.to_string(), e.to_string()))?;
// Read the response in RESP format from the stream.
stream_read_resp(stream, buffer)
.await
.map_err(|e| ConnectionError::RequestFailed(data.to_string(), e.to_string()))
}and we can finally replace the code of handshake with a compact and readable alternative. The PING message first.
src/server.rs use crate::commands::{echo, get, info, ping, set};
use crate::connection::ConnectionMessage;
use crate::connection::{stream_send_receive_resp, ConnectionMessage};
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::resp::bytes_to_resp;
use crate::server_result::{ServerError, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use tokio::{io::AsyncWriteExt, net::TcpStream};
...
pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Create a buffer to contain the response bytes.
let mut buffer = [0; 512];
// Handshake: PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
...
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
ping.to_string(),
resp.to_string()
)));
};
/////////////////////////////////////
// Send PING
// Prepare the RESP PING command.
let ping = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
// Send the command and read the response.
let resp = stream_send_receive_resp(stream, &ping, &mut buffer)
.await
.map_err(|e| ServerError::HandshakeFailed(e.to_string()))?;
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("PONG")) {
return Err(ServerError::HandshakeFailed(String::from("PING failed")));
};Please note that the imports change slightly as some modules are not needed any more. Such unused imports can be easily spotted running cargo build.
We can then simplify the first REPLCONF messages.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Handshake: REPLCONF
// Prepare the RESP REPLCONF command.
let replconf = RESP::Array(vec![
RESP::BulkString(String::from("REPLCONF")),
RESP::BulkString(String::from("listening-port")),
RESP::BulkString(info.port.to_string()),
]);
...
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};
/////////////////////////////////////
// Send REPLCONF listening-port xxx
// Prepare the RESP REPLCONF command.
let replconf = RESP::Array(vec![
RESP::BulkString(String::from("REPLCONF")),
RESP::BulkString(String::from("listening-port")),
RESP::BulkString(info.port.to_string()),
]);
// Send the command and read the response.
let resp = stream_send_receive_resp(stream, &replconf, &mut buffer)
.await
.map_err(|e| ServerError::HandshakeFailed(e.to_string()))?;
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};The second REPLCONF messages.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Handshake: REPLCONF
// Prepare the RESP REPLCONF command.
let replconf = RESP::Array(vec![
RESP::BulkString(String::from("REPLCONF")),
RESP::BulkString(String::from("capa")),
RESP::BulkString(String::from("psync2")),
]);
...
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};
/////////////////////////////////////
// Send REPLCONF capa psync2
// Prepare the RESP REPLCONF command.
let replconf = RESP::Array(vec![
RESP::BulkString(String::from("REPLCONF")),
RESP::BulkString(String::from("capa")),
RESP::BulkString(String::from("psync2")),
]);
// Send the command and read the response.
let resp = stream_send_receive_resp(stream, &replconf, &mut buffer)
.await
.map_err(|e| ServerError::HandshakeFailed(e.to_string()))?;
// Check that the response is correct.
if resp != RESP::SimpleString(String::from("OK")) {
return Err(ServerError::HandshakeFailed(format!(
"Sending {} - Wrong server answer: {}",
replconf.to_string(),
resp.to_string()
)));
};Finally, we can simplify PSYNC.
src/server.rs pub async fn handshake(stream: &mut TcpStream, info: &ServerInfo) -> ServerResult {
// Handshake: PSYNC
// Prepare the RESP PSYNC command.
let psync = RESP::Array(vec![
RESP::BulkString(String::from("PSYNC")),
RESP::BulkString(String::from("?")),
RESP::BulkString(String::from("-1")),
]);
// Send the PSYNC command.
stream
.write_all(psync.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
replconf.to_string(),
e.to_string()
))
})?;
/////////////////////////////////////
// Send PSYNC ? -1
// Prepare the RESP PSYNC command.
let psync = RESP::Array(vec![
RESP::BulkString(String::from("PSYNC")),
RESP::BulkString(String::from("?")),
RESP::BulkString(String::from("-1")),
]);
// Send the PSYNC command.
stream
.write_all(psync.to_string().as_bytes())
.await
.map_err(|e| {
ServerError::HandshakeFailed(format!(
"Sending {} - Cannot write to stream: {}",
replconf.to_string(),
e.to_string()
))
})?;Replication Stage 7: Send handshake (3/3)
As this was a mere refactoring, the code we wrote in this section still passes Replication - Stage 7 of the CodeCrafters challenge.
So far, we implemented the handshake process purely on the replica's side. In the next two steps, the challenge asks us to work on the server, in order to make that part of our code capable of carrying out the handshake.
Our code already supports PING and answers correctly. So, in this step we need to add support for the command REPLCONF. At this stage, we don't need to properly process the two arguments listening-port and capa, and we can safely just respond with the simple string OK.
First, let's implement the command.
src/commands/replconf.rs use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::ServerValue;
pub async fn command(_server: &Server, request: &Request, _command: &Vec<String>) {
request
.data(ServerValue::RESP(RESP::SimpleString("OK".to_string())))
.await;
}We can add two tests for this function.
src/commands/replconf.rs #[cfg(test)]
mod tests {
use super::*;
use crate::server_result::ServerMessage;
use tokio::sync::mpsc;
#[tokio::test]
// Test that the function command processes
// a `REPLCONF listening-port` request
// and that it responds with the correct value.
async fn test_command_replconf_listening_port() {
let cmd = vec![
String::from("replconf"),
String::from("listening-port"),
String::from("1234"),
];
let server = Server::new("localhost".to_string(), 6379);
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Null,
sender: connection_sender,
};
command(&server, &request, &cmd).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("OK"))))
)
}
#[tokio::test]
// Test that the function command processes
// a `REPLCONF capa` request and
// that it responds with the correct value.
async fn test_command_replconf_capa() {
let cmd = vec![
String::from("replconf"),
String::from("capa"),
String::from("psync"),
];
let server = Server::new("localhost".to_string(), 6379);
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Null,
sender: connection_sender,
};
command(&server, &request, &cmd).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("OK"))))
)
}
}This file needs to be first added to the module
src/commands/mod.rs pub mod echo;
pub mod get;
pub mod info;
pub mod ping;
pub mod replconf;
pub mod set;and then imported and used as a server command
src/server.rs use crate::commands::{echo, get, info, ping, set};
use crate::commands::{echo, get, info, ping, replconf, set};
use crate::connection::{stream_send_receive_resp, ConnectionMessage};
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::server_result::{ServerError, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{io::AsyncWriteExt, net::TcpStream};
...
pub async fn process_request(request: Request, server: &mut Server) {
// Process the request using the requested command.
match command_name.as_str() {
"echo" => {
echo::command(server, &request, &command).await;
}
"get" => {
get::command(server, &request, &command).await;
}
"info" => {
info::command(server, &request, &command).await;
}
"ping" => {
ping::command(server, &request, &command).await;
}
"replconf" => {
replconf::command(server, &request, &command).await;
}
"set" => {
set::command(server, &request, &command).await;
}
_ => {
request
.error(ServerError::CommandNotAvailable(command[0].clone()))
.await;
}
}Replication Stage 8: Receive handshake (1/2)
The code we wrote in this section passes Replication - Stage 8 of the CodeCrafters challenge.
In this step we need to implement the initial version of command PSYNC. When the master receives PSYNC ? -1 it should respond with its replication ID that we created in one of the first steps of this chapter.
The implementation doesn't deviate much from the REPLCONF one
src/commands/psync.rs use crate::request::Request;
use crate::resp::RESP;
use crate::server::Server;
use crate::server_result::ServerValue;
pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
// Reset the master replication offset.
server.replication.repl_offset = 0;
// Create the FULLRESYNC message.
let resp = ServerValue::RESP(RESP::SimpleString(format!(
"FULLRESYNC {} {}",
server.replication.replid.clone(),
server.replication.repl_offset.to_string()
)));
request.data(resp).await;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::server_result::{ServerMessage, ServerValue};
use tokio::sync::mpsc;
#[tokio::test]
// Test that the function command can send
// a `FULLRESYNC` request and that the server
// responds with the correct value.
async fn test_command() {
let mut server = Server::new("localhost".to_string(), 6379);
let cmd = vec![String::from("psync"), String::from("?"), String::from("-1")];
let (request_channel_tx, mut request_channel_rx) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Null,
sender: request_channel_tx.clone(),
};
server.replication.replid = String::from("some_repl_id");
server.replication.repl_offset = 1234;
command(&mut server, &request, &cmd).await;
assert_eq!(
request_channel_rx.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from(
"FULLRESYNC some_repl_id 0"
))))
);
}
}As we did before, to enable the command we need to add it to the module
src/commands/mod.rs pub mod echo;
pub mod get;
pub mod info;
pub mod ping;
pub mod psync;
pub mod replconf;
pub mod set;and then to the request routing
src/server.rs use crate::commands::{echo, get, info, ping, replconf, set};
use crate::commands::{echo, get, info, ping, psync, replconf, set};
use crate::connection::{stream_send_receive_resp, ConnectionMessage};
use crate::replication::ReplicationConfig;
use crate::request::Request;
use crate::resp::bytes_to_resp;
use crate::server_result::{ServerError, ServerResult, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
...
pub async fn process_request(request: Request, server: &mut Server) {
// Process the request using the requested command.
match command_name.as_str() {
"echo" => {
echo::command(server, &request, &command).await;
}
"get" => {
get::command(server, &request, &command).await;
}
"info" => {
info::command(server, &request, &command).await;
}
"ping" => {
ping::command(server, &request, &command).await;
}
"psync" => {
psync::command(server, &request, &command).await;
}
"replconf" => {
replconf::command(server, &request, &command).await;
}
"set" => {
set::command(server, &request, &command).await;
}
_ => {
request
.error(ServerError::CommandNotAvailable(command[0].clone()))
.await;
}
}Replication Stage 9: Receive handshake (2/2)
The code we wrote in this section passes Replication - Stage 9 of the CodeCrafters challenge.
After a full resync, the Redis master sends the replica a full copy of its data. This comes in a custom binary form called RDB, which we don't need to implement to solve the challenge. We are given a binary string that contains an empty database and that we can use directly.
The interesting problem we need to solve here is that the RDB file is sent in the format
$<length_of_file>\r\n<binary_content_of_file>which is very similar to the format of RESP bulk strings, but not exactly the same: unlike RESP bulk strings, the RDB payload is not terminated by \r\n after the binary data, because the length prefix is enough to know where the content ends.
Let's start defining a way for the server to return the content in RDB format.
src/server.rs impl Server {
pub fn generate_rdb(&self) -> Vec<u8> {
let v: Vec<u8> = vec![
0x52, 0x45, 0x44, 0x49, 0x53, 0x30, 0x30, 0x31, 0x31, 0xfa, 0x09, 0x72, 0x65, 0x64,
0x69, 0x73, 0x2d, 0x76, 0x65, 0x72, 0x05, 0x37, 0x2e, 0x32, 0x2e, 0x30, 0xfa, 0x0a,
0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x62, 0x69, 0x74, 0x73, 0xc0, 0x40, 0xfa, 0x05,
0x63, 0x74, 0x69, 0x6d, 0x65, 0xc2, 0x6d, 0x08, 0xbc, 0x65, 0xfa, 0x08, 0x75, 0x73,
0x65, 0x64, 0x2d, 0x6d, 0x65, 0x6d, 0xc2, 0xb0, 0xc4, 0x10, 0x00, 0xfa, 0x08, 0x61,
0x6f, 0x66, 0x2d, 0x62, 0x61, 0x73, 0x65, 0xc0, 0x00, 0xff, 0xf0, 0x6e, 0x3b, 0xfe,
0xc0, 0xff, 0x5a, 0xa2,
];
v
}It makes sense in this context for generate_rdb to be a method of Server, but the code could be part of a simple function that accepts the server just as we do for the commands.
To be able to send the RDB content we need a way to convert it into the given format. One way to do this is to first send the prefix $<length_of_file>\r\n and then the whole binary content. The reason behind this choice is that the prefix is clearly RESP data, which starts with a prefix and ends with \r\n, while the binary data is just binary and doesn't have any terminating sequence.
To implement this strategy we first need to create a type for the RDB prefix
src/resp.rs #[derive(Debug, PartialEq)]
pub enum RESP {
Array(Vec<RESP>),
BulkString(String),
Null,
RDBPrefix(usize),
SimpleString(String),
}
impl fmt::Display for RESP {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let data = match self {
Self::Array(data) => {
let mut output = String::from("*");
output.push_str(format!("{}\r\n", data.len()).as_str());
for elem in data.iter() {
output.push_str(elem.to_string().as_str());
}
output
}
Self::BulkString(data) => format!("${}\r\n{}\r\n", data.len(), data),
Self::Null => String::from("$-1\r\n"),
Self::RDBPrefix(data) => format!("${}\r\n", data.to_string()),
Self::SimpleString(data) => format!("+{}\r\n", data),
};
write!(f, "{}", data)
}
}This allows us to send the first part of the RDB, the length of the binary content.
src/commands/psync.rs pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
// Reset the master replica offset.
server.replication.repl_offset = 0;
// Create the FULLRESYNC message.
let resp = ServerValue::RESP(RESP::SimpleString(format!(
"FULLRESYNC {} {}",
server.replication.replid.clone(),
server.replication.repl_offset.to_string()
)));
request.data(resp).await;
// Generate the RDB data for the server.
let rdb = server.generate_rdb();
// Calculate the length of the RDB data.
let rdb_len = RESP::RDBPrefix(rdb.len());
// Send the RDB length.
request.data(ServerValue::RESP(rdb_len)).await;
// Send the RDB data.
request.data(ServerValue::Binary(rdb)).await;
}To send the actual bytes we need to define a new variant of ServerValue that contains binary data.
src/server_result.rs #[derive(Debug, PartialEq)]
pub enum ServerValue {
None,
RESP(RESP),
Binary(Vec<u8>),
}and since we are using that in handle_connection, the compiler will remind us that we need to cover it.
src/connection.rs pub async fn handle_connection(
loop {
// A response arrived from the server.
Some(response) = connection_receiver.recv() => {
let _ = match response {
ServerMessage::Data(ServerValue::RESP(v)) => stream.write_all(v.to_string().as_bytes()).await,
ServerMessage::Data(ServerValue::None) => Ok(()),
ServerMessage::Data(ServerValue::Binary(data)) => stream.write_all(&data).await,
ServerMessage::Error(e) => {
eprintln!("Error: {}", ConnectionError::ServerError(e));
return;
}
};
}After this change we need to actually send the binary part of the RDB file
src/commands/psync.rs pub async fn command(server: &mut Server, request: &Request, _command: &Vec<String>) {
// Reset the master replication offset.
server.replication.repl_offset = 0;
// Create the FULLRESYNC message.
let resp = ServerValue::RESP(RESP::SimpleString(format!(
"FULLRESYNC {} {}",
server.replication.replid.clone(),
server.replication.repl_offset.to_string()
)));
request.data(resp).await;
let rdb = server.generate_rdb();
let rdb_len = RESP::RDBPrefix(rdb.len());
request.data(ServerValue::RESP(rdb_len)).await;
request.data(ServerValue::Binary(rdb)).await;
}The RDB format is Redis's binary snapshot format, used for persistence and for the initial data transfer during replication. A full description is available in the RDB File Format documentation, which is an excellent resource if you want to implement a real RDB parser.
Here, we are hard-coding the bytes of an empty RDB file. Since it is worth knowing what they mean, here is a breakdown of the 88 bytes:
-------------------------------#
52 45 44 49 53 # Magic String "REDIS"
30 30 31 31 # RDB Version Number as ASCII string. "0011" = 11
-------------------------------#
fa # Auxiliary field (Redis strings)
09 # Length of the string in bytes (9)
72 65 64 69 73 2d 76 65 72 # String "redis-ver"
05 # Length of the string in bytes (5)
37 2e 32 2e 30 # String "7.2.0"
-------------------------------#
fa # Auxiliary field (Redis strings)
0a # Length of the string in bytes (10)
72 65 64 69 73 2d 62 69 74 73 # String "redis-bits"
c0 # 11xxxxxx special encoding
# 11000000 = next 1 byte is an 8-bit integer
40 # 0x40 = 64
-------------------------------#
fa # Auxiliary field (Redis strings)
05 # Length of the string in bytes (5)
63 74 69 6d 65 # String "ctime"
c2 # 11xxxxxx special encoding
# 11000010 = next 4 bytes are a 32-bit little-endian integer
6d 08 bc 65 # Little endian encoding = 0x65BC086D = 1706821741
# Unix timestamp = "2024-02-01 21:09:01 UTC"
-------------------------------#
fa # Auxiliary field (Redis strings)
08 # Length of the string in bytes (8)
75 73 65 64 2d 6d 65 6d # String "used-mem"
c2 # 11xxxxxx special encoding
# 11000010 = next 4 bytes are a 32-bit little-endian integer
b0 c4 10 00 # Little endian encoding = 0x0010C4B0 = 1098928 (~ 1 MiB)
-------------------------------#
fa # Auxiliary field (Redis strings)
08 # Length of the string in bytes (8)
61 6f 66 2d 62 61 73 65 # String "aof-base"
c0 # 11xxxxxx special encoding
# 11000000 = next 1 byte is an 8-bit integer
00 # 0x00 = 0
-------------------------------#
ff # End of RDB file indicator
f0 6e 3b fe c0 ff 5a a2 # CRC64 checksum of the entire fileReplication Stage 10: Empty RDB Transfer
The code we wrote in this section passes Replication - Stage 10 of the CodeCrafters challenge.