Synchronous Request Response with ActiveMQ and Spring

activemq-5-x-box-reflectionA few months ago I did a deep dive into Efficient Lightweight JMS with Spring and ActiveMQ where I focused on the details for asynchronous sending and receiving of messages. In an ideal world all messaging would be asynchronous. If you need a response then you should set up an asynchronous listener and either have enough state stored in the service or in the message that you can continue processing once the response has been received. However, when ideals lead to complexity, we have to make the decision of how much complexity can we tolerate for the performance we need. Sometimes it just makes more sense to use a synchronous request/response.

Request Response with JMS

ActiveMQ documentation actually has a pretty good overview of how request-response semantics work in JMS.

You might think at first that to implement request-response type operations in JMS that you should create a new consumer with a selector per request; or maybe create a new temporary queue per request.

Creating temporary destinations, consumers, producers and connections are all synchronous request-response operations with the broker and so should be avoided for processing each request as it results in lots of chat with the JMS broker.

The best way to implement request-response over JMS is to create a temporary queue and consumer per client on startup, set JMSReplyTo property on each message to the temporary queue and then use a correlationID on each message to correlate request messages to response messages. This avoids the overhead of creating and closing a consumer for each request (which is expensive). It also means you can share the same producer & consumer across many threads if you want (or pool them maybe).

This is a pretty good start but it requires some tweaking to work best in Spring. It also should be noted that Lingo and Camel are also suggested as options when using ActiveMQ. In my previous post I addressed why I don’t use either of these options. In short Camel is more power than is needed for basic messaging and Lingo is built on Jencks, neither of which have been updated in years.

Request Response in Spring

The first thing to notice is that its infeasible to create a consumer and temporary queue per client in Spring since pooling resources is required overcome the JmsTemplate gotchas. To get around this, I suggest using predefined request and response queues, removing the overhead of creating a temporary queue for each request/response. To allow for multiple consumers and producers on the same queue the JMSCorrelationId is used to correlated the request with its response message.

At this point I implemented the following naive solution:

@Component
public class Requestor {

    private static final class CorrelationIdPostProcessor implements MessagePostProcessor {

        private final String correlationId;

        public CorrelationIdPostProcessor( final String correlationId ) {
            this.correlationId = correlationId;
        }

        @Override
        public Message postProcessMessage( final Message msg ) throws JMSException {
            msg.setJMSCorrelationID( correlationId );
            return msg;
        }
    }

    private final JmsTemplate jmsTemplate;

    @Autowired
    public RequestGateway( JmsTemplate jmsTemplate ) {
        this.jmsTemplate = jmsTemplate;
    }

    public String request( final String request, String queue ) throws IOException {
        final String correlationId = UUID.randomUUID().toString();
        jmsTemplate.convertAndSend( queue+".request", request, new CorrelationIdPostProcessor( correlationId ) );
        return (String) jmsTemplate.receiveSelectedAndConvert( queue+".response", "JMSCorrelationID='" + correlationId + "'" );
    }
}

This worked for a while until the system started occasionally timing out when making a request against a particularly fast responding service. After some debugging it became apparent that the service was responding so quickly that the receive() call was not fully initialized, causing it to miss the message. Once it finished initializing, it would wait until the timeout and fail. Unfortunately, there is very little in the way of documentation for this and the best suggestion I could find still seemed to leave open the possibility for the race condition by creating the consumer after sending the message. Luckily, according to the JMS spec, a consumer becomes active as soon as it is created and, assuming the connection has been started, it will start consuming messages. This allows for the reordering of the method calls leading to the slightly more verbose but also more correct solution. (NOTE: Thanks to Aaron Korver for pointing out that ProducerConsumer needs to implement SessionCallback and that true needs to be passed to the JmsTemplate.execute() for the connection to be started.)

@Component
public class Requestor {

    private static final class ProducerConsumer implements SessionCallback<Message> {

        private static final int TIMEOUT = 5000;

        private final String msg;

        private final DestinationResolver destinationResolver;

        private final String queue;

        public ProducerConsumer( final String msg, String queue, final DestinationResolver destinationResolver ) {
            this.msg = msg;
            this.queue = queue;
            this.destinationResolver = destinationResolver;
        }

        public Message doInJms( final Session session ) throws JMSException {
            MessageConsumer consumer = null;
            MessageProducer producer = null;
            try {
                final String correlationId = UUID.randomUUID().toString();
                final Destination requestQueue =
                        destinationResolver.resolveDestinationName( session, queue+".request", false );
                final Destination replyQueue =
                        destinationResolver.resolveDestinationName( session, queue+".response", false );
                // Create the consumer first!
                consumer = session.createConsumer( replyQueue, "JMSCorrelationID = '" + correlationId + "'" );
                final TextMessage textMessage = session.createTextMessage( msg );
                textMessage.setJMSCorrelationID( correlationId );
                textMessage.setJMSReplyTo( replyQueue );
                // Send the request second!
                producer = session.createProducer( requestQueue );
                producer.send( requestQueue, textMessage );
                // Block on receiving the response with a timeout
                return consumer.receive( TIMEOUT );
            }
            finally {
                // Don't forget to close your resources
                JmsUtils.closeMessageConsumer( consumer );
                JmsUtils.closeMessageProducer( producer );
            }
        }
    }

