Using JMS in Spring Boot

JMS (Java Message Service) is a Java Message Oriented Middleware used to send messages between clients and works by sending messages to a message queue which are then taken when possible to execute a transaction. This post will focus on implementing JMS with Spring Boot, which doesn’t take long at all to setup.

JMS and message queues in general bring some certain advantages over using RESTful services such as:

  • Redundancy – A message must confirm that it has completed it’s transaction and that it can now be removed from the queue, but if the transaction fails it can be reprocessed. The messages can also be stored in a database allowing them to continue later on even if the server stops.
  • Asynchronous messaging – As the process time of the message cannot be garenteed the client that sent the message can carry on asynchronously to the completion of the transaction. Due to this the queue should be used to write data (POST if your thinking in a RESTful mindset).
  • Loose Coupling – The services do not interact directly and only know where the message queue is, where one service sends messages and the other receives them.

Now lets get on to actually implementing it. As mentioned earlier we will be using Spring Boot which makes everything nice and easy to setup and Apache ActiveMQ to create and manage the message queue.

The Maven dependencies required for setting up JMS are shown below (some extra dependencies not related to JMS were used and are not shown in the code snippet)

The first thing we will look at is the receiver which will take a message from the front of the queue and perform a transaction.

In this scenario the OrderTransactionReceiver takes messages from the OrderTransactionQueue and save them to the database by using the transactionRepository. The name of the method that receives the message is irrelevant and can be called whatever you want, although receiveMessage is quite appropriate, but it must have the @JmsListener annotation with destination property defining the name of the queue. Included in this annotation is the containerFactory property which is not required if you are happy with the default DefaultJmsListenerContainerFactory that is provided by Spring Boot.

So now we can take messages from the queue its probably a good idea to know how to put them there in the first place.

There is quite a lot of noise in this example as there code that is not related to posting to the message queue. There is only one line that is needed to send the message and in case it wasn’t clear I added a comment into the example. Actually that earlier statement a lie, it is two lines of code, but that is only if you included injecting in the JmsTemplate into the controller. The reason that I wrote this example inside a @RestController is to demonstrate a possible use of the message queue, a user makes a request via the REST API which will send a message to the queue to be executed at some point. While this happens the user is continuing with what they were doing as they do not need to wait for the request’s execution to finish.

The final piece to this simple puzzle is main application defined by the class with @SpringBootApplication.

Lets start with the @EnableJms annotation which gives a clear indication what it is for, but to be a bit more precise it triggers the discovery of methods marked with the @JmsListener and creates the listeners themselves behind the scenes. So if you remember this will be the recieveMessage method defined in OrderTransactionReceiver. The next two annotations, @ComponentScan and @EnableMongoRepositories are not required to setup JMS, but due to how the classes in this example are spread out they must be added so that the OrderTransactionController and OrderTransactionRepository can be found.

Going past the annotations on the class, remember the myFactory that was specified in the @JmsListener here is the code that defines it. This implementation matches what the default DefaultJmsListenerContainerFactory would be if we decided not to specify a factory inside the @JmsListener. A MessageConverter has to be defined as the default implementation can only convert basic types, which the OrderTransaction object is not. This implementation uses JSON to pass the messages to and from the queue. Spring Boot is kind enough to detect the MessageConverter and make use of it in the JmsTemplate and JmsListenerContainerFactory.

Now we have everything put together it can be tested to check that it actually works, through the use of some nicely placed print lines that you can see from the examples we can see how it makes it’s way from OrderTransactionController and to OrderTransactionReceiver.

By making a POST request to

localhost:8080/transaction/send

With the request body of

{
  "from":"you",
  "to":"me",
  "amount":200
}

And looking at the console we can see

Sending a transaction.
Received <OrderTransaction(from=you, to=me, amount=200)>

So we have proved that it works, but what happens if the transaction fails due to an exception occurring? As mentioned at the beginning of this post message queues provide redundancy as the transaction will be retried if it fails. To test this I have thrown an exception and added a counter into the receiveMessage method in OrderTransactionReceiver.

Sending a transaction.
<1> Received <OrderTransaction(from=you, to=me, amount=200)>
2017-06-17 19:12:59.748  WARN 2352 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Execution of JMS message listener failed, and no ErrorHandler has been set.
...
<7> Received <OrderTransaction(from=you, to=me, amount=200)>

Obviously I have removed the actual exceptions and messages from the console output but this displays what happens when the transaction fails quite clearly. As we can see as the transaction fails each time the message is redelivered until a maximum attempt of 7 tries has been made (1 initial try and 6 retries).

The amount of re-deliveries can be configured but it requires a bit more setup. To be able to alter this we need to install Apache ActiveMQ which allows extra configuration past what is provided by Spring Boot by default. When ActiveMQ is installed and the service is up and running (extra installation information found here) only a small change to the actual code is required, in fact its not actually a code change but a property change made in the application.properties file, which should be placed in the resources folder if one does not already exist.

