Developing JMS Applications

This chapter will review how to develop some simple JMS applications.

Managed Objects

Establishing a connection to the JMS provider is one of the first actions a JMS application takes. The way that the connection is established is through the used of a Managed Object. It important that you understand why using Managed Objects is beneficial to an application.

The JMS API recommends that the connection factory and destination objects be accessed via JNDI. Since they are objects retrieved from JNDI, they are called Managed Objects. They can be managed outside of the application code. The JMS connection factory being used by a client application can be changed by an administrator.

The JMS connection factory objects can be though of as objects that store the configuration information that will allow a client to establish a connection to the JMS provider. Typically it will store information such as the host name and port under which where the JMS service is running.

The two main reasons to use JNDI to look up the connection factory and the destination objects are:

  • Application code remains JMS provider independent. The application does not have to create provider specific objects. It simply uses the JNDI API to cause it to create the provider specific objects.

  • Enhanced application administration. A administrator can change where an application is connecting to by adjusting the JNDI tree contents.

Doing a JNDI Lookup

This section will show you how to do a JNDI look up of a managed object. The first thing you must do before doing a JNDI lookup is to create an InitialContext object. The easiest way to initialize the InitialContext object is to use the default constructor.

When the default constructor is used, the JNDI API will look for a jndi.properties file in the CLASSPATH and use the information in it to connect to the correct JNDI server. The configuration of the jndi.properties file will depend on the JMS provider that you use.

You can use following ?jndi.properties? file to configure JNDI to use the JBoss JNDI server on the localhost machine:

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
java.naming.provider.url=localhost

Learning by example is one of the best ways to pick up on new technology. If you have never done a JNDI lookup before, just take a look at the following listings and you will see how easy it is.

  • Listing 3.3 shows you how to look up a QueueConnectionFactory.

  • Listing 3.4 shows you how to look up a TopicConnectionFactory.

  • Listing 3.5 shows you how to look up a Queue and a Topic destination.

JBossMQ connection factories implement both the QueueConnectionFactory and the TopicConnectionFactory interfaces. This is the reason that both Listing 3.3 and Listing 3.4 lookup the same JNDI location.

JBossMQ, by default, binds all topics under the ?topic/? JNDI sub-context and all queues under the ?queue/? JNDI sub-context.

An administrator can change the location of where the managed objects are bound under JNDI. For more information about how to configure the JNDI locations the objects are bound to, please see JBossMQ 3.2.1 ? The Core Administration Guide.

Listing 3.3 Looking up a QueueConnectionFactory

try {
   InitialContext ic;
   QueueConnectionFactory cf;

   ic = new InitialContext();
   cf = (QueueConnectionFactory)ic.lookup(?ConnectionFactory?);
   
} catch (NamingException e) {
   System.out.println("JNDI lookup failed: " + e.toString());
}

Listing 3.4 Looking up a TopicConnectionFactory

try {
   InitialContext ic;
   TopicConnectionFactory cf;

   ic = new InitialContext();
   cf = (TopicConnectionFactory)ic.lookup(?ConnectionFactory?);

} catch (NamingException e) {
   System.out.println("JNDI lookup failed: " + e.toString());
}

Listing 3.5 Looking up a Queue and a Topic.

try {
   InitialContext ic;
   Queue q1;
   Topic t1;

   ic = new InitialContext();
   q1 = (Queue)ic.lookup(?queue/testQueue?);
   t1 = (Topic)ic.lookup(?topic/testTopic?);

} catch (NamingException e) {
   System.out.println("JNDI lookup failed: " + e.toString());
}

Creating a JMS Producer

This section shows you how you can create a simple application that acts as a JMS message producer for the Point-to-Point and the Pub-Sub messaging models. The sample programs in this section do not try to implement proper error handling to keep the source code easy to follow.

A Pub-Sub Message Producer

Listing 3.6 provides you the sample program that will send a ?Hello World!? message to the ?testQueue?. After all of the local variables have been declared, you see the following lines of code:

ctx = new InitialContext();
cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
destination = (Queue)ctx.lookup("queue/testQueue");

These line of code lookup the managed objects that the application will be working with from JNDI. For more information on JNDI lookups, see chapter 3.4.

The default JBoss configuration has the following queues deployed: testQueue, A, B, C, D, DLQ, and ex. You can use them for testing and development.

Once the connection factory object is obtained, it will be used to create the connection to the JMS provider. After that, the connection can then be used to establish a JMS session:

connection = cf.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 

The first argument into the createQueueSession(...) method is weather or not the JMS session is going to be transacted. Our example passes in ?false? so that the session is not transacted. Transacted sessions will be covered in chapter 3.8. The second argument to the method sets up the acknowledgment mode. The acknowledgment mode only affects how messages are consumed so this setting does not affect our example. Acknowledgment modes are covered in chapter 3.6.3.

Once the session has been created, it can be used to create the sender object and the message that the sender will be sending. In our example, the following code:

sender = session.createSender(destination);
message = session.createTextMessage();
message.setText("Hello World!"); 

creates the sender object and sets it up so that all messages that it sends go to the destination that was initially looked up via JNDI. The code also creates a TextMessage and sets the text of the message to ?Hello World!?

At this point in the code all the objects that we need to send in the message have been initialized. Finally, the send method is to call on the sender object to send the message to the queue.

sender.send(message);

Once this method returns, the ?Hello World!? message will be sitting on the destination.

One way to see if the message has made it to the destination is to use the JBoss jmx-console and view the MBean which manages the ?testQueue? destination. The MBean will be named jboss.mq.destination:name=testQueue,service=Queue. Once, you select the MBean, look for the QueueDepth MBean attribute. This attribute will tell you how many messages are sitting on the queue. Figure 8 MBean Attributes from a Queue MBean is displaying a queue that is holding two messages.

Listing 3.6 Example: Sending a TextMessage to a Queue

import javax.jms.*;
import javax.naming.*;
              
public class MessageToQueue
{
   public static void main(String[] args) 
   throws NamingException,JMSException
   {
      InitialContext ctx;
      QueueConnectionFactory cf;
      QueueConnection connection;
      QueueSession session;
      Queue destination;
      QueueSender sender;
      TextMessage message;
      
      ctx = new InitialContext();
      cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
      destination = (Queue)ctx.lookup("queue/testQueue");
      
      connection = cf.createQueueConnection();
      session = connection.createQueueSession(false,
      Session.AUTO_ACKNOWLEDGE);
      sender = session.createSender(destination);
      message = session.createTextMessage();
      message.setText("Hello World!");
      
      System.out.println("Sending Message.");
      sender.send(message);
   
      connection.close();
      System.out.println("Done.");
   }
}

A Pub-Sub Message Producer

The previous example covered sending a ?Hello World!? message to queue. We will now cover sending the same message to a topic instead. If you compare the differences between sending a message to a Queue and sending a message to a Topic, you will notice the following differences:

  • QueueXXX classes become the equivalent TopicXXX classes.

  • The QueueSender class is converted to the TopicPublisher class.

  • The method to send the message is publish(...) instead of send(...).

Since there is direct mapping between the examples of the Point-to-Point case and the Pub-Sub case, these is no need to further explain the source code. The important thing to remember about this example is that the message is being sent to a Topic destination and therefore, the message will be delivered to all the consumers that are receiving messages from that destination.

Listing 3.7 Example: Sending a TextMessage to a Topic

import javax.jms.*;
import javax.naming.*;

/**
 * Sends a TextMessage to a Topic
 */
public class MessageToTopic
{
   public static void main(String[] args) throws NamingException, JMSException
   {
      InitialContext ctx;
      TopicConnectionFactory cf;
      TopicConnection connection;
      TopicSession session;
      Topic destination;
      TopicPublisher publisher;
      TextMessage message;
      ctx = new InitialContext();
      cf = (TopicConnectionFactory)ctx.lookup("ConnectionFactory");
      destination = (Topic)ctx.lookup("topic/testTopic");
      
      connection = cf.createTopicConnection();
      session = connection.createTopicSession(false,
      Session.AUTO_ACKNOWLEDGE); 
      publisher = session.createPublisher(destination);
      message = session.createTextMessage();
      message.setText("Hello World!");
      
      System.out.println("Sending Message.");
      publisher.publish(message);
      
      connection.close();
      System.out.println("Done.");
   }
}

Specifying the QoS

There are several factors that affect the QoS that a JMS provider will use while delivering a message to a consumer.

QoS Factor

Description

Delivery Mode

If the delivery mode of the message is persistent, then the message should not get dropped due to a server restart.

Priority

Messages are typically delivered to consumers in the order that they are produced. If a message is sent with a higher priority than the other previous messages, then the higher priority message will be delivered before the lower priority messages. This is a number in the range of 0 to 9.

Time To Live

The Time To Live is the amount of time that a message, once sent will remain in the JMS system without being dropped. Once the Time to Live of the message has been passed, most JMS providers will quietly drop the message and it will not be delivered to a consumer.

These factors can be configured on a per message basis when the messages are produced. The easiest way to set the QoS settings is to set the corresponding bean properties on the QueueSender or TopicPublisher object.

The following example configures a publisher and a sender so that it sends persistent messages with a priority of 7 and a Time To Live value of one minute:

QueueSender sender=...
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setPriority(7);
sender.setTimeToLive(60000);
sender.send(message);
....
TopicPublisher publisher=...
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
publisher.setPriority(7);
publisher.setTimeToLive(60000);
publisher.send(message);

4.3Creating a JMS Message Consumer

This section will focus on how to create JMS message consumers. Many different options are available to a developer that is creating a message consumer. Consumers can:

  • Operate in either the Point-to-Point model or the Pub-Sub model.

  • Use different Acknowledgement Modes to control the redelivery of messages.

  • Choose to have the messages delivered synchronously asynchronously.

  • Receive a subset of messages by using selectors.

Receiving a Message from a Queue

We will first describe how to create one of the simplest message consumers, a synchronous consumer that receives a text message from a queue. Listing 3.8 is the example program that will be reviewed.

The first thing the application does is look up the managed objects using JNDI. More information about managed objects can be found in chapter 3.4.

ctx = new InitialContext();
cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
destination = (Queue)ctx.lookup("queue/testQueue");

Once the management objects have been looked up, we can establish our connection and session with the JMS provider. To keep it simple, the session that our example establishes is non-transactional and it auto acknowledges messages when they are received.

A receiver is created to receive messages from the destination that was looked up via JNDI earlier.

connection = cf.createQueueConnection();
session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
receiver = session.createReceiver(destination);

Now that all of the JMS objects have been initialized, our application is ready to receive a message from the queue.

The code below shows you how the connection is started and how the receiver is used to get the next message from the destination.

 connection.start(); message = (TextMessage)receiver.receive(); 
            

JMS Provider not Delivering Messages?

One of the most common errors that is often made when creating a consumer is forgetting to start the connection. You must start the connection so that the JMS provider knows that it is OK to start delivering messages to the consumers. The need to start() the connection is becomes more apparent once we cover asynchronous message delivery.

The receive() method call will block until a the JMS provider delivers a message to the receiver. Since it is a blocking method call, the JMS provider is doing a synchronous message delivery.

Listing 3.8 The source code for a Point-to-Point consumer

import javax.jms.*;
import javax.naming.*;

/**
 * Receives a TextMessage from a Queue
 */
public class QueueToMessage
{
   public static void main(String[] args) throws NamingException, JMSException
   {
      InitialContext ctx;
      QueueConnectionFactory cf;
      QueueConnection connection;
      QueueSession session;
      Queue destination;
      QueueReceiver receiver;
      TextMessage message;
      
      ctx = new InitialContext();
      cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
      destination = (Queue)ctx.lookup("queue/testQueue");
      
      connection = cf.createQueueConnection();
      session = connection.createQueueSession(false,
      Session.AUTO_ACKNOWLEDGE);
      receiver = session.createReceiver(destination);
      
      System.out.println("Waiting For A Message.");
      connection.start();
      message = (TextMessage)receiver.receive();
      System.out.println("The message was: "+message.getText());
   
      connection.close();
      System.out.println("Done.");
   }
}

Receiving a Message from a Topic

Now we will examine the counter-part to the previous example, a consumer that receives a TextMessage from a Topic. This example is almost the same as the Queue example except that was converted so that it could operate in the Pub-Sub model. Only two changes were needed.

  • QueueXXX classes become the equivalent TopicXXX classes.

  • The QueueReceiver class is changed to the TopicSubscriber class.

import javax.jms.*;
import javax.naming.*;

/**
 * Receives a TextMessage from a Topic
 */
public class TopicToMessage
{
   public static void main(String[] args) throws NamingException,
   JMSException
   {
      InitialContext ctx;
      TopicConnectionFactory cf;
      TopicConnection connection;
      TopicSession session;
      Topic destination;
      TopicSubscriber subscriber;
      TextMessage message;
      
      ctx = new InitialContext();
      cf = (TopicConnectionFactory)ctx.lookup("ConnectionFactory");
      destination = (Topic)ctx.lookup("topic/testTopic");
      
      connection = cf.createTopicConnection();
      session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      subscriber = session.createSubscriber(destination);
      
      System.out.println("Waiting For A Message.");
      connection.start();
      message = (TextMessage)subscriber.receive();
      System.out.println("The message was: "+message.getText());
         
      connection.close();
      System.out.println("Done.");
   }
}

Acknowledgement Modes

JMS consumers should be careful about the acknowledgment mode that they use. The acknowledgment mode is specified when the session is created. Any receivers created using the same session will have the same acknowledgment mode. The acknowledgment mode controls when the JMS provider considers that a message has been delivered successfully and when the message can be backed out so that it can be redelivered. In general, if a client application terminates before a message is acknowledged, the message is backed out and redelivered to the consumer.

If your session is transacted the acknowledgment mode setting is ignored. For more information about how JMS transactions work see chapter 3.8.

The examples that you have seen so far have used the AUTO_ACKNOWLEDGE acknowledgment mode. For simple JMS applications where there is not a critical need to ensure complete processing of all messages, this setting is sufficient.

  • AUTO_ACKNOWLEDGE - The JMS server considers that a message has been successfully delivered as soon as the message is passed back to the calling application. The only way that a message will be redelivered to the application is if the application or the provider terminate unexpectedly.

  • DUPS_OK_ACKNOWLEDGE ? Same as AUTO_ACKNOWLEDGE but it also means that the application can handle having a duplicate message delivered to it. Some providers could use this to optimize how they handle message acknowledgments.

  • CLIENT_ACKNOWLEDGE ? In this case, the client application must call the Message.acknowledge() to let the JMS provider know that the message and all previous messages were delivered successfully. Messages that have not been acknowledged by using Message.acknowledge(), will get redelivered if the application or JMS provider terminate unexpectedly or if the Session.recover() method is called.

Asynchronous Message Delivery

The consumers that have been described so far have been synchronous receivers. They are considered synchronous because the client application must do a blocking synchronous receive(...) call to get the next message. JMS also allows you to deliver messages asynchronously to objects that implement the MessageListener interface.

The MessageListener interface only contains one method:

public void onMessage(Message msg);

An application only needs to implement the MessageListener interface and JMS provider will send messages asynchronously to the onMessage(...) method.

Receiving an Asynchronous Message from a Queue

We will now review an example of an asynchronous Point-to-Point message consumer. Listing 3.9 contains the full source listing for the example. This example will pick up a TextMessage from the ?testQueue? display it on the console and exit.

In this example we create a new AsynchQueueToMessage instance and call the run() method on it.

public static void main(String[] args) throws NamingException, JMSException
{
   new AsynchQueueToMessage().run();
}

The run() method now does the bulk of initializing the JMS objects. You will notice that that the JMS objects are initialized the similar to how it was done in Listing 3.8. The main difference is that the local variables have become instance variables.

public void run() throws NamingException, JMSException
{
   ctx = new InitialContext();
   cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
   destination = (Queue)ctx.lookup("queue/testQueue");

   connection = cf.createQueueConnection();
   session = connection.createQueueSession(false,
   Session.AUTO_ACKNOWLEDGE);
   receiver = session.createReceiver(destination);

Near the end of the run() method is where you will notice a difference from our previous MessageToQueue example. This example calls the setMessageListener() method and passes a new instance of a custom MessageListener that will be defined a little later.

receiver.setMessageListener(new MyMessageListener());

Once all the objects have been initialized and ready to receive messages, the connection's start() method is called so that the MessageListener can receive it's messages.

Are you sure you want to start that connection?

Once you start to create MessageListener objects that are not trivial like our example, it is important that you do not call the start() method before the message listener has been fully initialized. Otherwise, the MessageListener will receive messages before it is ready to process the messages delivered to it.

   System.out.println("Waiting For A Message.");
   connection.start();
}

The final part of the example that we will examine is the implementation of the MessageListener. The MessageListener is implemented as inner classes. It's onMessage(...) method displays the TextMessage to the console and then closes the connection.

 class MyMessageListener implements MessageListener { public 
            void onMessage(Message msg) { try { TextMessage message = (TextMessage)msg; 
            System.out.println("The message was: "+message.getText()); 
            connection.close(); System.out.println("Done."); } catch 
            (JMSException e) { e.printStackTrace(); } } } 

What is keeping the application running?

An advanced Java programmer may notice that the main thread of execution returns normally after the connection.start(); executes. Normally a program would terminate after the main thread of execution returns. The reason the program does not terminate is because the connection starts a non-daemon thread in the background. The program will continue to run until the connection is closed.

Listing 3.9 The source code for a Asynchronous Point-to-Point consumer

import javax.jms.*;
import javax.naming.*;

/**
 * Listens for a TextMessage from a Queue
 */
public class AsynchQueueToMessage
{
   InitialContext ctx;
   QueueConnectionFactory cf;
   QueueConnection connection;
   QueueSession session;
   Queue destination;
   QueueReceiver receiver;
   
   public static void main(String[] args) throws NamingException,
   JMSException
   {
      new AsynchQueueToMessage().run();
   }

   public void run() throws NamingException, JMSException
   {
      ctx = new InitialContext();
      cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
      destination = (Queue)ctx.lookup("queue/testQueue");
      
      connection = cf.createQueueConnection();
      session = connection.createQueueSession(false,
      Session.AUTO_ACKNOWLEDGE);
      receiver = session.createReceiver(destination);
      receiver.setMessageListener(new MyMessageListener());
      
      System.out.println("Waiting For A Message.");
      connection.start();
   }

   class MyMessageListener implements MessageListener {

      public void onMessage(Message msg)
      {
         try
         {
            TextMessage message = (TextMessage)msg;
            System.out.println("The message was: "+message.getText());
            connection.close();
            System.out.println("Done.");
         }
         catch (JMSException e)
         {
            e.printStackTrace();
         }
      }
   }
}

Selectors

Selectors are used pick out a subset of messages that a consumer is interested in receiving. When you know a destination will have more messages than it is interested in consuming, the consuming application should use a selector. The selector works like similar to an SQL WHERE clause. The selector can use all all the message header and property fields to formulate it's selector.

You specify the selector when you create the consumer. Once a consumer is created with a selector, it cannot be changed. An example of a a consumer being created with a selector can be found below.

// For the point-to-point case:
receiver = session.createReceiver(destination, "JMSType = 'order' AND total>100 AND tax <1");
// For the pub-sub case:
subscriber = session.createSubscriber(destination, "JMSType = 'order' AND total>100 AND tax <1", false);

The consumer would only receive messages of type 'order' who's total is over 100 and who's tax is less than one. For more in depth information on selector, see the Message class in the JMS javadocs.

Durable Subscriptions

You you already learned from chapter 3.3 that a topic does not hold messages like queues do. Topics allow you to broadcast a message to multiple receivers. So what do you do if you want to broadcast a message to multiple consumers but you want those messages to be held for the clients that are not currently connected? The answer is that the clients needs to establish a durable subscription with the topic.

A client establishes a durable subscription by taking the following actions:

  • Establish a connection with with a user id and password that has the security needed to create durable subscriptions.

  • Use the Session.createDurableSubscriber to actually create the durable subscription. The additional string argument of them method is the subscription id of the durable subscription.

The following code snippet shows you how a durable subscription could be established:

connection = cf.createTopicConnection("john", "needle");
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
subscriber =
session.createDurableSubscriber(destination,"sub1");

Once a durable subscription has been established for the first time, the JMS provider will will retain messages for the subscriber even if the client is not currently connected. When the client reconnects with the same durable subscription, all retained messages will be delivered.

Once a client now longer needs the JMS provider to retain the messages of a durable subscription, the client should cancel it's durable subscription. A client would accomplish this by using the Session.unsubscribe() method:

session.unsubscribe("sub1");

If a client just wants the change an existing durable subscription to receive messages from a different topic and/or a different message selector, then the client does not need to un subscribe the subscription first. When a new durable subscription is established using an existing subscription id, the previous durable subscription is canceled first.

JMS Messages

The messages that producers and consumers pass to each other are very important since in essence the message is the interface between the two components. The JMS spec specifies that JMS providers must support the message types listed in Table 3.10.

A JMS message is made up of three parts:

  1. Message Header ? All JMS message types support a common set of header properties such as the JMS Message ID and the JMS Priority. Message selectors can operate on fields in the Message Header.

  2. Message Properties ? Allows application defined fields that message selectors can operate on. The amount of data you can store in the message properties is usually constrained in size by the JMS provider.

  3. Message Body ? Holds the bulk of the message data that the application is sending.

The type of JMS message that your application will use will depend on the type of message body that it will need to send. Use Table 3.10 to determine the type of JMS message you need to use.

Table 3.10 The Message types that the JMS API defines.

javax.jms Class Name

Description

BytesMessage

Designed to transport binary message body.

MapMessage

Designed to transport a set of key ? value pairs in the body of the message.

Message

Base class of all messages. Useful if you do not need to transport message body.

ObjectMessage

Use to transport a serializeable object in the body of the message.

StreamMessage

Provides a stream interface to transport a binary message body.

TextMessage

Use to transport a text based message body.

JMS Transactions

A JMS transactions allows a client application to perform multiple JMS operations as an atomic unit of work. In other words, it allows you to rollback or commit a set of messages that were sent or received. Doing a rollback undoes any JMS operations that were previously done by the session. Doing a commit finalizes all the work that was done by the session.

The JMS session object is what is used to control the JMS transaction. When you create a Session object, the first argument specifies if the session will be transacted. The code sample below creates a transacted JMS session.

// for the point-to-point case:
session = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);

// for the pub-sub case:
session = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);

When you are working with a transacted JMS session, there are a few issues you need to be aware of:

  • Messages sent to a destination do not get delivered to the destination until the session is committed. If the session is rolled back.

  • Messages received from a destination do not get acknowledged until the session is committed.

  • Any work that a session has done will be rolled back when the session is closed. If your application is unexpectedly terminated, all uncommitted transactions are rolled back.

The following code snippet is using a transacted session to publish two messages in a singe unit of work. If an exception occurs at anytime that the messages are being sent, the transaction is rolled back:

try
{
   publisher.publish(message1);
   publisher.publish(message2);
   session.commit();
}
catch (Throwable e)
{
   session.rollback();
}