    private final JmsTemplate jmsTemplate;

    @Autowired
    public Requestor( final JmsTemplate jmsTemplate ) {
        this.jmsTemplate = jmsTemplate;
     }

    public String request( final String request, String queue ) {
        // Must pass true as the second param to start the connection
        return (String) jmsTemplate.execute( new ProducerConsumer( msg, queue, jmsTemplate.getDestinationResolver() ), true );
    }
}

About Pooling

Once the request/response logic was correct it was time to load test. Almost instantly, memory usage exploded and the garbage collector started thrashing. Inspecting ActiveMQ with the Web Console showed that MessageConsumers were hanging around even though they were being explicitly closed using Spring’s own JmsUtils. Turns out, the CachingConnectionFactory‘s JavaDoc held the key to what was going on: “Note also that MessageConsumers obtained from a cached Session won’t get closed until the Session will eventually be removed from the pool.” However, if the MessageConsumers could be reused this wouldn’t be an issue. Unfortunately, CachingConnectionFactory caches MessageConsumers based on a hash key which contains the selector among other values. Obviously each request/response call, with its necessarily unique selector, was creating a new consumer that could never be reused. Luckily ActiveMQ provides a PooledConnectionFactory which does not cache MessageConsumers and switching to it fixed the problem instantly. However, this means that each request/response requires a new MessageConsumer to be created. This is adds overhead but its the price that must be payed to do synchronous request/response.

About these ads

About Benjamin Darfler
Coder, Meditator, Photographer

29 Responses to Synchronous Request Response with ActiveMQ and Spring

  1. Pingback: Efficient Lightweight JMS with Spring and ActiveMQ « CodeDependents

  2. Aaron Korver says:

    Okay, this is great…EXCEPT, that the connection.start() has to be called in order to consume messages. The JMSTemplate from Spring specifically doesn’t call connection.start() when using a ProducerCallback. Thus, I’m not sure how your code would work as displayed. If you wrap the ProducerCallback in a SessionCallback, and then call the template.execute(sessionCallback(),true) then all is fine.

    Thoughts?

  3. Mandy Warren says:

    Hi, Thanks very much for the article, I’ve also just recently tried to implemented request-reply using Spring in a similar manner to your example except I am using Sonic MQ as my JMS Provider.

    Sonic have asked that I create a temporary destination per session for best performance and exactly as you discussed this doesn’t seem feasible in Spring so I’m reverting to writing the code by hand and not using Spring! I am assuming that when you talk about having a predefined response queue you mean it has been administratively setup and is shared by all threads/sessions. I therefore wonder in your example how you cope with the scenario where multiple threads are calling request() and hence there are a number of responses on the predefined response queue? How do you correlate that a particular response is associated with a particular request? I would have thought you’d need a message selector?

    Mandy

    • Exactly right, you use a selector coupled with the JMSCorrelationId. If you look through the code sample you can see where it is set and where the selector is set. Apologies if that wasn’t clear enough.

  4. alex says:

    Shouldn’t your constructor be named Requestor instead of RequestGateway?

  5. Jay says:

    This was really very helpful. Solved my problem with IBM MQ/SPRING JMS. Thanks a lot

    Cheers !!!
    – Jay

  6. Pingback: ConnectionFactories and Caching with Spring and ActiveMQ « CodeDependents

  7. Vishal Saxena says:

    Benjamin,
    Do you know why this runs but doesn’t publish?

    public String request(final String request) throws JMSException {
    ProducerConsumer prodCons = new ProducerConsumer(request,jmsTemplate);
    Message respJmsMsg = jmsTemplate.execute(prodCons,true);
    String respStrMsg = ((TextMessage)respJmsMsg).getText();
    return respStrMsg;
    }
    }

    public ProducerConsumer(final String msg, final JmsTemplate jmsTemplate) {
    this.msg = msg;
    this.jmsTemplate = jmsTemplate;
    }

    @Override
    public Message doInJms(Session session) throws JMSException {
    MessageProducer msgProd=null;
    MessageConsumer msgCons=null;
    try{
    String destName = jmsTemplate.getDefaultDestinationName();
    DestinationResolver destResolver = jmsTemplate.getDestinationResolver();
    Destination dest = destResolver.resolveDestinationName(session, destName, true);
    msgProd = session.createProducer(dest);
    TextMessage txtMsg = session.createTextMessage(msg);
    Destination tempReplyQ = session.createTemporaryQueue();
    txtMsg.setJMSReplyTo(tempReplyQ);
    final String correlationId = UUID.randomUUID().toString();
    txtMsg.setJMSCorrelationID(correlationId);
    msgProd.send(txtMsg);
    logger.debug(String.format(“Send: %s; On: %s; With ReplyTo: %s”,txtMsg, dest,
    tempReplyQ));

    msgCons = session.createConsumer(tempReplyQ);
    Message respJmsMsg = msgCons.receive(TIMEOUT);
    logger.debug(String.format(“RecievedSynchReply: %s; On ReplyTo: %s”,respJmsMsg, tempReplyQ));
    return respJmsMsg;
    }
    finally{
    JmsUtils.closeMessageConsumer(msgCons);
    JmsUtils.closeMessageProducer(msgProd);
    }
    }

    • Try asking the amq list or the spring forums, its been a while since I’ve done this code now and its pretty rusty.

      • Vishal Saxena says:

        Thx Benjamin any ways.

        What it was that the on the receiver ( receiver not shown in the code was missing destination-type=”topic”; default destination-type is queue). So messages were being written to a topic with a particular name while the receiver was listening to a queue of the same name and hence not getting it which I falsely concluded as ‘messages not being published’.

        You may delete this post.

  8. raj says:

    can you please let me know how to run this code that you have given. I am new to jms spring and trying to correlate request/response message but i was unable to run this code. please help.
    thanks
    Raj

      • raj says:

        thank you for the response. I am using websphere mq with permanent queues.
        in the message creator i have setreply to the response queue will your code work if i replace with permanent qeueu setting replyto to the response queue.
        i am trying to correlate sychronous request-response with websphere mq using permanent queue since websphere mq doesnt support temporary queue. i aprpeciate your response.

        thanks
        raj

      • I’m not quite sure what you are asking (please check for spelling at least before sending). However, if you are asking can you use permanent queues I don’t see why not. The best way to find out of course is to code and test it.

  9. HV says:

    Hi Benjamin,

    I have run into a problem where my Active MQ listener seems to receive XX no of messages where XX is equal to number of concurrent consumers.

    ActiveMQ consumer stops receving messages

    I am using Spring 3 and Active MQ 5.3.0.

    The problem that I am facing is that it looks like each consumer receives 1 message.
    I have set the concurrent consumers to 30 and maximum concurrent consumers to 100.

    I have a TaskListener(message listener) which receives a message to run process A. In
    process A I need to trigger sub-processes. Trigerring a sub-process involves sending
    a message to TaskListener to run sub-process A1…In Process A class I am polling/waiting
    until all the sub-processes complete. Looks like I am limited to running to only 30
    sub-processes(same as the concurrent consumers).

    The JMS set up is as follows:

    vm://localhost

    What am I overlooking ? Please advice.

    • Thats a great question, I feel like it sounds familiar but I haven’t touched AMQ in a year now. I would suggest you post on the mailer or in the irc channel. The guys there are very good and very helpful. Best of luck.

  10. Lars Hvile says:

    >> Unfortunately, CachingConnectionFactory caches MessageConsumers based on a hash
    >> key which contains the selector among other values
    This can be disabled with CachingConnectionFactory’s setCacheConsumers(false). This fixes the consumer ‘leak’, and has no noticable performance-hit over using cached consumers.

    I’ve been using ActiveMQ 5.5 while testing this. It seems like the only thing that really helps with performance is using an existing response-queue, instead of creating new ones for each request.

  11. Hi,

    This blog post have been very helpful because “it just makes more sense to use a synchronous request/response” as you said ;o)

    By the way, I am not sure I have understood how to use the permanent response queue. Does it mean that the request queue listener have to use a queue sender to send a response to the initial requester (that is waiting an answer on the response queue)? Or there is a smarter way to code that? How the queue listener should be coded to use the response queue?

    Thanks!

    • Yes the responder sends the response on a known queue with a correlation id. The requester listens on the known response queue for a message with that correlation id. Does that help?

      • It helps! But One more questions : does it works for several requesters (concurrent process) using the same queue with a single responder? How does each requester manage to get its correct correlation id without removing the other ones from the queue?

      • Yes, when you subscribe to the reply queue you do so with a selector for that correlation id. ActiveMQ takes care of the rest. You can see all this in the code samples. I promise it works ;-)

  12. Shiva says:

    Hi Benjamin

    I tried this and it works great in my system… I am getting a response in sync mode.. But I see that the message consumers count keeps on increasing for ever… I tried switching to active mq pooled connection factory too…. that doesnt seem to solve my issue…Can you please advise…

  13. James Barwick says:

    guys…

    can’t you simply use CachingConnectionFactory.setCacheConsumers(false); to fix the caching issue without using ActiveMQ’s pool objects? (keep it in spring).

    Also, this code seems like it is simply a workaround for a Spring Bug. Right?

    we “SHOULD” be able to do jmstemplate.send() then jmstemplate.receive() without losing synchronous message response?

    Thanks for this blog post. It certainly did help.

    By the way, if you are interested…I changed the code such that the constructor took in a “MessageCreator” object. Helps when sending different types of messages (Text, Properties, Object…etc).

    Thanks!

    James

    • Hey James,

      Thanks for the comment. Its been now over 3 years since I wrote this I haven’t touched the system I was working on at the time in almost as long. For that reason I can’t really answer your question with any certainty but I’m glad the post helped and feel free to follow up here if you find out any further information.

      Cheers,

      Ben

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: