Java JMS Publish-Subscribe Message Example

The publish/subscribe messaging domain is a one-to-many model in which a publisher sends a message through a topic to all active subscribers who receive it.

Publish Subscribe JMS Messaging

Java Message Service (JMS) is a Java API that allows applications to create, send, receive, and read messages. In this example, we will go through one example of the publish/subscribe messaging domain.

The publish/subscribe messaging domain is a one-to-many model in which a publisher sends a message through a topic to all active subscribers who receive it. It enables asynchronous communication between message producers (publishers) and message consumers (subscribers).

In the Pub/Sub model:

  • Publishers send messages to a topic.
  • Subscribers register to the topic to receive messages.
  • A topic acts as a logical channel, ensuring all registered subscribers receive the published messages.
Publish Subscribe JMS Messaging
Publish-Subscribe JMS Messaging

1. Prerequisites

To follow along with the examples, ensure the following:

  1. JDK Installed: Java Development Kit (JDK) 8 or later.
  2. JMS Provider: A JMS provider, such as Apache ActiveMQ, must be installed and running.
  3. JMS Library: Include the necessary JMS libraries in your project.

For Maven-based projects, add the following dependency for ActiveMQ:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.17.1</version>
</dependency>

2. Setting Up JMS in Pub/Sub Model

2.1. Connecting to a JMS Provider

Use the InitialContext to look up the ConnectionFactory and establish a connection.

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;

public class JMSConnectionUtil {
    public static Connection createConnection() throws Exception {
        Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        env.put(Context.PROVIDER_URL, "tcp://localhost:61616"); // Default ActiveMQ URL

        Context context = new InitialContext(env);
        ConnectionFactory factory = (ConnectionFactory) context.lookup("ConnectionFactory");
        return factory.createConnection();
    }
}

2.2. Creating a Topic

A topic is a logical channel used for communication in the Pub/Sub model. Topics can also be looked up using the InitialContext:

Context context = new InitialContext(env);
Topic topic = (Topic) context.lookup("dynamicTopics/MyTopic");

3. Publisher Flow Example

In a typical publish/subscribe message sender application, the following steps are performed:

  • Firstly, we will obtain the InitialContext object for the JMS server.
  • After that, use the initial context object to look up a topic object.
  • Again, we will use the initial context object to look at the topic connection factory.
  • Then, use the topic connection factory to create the topic connection, which represents the physical connection of the JMS server.
  • After creating the topic connection factory, we will create the topic session. The first parameter will decide whether the session is transacted or not, but we will use a non-transacted session, and the second parameter will decide the delivery mode.
  • After this, we will create a topic publisher for the topic object and then make a message.
  • Then, send a message such as “Hello Subscribers!” to the topic object.
  • After that, close the topic connection. Closing the topic connection automatically closes the session and topic publisher.

Let’s look at example below:

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;

public class Publisher {
    public static void main(String[] args) {
        try {
            Connection connection = JMSConnectionUtil.createConnection();
            connection.start();

            Properties env = new Properties();
            env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
            env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
            Context context = new InitialContext(env);

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = (Topic) context.lookup("dynamicTopics/MyTopic");

            MessageProducer producer = session.createProducer(topic);
            TextMessage message = session.createTextMessage("Hello Subscribers!");

            producer.send(message);
            System.out.println("Message sent: " + message.getText());

            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4. Subscriber Flow Example

Most of the steps for the receiver side are the same as for the sender application – except it will listen to the message rather than send the JMS message.

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;

public class Subscriber {
    public static void main(String[] args) {
        try {
            Connection connection = JMSConnectionUtil.createConnection();
            connection.start();

            Properties env = new Properties();
            env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
            env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
            Context context = new InitialContext(env);

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = (Topic) context.lookup("dynamicTopics/MyTopic");

            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(message -> {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message: " + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            System.out.println("Subscriber is waiting for messages...");
            Thread.sleep(10000); // Keep the program running to receive messages

            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

In the above code, the receiver will receive the message from the sender and print it, i.e. “Hello Subscribers!“.

5. FAQs

5.1. How to create durable queues?

We can ensure that the subscribers receive messages even when they are offline by enabling durable subscriptions.

MessageConsumer consumer = session.createDurableSubscriber(topic, "SubscriberName");

5.2. How to improve message durability?

We can use DeliveryMode.PERSISTENT to ensure that messages sent by the producer are not lost in the event of a JMS provider failure or crash.

MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

By setting the delivery mode to PERSISTENT, the message broker will persist the message data (e.g., to disk) before acknowledging receipt, ensuring the message’s durability.

Happy Learning !!

Weekly Newsletter

Stay Up-to-Date with Our Weekly Updates. Right into Your Inbox.

Comments

Subscribe
Notify of
5 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.