As we can see from the snippet above the maximum amount of re-deliveries will now be limited to 1, the other properties are the default username and password of ActiveMQ. In case you start wondering about what port is being used by here by the broker-url, this is the default port that ActiveMQ is ran on so it should (hopefully…) work straight away if you try it yourself.

Going back to the console output it also mentioned not having an ErrorHandler defined, so lets set one up by adding some extra code to the factory that was created earlier.

Now when an error occurs the ugly stacktrace wont plague the console log, unless you want it to of course. I have included both the anonymous class and lambda function versions of implementing the ErrorHandler just so it is a bit clear in what it is doing.

By configuring the maximum re-deliveries and adding the ErrorHandler the console output will now look like

Sending a transaction.
<1> Received <OrderTransaction(from=you, to=me, amount=200)>
An error has occurred in the transaction
<2> Received <OrderTransaction(from=you, to=me, amount=200)>
An error has occurred in the transaction

So there we have it, we have set up a simple JMS using Spring Boot and Apache ActiveMQ, along with a little introduction into why message queues like JMS can be useful such as providing redundancy, asynchronous messaging and loose coupling. As usual Spring and Spring Boot makes things quite simple for us to implement allowing the basic code to be written quickly and without loads of code.

For all the source code included in this tutorial along with any not shown here can be found on my GitHub.

  1. […] my previous post Using JMS in Spring Boot I mentioned some characteristics that message queues exhibit, redundancy and asynchronous messaging […]

    Like

    Reply

  2. Hi, I just downloaded your github project and run it, but I get an error…………………….ERROR 618 — [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination ‘OrderTransactionQueue’ – retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused (Connection refused)
    what is this about? Should I install something extra or make a special configuration on my machine? thanks!!!!

    Like

    Reply

    1. I do not believe you should need anything for what is covered in this post but I would need to double check myself.

      For the short term, try install and run ActiveMQ. This will start a service on localhost:61616 which should fix this problem.

      I hope this helps and I will look into it more when I can.

      Thanks
      Dan

      Like

      Reply

      1. When i installed ActiveMQ the errors went away… Thanks!

        Liked by 1 person

  3. Thank you very much for your response, I will try what you tell me !!

    Like

    Reply

  4. How would you suspend listening to a queue if there is any issue in the downstream jobs? For example, In the receiveMessage method, the save operation is failing (perhaps database is not available) and don’t want to pick up all the messages from the queue. How would I pause until the backend is back to normal?

    Like

    Reply

    1. Hi David,

      Hopefully something like the below works.

      if(something isnt working) {
          registry.getListenerContainer("OrderTransactionQueue").stop();
      }
      if(something is working) {
          registry.getListenerContainer("OrderTransactionQueue").start();
      }
      

      I havent had time to try this manually but I think that might do it. The following stack overflow post might be useful, https://stackoverflow.com/questions/36046837/how-to-pause-and-start-consuming-message-using-jmslistener .

      Let me know if that helps.

      Thanks
      Dan

      Like

      Reply

  5. Nice post. I like the flow in which it explains the finer details that happen behind the scenes. I have a couple of questions that i would like to get your valuable inputs on

    1. When does the org.springframework.util.ErrorHandler implementation get invoked? Does the handleError method get called when the publisher or the subscriber loses connection to activemq? Does spring constantly checks internally?

    2. I have read that the DefaultMessageListenerContainer disconnects with non-durable subscribers on a jms topic (https://bit.ly/2rLP8cW). Assuming this is true, it would mean that the subscriber could miss some messages. How can this be avoided?

    I would appreciate your thoughts on the above queries. Thanks.

    Like

    Reply

    1. Hi Amit,

      Thank you for the compliment.

      Its a bit hard for me to test some of this locally, since I need to keep the message queue running but disconnect the subscriber from it to answer your first question.

      But, from looking at the setErrorHandler JavaDocs only triggers on exceptions that occur within the receive method of the subscriber. Therefore I assume it will not trigger if the publisher or subscriber loses connection to the queue. Spring does constantly check and will output errors like below if the queue cannot be accessed anymore:

      2018-05-19 08:53:07.409  WARN 18420 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'OrderTransactionQueue' - trying to recover. Cause: java.io.EOFException
      2018-05-19 08:53:08.433 ERROR 18420 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'OrderTransactionQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
      

      For question 2, I honestly don’t have an answer and would need to spend some time looking into it myself.

      Thanks
      Dan

      Like

      Reply

  6. […] let’s take a look at the existing project. Here are links to the code and the corresponding blog post. The blog post covers all the information about the code. Here’s the quick rundown so we can […]

    Like

    Reply

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: