Efficient Lightweight JMS with Spring and ActiveMQ

Spring

© Darwin Bell

Asynchronicity, its the number one design principal for highly scalable systems, and for Java that means JMS, which in turn means ActiveMQ. But how do I use JMS efficiently? One can quickly  become overwhelmed with talk of containers, frameworks, and a plethora of options, most of which are outdated. So lets pick it apart.

Frameworks

The ActiveMQ documentation makes mention of two frameworks; Camel and Spring. The decision here comes down to simplicity vs functionality. Camel supports an immense amount of Enterprise Integration Patterns that can greatly simplify integrating a variety of services and orchestrating complicated message flows between components. Its certainly a best of breed if your system requires such functionality. However, if you are looking for simplicity and support for the basic best practices then Spring has the upper hand. For me, simplicity wins out any day of the week.

JCA (Use It Or Loose It)

Reading through ActiveMQ’s spring support one is instantly introduced to the idea of a JCA container and ActiveMQ’s various proxies and adaptors for working inside of one. However, this is all a red herring. JCA is part of the EJB specification and as with most of the EJB specification, Spring doesn’t support it. Then there is a mention of Jencks, a “lightweight JCA container for Spring”, which was spun off of ActiveMQ’s JCA container. At first this seems like the ideal solution, but let me stop you there.  Jencks was last updated on January 3rd 2007. At that time ActiveMQ was at version 4.1.x and Spring was at version 2.0.x and things have come a long way, a very long way. Even trying to get Jencks from the maven repository fails due to dependencies on ActiveMQ 4.1.x jars that no longer exist. The simple fact is there are better and simpler ways to ensure resource caching.

Sending Messages

The core of Spring’s message sending architecture is the JmsTemplate. In typical Spring template fashion, the JmsTemplate abstracts away all the cruft of opening and closing sessions and producers so all the application developer needs to worry about is the actual business logic. However, ActiveMQ is quick to point out the JmsTemplate gotchas, mostly that JmsTemplate is designed to open and close the session and producer on each call. To prevent this from absolutely destroying the messaging performance the documentation recommends using ActiveMQ’s PooledConnectionFactory which caches the sessions and message producers. However this too is outdated. Starting with version 2.5.3, Spring started shipping its own CachingConnectionFactory which I believe to be the preferred caching method. (UPDATE: In my more recent post, I talk about when you might want to use PooledConnectionFactory.) However, there is one catch to point out. By default the CachingConnectionFactory only caches one session which the javadoc claims to be sufficient for low concurrency situations. By contrast, the PooledConnectionFactory defaults to 500. As with most settings of this type, some amount of experimentation is probably in order. I’ve started with 100 which seems like a good compromise.

Receiving Messages

As you may have noticed, the JmsTemplate gotchas strongly discourages using the recieve() call on the JmsTemplate, again, since there is no pooling of sessions and consumers.  Moreover, all calls on the JmsTemplate are synchronous which means the calling thread will block until the method returns. This is fine when using JmsTemplate to send messages since the method returns almost instantly. However, when using the recieve() call, the thread will block until a message is received, which has a huge impact on performance. Unfortunately, neither the JmsTemplate gotchas nor the spring support documentation mentions the simple Spring solution for these problems. In fact they both recommend using Jencks, which we already debunked. The actual solution, using the DefaultMessageListenerContainer, is buried in the how do I use JMS efficiently documentation. The DefaultMessageListenerContainer allows for the asynchronous receipt of messages as well as caching sessions and message consumers. Even more interesting, the DefaultMessageListenerContainer can dynamically grow and shrink the number of listeners based on message volume. In short, this is why we can completely ignore JCA.

Putting It All Together

Spring Context XML

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd">

<!-- enables annotation based configuration -->
<context:annotation-config />
<!-- scans for annotated classes in the com.company package -->
<context:component-scan base-package="com.company"/>
<!-- allows for ${} replacement in the spring xml configuration from the system.properties file on the classpath -->
<context:property-placeholder location="classpath:system.properties"/>
<!-- creates an activemq connection factory using the amq namespace -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="${jms.url}" userName="${jms.username}" password="${jms.password}" />
<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="amqConnectionFactory" />
    <property name="exceptionListener" ref="jmsExceptionListener" />
    <property name="sessionCacheSize" value="100" />
</bean>
<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <constructor-arg ref="connectionFactory"/>
</bean>
<!-- listener container definition using the jms namespace, concurrency is the max number of concurrent listeners that can be started -->
<jms:listener-container concurrency="10" >
    <jms:listener id="QueueListener" destination="Queue.Name" ref="queueListener" />
</jms:listener-container>
</beans>

There are two things to notice here. First, I’ve added the amq and jms namespaces to the opening beans tag. Second, I’m using the Spring 2.5 annotation based configuration. By using the annotation based configuration I can simply add @Component annotation to my Java classes instead of having to specify them in the spring context xml explicitly. Additionally, I can add @Autowired on my constructors to have objects such as JmsTemplate automatically wired into my objects.

QueueSender

package com.company;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class QueueSender
{
    private final JmsTemplate jmsTemplate;

    @Autowired
    public QueueSender( final JmsTemplate jmsTemplate )
    {
        this.jmsTemplate = jmsTemplate;
    }

    public void send( final String message )
    {
        jmsTemplate.convertAndSend( "Queue.Name", message );
    }
}

Queue Listener


package com.company;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component
public class QueueListener implements MessageListener
{
    public void onMessage( final Message message )
    {
        if ( message instanceof TextMessage )
        {
            final TextMessage textMessage = (TextMessage) message;
            try
            {
                System.out.println( textMessage.getText() );
            }
            catch (final JMSException e)
            {
                e.printStackTrace();
            }
        }
    }
}

JmsExceptionListener

package com.company;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;

import org.springframework.stereotype.Component;

@Component
public class JmsExceptionListener implements ExceptionListener
{
    public void onException( final JMSException e )
    {
        e.printStackTrace();
    }
}

Update

I have finally updated the wiki at activemq.apache.org. The following pages now recommend using MessageListenerContainers and JmsTemplate with a Pooling ConnectionFactory instead of JCA and Jencks.
About these ads

About Benjamin Darfler
Coder, Meditator, Photographer

