#[derive(Debug, PartialEq)]
pub enum ServerError {}You work like a slave for that idiot actor who won't give you a penny.
The code we developed so far is a basic (but working!) implementation of a remote key/value store. We developed an internal core that manages data and a request processor that can route commands, and with such devices we can implement many other services.
However, the component that manages the key expiry mechanism showed us that not everything in the system works according to the traditional request/response logic. For example, we can have components triggered by timers and, more generally, parts of the system that respond to different events than the mere incoming client request.
The current architecture of the system is not ideal for such a task. For starters, the core of the system is the loop that runs select! in the function main, and adding more components would quickly lead to a version of the function that contains too many arms. While this is not a problem in terms of performance, it makes the system increasingly difficult to maintain and understand.
A second and more significant problem is connected with the Rust ownership model. Remember that in Rust, every time we call a function passing a variable, the function takes ownership of it.
Because of that, in a concurrent scenario we need to wrap shared resources with an Arc<Mutex<T>> and clone the reference before passing the value to a function, which will try to lock the underlying resource before accessing it.
Which resources are we discussing? In general, we need to pass relevant parts of the state of the system to every function we run in an asynchronous task. So far, the only shared resource we are managing is the storage, but in the future the complexity will increase. For example, to implement replication we will have to pass the list of connected replicas, and to implement transactions we will need to know if a client is currently in a transaction.
Overall, the Arc<Mutex<T>> pattern works very well for small systems, where the state passed to each function is compact. For more complex cases, we need a different approach. This is, specifically, the actor model.
The idea of actors is extremely simple: they are independent components of an application that communicate with each other through messages. Introducing a message-based communication layer is one of the best strategies to promote decoupling between parts of a system, which is exactly what we need here.
Ultimately, actors and messaging systems inside a single application are yet another implementation of the concept of service and API, which you can find at different scales in any computer system (e.g. cloud computing, microservices, REST APIs).
When the system is run by actors, there is no need to use the Arc<Mutex<T>> model. A specific actor will be the unique owner of a certain resource, and other tasks can access the latter through messages. As you can see, this fits the Rust ownership model perfectly.
In this chapter, we will refactor the existing system into a concurrent server based on actors, and we will learn to use some components of the Tokio runtime that simplify our job. In a later chapter we will use the new structure to implement replication.
The overall architecture we are going to implement is shown in the following picture.

The storage is owned by the server, and the two interact via internal method calls. There is no need to lock the storage as the server is a single entity and won't therefore access the storage concurrently.
The server receives messages from other components, wrapped in a convenient structure Request. These messages will travel between Rust actors through Tokio channels, and are represented by specific Rust types.
At the moment, the only components that will send such messages are the Connection Handlers (one per client), but the picture shows that in the future there might be other components that implement different functionalities.
Connection Handlers receive RESP commands in binary form through a network connection, and their task is to convert them into Request messages. It's a good idea to decouple the interface we show to the client (RESP commands) from the interface the server uses internally.
The flexibility of the system will be evident once we tackle replication and transactions, adding new components to interact with the server to extend its functionalities.
To implement the new architecture we need two main components: a way to run actors and a way to exchange messages between them.
In this scenario, actors are just asynchronous tasks, so the way to run them is the asynchronous runtime, that in our case is Tokio. In particular, tokio::spawn [docs] will be the way to create actors. Rust is not an object-oriented programming language, so actors will always be functions.
As for messages, Tokio allows us to create channels that can be used to send and receive data to and from an actor (or any other part of a system). There are two main types of channels that can be created in Tokio. One-shot channels with tokio::sync::oneshot [docs] and multi-producer/single-consumer channels (MPSC) with tokio::sync::mpsc [docs]. We can consider channels like queues shared between asynchronous tasks (and thus among threads).
Tokio channels are always one way. A sender can push messages to the receiver, but the channel doesn't offer the latter a way to reply.
One-shot channels are a way to send a single message between a producer and a consumer. In a scenario where an actor doesn't keep track of the components that connect to it, it's useful to use one-shot messages. Each message sent to the server will contain the one-shot channel that can be used to deliver responses. This mechanism is similar to that of pre-printed envelopes used in postal ballots, where you receive documents and the envelope that you have to use to send them back.
MPSC channels, on the other hand, are useful to establish a more permanent communication route. They allow us to create multiple senders (producers) and a single receiver (consumer), but they are perfect also for a scenario with a single sender.
For reasons that will be clear in the next sections, in this book we will mostly use MPSC channels. We will however retain the idea of sending messages that contain the response channel.
An MPSC channel is made of a Sender [docs] and a Receiver [docs].
Since MPSC channels have multiple producers the sender can be cloned [docs], while the receiver cannot.
In the rest of the book, we will stick to the following naming scheme for channels:
COMPONENT_sender is the cloneable mpsc::Sender used to send a message to COMPONENTCOMPONENT_receiver is the mpsc::Receiver used by COMPONENT to receive messages.To convert the server into an actor we will go through the following steps:
select! to begin the transition to an asynchronous system.Request type to communicate with the server.The steps are (hopefully) small enough to be understandable. The code will be in a working state that can be compiled and used until we turn the server into an actor (step 5). That and the following step have been split for clarity’s sake, at the cost of having a stage where the system doesn't work properly.
Let's start, as usual, taking care of result types and errors. In a previous chapter, we put ServerError aside when we isolated the storage. It's time to bring it back, but for the time being we don't have any specific variants to add to the enum.
src/server_result.rs #[derive(Debug, PartialEq)]
pub enum ServerError {}We also need to add the file as a module.
src/main.rs mod resp;
mod resp_result;
mod server;
mod server_result;
mod set;
mod storage;
mod storage_result;The connection handler receives the client's requests in RESP binary format and needs to send them to the server. It makes sense to wrap this piece of information in a struct Request to capture all the data that we want to exchange. At the moment we want to send the RESP enum variant and a sender for the response, but in the future we will need to add more fields like for example the original bytes received from the client.
First of all, the connection handler will send messages to the server, and we need a type to represent them.
src/connection.rs use crate::request::Request;
#[derive(Debug)]
pub enum ConnectionMessage {
Request(Request),
}The reason for the abstraction is that in the future we might want to send other types of data, such as internal commands.
As explained before, the Request should contain the client's RESP request, but also the channel used to send a response. There is no such entity as "the channel", though, and the only way to access it is through a Sender. Remember the metaphor of the postal ballot envelope.
src/request.rs use crate::{resp::RESP, server_result::ServerMessage};
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct Request {
pub value: RESP,
pub sender: mpsc::Sender<ServerMessage>,
}The sender will carry a response, that is a message that comes from the server, so let's define ServerMessage. Currently, any request sent to the server produces either a response in RESP binary format or an error.
src/server_result.rs use crate::resp::RESP;
use std::fmt;
#[derive(Debug, PartialEq)]
pub enum ServerError {
CommandError,
}
impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ServerError::CommandError => write!(f, "Error while processing!"),
}
}
}
pub type ServerResult<T> = Result<T, ServerError>;
#[derive(Debug)]
pub enum ServerMessage {
Data(RESP),
Error(ServerError),
}It's in general a good idea to abstract messages with enums. They are a powerful way to express data types that can evolve in the future without affecting the function prototypes that have been already developed.
Finally, let's add the new files to the list of modules.
src/main.rs mod connection;
mod request;
mod resp;
mod resp_result;
mod server;
mod server_result;
mod set;
mod storage;
mod storage_result;In this section we will refactor the connection handler to use the macro select! instead of awaiting directly stream.read. Being a refactor, we are not going to change the behaviour, but we are preparing the handler to host multiple asynchronous actions. This will be useful later when we receive messages sent by the server.
The refactoring is straightforward. The core difference is that instead of
match stream.read(&mut buffer).await {we will have
select! {
result = stream.read(&mut buffer) => {
match result {The complete change is
src/main.rs use crate::resp::{bytes_to_resp, RESP};
use crate::server::process_request;
use crate::storage::Storage;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
};
...
// The main entry point for valid TCP connections.
async fn handle_connection(mut stream: TcpStream, storage: Arc<Mutex<Storage>>) {
// Create a buffer to host incoming data.
let mut buffer = [0; 512];
loop {
// Read from the stream into the buffer.
match stream.read(&mut buffer).await {
select! {
// Data is available in the stream, read it.
result = stream.read(&mut buffer) => {
// Check if the incoming data is valid
// and act accordingly.
match result {
// If the stream returned some data,
// process the request.
Ok(size) if size != 0 => {
// Initialise the index to start at the
// beginning of the buffer.
let mut index: usize = 0;
// Process the bytes in the buffer according to
// the content and extract the request. Update the index.
let request = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
Ok(v) => v,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
};
// Process the request.
let response = match process_request(request, storage.clone()) {
Ok(v) => v,
Err(e) => {
eprintln!("Error parsing command: {}", e);
return;
}
};
// Write the response to the stream.
if let Err(e) = stream.write_all(response.to_string().as_bytes()).await {
eprintln!("Error writing to socket: {}", e);
}
}
// If the stream returned no data
// the connection has been closed.
Ok(_) => {
println!("Connection closed");
break;
}
Err(e) => {
println!("Error: {}", e);
break;
}
}
}
}
}
}This step involves more changes than the previous ones, but there is no additional complexity. At the moment the connection handler calls process_request passing the RESP request directly, but we will change it in order to use a Request.
This change prepares the system for a later step, where we won't call process_request directly any more, but will instead send a message to the server.
Let's start with the connection handler. Since we need to create a Request, we also need to create a channel that will send and receive messages of type ServerMessage.
src/main.rs use crate::request::Request;
use crate::resp::{bytes_to_resp, RESP};
use crate::server::process_request;
use crate::storage::Storage;
use server_result::ServerMessage;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
// The main entry point for valid TCP connections.
async fn handle_connection(mut stream: TcpStream, storage: Arc<Mutex<Storage>>) {
// Create a buffer to host incoming data.
let mut buffer = [0; 512];
// Create the MPSC channel to communicate with the connection.
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
loop {
select! {
// Data is available in the stream, read it.
result = stream.read(&mut buffer) => {
// Check if the incoming data is valid
// and act accordingly.
match result {
// If the stream returned some data,
// process the request.
Ok(size) if size != 0 => {
// Initialise the index to start at the
// beginning of the buffer.
let mut index: usize = 0;
// Process the bytes in the buffer according to
// the content and extract the request. Update the index.
let request = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
let resp = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
Ok(v) => v,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
};
// Create the request.
let request = Request {
value: resp,
sender: connection_sender.clone(),
};
// Process the request.
let response = match process_request(request, storage.clone()) {
Ok(v) => v,
Err(e) => {
eprintln!("Error parsing command: {}", e);
return;
}
};
...Now we need to change process_request accordingly and adjust the tests. The function can be fixed very quickly.
src/server.rs use crate::request::Request;
use crate::storage::Storage;
use crate::storage_result::{StorageError, StorageResult};
use crate::RESP;
use std::sync::{Arc, Mutex};
...
// Process an incoming request and return a result.
pub fn process_request(request: RESP, storage: Arc<Mutex<Storage>>) -> StorageResult<RESP> {
pub fn process_request(request: Request, storage: Arc<Mutex<Storage>>) -> StorageResult<RESP> {
// Check if the request is expressed using
// a RESP array and extract the elements.
let elements = match request {
let elements = match request.value {
RESP::Array(v) => v,
_ => {
return Err(StorageError::IncorrectRequest);
}
};
...Testing this function now requires a Request, which means that we also need to create the MPSC channels in the tests.
src/server.rs #[cfg(test)]
mod tests {
use super::*;
use crate::server_result::ServerMessage;
use tokio::sync::mpsc;
#[test]
// Test that the function process_request
// processes a PING request and that it
// responds with a PONG.
fn test_process_request_ping() {
let request = RESP::Array(vec![RESP::BulkString(String::from("PING"))]);
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Arc::new(Mutex::new(Storage::new()));
let output = process_request(request, storage).unwrap();
assert_eq!(output, RESP::SimpleString(String::from("PONG")));
}
#[test]
// Test that the function process_request
// returns the correct error when it is
// given a request that doesn't contain
// a RESP array.
fn test_process_request_not_array() {
let request = RESP::BulkString(String::from("PING"));
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::BulkString(String::from("PING")),
sender: connection_sender,
};
let storage = Arc::new(Mutex::new(Storage::new()));
let error = process_request(request, storage).unwrap_err();
assert_eq!(error, StorageError::IncorrectRequest);
}
#[test]
// Test that the function process_request
// returns the correct error when it is
// given a request that contains a RESP array
// but the content of the array is not
// a bulk string.
fn test_process_request_not_bulkstrings() {
let request = RESP::Array(vec![RESP::SimpleString(String::from("PING"))]);
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![RESP::SimpleString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Arc::new(Mutex::new(Storage::new()));
let error = process_request(request, storage).unwrap_err();
assert_eq!(error, StorageError::IncorrectRequest);
}
#[test]
// Test that the function process_request
// processes an ECHO request and that it
// responds with a copy of the input.
fn test_process_request_echo() {
let request = RESP::Array(vec![
RESP::BulkString(String::from("ECHO")),
RESP::BulkString(String::from("42")),
]);
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![
RESP::BulkString(String::from("ECHO")),
RESP::BulkString(String::from("42")),
]),
sender: connection_sender,
};
let storage = Arc::new(Mutex::new(Storage::new()));
let output = process_request(request, storage).unwrap();
assert_eq!(output, RESP::BulkString(String::from("42")));
}
}At this point we can convert the server into a full-fledged actor. This clearly involves the connection handler as well, but in this step we will focus solely on the server. In the next step we will fix the connection handler and make sure both ends of the communication work properly. For this reason, at the end of this section the code won't work.
Let's create a structure to capture the information the server needs to manage.
src/server.rs pub struct Server {
pub storage: Option<Storage>,
}
impl Server {
pub fn new() -> Self {
Self { storage: None }
}
pub fn set_storage(&mut self, storage: Storage) {
self.storage = Some(storage);
}
}As you can see, this is the core of the idea behind actors: the server owns the storage. In this implementation the server can be initialised without the storage, but this is just a stylistic choice that doesn't impact the way the actor works.
The next change introduces the actor itself, which is a function called run_server
src/server.rs use crate::connection::ConnectionMessage;
use crate::request::Request;
use crate::storage::Storage;
use crate::storage_result::{StorageError, StorageResult};
use crate::RESP;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
...
// Listen for incoming messages and process the contained requests.
pub async fn run_server(mut server: Server, mut crx: mpsc::Receiver<ConnectionMessage>) {
loop {
tokio::select! {
// Keep listening for messages on the incoming channel.
Some(message) = crx.recv() => {
match message {
// If the message contains a request, extract it.
ConnectionMessage::Request(request) => {
// Process the request.
process_request(request, &mut server).await;
}
}
}
}
}
}As you can see this is initialised with a Server and a channel that receives messages from the connection handler. The body is an infinite loop that fetches a message, extracts the Request, and runs process_request on it.
Speaking of which, the function process_request needs to be changed as well, since we are now passing directly a reference to Server instead of an Arc<Mutex<Storage>>.
src/server.rs // Process an incoming request and return a result.
pub fn process_request(request: Request, storage: Arc<Mutex<Storage>>) -> StorageResult<RESP> {
pub async fn process_request(request: Request, server: &mut Server) {
// Check if the request is expressed using
// a RESP array and extract the elements.
let elements = match request.value {
let elements = match &request.value {
RESP::Array(v) => v,
_ => {
return Err(StorageError::IncorrectRequest);
panic!()
}
};
// The vector that contains all the commands we need to process.
let mut command = Vec::new();
// Check that each element of the array is a
// bulk string, extract the content, and add it
// to the vector.
for elem in elements.iter() {
match elem {
RESP::BulkString(v) => command.push(v.clone()),
_ => {
return Err(StorageError::IncorrectRequest);
panic!()
}
}
}
// Acquire a lock on the storage.
let mut guard = storage.lock().unwrap();
// Process the command contained in the request.
let response = guard.process_command(&command);
// Return the response.
response
let storage = match server.storage.as_mut() {
Some(storage) => storage,
None => panic!(),
};
// Process the command contained in the request.
let _response = storage.process_command(&command);
}There are other important changes in the function. The first one is that the code to return an error containing StorageError::IncorrectRequest has been removed and replaced with panic!(). The reason is that we haven't set up the other half of the system that receives messages yet, so we don't have any other way to signal an error. panic!() is a good placeholder for now.
The second change is in the logic at the bottom of the function. We don't need to lock the storage any more as the server is the sole owner of that resource. The response is not returned by the function as it was previously. Once again, we need a mechanism to return the response to the connection handler, to be implemented in the next step.
As we created a new type, it's a good idea to have tests, so we'll create test_create_new and test_set_storage. As process_request at the moment doesn't output the response in any meaningful way, we should also remove the remaining tests
src/server.rs #[cfg(test)]
mod tests {
use super::*;
use crate::server_result::ServerMessage;
use tokio::sync::mpsc;
#[test]
// Test that the function process_request
// processes a PING request and that it
// responds with a PONG.
fn test_process_request_ping() {
...
}
#[test]
// Test that the function process_request
// returns the correct error when it is
// given a request that doesn't contain
// a RESP array.
fn test_process_request_not_array() {
...
}
#[test]
// Test that the function process_request
// returns the correct error when it is
// given a request that contains a RESP array
// but the content of the array is not
// a bulk string.
fn test_process_request_not_bulkstrings() {
...
}
#[test]
// Test that the function process_request
// processes an ECHO request and that it
// responds with a copy of the input.
fn test_process_request_echo() {
...
}
#[test]
fn test_create_new() {
let server: Server = Server::new();
match server.storage {
Some(_) => panic!(),
None => (),
};
}
#[test]
fn test_set_storage() {
let storage = Storage::new();
let mut server: Server = Server::new();
server.set_storage(storage);
match server.storage {
Some(_) => (),
None => panic!(),
};
}
}Now that the server has been converted, we need to change the connection handler so that it uses a message channel to send requests.
The function handle_connection doesn't need to receive an Arc<Mutex<Storage>> any more, but will receive a sender for ConnectionMessage entities.
The call to process_request is replaced by a call to server_sender.send, and since the latter doesn't directly return a response, the code that managed the error has been removed. Once again, this will be completed in the next step.
src/main.rs // The main entry point for valid TCP connections.
async fn handle_connection(mut stream: TcpStream, storage: Arc<Mutex<Storage>>) {
async fn handle_connection(mut stream: TcpStream, server_sender: mpsc::Sender<ConnectionMessage>) {
// Create a buffer to host incoming data.
let mut buffer = [0; 512];
// Create the MPSC channel to communicate with the connection.
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
loop {
select! {
// Data is available in the stream, read it.
result = stream.read(&mut buffer) => {
// Check if the incoming data is valid
// and act accordingly.
match result {
// If the stream returned some data,
// process the request.
Ok(size) if size != 0 => {
// Initialise the index to start at the
// beginning of the buffer.
let mut index: usize = 0;
// Process the bytes in the buffer according to
// the content and extract the request. Update the index.
let resp = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
Ok(v) => v,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
};
// Create the request.
let request = Request {
value: resp,
sender: connection_sender.clone(),
};
// Process the request.
let response = match process_request(request, storage.clone()) {
Ok(v) => v,
Err(e) => {
eprintln!("Error parsing command: {}", e);
return;
}
};
// Write the response to the stream.
if let Err(e) = stream.write_all(response.to_string().as_bytes()).await {
eprintln!("Error writing to socket: {}", e);
}
// Send the request to the server.
match server_sender.send(ConnectionMessage::Request(request)).await {
Ok(()) => {},
Err(e) => {
eprintln!("Error sending request: {}", e);
return;
}
}
}
// If the stream returned no data
// the connection has been closed.
Ok(_) => {
println!("Connection closed");
break;
}
Err(e) => {
println!("Error: {}", e);
break;
}
}
}
}
}
}As handle_connection changed its prototype, we need to change the code of main accordingly. As you can see, we also create the channel that will be used to send messages to the server.
src/main.rs use crate::request::Request;
use crate::resp::{bytes_to_resp, RESP};
use crate::server::process_request;
use crate::storage::Storage;
use connection::ConnectionMessage;
use server_result::ServerMessage;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::AsyncReadExt,
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create the TCP listener, bound to the
// standard Redis port.
let listener = TcpListener::bind("127.0.0.1:6379").await?;
// Create a storage and protect it against concurrency issues.
let storage = Arc::new(Mutex::new(Storage::new()));
// Create a timer that expires every 10 milliseconds.
let mut interval_timer = tokio::time::interval(Duration::from_millis(10));
// Create the MPSC channel to communicate with the server.
let (server_sender, _) = mpsc::channel::<ConnectionMessage>(32);
loop {
// Process each incoming connection.
tokio::select! {
// Process a new connection.
connection = listener.accept() => {
match connection {
// The connection is valid, handle it.
Ok((stream, _)) => {
// Spawn a task to take care of this connection.
tokio::spawn(handle_connection(stream, storage.clone()));
tokio::spawn(handle_connection(stream, server_sender.clone()));
}
Err(e) => {
println!("Error: {}", e);
continue;
}
}
}
// Process the expired timer.
_ = interval_timer.tick() => {
tokio::spawn(expire_keys(storage.clone()));
}
}
}
}In the current state, the code cannot work properly because the function process_request doesn't have any way to send its response back to the connection handler. Previously, the function was called directly, so it was just a matter of awaiting its completion and reading the output value.
In this new configuration, however, the function is called through a message, so there is no immediate return value. The most natural way to send back the response is through a message from the server, and to do that we need to modify the connection handler to receive and process messages. We also need to run the actor itself, which in this case is the function run_server.
Let's start as usual defining a type to represent errors in the connection.
src/connection.rs use crate::request::Request;
use crate::{request::Request, server_result::ServerError};
use std::fmt;
#[derive(Debug)]
pub enum ConnectionError {
ServerError(ServerError),
}
impl fmt::Display for ConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionError::ServerError(e) => {
write!(f, "{}", format!("Server error: {}", e))
}
}
}
}
#[derive(Debug)]
pub enum ConnectionMessage {
Request(Request),
}At the moment there is only one possible error in the connection handler, which is an unsuccessful response from the server, represented here by the variant ConnectionError::ServerError.
The function handle_connection that represents the core of the connection handler has been already converted into a select! loop, so we just need to add a new arm that uses the connection_receiver
src/main.rs // The main entry point for valid TCP connections.
async fn handle_connection(mut stream: TcpStream, server_sender: mpsc::Sender<ConnectionMessage>) {
// Create a buffer to host incoming data.
let mut buffer = [0; 512];
// Create the MPSC channel to communicate with the connection.
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
loop {
select! {
// Data is available in the stream, read it.
result = stream.read(&mut buffer) => {
...
}
// A response arrived from the server.
Some(response) = connection_receiver.recv() => {
let _ = match response {
ServerMessage::Data(v) => stream.write_all(v.to_string().as_bytes()).await,
ServerMessage::Error(e) => {
eprintln!("Error: {}", ConnectionError::ServerError(e));
return;
}
};
}
}
}
}Here, connection_receiver.recv() is awaited by select! and when a response is detected it is written to the stream if successful and to the standard error otherwise. Where does this response come from? As you remember, in handle_connection we clone connection_sender and we store it into the request, so that the server can use it
src/server.rs use crate::connection::ConnectionMessage;
use crate::request::Request;
use crate::server_result::ServerMessage;
use crate::storage::Storage;
use crate::RESP;
use tokio::sync::mpsc;
...
// Process an incoming request and return a result.
pub async fn process_request(request: Request, server: &mut Server) {
let storage = match server.storage.as_mut() {
Some(storage) => storage,
None => panic!(),
};
// Process the command contained in the request.
let _response = storage.process_command(&command);
let response = storage.process_command(&command);
match response {
Ok(v) => {
request.sender.send(ServerMessage::Data(v)).await.unwrap();
}
Err(_e) => (),
}Please note that process_request is still heavily under construction, as we are not properly dealing with the error in the code above and still haven't replaced the panic!() calls.
The last change of this step is to run the server as an actor, so that the connection handler has something to exchange messages with. To do this we have to change the function main to spawn the actor
src/main.rs use crate::connection::ConnectionError;
use crate::request::Request;
use crate::resp::{bytes_to_resp, RESP};
use crate::storage::Storage;
use connection::ConnectionMessage;
use server::{run_server, Server};
use server_result::ServerMessage;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::{
io::AsyncReadExt,
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create the TCP listener, bound to the
// standard Redis port.
let listener = TcpListener::bind("127.0.0.1:6379").await?;
// Create a storage and protect it against concurrency issues.
let storage = Arc::new(Mutex::new(Storage::new()));
// Create a timer that expires every 10 milliseconds.
let mut interval_timer = tokio::time::interval(Duration::from_millis(10));
// Create and connect the storage and the server.
let storage = Storage::new();
let mut server = Server::new();
server.set_storage(storage);
// Create the MPSC channel to communicate with the server.
let (server_sender, _) = mpsc::channel::<ConnectionMessage>(32);
let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
// Spawn the server task.
tokio::spawn(run_server(server, server_receiver));
loop {
// Process each incoming connection.
tokio::select! {
// Process a new connection.
connection = listener.accept() => {
...
}
// Process the expired timer.
_ = interval_timer.tick() => {
tokio::spawn(expire_keys(storage.clone()));
}
}
}
}As you can see, the storage becomes part of the server and the server is run as an actor through tokio::spawn. The server receiver is now needed by run_server, so we store it in server_receiver. Please note that we also got rid of the second arm of select! that awaited interval_timer.tick() and of the initialisation of interval_timer. As the storage is now owned by the server it is not possible to work on it directly, and in the next step we will move the management of key expiry somewhere else.
With the latest changes, the system comes back to life, and we can once again pass the end to end tests. Active expiry is not working any more, but passive expiry does, and that's enough to make the test pass.
The code is clearly in a terrible state. Imports should be tidied up, unit tests have been deleted, and error management has been replaced by panic! calls. So, the plan for the next steps is to tidy it up.
Stage 7: Expiry
This version of the code passes Stage 7 of the CodeCrafters challenge, just like the version we had at the end of the previous chapter. This shows that our refactoring worked.
When we go through a major refactoring, it's always important to keep the system in a working state as much as possible. The byproduct of this succession of intermediate states, where the system is transitioning from the old architecture to the new one, is a lot of temporary code and unused imports, and more generally an untidiness of the code base.
In this step we need to fill in the gaps and to make sure the code is well written.
The first move is to restore active expiry, which was removed in the previous step as storage was no longer accessible from main. The strategy here is very simple.
First of all let's remove expire_keys from main.rs and recreate it as a method of the struct Server.
src/main.rs use crate::connection::ConnectionError;
use crate::request::Request;
use crate::resp::{bytes_to_resp, RESP};
use crate::storage::Storage;
use connection::ConnectionMessage;
use server::{run_server, Server};
use server_result::ServerMessage;
use std::sync::{Arc, Mutex};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
// The entry point for key expiry operations.
async fn expire_keys(storage: Arc<Mutex<Storage>>) {
// Acquire a lock on the storage.
let mut guard = storage.lock().unwrap();
// Trigger the expiry process.
guard.expire_keys();
}src/server.rs pub struct Server {
pub storage: Option<Storage>,
}
impl Server {
pub fn new() -> Self {
Self { storage: None }
}
pub fn set_storage(&mut self, storage: Storage) {
self.storage = Some(storage);
}
// The entry point for key expiry operations.
pub fn expire_keys(&mut self) {
// Get the storage contained in the server.
let storage = match self.storage.as_mut() {
Some(storage) => storage,
None => return,
};
// Trigger the expiry process.
storage.expire_keys();
}
}The fact that expire_keys is now a method instead of a simple function is just a matter of preference.
Now, since the server actor run_server is a select! loop we can once again create interval_timer and await interval_timer.tick().
src/server.rs use crate::connection::ConnectionMessage;
use crate::request::Request;
use crate::server_result::ServerMessage;
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
...
// Listen for incoming messages and process the contained requests.
pub async fn run_server(mut server: Server, mut crx: mpsc::Receiver<ConnectionMessage>) {
let mut interval_timer = tokio::time::interval(Duration::from_millis(10));
loop {
tokio::select! {
// Keep listening for messages on the incoming channel.
Some(message) = crx.recv() => {
match message {
// If the message contains a request, extract it.
ConnectionMessage::Request(request) => {
// Process the request.
process_request(request, &mut server).await;
}
}
}
_ = interval_timer.tick() => {
server.expire_keys();
}
}
}
}Once again, as the server owns the storage there is no need to lock the resource here.
So far we assumed that the server could return any type of result, but it makes sense to identify the actual types and to group them into an enum. The only one we need so far, at any rate, is RESP
src/server_result.rs use crate::resp::RESP;
use std::fmt;
#[derive(Debug, PartialEq)]
pub enum ServerError {
CommandError,
}
impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ServerError::CommandError => write!(f, "Error while processing!"),
}
}
}
pub type ServerResult<T> = Result<T, ServerError>;
#[derive(Debug)]
pub enum ServerValue {
RESP(RESP),
}
pub type ServerResult = Result<ServerValue, ServerError>;
#[derive(Debug)]
pub enum ServerMessage {
Data(RESP),
Data(ServerValue),
Error(ServerError),
}The variant RESP(RESP) might look odd at first glance. The first RESP is the name of the variant, while the second is the type it wraps — the RESP enum we defined in chapter 2. The two happen to share the same name because the variant represents a RESP value, and the type is called RESP as well.
This causes a couple of changes in other functions. In handle_connection
src/main.rs use crate::connection::ConnectionError;
use crate::request::Request;
use crate::resp::{bytes_to_resp, RESP};
use crate::server_result::ServerValue;
use crate::storage::Storage;
use connection::ConnectionMessage;
use server::{run_server, Server};
use server_result::ServerMessage;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
async fn handle_connection(mut stream: TcpStream, server_sender: mpsc::Sender<ConnectionMessage>) {
// A response arrived from the server.
Some(response) = connection_receiver.recv() => {
let _ = match response {
ServerMessage::Data(v) => stream.write_all(v.to_string().as_bytes()).await,
ServerMessage::Data(ServerValue::RESP(v)) => stream.write_all(v.to_string().as_bytes()).await,
ServerMessage::Error(e) => {
eprintln!("Error: {}", ConnectionError::ServerError(e));
return;
}
};
}and in process_request
src/server.rs use crate::connection::ConnectionMessage;
use crate::request::Request;
use crate::server_result::ServerMessage;
use crate::server_result::{ServerMessage, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
...
pub async fn process_request(request: Request, server: &mut Server) {
// Process the command contained in the request.
let response = storage.process_command(&command);
match response {
Ok(v) => {
request.sender.send(ServerMessage::Data(v)).await.unwrap();
request
.sender
.send(ServerMessage::Data(ServerValue::RESP(v)))
.await
.unwrap();
}
Err(_e) => (),
}
}The struct Request is pretty important for the new architecture, as a lot of the functionalities of the system are connected with it. It makes sense to add a couple of helper methods to simplify its usage
src/request.rs use crate::{resp::RESP, server_result::ServerMessage};
use crate::{
resp::RESP,
server_result::{ServerError, ServerMessage, ServerValue},
};
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct Request {
pub value: RESP,
pub sender: mpsc::Sender<ServerMessage>,
}
impl Request {
pub async fn error(&self, e: ServerError) {
self.sender.send(ServerMessage::Error(e)).await.unwrap();
}
pub async fn data(&self, d: ServerValue) {
self.sender.send(ServerMessage::Data(d)).await.unwrap();
}
}and we can use them directly in process_request to simplify a call
src/server.rs pub async fn process_request(request: Request, server: &mut Server) {
// Process the command contained in the request.
let response = storage.process_command(&command);
match response {
Ok(v) => {
request
.sender
.send(ServerMessage::Data(ServerValue::RESP(v)))
.await
.unwrap();
request.data(ServerValue::RESP(v)).await;
}
Err(_e) => (),
}
}and then to replace panic!. To do that we first need to define a couple of new variants of ServerError.
src/server_result.rs #[derive(Debug, PartialEq)]
pub enum ServerError {
CommandError,
IncorrectData,
StorageNotInitialised,
}
impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ServerError::CommandError => write!(f, "Error while processing!"),
ServerError::IncorrectData => {
write!(f, "Data received from stream is incorrect.")
}
ServerError::StorageNotInitialised => {
write!(f, "Storage has not been initialised.")
}
}
}
}and then we can finish the work on process_request, replacing the calls to panic!() with actual errors.
src/server.rs use crate::connection::ConnectionMessage;
use crate::request::Request;
use crate::server_result::{ServerMessage, ServerValue};
use crate::server_result::{ServerError, ServerValue};
use crate::storage::Storage;
use crate::RESP;
use std::time::Duration;
use tokio::sync::mpsc;
...
// Process an incoming request and return a result.
pub async fn process_request(request: Request, server: &mut Server) {
// Check if the request is expressed using
// a RESP array and extract the elements.
let elements = match &request.value {
RESP::Array(v) => v,
_ => {
panic!()
request.error(ServerError::IncorrectData).await;
return;
}
};
// The vector that contains all the commands we need to process.
let mut command = Vec::new();
// Check that each element of the array is a
// bulk string, extract the content, and add it
// to the vector.
for elem in elements.iter() {
match elem {
RESP::BulkString(v) => command.push(v.clone()),
_ => {
panic!()
request.error(ServerError::IncorrectData).await;
return;
}
}
}
let storage = match server.storage.as_mut() {
Some(storage) => storage,
None => panic!(),
None => {
request.error(ServerError::StorageNotInitialised).await;
return;
}
};
Connection handlers are arguably the central part of the system, which after all is a server whose main task is to accept incoming requests and send appropriate responses. When we converted the server into an actor we did the same to connection handlers, which now react to "messages" from clients (requests) and to messages from the server. For each incoming connection, the main loop spawns a dedicated connection handler to monitor it.
The specific actor for the connection handler is the function handle_connection, so it is worth isolating it in a separate space. To make the code tidier, it's also useful to capture the main select! loop in a separate space.
Let's start moving handle_connection to src/connection.rs
src/connection.rs use crate::resp::bytes_to_resp;
use crate::server_result::{ServerMessage, ServerValue};
use crate::{request::Request, server_result::ServerError};
use std::fmt;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
select,
sync::mpsc,
};
...
// The main entry point for valid TCP connections.
pub async fn handle_connection(
mut stream: TcpStream,
server_sender: mpsc::Sender<ConnectionMessage>,
) {
// Create a buffer to host incoming data.
let mut buffer = [0; 512];
...
}The function is moved as it is. To isolate the listener loop, instead, we need to create a new function
src/connection.rs use crate::resp::bytes_to_resp;
use crate::server_result::{ServerMessage, ServerValue};
use crate::{request::Request, server_result::ServerError};
use std::fmt;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
...
// Run a task that listens to the bound TCP port
// for incoming connection from clients and
// spawns a connection handler for each one of them.
pub async fn run_listener(host: String, port: u16, server_sender: mpsc::Sender<ConnectionMessage>) {
// Create the TCP listener, bound to the given port.
let listener = TcpListener::bind(format!("{}:{}", host, port))
.await
.unwrap();
loop {
// Process each incoming connection.
tokio::select! {
// Process a new connection.
connection = listener.accept() => {
match connection {
// The connection is valid, handle it.
Ok((stream, _)) => {
// Spawn a task to take care of this connection.
tokio::spawn(handle_connection(stream, server_sender.clone()));
}
Err(e) => {
eprintln!("Error: {}", e);
continue;
}
}
}
}
}
}and this needs to be called in main.
src/main.rs use crate::connection::ConnectionError;
use crate::request::Request;
use crate::resp::{bytes_to_resp, RESP};
use crate::server_result::ServerValue;
use crate::resp::RESP;
use crate::storage::Storage;
use connection::ConnectionMessage;
use connection::{run_listener, ConnectionMessage};
use server::{run_server, Server};
use server_result::ServerMessage;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
select,
sync::mpsc,
};
use tokio::sync::mpsc;
mod connection;
mod request;
mod resp;
mod resp_result;
mod server;
mod server_result;
mod set;
mod storage;
mod storage_result;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create the TCP listener, bound to the
// standard Redis port.
let listener = TcpListener::bind("127.0.0.1:6379").await?;
// Create and connect the storage and the server.
let storage = Storage::new();
let mut server = Server::new();
server.set_storage(storage);
// Create the MPSC channel to communicate with the server.
let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
// Spawn the server task.
tokio::spawn(run_server(server, server_receiver));
loop {
// Process each incoming connection.
tokio::select! {
// Process a new connection.
connection = listener.accept() => {
match connection {
// The connection is valid, handle it.
Ok((stream, _)) => {
// Spawn a task to take care of this connection.
tokio::spawn(handle_connection(stream, server_sender.clone()));
}
Err(e) => {
println!("Error: {}", e);
continue;
}
}
}
}
}
run_listener("127.0.0.1".to_string(), 6379, server_sender).await;
Ok(())
}
// The main entry point for valid TCP connections.
async fn handle_connection(mut stream: TcpStream, server_sender: mpsc::Sender<ConnectionMessage>) {
...
}In Step 5 we commented out some tests because process_request wasn't returning any response during our refactoring. Now we can restore them, adapting them to the new setup, where process_request receives the request and the server. Let's consider test_process_request_ping to highlight the changes, as the other tests will follow a similar pattern.
The original test was
src/server.rs #[test]
// Test that the function process_request
// processes a PING request and that it
// responds with a PONG.
fn test_process_request_ping() {
let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Arc::new(Mutex::new(Storage::new()));
let output = process_request(request, storage).unwrap();
assert_eq!(output, RESP::SimpleString(String::from("PONG")));
}while the new version is
src/server.rs #[tokio::test] 1
// Test that the function process_request
// processes a PING request and that it
// responds with a PONG.
async fn test_process_request_ping() { 2
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32); 3
let request = Request {
value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Storage::new(); 4
let mut server: Server = Server::new();
server.set_storage(storage);
process_request(request, &mut server).await; 5
assert_eq!(
connection_receiver.try_recv().unwrap(), 6
ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("PONG"))))
);
}First of all, the result of process_request is not going to be the output of the function any more. Rather, we know that the result will be accessible through the receiver, initialised here as connection_receiver 3. For this reason, the test is now an asynchronous function (async fn) 2 and has to be marked with #[tokio::test] 1 in order to run. This is because the testing framework has to initialise the engine that runs the asynchronous tasks.
The storage doesn't need to be wrapped by Arc<Mutex<>> any more 4. The server is initialised and given ownership of the storage.
As mentioned before, the function process_request is now asynchronous, so we need to await it 5, and it receives the request and the server rather than the request and the storage.
Last, the output is accessible through the receiver, so we use try_recv 6 on it to make sure a message is available. The message format is now more structured, as the RESP data is wrapped by ServerValue and ServerMessage.
The function test_process_request_echo goes through the same transformation
src/server.rs #[tokio::test]
// Test that the function process_request
// processes an ECHO request and that it
// responds with a copy of the input.
async fn test_process_request_echo() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![
RESP::BulkString(String::from("ECHO")),
RESP::BulkString(String::from("42")),
]),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(String::from("42"))))
);
}The same is true for the remaining tests, with the additional use of ServerError::IncorrectData instead of StorageError::IncorrectRequest.
src/server.rs #[cfg(test)]
mod tests {
use super::*;
use crate::server_result::ServerMessage;
...
#[test]
// Test that an existing Storage structure
// can be associated with a Server.
fn test_set_storage() {
let storage = Storage::new();
let mut server: Server = Server::new();
server.set_storage(storage);
match server.storage {
Some(_) => (),
None => panic!(),
};
}
#[tokio::test]
// Test that the function process_request
// processes a PING request and that it
// responds with a PONG.
async fn test_process_request_ping() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::SimpleString(String::from("PONG"))))
);
}
#[tokio::test]
// Test that the function process_request
// processes an ECHO request and that it
// responds with a copy of the input.
async fn test_process_request_echo() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![
RESP::BulkString(String::from("ECHO")),
RESP::BulkString(String::from("42")),
]),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Data(ServerValue::RESP(RESP::BulkString(String::from("42"))))
);
}
#[tokio::test]
// Test that the function process_request
// returns the correct error when it is
// given a request that doesn't contain
// a RESP array.
async fn test_process_request_not_array() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::BulkString(String::from("PING")),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Error(ServerError::IncorrectData)
);
}
#[tokio::test]
// Test that the function process_request
// returns the correct error when it is
// given a request that contains a RESP array
// but the content of the array is not
// a bulk string.
async fn test_process_request_not_bulkstrings() {
let (connection_sender, mut connection_receiver) = mpsc::channel::<ServerMessage>(32);
let request = Request {
value: RESP::Array(vec![RESP::SimpleString(String::from("PING"))]),
sender: connection_sender,
};
let storage = Storage::new();
let mut server: Server = Server::new();
server.set_storage(storage);
process_request(request, &mut server).await;
assert_eq!(
connection_receiver.try_recv().unwrap(),
ServerMessage::Error(ServerError::IncorrectData)
);
}
}To make the test comparisons possible, we need to make sure that both ServerValue and ServerMessage derive PartialEq
src/server_result.rs #[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum ServerValue {
RESP(RESP),
}
pub type ServerResult = Result<ServerValue, ServerError>;
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum ServerMessage {
Data(ServerValue),
Error(ServerError),
}Last, since StorageError::IncorrectRequest is not used any more, we can remove the variant.
src/storage_result.rs use std::fmt;
#[derive(Debug, PartialEq)]
pub enum StorageError {
IncorrectRequest,
CommandNotAvailable(String),
CommandSyntaxError(String),
CommandInternalError(String),
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StorageError::IncorrectRequest => {
write!(f, "The client sent an incorrect request!")
}
StorageError::CommandNotAvailable(c) => {
write!(f, "The requested command {} is not available!", c)
}
StorageError::CommandSyntaxError(string) => {
write!(f, "Syntax error while processing {}!", string)
}
StorageError::CommandInternalError(string) => {
write!(f, "Internal error while processing {}!", string)
}
}
}
}
pub type StorageResult<T> = Result<T, StorageError>;The next warning points out that Storage::set_active_expiry is never used. Clearly, the function is optional, and might be connected with a command line option, but for now we can just turn it on by default.
src/storage.rs impl Storage {
pub fn new() -> Self {
let store: HashMap<String, StorageData> = HashMap::new();
Self {
store: store,
expiry: HashMap::<String, SystemTime>::new(),
active_expiry: true,
}
}
// Turns on the storage active expiry.
pub fn set_active_expiry(&mut self, value: bool) {
self.active_expiry = value;
}
...
mod tests {
#[test]
// Test that the function expire_keys doesn't remove
// keys that have an expiry time in the past.
// if active expiry is turned off.
fn test_expire_keys_deactivated() {
let mut storage: Storage = Storage::new();
storage.active_expiry = false;
storage.set_active_expiry(false);
storage
.set(String::from("akey"), String::from("avalue"), SetArgs::new())
.unwrap();
storage.expiry.insert(
String::from("akey"),
SystemTime::now() - Duration::from_secs(5),
);
storage.expire_keys();
assert_eq!(storage.store.len(), 1);
}src/main.rs #[tokio::main]
async fn main() -> std::io::Result<()> {
// Create and connect the storage and the server.
let storage = Storage::new();
// Create the storage and activate expiry.
let mut storage = Storage::new();
storage.set_active_expiry(true);
// Create the server and attach the storage.
let mut server = Server::new();
server.set_storage(storage);
// Create the MPSC channel to communicate with the server.
let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
// Spawn the server task.
tokio::spawn(run_server(server, server_receiver));
run_listener("127.0.0.1".to_string(), 6379, server_sender).await;
Ok(())
}Stage 7: Expiry
Before moving on, we can double check that the code still passes Stage 7 of the CodeCrafters challenge. This concludes the base challenge for "Build your own Redis".