Thursday, May 10, 2012

Java client to send/receive messages for ActiveMQ

ActiveMQ is a messagebroker supporting JMS 1.1 specification..Here i share a simple java client which can be used to send/browse  messages  in a destination which(ie: queue) is created in ActiveMQ server.
You might need ActiveMQ libraries in your classpath.

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;

public class ActiveMQTest {
 private static ActiveMQDestination destination;

    public static void offer() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");

        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) sendSession.createQueue("TEST.FOO");

        MessageProducer producer = sendSession.createProducer(destination);

        for (int i = 1; i < 5; i++) {
            TextMessage message = sendSession.createTextMessage(String.valueOf(i));
            // Send the messages
            producer.send(message);
            System.out.println("########Sent message : " + message.getText());
        }
        producer.close();
        sendSession.close();
        connection.close();
    }

    public static void peek() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");

        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        QueueBrowser browser = session.createBrowser((Queue) destination);
        Enumeration enumeration = browser.getEnumeration();

        if (enumeration.hasMoreElements()) {
            Object msg = enumeration.nextElement();
            TextMessage m = (TextMessage) msg;
            System.out.println(" !!!!!!!!Browsed  msg " +m.getText());
        }

        browser.close();
        session.close();
        connection.close();

    }

    public static void poll() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        MessageConsumer consumer = session.createConsumer(destination);
        TextMessage m = (TextMessage) consumer.receive(1000);
        if (m != null) {
            System.out.println("*********** Polled Messg" + m.getText());
        }
        consumer.close();
        session.close();
        connection.close();
    }

  
    public static void main(String args[]) {
        try {
            offer();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
             for (int i = 1; i < 5; i++) {
                peek();
                poll();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}