Thursday, May 10, 2012

Java client to send/receive messages for ActiveMQ

ActiveMQ is a messagebroker supporting JMS 1.1 specification..Here i share a simple java client which can be used to send/browse  messages  in a destination which(ie: queue) is created in ActiveMQ server.
You might need ActiveMQ libraries in your classpath.

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;

public class ActiveMQTest {
 private static ActiveMQDestination destination;

    public static void offer() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");

        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) sendSession.createQueue("TEST.FOO");

        MessageProducer producer = sendSession.createProducer(destination);

        for (int i = 1; i < 5; i++) {
            TextMessage message = sendSession.createTextMessage(String.valueOf(i));
            // Send the messages
            producer.send(message);
            System.out.println("########Sent message : " + message.getText());
        }
        producer.close();
        sendSession.close();
        connection.close();
    }

    public static void peek() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");

        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        QueueBrowser browser = session.createBrowser((Queue) destination);
        Enumeration enumeration = browser.getEnumeration();

        if (enumeration.hasMoreElements()) {
            Object msg = enumeration.nextElement();
            TextMessage m = (TextMessage) msg;
            System.out.println(" !!!!!!!!Browsed  msg " +m.getText());
        }

        browser.close();
        session.close();
        connection.close();

    }

    public static void poll() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        MessageConsumer consumer = session.createConsumer(destination);
        TextMessage m = (TextMessage) consumer.receive(1000);
        if (m != null) {
            System.out.println("*********** Polled Messg" + m.getText());
        }
        consumer.close();
        session.close();
        connection.close();
    }

  
    public static void main(String args[]) {
        try {
            offer();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
             for (int i = 1; i < 5; i++) {
                peek();
                poll();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1 comment:

  1. I have a Restful service API developed with JAX-RS and jersey. I have deployed the same in TOMCAT 7. Now I would like to implement Activemq so that I would keep all request in a queue and process the request resource. How to do this and integrate with tomcat7. How to integrate ActiveMq with Tomcat7 or my rest service webapp. How to call the service.

    Important :- Inside the Rest Api, I am using FilterChaining concept for security concern and after verification of the calling party, I am simply forwarding the request to the resource. For this I have added in web.xml.

    ReplyDelete