Key Expiry
No code, no riddle, no fancy little countdown.
Things start to become interesting, and somewhat convoluted, with expiry.
Redis supports two types of key expiry, passive and active. With passive expiry, a key is checked upon retrieval. A GET operation might thus find a key but discover that it's expired and remove it before returning it. With active expiry, the server periodically scans keys and removes the expired ones.
From the end user's point of view the two mechanisms produce the same result: keys created with an explicit expiry time will eventually be removed. At the end of the chapter we will be able to send commands like SET answer 42 ex 5 and see the key answer disappear after 5 seconds.
The reason why things start to become interesting is that in this chapter we will add a component to the system that goes beyond the simple request/response model we implemented so far. The active expiry mechanism requires an independent process that is a reaction to an internal timeout rather than to a client request.
We will implement a working solution, but we will also discuss the limits of the current architecture. In the two next chapters we will go through a major refactoring that will introduce a much more powerful structure based on actors.
For now, we will implement the following requirements:
- Add creation time and expiry to stored data.
- Keep a list of keys whose expiry time has been set.
- Check the expiry time of retrieved keys.
- Periodically scan the list of expiring keys and remove the ones whose time is up.
More specifically, we will go through the following steps:
- Creation time and expiry - We will augment the data contained in the storage, adding fields to manage the expiry.
- Storage support for expiry - A new structure will be added to the storage to keep track of expiring keys.
- Run a function periodically - We will learn how to run the expiry function periodically in the background.
- SET parameters - The Redis command
SET accepts several options to control key expiry, so we need to parse the command line and extract them. - SET with expiry - Both the command
SET and GET need to be updated to actually use key expiry.
Step 4.1 - Creation time and expiry
We can start adding the creation time and expiry to the data contained inside the storage. To do this we need to define a struct that represents stored data
src/storage.rs
use crate::resp::RESP;
use crate::storage_result::{StorageError, StorageResult};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
#[derive(Debug, PartialEq)]
pub enum StorageValue {
String(String),
}
#[derive(Debug)]
pub struct StorageData {
pub value: StorageValue,
pub creation_time: SystemTime,
pub expiry: Option<Duration>,
}
Here, we are using the two types SystemTime [docs] (creation time, absolute) and Duration [docs] (expiry, relative).
We can then create a method add_expiry and a function to create StorageData from a String
src/storage.rs
impl StorageData {
pub fn add_expiry(&mut self, expiry: Duration) {
self.expiry = Some(expiry);
}
}
impl From<String> for StorageData {
fn from(s: String) -> StorageData {
StorageData {
value: StorageValue::String(s),
creation_time: SystemTime::now(),
expiry: None,
}
}
}
To make sure that we can check the equality between two pieces of data we need to implement PartialEq [docs]
src/storage.rs
impl PartialEq for StorageData {
fn eq(&self, other: &Self) -> bool {
self.value == other.value && self.expiry == other.expiry
}
}
Last, we need to use the new data structure in the storage
src/storage.rs
pub struct Storage {
store: HashMap<String, StorageValue>,
store: HashMap<String, StorageData>,
}
This requires a straightforward set of changes in the rest of the code.
src/storage.rs
impl Storage {
pub fn new() -> Self {
let store: HashMap<String, StorageValue> = HashMap::new();
let store: HashMap<String, StorageData> = HashMap::new();
Self { store: store }
}
...
// Implement the `set` operation for the storage.
fn set(&mut self, key: String, value: String) -> StorageResult<String> {
self.store.insert(key, StorageValue::String(value));
self.store.insert(key, StorageData::from(value));
Ok(String::from("OK"))
}
// Implement the `get` operation for the storage.
fn get(&self, key: String) -> StorageResult<Option<String>> {
match self.store.get(&key) {
Some(StorageValue::String(v)) => return Ok(Some(v.clone())),
Some(StorageData {
value: StorageValue::String(v),
creation_time: _,
expiry: _,
}) => return Ok(Some(v.clone())),
None => return Ok(None),
}
}
...
}
And in the tests.
src/storage.rs
#[test]
// Test that the function set works as expected.
// When a key and value pair is stored the
// output is the value, the storage contains
// an element, and the value can be retrieved.
fn test_set_value() {
let mut storage: Storage = Storage::new();
let avalue = StorageValue::String(String::from("avalue"));
let avalue = StorageData::from(String::from("avalue"));
let output = storage
.set(String::from("akey"), String::from("avalue"))
.unwrap();
assert_eq!(output, String::from("OK"));
assert_eq!(storage.store.len(), 1);
match storage.store.get(&String::from("akey")) {
Some(value) => assert_eq!(value, &avalue),
None => panic!(),
}
}
#[test]
// Test that the function get works as expected.
// When a key value is retrieved, the output
// is the value, and the key is not deleted
// from the storage.
fn test_get_value() {
let mut storage: Storage = Storage::new();
storage.store.insert(
String::from("akey"),
StorageValue::String(String::from("avalue")),
StorageData::from(String::from("avalue")),
);
let result = storage.get(String::from("akey")).unwrap();
assert_eq!(storage.store.len(), 1);
assert_eq!(result, Some(String::from("avalue")));
}
#[test]
// Test that the storage provides the function
// command_get and that its output is correct.
fn test_process_command_get() {
let mut storage: Storage = Storage::new();
storage.store.insert(
String::from("akey"),
StorageValue::String(String::from("avalue")),
StorageData::from(String::from("avalue")),
);
let command = vec![String::from("get"), String::from("akey")];
let output = storage.process_command(&command).unwrap();
assert_eq!(output, RESP::BulkString(String::from("avalue")));
assert_eq!(storage.store.len(), 1);
}
Step 4.2 - Storage support for expiry
At this point we need to add support for expiry into the Storage struct. We need to create a function expire_keys that is triggered periodically and whose task is to decide if expiring keys are still valid or not.
A simple way to speed up the execution of such an operation is to keep a separate account of keys with an expiry, so that we don't need to scan all the keys stored in the system every time we trigger expire_keys. This structure should be a HashMap as it needs to contain both the key and the expiry time.
src/storage.rs
pub struct Storage {
store: HashMap<String, StorageData>,
expiry: HashMap<String, SystemTime>,
active_expiry: bool,
}
The idea behind the flag active_expiry is to allow the process to be halted. Keep in mind that this is not a perfect implementation of what happens in Redis, but a parallel implementation of similar concepts, so this may differ from the actual Redis implementation.
The new fields have to be initialised when a value is created
src/storage.rs
impl Storage {
pub fn new() -> Self {
let store: HashMap<String, StorageData> = HashMap::new();
Self { store: store }
Self {
store: store,
expiry: HashMap::<String, SystemTime>::new(),
active_expiry: true,
}
}
and the creation test changes accordingly
src/storage.rs
mod tests {
#[test]
fn test_create_new() {
let storage: Storage = Storage::new();
assert_eq!(storage.store.len(), 0);
assert_eq!(storage.expiry.len(), 0);
assert_eq!(storage.expiry, HashMap::<String, SystemTime>::new());
assert!(storage.active_expiry);
}
Finally, the method expire_keys is the function that we want to run periodically to clean up expired keys.
src/storage.rs
impl Storage {
// Check all keys with an expiry time.
// If the key has expired remove it from the storage.
pub fn expire_keys(&mut self) {
// This function works only if active
// expiry is turned on.
if !self.active_expiry {
return;
}
// Get the current time.
let now = SystemTime::now();
// Filter all expiry keys and find
// those that expired in the past.
let expired_keys: Vec<String> = self
.expiry
.iter()
.filter_map(|(key, &value)| if value < now { Some(key.clone()) } else { None })
.collect();
// Remove all expired keys from the storage
// and from the list of expiry keys.
for k in expired_keys {
self.store.remove(&k);
self.expiry.remove(&k);
}
}
The tests for these two functions are
src/storage.rs
mod tests {
#[test]
// Test that the function expire_keys removes
// keys that have an expiry time in the past.
fn test_expire_keys() {
let mut storage: Storage = Storage::new();
storage
.set(String::from("akey"), String::from("avalue"))
.unwrap();
storage.expiry.insert(
String::from("akey"),
SystemTime::now() - Duration::from_secs(5),
);
storage.expire_keys();
assert_eq!(storage.store.len(), 0);
}
#[test]
// Test that the function expire_keys doesn't remove
// keys that have an expiry time in the past
// if active expiry is turned off.
fn test_expire_keys_deactivated() {
let mut storage: Storage = Storage::new();
storage.active_expiry = false;
storage
.set(String::from("akey"), String::from("avalue"))
.unwrap();
storage.expiry.insert(
String::from("akey"),
SystemTime::now() - Duration::from_secs(5),
);
storage.expire_keys();
assert_eq!(storage.store.len(), 1);
}
Step 4.3 - Run a function periodically
So far the structure of the code is pretty linear, despite the fact that we are using asynchronous code. Every time a new client connection is established, the server spawns a new connection handler task that receives a reference to the storage.
When the task needs to interact with the storage it can create a lock and access the resource in an exclusive fashion. Creating a lock on the whole storage regardless of the nature of the operation is clearly suboptimal, but we won't dig into performance optimisation in this project.
Now we need to periodically run the method expire_keys of the storage. The most effective approach is to set up a timer that will run the function every given amount of time.
To do this we first create an asynchronous function that locks the storage and runs the method.
src/main.rs
// The entry point for key expiry operations.
async fn expire_keys(storage: Arc<Mutex<Storage>>) {
// Acquire a lock on the storage.
let mut guard = storage.lock().unwrap();
// Trigger the expiry process.
guard.expire_keys();
}
Now we can add an asynchronous timer to the main loop and have this function called every 10 milliseconds. In a production system this value would be fetched from a configuration file, but in this case we will just hard-code it.
The current loop in main relies on the fact that there is only one asynchronous source of events, that is listener.accept()
src/main.rs
async fn main() -> std::io::Result<()> {|@anchor|
loop {
// Process each incoming connection.
match listener.accept().await {
...
}
but this is not the case any more, as we have both the listener and the timer to await. This is the perfect job for select! [docs], so we'll first introduce it in the current version of main
src/main.rs
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create the TCP listener, bound to the
// standard Redis port.
let listener = TcpListener::bind("127.0.0.1:6379").await?;
// Create a storage and protect it against concurrency issues.
let storage = Arc::new(Mutex::new(Storage::new()));
loop {
// Process each incoming connection.
match listener.accept().await {
// The connection is valid, handle it.
Ok((stream, _)) => {
// Spawn a task to take care of this connection.
tokio::spawn(handle_connection(stream, storage.clone()));
Err(e) => {
println!("Error: {}", e);
continue;
}
tokio::select! {
// Process a new connection.
connection = listener.accept() => {
match connection {
// The connection is valid, handle it.
Ok((stream, _)) => {
// Spawn a task to take care of this connection.
tokio::spawn(handle_connection(stream, storage.clone()));
}
Err(e) => {
println!("Error: {}", e);
continue;
}
}
}
}
}
}
and at this point we can easily add the new task
src/main.rs
use crate::server::process_request;
use crate::storage::Storage;
use std::sync::{Arc, Mutex};
use std::time::Duration;
...
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create the TCP listener, bound to the
// standard Redis port.
let listener = TcpListener::bind("127.0.0.1:6379").await?;
// Create a storage and protect it against concurrency issues.
let storage = Arc::new(Mutex::new(Storage::new()));
// Create a timer that expires every 10 milliseconds.
let mut interval_timer = tokio::time::interval(Duration::from_millis(10));
loop {
// Process each incoming connection.
tokio::select! {
// Process a new connection.
connection = listener.accept() => {
match connection {
// The connection is valid, handle it.
Ok((stream, _)) => {
// Spawn a task to take care of this connection.
tokio::spawn(handle_connection(stream, storage.clone()));
}
Err(e) => {
println!("Error: {}", e);
continue;
}
}
}
// Process the expired timer.
_ = interval_timer.tick() => {
tokio::spawn(expire_keys(storage.clone()));
}
}
}
}
Blocking functions
Please note that expire_keys is an asynchronous function but its true nature is that of a blocking one. The method expire_keys might in theory run for a very long time, thus making the whole asynchronous assumption false. In this project we will not address this problem, but keep in mind that a production system should.
Should you want to try, a simple improvement is to make the function remove only a certain amount of keys to keep execution time short.
Let's pause for a moment to consider what we have done here, as it will be crucial in the next chapter. Since the function expire_keys is triggered by an internal timer, the assumption that the server waits for incoming connections only is not valid any more. There might be several events that we need to process in select!, and each one of them must receive a protected copy of the shared resources it affects (clone of Arc<Mutex<T>>).
Step 4.4 - SET parameters
To be able to add an expiry time to keys we need to change the way the command SET works. As there are many parameters supported by this command it's clear that we need a separate function to parse them. This is the first symptom that commands should be more than just methods of the struct Storage, but for now we won't refactor anything on that side.
Command line parsing is a complicated and messy process, and generally it's better to rely on specific libraries. In this case, however, it might make sense to implement something custom since the SET command is not a traditional shell command with long and short options preceded by hyphens.
We will add support for the options ex, px, nx, xx, and get. There are some requirements:
nx and xx are mutually exclusiveex and px are mutually exclusiveex and px have to be followed by an integer (respectively seconds and milliseconds)- options can be listed in any order
The Redis CLI is case insensitive, so all these options can be used in lowercase or uppercase form. However, in the following sections they will always appear in lowercase form.
We can start creating a new file called src/set.rs, and defining some enums and a struct that represent the possible settings
src/set.rs
#[derive(Debug, PartialEq)]
pub enum KeyExistence {
NX,
XX,
}
#[derive(Debug, PartialEq)]
pub enum KeyExpiry {
EX(u64),
PX(u64),
}
#[derive(Debug, PartialEq)]
pub struct SetArgs {
pub expiry: Option<KeyExpiry>,
pub existence: Option<KeyExistence>,
pub get: bool,
}
impl SetArgs {
pub fn new() -> Self {
SetArgs {
expiry: None,
existence: None,
get: false,
}
}
}
Enums are a good way to represent mutually exclusive options like nx and xx (similar to a boolean OR), while a struct is great to group values (similar to a boolean AND).
We also need to add the new file as a module
src/main.rs
mod resp;
mod resp_result;
mod server;
mod set;
mod storage;
mod storage_result;
Then we can create a function parse_set_arguments that receives a Vec<String> and creates a SetArgs. It makes sense to reuse StorageResult as a result type, since the parsing happens in the storage.
src/set.rs
use crate::storage_result::{StorageError, StorageResult};
...
// Parse the arguments passed to the command SET and
// collect them into a SetArgs struct.
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
// Create a new SetArgs struct.
let mut args = SetArgs::new();
// PARSE COMMANDS HERE
Ok(args)
}
Inside that function we need to loop on the input vector and check if strings correspond to commands, but we cannot strictly iterate on them because ex and px are followed by an argument, so loop is the best solution.
Basic structure and the argument nx
This is the implementation that matches nx
src/set.rs
// Parse the arguments passed to the command SET and
// collect them into a SetArgs struct.
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
// Create a new SetArgs struct.
let mut args = SetArgs::new();
// An index to keep track of the argument we processed.
let mut idx: usize = 0;
// Loop through all arguments.
loop {
// If we processed all arguments stop the loop.
if idx >= arguments.len() {
break;
}
// Process the current argument.
match arguments[idx].to_lowercase().as_str() {
"nx" => {
// NX and XX are mutually exclusive.
if args.existence == Some(KeyExistence::XX) {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
args.existence = Some(KeyExistence::NX);
idx += 1;
}
_ => {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
}
}
Ok(args)
}
Let's have a look at this initial implementation, as the rest of the options will follow the same logic.
src/set.rs
// Parse the arguments passed to the command SET and
// collect them into a SetArgs struct.
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
// Create a new SetArgs struct.
let mut args = SetArgs::new();
// An index to keep track of the argument we processed.
let mut idx: usize = 0; 1
// Loop through all arguments.
loop { 2
// If we processed all arguments stop the loop.
if idx >= arguments.len() { 3
break;
}
// Process the current argument.
match arguments[idx].to_lowercase().as_str() { 4
"nx" => { 5
// NX and XX are mutually exclusive.
if args.existence == Some(KeyExistence::XX) { 6
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
args.existence = Some(KeyExistence::NX);
idx += 1;
}
_ => { 7
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
}
}
Ok(args)
}
Two arguments are followed by a number so we need to be free to look at the next element, which means that we need to loop 2 and to keep an eye on the current index 1. When the index shows that we are past the last argument 3 we terminate the loop.
In each loop we match 4 the current element of the input vector against the name of a command and perform the relative actions. If the command is not among the supported ones we stop with an error 7.
The only argument we support at the moment is nx 5. As nx and xx are mutually exclusive, we need to check if args.existence is already set to KeyExistence::XX 6 and in that case we need to stop with an error. Otherwise, we can set args.existence to KeyExistence::NX and increment the index.
To complete the implementation of this first argument let's add the following tests
src/set.rs
#[cfg(test)]
mod tests {
use super::*;
#[test]
// Check that the function parse_set_arguments
// processes the arguments NX correctly.
fn test_parse_nx() {
let commands: Vec<String> = vec![String::from("NX")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(args.existence, Some(KeyExistence::NX));
}
#[test]
// Check that the function parse_set_arguments
// processes the arguments NX correctly (lowercase)
fn test_parse_nx_lowercase() {
let commands: Vec<String> = vec![String::from("nx")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(args.existence, Some(KeyExistence::NX));
}
}
The argument xx
The argument xx mirrors the above implementation with minor changes
src/set.rs
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
match arguments[idx].to_lowercase().as_str() {
"xx" => {
// XX and NX are mutually exclusive.
if args.existence == Some(KeyExistence::NX) {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
args.existence = Some(KeyExistence::XX);
idx += 1;
}
_ => {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
Please note that you need to put the branch "xx" before _, because the latter matches anything.
The tests for this argument are
src/set.rs
mod tests {
#[test]
// Check that the function parse_set_arguments
// processes the arguments XX correctly.
fn test_parse_xx() {
let commands: Vec<String> = vec![String::from("XX")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(args.existence, Some(KeyExistence::XX));
}
#[test]
// Check that the function parse_set_arguments
// returns the correct error when we pass XX and
// NX together.
fn test_parse_xx_and_nx() {
let commands: Vec<String> = vec![String::from("XX"), String::from("NX")];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
#[test]
// Check that the function parse_set_arguments
// returns the correct error when we pass XX and
// NX together. (reverse order)
fn test_parse_nx_and_xx() {
let commands: Vec<String> = vec![String::from("NX"), String::from("XX")];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
The argument get
The argument get of the command SET has no specific requirements, as it does not conflict with anything
src/set.rs
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
match arguments[idx].to_lowercase().as_str() {
"get" => {
args.get = true;
idx += 1;
}
_ => {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
The tests for this argument are
src/set.rs
mod tests {
#[test]
// Check that the function parse_set_arguments
// processes the arguments GET correctly.
fn test_parse_get() {
let commands: Vec<String> = vec![String::from("GET")];
let args = parse_set_arguments(&commands).unwrap();
assert!(args.get);
}
#[test]
// Check that the function parse_set_arguments
// behaves correctly when we pass NX and GET.
fn test_parse_nx_and_get() {
let commands: Vec<String> = vec![String::from("NX"), String::from("GET")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(
args,
SetArgs {
existence: Some(KeyExistence::NX),
expiry: None,
get: true
}
);
}
#[test]
// Check that the function parse_set_arguments
// behaves correctly when we pass XX and GET.
fn test_parse_xx_and_get() {
let commands: Vec<String> = vec![String::from("XX"), String::from("GET")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(
args,
SetArgs {
existence: Some(KeyExistence::XX),
expiry: None,
get: true
}
);
}
The arguments ex and px
The two final commands ex and px have the same structure as nx and xx, being mutually exclusive, but with the additional complexity of requiring a numeric value. We need to check that there is a following value and that it is a number. The implementation of ex is
src/set.rs
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
match arguments[idx].to_lowercase().as_str() {
"ex" => {
// EX and PX are mutually exclusive.
if let Some(KeyExpiry::PX(_)) = args.expiry {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
// EX requires an argument, check that it is present.
if idx + 1 == arguments.len() {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
// The argument passed to EX must be a u64.
let value: u64 = arguments[idx + 1]
.parse()
.map_err(|_| StorageError::CommandSyntaxError(arguments.join(" ")))?;
args.expiry = Some(KeyExpiry::EX(value));
idx += 2;
}
_ => {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
and that of px mirrors it
src/set.rs
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
match arguments[idx].to_lowercase().as_str() {
"px" => {
// PX and EX are mutually exclusive.
if let Some(KeyExpiry::EX(_)) = args.expiry {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
// PX requires an argument, check that it is present.
if idx + 1 == arguments.len() {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
// The argument passed to PX must be a u64.
let value: u64 = arguments[idx + 1]
.parse()
.map_err(|_| StorageError::CommandSyntaxError(arguments.join(" ")))?;
args.expiry = Some(KeyExpiry::PX(value));
idx += 2;
}
_ => {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
The tests for these arguments are
src/set.rs
mod tests {
#[test]
// Check that the function parse_set_arguments
// processes the arguments EX correctly.
fn test_parse_ex() {
let commands: Vec<String> = vec![String::from("EX"), String::from("100")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(args.expiry, Some(KeyExpiry::EX(100)));
}
#[test]
// Check that the function parse_set_arguments
// returns the correct error when the argument
// of EX is not a u64.
fn test_parse_ex_wrong_value() {
let commands: Vec<String> = vec![String::from("EX"), String::from("value")];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
#[test]
// Check that the function parse_set_arguments
// returns the correct error when EX doesn't
// have an argument.
fn test_parse_ex_end_of_vector() {
let commands: Vec<String> = vec![String::from("EX")];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
#[test]
// Check that the function parse_set_arguments
// processes the arguments PX correctly.
fn test_parse_px() {
let commands: Vec<String> = vec![String::from("PX"), String::from("100")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(args.expiry, Some(KeyExpiry::PX(100)));
}
#[test]
// Check that the function parse_set_arguments
// returns the correct error when the argument
// of PX is not a u64.
fn test_parse_px_wrong_value() {
let commands: Vec<String> = vec![String::from("PX"), String::from("value")];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
#[test]
// Check that the function parse_set_arguments
// returns the correct error when PX doesn't
// have an argument.
fn test_parse_px_end_of_vector() {
let commands: Vec<String> = vec![String::from("PX")];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
#[test]
// Check that the function parse_set_arguments
// returns the correct error when we pass EX and
// PX together.
fn test_parse_ex_and_px() {
let commands: Vec<String> = vec![
String::from("EX"),
String::from("100"),
String::from("PX"),
String::from("100"),
];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
Step 4.5 - SET with expiry
It's time to update Storage to include the work we have done on SET. We can start importing the relevant components
src/storage.rs
use crate::resp::RESP;
use crate::set::{parse_set_arguments, KeyExpiry, SetArgs};
use crate::storage_result::{StorageError, StorageResult};
use std::collections::HashMap;
The internal command (implemented by Storage::set) should receive an argument of type SetArgs and act accordingly
src/storage.rs
impl Storage {
// Implement the `set` operation for the storage.
fn set(&mut self, key: String, value: String) -> StorageResult<String> {
self.store.insert(key, StorageData::from(value));
fn set(&mut self, key: String, value: String, args: SetArgs) -> StorageResult<String> {
let mut data = StorageData::from(value);
if let Some(value) = args.expiry {
let expiry = match value {
KeyExpiry::EX(v) => Duration::from_secs(v),
KeyExpiry::PX(v) => Duration::from_millis(v),
};
data.add_expiry(expiry);
self.expiry
.insert(key.clone(), SystemTime::now().add(expiry));
}
self.store.insert(key.clone(), data);
Ok(String::from("OK"))
}
Here, ex or px are transformed into a std::time::Duration and added to the data that is about to be stored. The expiring key is also added to self.expiry for faster retrieval by self.expire_keys. To be able to run SystemTime::now().add(expiry) we also need to import a trait
src/storage.rs
use crate::resp::RESP;
use crate::set::{parse_set_arguments, KeyExpiry, SetArgs};
use crate::storage_result::{StorageError, StorageResult};
use std::collections::HashMap;
use std::ops::Add;
use std::time::{Duration, SystemTime};
As we want to implement also passive expiration, we must change the method get as well.
src/storage.rs
impl Storage {
// Implement the `get` operation for the storage.
fn get(&self, key: String) -> StorageResult<Option<String>> {
fn get(&mut self, key: String) -> StorageResult<Option<String>> {
if let Some(&expiry) = self.expiry.get(&key) {
if SystemTime::now() >= expiry {
self.expiry.remove(&key);
self.store.remove(&key);
return Ok(None);
}
}
match self.store.get(&key) {
Some(StorageData {
value: StorageValue::String(v),
creation_time: _,
expiry: _,
}) => return Ok(Some(v.clone())),
None => return Ok(None),
}
}
The parameter self becomes mutable, as we are removing the key in case it's expired. Then we perform a simple check of the expiry time against SystemTime::now() to decide if the key is still valid. The last change to the methods is in Storage::command_set, where we need to parse the arguments and give them to Storage::set.
src/storage.rs
impl Storage {
fn command_set(&mut self, command: &Vec<String>) -> StorageResult<RESP> {
// Check the command length. The command
// requires at least 2 parameters.
if command.len() != 3 {
if command.len() < 3 {
return Err(StorageError::CommandSyntaxError(command.join(" ")));
}
let key = command[1].clone();
let value = command[2].clone();
let args = parse_set_arguments(&command[3..].to_vec())?;
// Use the function set to store the key and value pair.
let _ = self.set(command[1].clone(), command[2].clone());
let _ = self.set(key, value, args);
Ok(RESP::SimpleString(String::from("OK")))
}
Existing tests require some changes to match the new function prototypes
src/storage.rs
mod tests {
#[test]
// Test that the function set works as expected.
// When a key and value pair is stored the
// output is the value, the storage contains
// an element, and the value can be retrieved.
fn test_set_value() {
let mut storage: Storage = Storage::new();
let avalue = StorageData::from(String::from("avalue"));
let output = storage
.set(String::from("akey"), String::from("avalue"))
.set(String::from("akey"), String::from("avalue"), SetArgs::new())
.unwrap();
assert_eq!(output, String::from("OK"));
assert_eq!(storage.store.len(), 1);
match storage.store.get(&String::from("akey")) {
Some(value) => assert_eq!(value, &avalue),
None => panic!(),
}
}
...
#[test]
// Test that the function get works as expected.
// When a key doesn't exist the ouput is None, and
// the storage is left unchanged.
fn test_get_value_key_does_not_exist() {
let storage: Storage = Storage::new();
let mut storage: Storage = Storage::new();
let result = storage.get(String::from("akey")).unwrap();
assert_eq!(storage.store.len(), 0);
assert_eq!(result, None);
}
...
#[test]
// Test that the function expire_keys removes
// keys that have an expiry time in the past.
fn test_expire_keys() {
let mut storage: Storage = Storage::new();
storage
.set(String::from("akey"), String::from("avalue"))
.set(String::from("akey"), String::from("avalue"), SetArgs::new())
.unwrap();
storage.expiry.insert(
String::from("akey"),
SystemTime::now() - Duration::from_secs(5),
);
storage.expire_keys();
assert_eq!(storage.store.len(), 0);
}
#[test]
// Test that the function expire_keys doesn't remove
// keys that have an expiry time in the past
// if active expiry is turned off.
fn test_expire_keys_deactivated() {
let mut storage: Storage = Storage::new();
storage.active_expiry = false;
storage
.set(String::from("akey"), String::from("avalue"))
.set(String::from("akey"), String::from("avalue"), SetArgs::new())
.unwrap();
storage.expiry.insert(
String::from("akey"),
SystemTime::now() - Duration::from_secs(5),
);
storage.expire_keys();
assert_eq!(storage.store.len(), 1);
}
We can also add a new test to check that px works
src/storage.rs
mod tests {
#[test]
// Test that the function set works as expected
// when a key is created with passive expiration.
// This doesn't test the expiry mechanism itself,
// only that the key is created with the correct
// parameters.
fn test_set_value_with_px() {
let mut storage: Storage = Storage::new();
let mut avalue = StorageData::from(String::from("avalue"));
avalue.add_expiry(Duration::from_millis(100));
let output = storage
.set(
String::from("akey"),
String::from("avalue"),
SetArgs {
expiry: Some(KeyExpiry::PX(100)),
existence: None,
get: false,
},
)
.unwrap();
assert_eq!(output, String::from("OK"));
assert_eq!(storage.store.len(), 1);
match storage.store.get(&String::from("akey")) {
Some(value) => assert_eq!(value, &avalue),
None => panic!(),
}
storage.expiry.get(&String::from("akey")).unwrap();
}
CodeCrafters
Stage 7: Expiry
This version of the code passes Stage 7 of the CodeCrafters challenge.