Are you looking for information on delay queues and how they should be implemented using AWS SQS? Funny, that’s exactly what i’m going to talk about.
As we’re on the verge of a new Formula 1 season, lets use a real-life racing use-case. Nowadays Formula 1 cars are full of sensors to monitor everything you can imagine on a car, from tire and engine temperatures to slow punctures.
During a race, engine temperatures can rise for a number of reasons without it immediately meaning there is an issue. For example, our car is following another car closely and is therefore picking up hot air from the car in front, making our engine temperatures run high. Generally this isn’t a problem, as running closely behind another car is needed to make an overtake, but if the driver fails to make a successfull overtake and keeps driving within this hot air for a prolonged period of time, the risk of engine damage becomes bigger and bigger.
Since we have excellent sensors on our car, we receive real-time engine temperature updates and can notify the pitwall whenever the engine temperature has been too high for too long.
The Theory
Let’s get theoretical shall we?
Based on the schematic above we can list the responsibilities of our Spring Boot service:
Receive temperature changes, and keep the engine state updated.
Whenever the temperature has crossed our threshold, send a delayed temperature check message to the queue.
Receive the delayed message and check whether the engine temperature is still above our threshold and if so, raise the severity.
Send notifications on severity changes.
The Execution
On to the implementation we go and first things first, we setup the Gradle project using the following build.gradle file.
Now that we got that out of the way, wouldn’t it be nice to be able to use JMS with SQS?
Well, we weren’t the first ones to think of that, if we look at the amazon-sqs-java-messaging-lib library, we can see that there is an SQSConnectionFactory that implements the JMS ConnectionFactory interface.
Alright, so once we create a bean for it we should be able to create a connection, add a message listener and get on our way!
Notice that we put the @Profile("!localstack") on it, as in the Github Repository there is also a @Profile("localstack") configuration which allows us to connect to a localstack environment to test the integration on our own computer.
Right, so we got the SQSConnectionFactory bean, lets create a connection and register a message listener!
Let’s make a quick pitstop; As you can see in addition to the SQSConnectionFactory, we also autowired a ConfigurationProperties bean to be able to specify our target sqs queue in the application.yaml, but also a JMS MessageListener bean to be registered when creating the connection.
In the SQSMessageListener we will make a distinction between the two types of messages that we can receive:
Bonus: Did you notice how we used the Java 17 pattern matching in our switch statement above?
Alright, so we can now identify each message that comes in and call its respective listener.
Let’s start with the TemperatureChangeMessageListener who has the responsibility of keeping the EngineState up to date and trigger a TemperatureCheckMessage whenever the threshold was exceeded.
Once again we autowire our SQSConnectionProperties in case we need to send out a temperature check message, and we autowire the EscalationConfigurationProperties which define the temperature threshold and the delay in which the engine temperature should have returned to normal.
We update the EngineState and afterwards we check whether the temperature has exceeded the threshold, if so we let the TemperatureCheckScheduler schedule a message to check the temperature for when the delay has passed.
In the TemperatureCheckMessage we put the severity that the engine currently has, so that when the delay has expired and the temperature is still too high, we know to what severity the engine state should be raised.
Best practice: When running this in production, it might be best to put these messages on a seperate queue to avoid any additional delays caused by other messages in the queue.
Note here the delaySeconds in the request that prevents the message from being picked up before the delay has expired.
Alright so we have completed the logic for our TemperatureChangeMessage, now lets look at what we should do when we receive our delayed TemperatureCheckMessage.
log.info("Sending notification: Engine temperature has been above configured threshold of '{}' degrees celsius.", escalation.getThreshold().getTemperature()); log.info("Engine severity raised to {}", nextSeverity); engine.setSeverity(nextSeverity); } } }
First thing first, we check whether the engine temperature is still above the configured threshold, otherwise we don’t care. We could extend the logic to de-escalate when the engine has gone below the threshold, but lets leave that for now.
So the temperature is still too high and based on the previous severity in the message, we can determine what the next severity should be. As we can have multiple temperature readings within the configured delay we only update the state, and send notifications, if the engine severity has changed.
That’s it, we should now be able to escalate temperature readings to our pitwall based on our configured temperature threshold!
The Test
When we create the following configuration in our application.yaml:
2022-02-07 15:00:46.166 INFO : Temperature Change: 48.0 2022-02-07 15:00:56.777 INFO : Temperature Change: 53.0 2022-02-07 15:01:07.677 INFO : Temperature Change: 54.0 2022-02-07 15:01:56.950 INFO : Sending notification: Engine temperature has been above configured threshold of '50.0' degrees celsius. 2022-02-07 15:01:56.950 INFO : Engine severity raised to WARNING 2022-02-07 15:02:08.359 INFO : Temperature Change: 58.0 2022-02-07 15:03:08.414 INFO : Sending notification: Engine temperature has been above configured threshold of '50.0' degrees celsius. 2022-02-07 15:03:08.414 INFO : Engine severity raised to CRITICAL
Call me a Geek, but if that’s not cool, i don’t know what is.
Hope you enjoyed the long post, as always if you’re interested in the whole code base; the Github Repository is just one click away!