Monday, October 3, 2011

Creating a queue in OracleAQ

Oracle provides database integrated messaging functionality,which is called OracleAQ. 
Oracle JMS (OJMS) is the JMS interface to the Oracle Database Streams Advanced Queuing (AQ) feature.
To work with OracleAQ, we need to install Oracle database server.(V11g)


In this post lets have a look on how we can create a queue programmatically using OJMS libraries and AQ libraries.
For this, specific user should have required authorization to create queues..
  • Connect as 'sys/admin' ;
  • create user ratha identified by ratha;
  • grant create session to ratha;
  • grant connect, resource to ratha;
  • grant aq_administrator_role to ratha  identified by  ratha;
  • grant execute on dbms_aq to  ratha;
  • grant execute on dbms_aqadm to  ratha;
  • exec dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','ratha');
  • exec dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','ratha'); 
  • grant execute on sys.aq$_jms_text_message to ratha; 
  • Connect as 'ratha/ratha'
After providing required authorization for the new user 'ratha' (password : ratha) you could be able to login as user 'ratha' and able to create table/queue etc..

With the particular user information, now lets try to create a queue in oracle database.

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class OracleAQClient {

public static QueueConnection getConnection() {

  String hostname = "localhost";
  String oracle_sid = "orcl";
  int portno = 1521;
  String userName = "ratha";
  String password = "ratha";
  String driver = "thin";
  QueueConnectionFactory QFac = null;
  QueueConnection QCon = null;
  try {
   // get connection factory , not going through JNDI here
   QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, 
portno,driver);
   // create connection
   QCon = QFac.createQueueConnection(userName, password);
   } catch (Exception e) {
   e.printStackTrace();
  }
  return QCon;
 }

 public static void createQueue(Session session, String user,
String qTable, String queueName) {
  try {
   /* Create Queue Tables */
   System.out.println("Creating Queue Table...");

   AQQueueTableProperty qt_prop;
   AQQueueTable q_table = null;
   AQjmsDestinationProperty dest_prop;
   Queue queue = null;
   qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");

   /* create a queue table *///
   // /* Drop the queue table if already exists */
   // try{
   // q_table = ((AQjmsSession) session).getQueueTable(user, qTable);
   // q_table.drop(true);
   // System.out.println("Droped older queuetable...");
   // }
   // catch(Exception e){
   // e.printStackTrace();
   // return;
   // }

   q_table = ((AQjmsSession) session).createQueueTable(user, qTable,
qt_prop);
   System.out.println("Qtable created");
   dest_prop = new AQjmsDestinationProperty();
   /* create a queue */
   queue = ((AQjmsSession) session).createQueue(q_table, queueName, 
dest_prop);
   System.out.println("Queue created");
   /* start the queue */
   ((AQjmsDestination) queue).start(session, true, true);
  } catch (Exception e) {
   e.printStackTrace();
   return;
  }
 }

 public static void sendMessage(String user, String queueName) {

  try {
   QueueConnection QCon = getConnection();  
   Session session = QCon.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
   QCon.start();
   Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
   MessageProducer producer = session.createProducer(queue);
   TextMessage tMsg = session.createTextMessage("test");
   producer.send(tMsg);
   System.out.println("Sent message = " + tMsg.getText());

   session.close();
   producer.close();
   QCon.close();

  } catch (JMSException e) {
   e.printStackTrace();
   return;
  }
 }

 public static void browseMessage(String user, String queueName) {
  Queue queue;
  try {
   QueueConnection QCon = getConnection();  
   Session session = QCon.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
  
   QCon.start();
   queue = ((AQjmsSession) session).getQueue(user, queueName);
   QueueBrowser browser = session.createBrowser(queue);
   Enumeration enu = browser.getEnumeration();
   List list = new ArrayList();  
   while (enu.hasMoreElements()) {
    TextMessage message = (TextMessage) enu.nextElement();   
    list.add(message.getText());
   }
   for (int i = 0; i < list.size(); i++) {
    System.out.println("Browsed msg " + list.get(i));
   }
   browser.close();
   session.close();
   QCon.close();

  } catch (JMSException e) {
   e.printStackTrace();
  }

 }

 public static void consumeMessage(String user, String queueName) {  
  Queue queue;
  try {
   QueueConnection QCon = getConnection();  
   Session session = QCon.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
   QCon.start();
   queue = ((AQjmsSession) session).getQueue(user, queueName);
   MessageConsumer consumer = session.createConsumer(queue);
   TextMessage msg = (TextMessage) consumer.receive();
   System.out.println("MESSAGE RECEIVED " + msg.getText());

   consumer.close();
   session.close();
   QCon.close();
  } catch (JMSException e) {  
   e.printStackTrace();
  }
 }

 public static void main(String args[]) {
  String userName = "ratha";
  String queue = "test";
  // createQueue( userName, qTable, queue);
  sendMessage(userName, queue);
  browseMessage(userName, queue);
  // consumeMessage(userName, queue);
 }
}

You might need following jars in your class-path in order to run above java client
  • ojdbc6.jar (can be found at db_home\jdbc\lib)
  • jta.jar (can be found at db_home\jdbc\jlib)
  • jmscommon.jar (can be found at db_home\RDBMS\jlib folder)
  • aqapi.jar(can be found at db_home\RDBMS\jlib folder)


8 comments:

  1. Hi,

    The code is the best. But why the queue remains in the database after consuming???

    I called consumeCode & it displayed the consumed message too but the database still has the same message in queue. Also, the retry_count column in the queue keeps on increasing whenever I call consume.

    Appreciate if you can answer 2 Question:
    1. How to make consumecode to remove the message from database queue?
    2. Why retry_count column in queueu in database increases?
    3. How to receive bulk messages or how to consume bulk messages at once? If queue has 5 records, how to retrieve all 5 in one attempt?

    Thanks!

    ReplyDelete
  2. Hi,
    If you notice above code there are 4 separate methods..
    1.CreateQueue(): This is to create a queue at OracleDB. Once you created you don't need to execute this method.Because queue is created . Thats why, i commented at main() method.
    2.sendMessage(): This is to send messages to the queue;
    3.browseMessage(): This is just to browse messages.
    4.consumeMessage(): This is to consume Messages. But after consuming the message, message WILL BE REMOVED from the queue..This is how JMS works..

    I don't know, why it remains in your queue. I think you are executing the code with send message() and browseMessage()..

    Commentout browseMessage()+consumeMessage() and send no of messages and after that run only consumeMessage() code and see..

    If you want to delete the queue, you can use OracleDB console and delete the queue..

    To retrieve bulk messages at once, use a 'for' loop and aggregate all messages in an array and print them:)..

    ReplyDelete
  3. Hi Vijayratha,

    Thanks for your reply. However, I tied invoking only Consumemessage code. Please note that queue is already having records in my case, in other words, I'm not using your sendmessage method to send messages to queue. And when I invoke consumemessage method alone using your code, the message remains in the queue with increasing retry_count column in queue table. However, if I change the code line from your consumemessage to a shown below:

    Session session = QCon.createQueueSession(false,
    Session.AUTO_ACKNOWLEDGE);

    then the messages get removed. I used Auto acknowledge instead of client acknowledge. Can you explain why I need to use auto to remove messages from queue?

    Also, my DBA told me specifically not to use for loop to avoid hits to DB. Is there other efficient way to retrieve messages in bulk say like bulk receive or so???

    Also, do you have an example where we can use messagelistener using your code? Using this example, how would I know whether there are messages in the queue? I mean messages may keep coming at any time & I need to retrieve them as soon as they arrive, how can I do that?

    I really appreciate your efforts taken for this example!

    Thanks!

    ReplyDelete
  4. AUTO_ACKNOWLEDGE : the session automatically acknowledges a client's receipt of a message either when the session has successfully returned from a call to receive or when the message listener the session has called to process the message successfully returns

    CLIENT_ACKNOWLEDGE: the client acknowledges a consumed message by calling the message's acknowledge method.

    So, using which mode depends on your program..

    I dont have any sample for ojms message listener..But this guide will be helpful
    http://docs.oracle.com/cd/B19306_01/server.102/b14257/jm_create.htm

    ReplyDelete
  5. Hi,

    I could not understand "To retrieve bulk messages at once, use a 'for' loop and aggregate all messages in an array and print them:).. "

    Can you please provide the for loop sample? What would be the size of the for loop as the consumemessage does not have any size??

    Is there any method which will tell us queue size??

    I'll really appreciate if you can provide sample which will show us on how to receive messages at once?

    Thanks!

    ReplyDelete
  6. Well you can find lot of samples on how to use for loop in java..
    to find queue depth;(check the broser code)

    Enumeration enu = browser.getEnumeration();
    List list = new ArrayList();
    while (enu.hasMoreElements()) {
    TextMessage message = (TextMessage) enu.nextElement();
    list.add(message.getText());
    }
    for (int i = 0; i < list.size(); i++) {
    System.out.println("Browsed msg " + list.get(i));
    }

    ReplyDelete
  7. Thanks for the quick reply. However, I do not want to use browse api. Since I'll be using consume method only, how can I do for loop in consume???

    The reason I need to do in consume is because my queue will keep on getting records and I'm calling ur method of consume but it returns me one record & removes from the queue.

    How can I get the queue size from consume method & then remove them from queue immdly?? Kind of bulk remove...

    I'll really appreciate if you can provide consume method with for loop code, in that manner, it'll be helpful for other readers too.

    Thanks again for excellent tutorial!

    ReplyDelete
  8. Can I use browse API to loop through & then use receive inside it???

    If the queue has 10 records, how can I receive them? Basically, I'm trying to understand how people consume messages?

    If I invoke thread every minute, if it consumes only 1 record at a time, it'll take long time to process even 10 records. Imagine, if queue has 10000 records in 5 minutes, then how should we retrieve it using this code??

    Earlier you mentioned that I can use for loop & thats where I'm still struggling how to write for loop & consume those messages when thread wakes up. Basically, I'm running a simple thread which checks the queue & I want that thread to process all data in the queue at that time & then go to sleep for 1 min & then again do the same thing. Here, I'm struggling on getting all the data from the queue when the thread wakes up.

    I'll really appreciate if you can provide the for loop sample which will consume messages in the queue when thread wakes up.

    Thanks!

    ReplyDelete