55 Responses to Efficient Lightweight JMS with Spring and ActiveMQ

  1. Rob Davies says:

    Great article. We so need to tidy up the ActiveMQ website!

  2. Michal Cevela says:

    Great article, Benjamin! I’m just trying to integrate ActiveMQ v5.3 into SpringSource dm Server v2.0.4 and it seems to work .-) Well done, keep writing!

  3. javaguy44 says:

    Nice post.

    Thanks for clearing up the Jencks / session pooling issues, I think this was very much needed.

    When you update the ActiveMQ website, would be great if you can followup this post with the link…and any other resource optimizations you’ve learned

  4. Jon Drew says:

    Very nice write up of this topic. I wish I had seen it before I had finished my project. Thanks for the information about Jencks, I had started looking down that path, but never got that far… I guess I don’t need to revisit.

    Do you have any numbers to compare the ActiveMQ pooled connection factory vs the one shipped with Spring? I agree that Spring usually does a decent job with performance, but with the JMSTemplate written the way it is, I just wasn’t sure whether to use Spring or ActiveMQ’s connection pooling.

    Again, thanks for the write up on this topic.

    • I don’t have numbers comparing the two but its a very simple change in the xml config so switching them back and forth in your system should be trivial. My gut feeling is that the Spring version is more aware of how to deal with the rest of the Spring JMS classes and with the Spring lifecycle which is why I went with it. Additionally, ActiveMQ pool requires the activemq-pool jar and has a dependency on apache commons pool where as the Spring version has no dependancies and comes for free with the spring-jms jar. If you do test this, make sure you set the number of sessions to be the same on both.

  5. Kurt says:

    Thanks for this concise article.
    A lot information and explanation such a short way.

    Would be great if we can follow on your blog about transaction integration, sth like sessionTransacted on AbstractPollingMessageListenerContainer, in the future…

    • Kurt, thanks for the complement. Unfortunately I use transactions so I’m not that familiar with it though you probably need to use a JmsTransactionManager. If you need to coordinate message and database transactions I think you need to use JCA which spring supports as of 2.5 . Hope that helps.

  6. Apurav Chauhan says:

    Hey Benjamin,
    This is very nice stuff. But i am a little skeptical about how to execute JMS effeciently in a non Spring framework. Am working in struts currently and i have initialized the factory and producers in a static way. As u just mentioned that it creates connection on every request, thats y i kept the connection and producers in static variables.

    /**start the broker
    */
    static{
    BrokerService broker = new BrokerService();
    try {
    broker.addConnector(“vm://localhost”);
    broker.start();
    } catch (Exception e) {}
    }
    /*
    * Make the connection
    */
    static{
    try{
    connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD,”vm://localhost”);
    connection = connectionFactory.createConnection();
    connection.setExceptionListener(new JMSExceptionListener());
    connection.start();
    session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
    destination = session.createTopic(“Message Center”);
    producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }catch (Exception e) {}
    }

    And after this i ll directly use my producer object from other classes directly without using new operator.

    Please advice if the way i am doing is effecient or do u think somethig else can be done??

    Really thanks in advance

    • Only connections are thread safe, producers and sessions are not. If you are using the producer object in a multi threaded environment (which you probably are with struts) you will probably run into concurrency issues.

      I would recommend using the PooledConnectionFactory since it will cache sessions and message producers. Then you can get a new session and producer every time you want to send a message.

      Luckily message sending is pretty straight forward using the PooledConnectionFactory. If you are also doing message consumption then you have a whole other set of issues.

      • John Russell says:

        What he said, but also starting that stuff in static initializers inside try’s with no catches is going to make that next to impossible to a) test b) debug if there is actually a problem with initialization. In static blocks means that you won’t even be able to write a method to check to see if the broker is running without starting it. Any time you load the class in any way you’ll start up the message bus.

        Just a thought.

      • Apurav Chauhan says:

        @Benjamin
        PooledConnectionFactory is a good idea.Thanks Benjamin.
        —————————————————–
        @John
        Reason for Static initialzers:
        Actually i implemented the singleton approach for the producer object. Thats y i was initializing it once in the static block and thereafter i’ll keep using the same object to send different messages.

        I was just curious about wheter i ll run into any thread conflicts if i use the same producer object for sending msgs.

        Well i think,even though the producer object is sared b’w threads but because modifying the internal state of the producer obj, i should not cause any concurency problems.

        Please bear with my questions coz am a novice and u both are pros.

      • Apurav,

        Maybe I’m not fully understanding you but you said the producer is shared between threads. If thats the case then you will probably run into concurrency problems.

        Also you should look into lazy loading the singleton http://en.wikipedia.org/wiki/Initialization_on_demand_holder_idiom not this idiom is tricky and relies on some weirdness in the jvm so you should use that code to the letter and do something useful in your catch block if all you do is write to standard error.

      • Apurav Chauhan says:

        hmm! thanks

  7. ludovicianul says:

    I’m working with Flex, Spring, ActiveMQ and BlazeDS. I have a producer that posts messages to a Topic every second. When i first subscribe to that topic with a Flex client everything is ok. After i close the browser and i re-open the application i receive all the messages that were sent while i was offline. The topic is NOT durrable. It seems that ActiveMQ is not closing the connection with the Flex cleint event that i closed the browser. Do you know what the problem might be? If i subscribe/unsubscribe from Java client everything is ok.

    • 1) How have you checked that the topic is non durable? It never hurts to question assumptions.

      2) Is it possible to explicitly unsubscribe and or close the connection / socket in flex before you close the browser? I know that flash doesn’t have a shutdown hook but I’m not sure about flex.

      3) Is it possible to explicitly start up a new subscription / connection /socket when you reopen the app?

      We have seen some crazy behavior in flash re: sockets. The vm will hold on to them and keep trying to reconnect gracefully for the client app. Seems nice when you are a novice programmer dealing with intermittent network issues but can actually be kinda annoying and unexpected if you are more advanced.

      Besides that I have no clue, I’m not a flex/flash/blazeds guy.

    • Apurav Chauhan says:

      Try doing:
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

      Might work

  8. Pingback: Synchronous Request Response with ActiveMQ and Spring « CodeDependents

  9. Pingback: ConnectionFactories and Caching with Spring and ActiveMQ « CodeDependents

  10. Mark says:

    Certainly, the documentation surrounding the Jencks libraries is confusing, and performance comparisons between the various connection pooling options are far and few between, but I thought I should point out that there are more recent releases then stated above (Jencks 2.2 is the latest from Sept 2010 and can be found at http://repo1.maven.org/maven2/org/jencks/ ).
    Mark

  11. Nitin says:

    Hello !

    I am new to spring and amq.
    I have created springcontext.xml and system.properties and classes as mentioned in article.
    and wrote Test class is as follows:

    public class Test {
    public static void main(String[] args) {
    String[] configuration = new String[] { “/springcontext.xml” };
    Test.applicationContext = new ClassPathXmlApplicationContext(configuration);
    Test.applicationContext.registerShutdownHook();
    QueueSender queueSender = (QueueSender) Test.applicationContext.getBean(“queueSender”);
    queueSender.send(“FIRST MSG”);
    Test.applicationContext.close();
    }
    }

    Spring obviously throws exception “NoSuchBeanDefinitionException” queueSender.

    May be its basic but please help me find better way to create/access instance of QueueSender and send sample message to queue.

    Your help will get me started.

    Thank you,
    Nitin.

    • I noticed that I had the package name wrong in QueueSender. Try it again. If you have changed any of the packages be sure to change

      in the spring xml to match your new package names

      • Nitin says:

        Hi Benjamin,

        Thank you for your instant reply :)

        I had changed package names and xml but found It was classpath for xml and few “class not found” issues :)

        It is working perfectly fine now, receiver gets message and admin page of activemq shows right count too

        Please let me know if i should get instance of QueueSender in any better way at all :)

        Thanks again,
        Nitin

  12. Neeraj Kumar says:

    This is by far the best article on Integrating ActiveMQ and Spring ….
    Nice, tidy and neat.

    Thanks Benjamin

  13. Prem says:

    I have an issue with using CachingConnectionFactory and JMSTemplate on IBM Websphere 6.1

    1) How to reconnect to MQ, when there is an queue Mangers goes down and comes back.

    2) how to use reconnectOnException in IBM web sphere.

    Any help is appreciated.

  14. Maxim Valyanskiy says:

    What is the proper way to destroy embedded broker with vmtransport? Tomcat logs following lines when application stops:

    25.11.2010 13:49:23 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
    SEVERE: The web application [] appears to have started a thread named [VMTransport] but has failed to stop it. This is very likely to create a memory leak.
    25.11.2010 13:49:23 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
    SEVERE: The web application [] appears to have started a thread named [VMTransport] but has failed to stop it. This is very likely to create a memory leak.
    25.11.2010 13:49:23 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
    SEVERE: The web application [] appears to have started a thread named [VMTransport] but has failed to stop it. This is very likely to create a memory leak.
    25.11.2010 13:49:23 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
    SEVERE: The web application [] appears to have started a thread named [ActiveMQ Task] but has failed to stop it. This is very likely to create a memory leak.
    25.11.2010 13:49:23 org.apache.coyote.http11.Http11AprProtocol destroy

  15. Marc Hollins says:

    Thank you for this post, it has definitely helped me with my Spring JMS configuration.

    One question though:
    In the Spring Context XML, does the listener-container need a reference to the connection factory?

    eg.
    < jms:listener-container concurrency="10" connection-factory=”connectionFactory” >

    thanks.

    • If I remember correctly if there is a connection factory object configured in spring with the name connectionFactory then it will be used by default. If you have named your connection factory something else then yes you need to give it a reference.

  16. SAURABH says:

    Hi, Benjamin
    I am an absolute starter with ActiveMQ. The above code snippets need to be supported with some other code. Can you share me the link to the full code repo for this example which would contain all config and properties file. Thanks in advance

    • The ActiveMQ website has some great resources for beginners and I updated the Spring documentation there to reflect this blog post. Try starting with this. You might also find some other useful blog posts here or you can splurge and buy the just released book on ActiveMQ.

  17. fei long says:

    Hi, Benjamin
    I am an absolute starter with ActiveMQ,I just use the above code in my project, but i find an error

    Unable to locate Spring NamespaceHandler for XML schema namespace [http://activemq.apache.org/schema/core]

  18. fei long says:

    Hello !

    I am new to spring and amq

    I use the example above, Fast data transfer,i used 100 threads to send 100,000 data,and open 100 consumers to received message, No problem the first time,Then I stop all the threads, A few minutes later,I open the 100 threads again, I find the activeMQ page display the Messages Enqueued was showly, after the theads completed, i find the Messages Enqueued and the Messages Dequeued only has 1000 messages, lost 99,000 data,then I restart the activeMQ, the 99,000 data was in the display in the Number Of Pending Messages. so I think the second time all the messages has send to the activeMQ
    But only display 1/10 messages ,I has set the sessionCacheSize 100,Can you help me analyze the reasons ?
    thanks

  19. Sridhar says:

    Thanks for the post extremely useful
    The configuration does not work out of the box for spring 2.5.5, org.springframework.jms.connection.CachingConnectionFactory does not have constructer used in the example, however spring 2.5.6 works

  20. Thank god some bloggers can write. Thank you for this piece of writing.

  21. Dan Be says:

    I keep getting the following error:

    Error creating bean with name ‘queueSender’: Unsatisfied dependency expressed through constructor argument with index 0 of type [org.springframework.jms.core.JmsTemplate]: Error creating bean with name ‘jmsTemplate’ defined in ServletContext resource [/WEB-INF/applicationContext.xml]: Cannot resolve reference to bean ‘connectionFactory’ while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘connectionFactory’ defined in ServletContext resource [/WEB-INF/applicationContext.xml]: 1 constructor arguments specified but no matching constructor found in bean ‘connectionFactory’ (hint: specify index and/or type arguments for simple parameters to avoid type ambiguities)

    did any one run into the same error?

    • @Sridhar above mentions that the xml only works with Spring 2.5.6 not 2.5.5. The issue is with the constructor for org.springframework.jms.connection.CachingConnectionFactory. I would check the javadoc for your version and make the necessary changes.

  22. Shiva says:

    Hi Benjamin

    I tried this and it works great in my system… I am getting a response in sync mode.. But I see that the message consumers count keeps on increasing for ever… I tried switching to active mq pooled connection factory too…. that doesnt seem to solve my issue…Can you please advise…

  23. Good article. I will be dealing with a few
    of these issues as well..

  24. ray says:

    Hi,
    I would like to know by your experience what do you think of having ActiveMQ comparing to HornetQ both integrated in Spring.

    Latency and throughout manners.

    thanks,

    • I’m glad to see Spring integrates with both. When I looked at this a few years ago HornetQ had some compelling performance numbers but I found ActiveMQ to be a more welcoming community which was very important when things didn’t work as expected. Additionally, when I started the project, HornetQ wasn’t an option so I chose ActiveMQ by default. I haven’t looked at either of these in years so I cannot comment on their current performance but I’m glad there is competition. Apollo looks compelling as well and will eventually become ActiveMQ 6.0.

  25. samsbalu says:

    Hi, when i configured like this to maintain queue ordering, but still there are 2 concurrent listeners are consuming the queue messages why? and how to get rid of this.

  26. hosting says:

    I like this web site extremely a lot so much great information. “Books are not made to be believed, but to be subjected to inquiry.” by Umberto Eco.

  27. Pingback: Active MQ – Producer / Consumer Spring | technicalpractical

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: