Showing posts with label JMS. Show all posts
Showing posts with label JMS. Show all posts

Friday, July 1, 2016

Sample Kafka Producer and Consumer

Applied to: Apache Kafka Version: 0.9.X


Producer 



import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer { 
private void generateMessgaes() throws IOException {
    String topic = "myTopic";
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("serializer.class", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("client.id", "test");

    KafkaProducer producer = null;
    try {
          producer = new KafkaProducer<>(props);
          producer.send(new ProducerRecord(topic, "test msg"));
      } catch (Throwable e) {
        e.printStackTrace();

     } finally {
          producer.close(100,TimeUnit.MILLISECONDS);
      }
   }

public static void main(String[] args) throws IOException {
       Producer producer = new Producer();
       producer.generateMessgaes();
     }
}
Consumer


import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Logger;

public class Listener {

public void start() throws CoreException {
String topic = "myTopic";
List topics = Arrays.asList(topic);
Properties props = new Properties();
props.put("bootstrap.servers", "aukk1.leightonobrien.com:9092");
props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000");; props.put("heartbeat.interval.ms", "10000"); props.put("auto.offset.reset", "earliest"); props.put("group.id", "test");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("fetch.min.bytes", 1); props.put("receive.buffer.bytes", "10000"); props.put("max.partition.fetch.bytes", "10000"); props.put("request.timeout.ms", "40000"); KafkaConsumer consumer = new KafkaConsumer(props);
try { consumer.subscribe(topics);
} catch (Exception e) {
e.printStackTrace();
}
try {
while (true) {
ConsumerRecords records = consumer.poll(3000);
System.out.println("polling msges : " + records.count());
for (ConsumerRecord record : records) {
System.out.println("kafka record : " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally { consumer.close();
}
}
public static void main(String args[]) throws CoreException {
Listener listener = new Listener();
listener.start();
}
}

Wednesday, March 30, 2016

The session timeout is not within an acceptable range : Kafka v0.9.0.x

To overcome above issue [1] in the kafka 0.9.0.x, check following properties ;

  1. group.max.session.timeout.ms in the server.properties > session.timeout.ms in the consumer.properties
  2. group.min.session.timeout.ms in the server.properties < session.timeout.ms in the consumer.properties.

[1]

org.apache.kafka.common.errors.ApiException: The session timeout is not within an acceptable range.

Thursday, May 10, 2012

Java client to send/receive messages for ActiveMQ

ActiveMQ is a messagebroker supporting JMS 1.1 specification..Here i share a simple java client which can be used to send/browse  messages  in a destination which(ie: queue) is created in ActiveMQ server.
You might need ActiveMQ libraries in your classpath.

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;

public class ActiveMQTest {
 private static ActiveMQDestination destination;

    public static void offer() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");

        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) sendSession.createQueue("TEST.FOO");

        MessageProducer producer = sendSession.createProducer(destination);

        for (int i = 1; i < 5; i++) {
            TextMessage message = sendSession.createTextMessage(String.valueOf(i));
            // Send the messages
            producer.send(message);
            System.out.println("########Sent message : " + message.getText());
        }
        producer.close();
        sendSession.close();
        connection.close();
    }

    public static void peek() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");

        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        QueueBrowser browser = session.createBrowser((Queue) destination);
        Enumeration enumeration = browser.getEnumeration();

        if (enumeration.hasMoreElements()) {
            Object msg = enumeration.nextElement();
            TextMessage m = (TextMessage) msg;
            System.out.println(" !!!!!!!!Browsed  msg " +m.getText());
        }

        browser.close();
        session.close();
        connection.close();

    }

    public static void poll() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
                                                                                    "vm://localhost");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        MessageConsumer consumer = session.createConsumer(destination);
        TextMessage m = (TextMessage) consumer.receive(1000);
        if (m != null) {
            System.out.println("*********** Polled Messg" + m.getText());
        }
        consumer.close();
        session.close();
        connection.close();
    }

  
    public static void main(String args[]) {
        try {
            offer();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
             for (int i = 1; i < 5; i++) {
                peek();
                poll();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Saturday, April 28, 2012

Configuring JMS Transaction Rollback in WSO2ESB

WSO2 ESB supports transactions in two ways..
  • Transaction mediator
  • JMS transport transaction

Here lets have a look on how we can use jms transport transaction to roll back the transactions in case of failures..
In my previous post, i explained how can we configure a proxy to listen to a particular queue..
Here , we'll  add two more additional parameters, to make it rollback the transaction when failures occur..

  1. Add a service level parameter to enable local JMS transaction...
 <parameter name="transport.jms.SessionTransacted">true</parameter>
You can define this parameter at your transport configuration, which is defined in the axis2 configuration..
Service level parameters will overwrite the transport level parameters..
 
    2. Add "SET_ROLLBACK_ONLY" property in the mediation flow(ie: @ your InSequence). When failure occurs , this property helps to rollback the local transaction.

    <property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>
Here is the final modified configuration...

<proxy name="ErrorProxy" transports="https http jms" startOnLoad="true" trace="disable">
    <target endpoint="ErrorQueueEndpoint" inSequence="ErrorInSequence" faultSequence="fault"/>
  <parameter name="transport.jms.ContentType">
      <rules>
        <jmsProperty>contentType</jmsProperty>
        <default>text/xml</default>
      </rules>
  </parameter>
  <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
  <parameter name="transport.jms.DestinationType">queue</parameter>
  <parameter name="transport.jms.SessionTransacted">true</parameter>
  <parameter name="transport.jms.Destination">errorqueue</parameter>
</proxy>
<sequence name="ErrorInSequence">
   <property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>
    <log level="custom">
        <property name="Picked message" value="************"/>
    </log>
   <description/>
</sequence>
<endpoint xmlns="http://ws.apache.org/ns/synapse" name="ErrorQueueEndpoint">
    <address uri="http://localhost:9000/services/SimpleStockQuoteService1">
     <timeout>
       <duration>30000</duration>
       <responseAction>fault</responseAction>
     </timeout>
    </address>
</endpoint>
<sequence xmlns="http://ws.apache.org/ns/synapse" name="fault">
    <log level="full">
        <property name="FAULT SEQ" value="***"/>
        <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
        <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
    </log>
</sequence>
 If you down your endpoint(ie:ErrorQueueEndpoint); you will see that your message will remain the queue..
There is a detail article written by Rajika about the transactions support in WSO2ESB...

Monday, January 2, 2012

Defining JMS proxy to listen to a particular queue in wso2esb

There are  use cases, where user needs to define number of jms queues for different purposes and proxies will listen to those queues and do further processing with messages.
There is a predefined service level parameter available in axis2   to make 'jms' proxies to listen particular queues.
Check the following simple sample(for qpid), where 'ErrorProxy" listens a  queue named as "errorqueue".

<proxy xmlns="http://ws.apache.org/ns/synapse" name="ErrorProxy" transports="https,http,jms" statistics="disable" trace="disable" startOnLoad="true">
   <target inSequence="ErrorInSequence" endpoint="ErrorQueueEndpoint" />
   <parameter name="transport.jms.ContentType">
      <rules>
         <jmsProperty>contentType</jmsProperty>
         <default>text/xml</default>
      </rules>
   </parameter>
   <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
   <parameter name="transport.jms.DestinationType">queue</parameter>
   <parameter name="transport.jms.Destination">errorqueue</parameter>
</proxy> 
You have to enable jms transport in your axis2.xml.

eg:
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
  <parameter name="myQueueConnectionFactory" locked="false">
   <parameter name="java.naming.factory.initial" locked="false"> org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
   <parameter name="java.naming.provider.url" locked="false">repository/conf /jndi.properties</parameter>
   <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false"> QueueConnectionFactory</parameter>
   <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue< /parameter>
  </parameter>

Need to add following entry at jndi.properties file
  queue.errorqueue = example.errorqueue


ErrorInSequence Configuration
--------------------------------------
<sequence xmlns="http://ws.apache.org/ns/synapse" name="ErrorInSequence">
   <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" />
   <property name="OUT_ONLY" value="true" scope="default"/>
     <log level="custom">
      <property name="Picked message" value="************" />
   </log>
   <description></description>
</sequence> 

This sequence receives the error message,which is stored at error queue

Monday, October 3, 2011

Creating a queue in OracleAQ

Oracle provides database integrated messaging functionality,which is called OracleAQ. 
Oracle JMS (OJMS) is the JMS interface to the Oracle Database Streams Advanced Queuing (AQ) feature.
To work with OracleAQ, we need to install Oracle database server.(V11g)


In this post lets have a look on how we can create a queue programmatically using OJMS libraries and AQ libraries.
For this, specific user should have required authorization to create queues..
  • Connect as 'sys/admin' ;
  • create user ratha identified by ratha;
  • grant create session to ratha;
  • grant connect, resource to ratha;
  • grant aq_administrator_role to ratha  identified by  ratha;
  • grant execute on dbms_aq to  ratha;
  • grant execute on dbms_aqadm to  ratha;
  • exec dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','ratha');
  • exec dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','ratha'); 
  • grant execute on sys.aq$_jms_text_message to ratha; 
  • Connect as 'ratha/ratha'
After providing required authorization for the new user 'ratha' (password : ratha) you could be able to login as user 'ratha' and able to create table/queue etc..

With the particular user information, now lets try to create a queue in oracle database.

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class OracleAQClient {

public static QueueConnection getConnection() {

  String hostname = "localhost";
  String oracle_sid = "orcl";
  int portno = 1521;
  String userName = "ratha";
  String password = "ratha";
  String driver = "thin";
  QueueConnectionFactory QFac = null;
  QueueConnection QCon = null;
  try {
   // get connection factory , not going through JNDI here
   QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, 
portno,driver);
   // create connection
   QCon = QFac.createQueueConnection(userName, password);
   } catch (Exception e) {
   e.printStackTrace();
  }
  return QCon;
 }

 public static void createQueue(Session session, String user,
String qTable, String queueName) {
  try {
   /* Create Queue Tables */
   System.out.println("Creating Queue Table...");

   AQQueueTableProperty qt_prop;
   AQQueueTable q_table = null;
   AQjmsDestinationProperty dest_prop;
   Queue queue = null;
   qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");

   /* create a queue table *///
   // /* Drop the queue table if already exists */
   // try{
   // q_table = ((AQjmsSession) session).getQueueTable(user, qTable);
   // q_table.drop(true);
   // System.out.println("Droped older queuetable...");
   // }
   // catch(Exception e){
   // e.printStackTrace();
   // return;
   // }

   q_table = ((AQjmsSession) session).createQueueTable(user, qTable,
qt_prop);
   System.out.println("Qtable created");
   dest_prop = new AQjmsDestinationProperty();
   /* create a queue */
   queue = ((AQjmsSession) session).createQueue(q_table, queueName, 
dest_prop);
   System.out.println("Queue created");
   /* start the queue */
   ((AQjmsDestination) queue).start(session, true, true);
  } catch (Exception e) {
   e.printStackTrace();
   return;
  }
 }

 public static void sendMessage(String user, String queueName) {

  try {
   QueueConnection QCon = getConnection();  
   Session session = QCon.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
   QCon.start();
   Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
   MessageProducer producer = session.createProducer(queue);
   TextMessage tMsg = session.createTextMessage("test");
   producer.send(tMsg);
   System.out.println("Sent message = " + tMsg.getText());

   session.close();
   producer.close();
   QCon.close();

  } catch (JMSException e) {
   e.printStackTrace();
   return;
  }
 }

 public static void browseMessage(String user, String queueName) {
  Queue queue;
  try {
   QueueConnection QCon = getConnection();  
   Session session = QCon.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
  
   QCon.start();
   queue = ((AQjmsSession) session).getQueue(user, queueName);
   QueueBrowser browser = session.createBrowser(queue);
   Enumeration enu = browser.getEnumeration();
   List list = new ArrayList();  
   while (enu.hasMoreElements()) {
    TextMessage message = (TextMessage) enu.nextElement();   
    list.add(message.getText());
   }
   for (int i = 0; i < list.size(); i++) {
    System.out.println("Browsed msg " + list.get(i));
   }
   browser.close();
   session.close();
   QCon.close();

  } catch (JMSException e) {
   e.printStackTrace();
  }

 }

 public static void consumeMessage(String user, String queueName) {  
  Queue queue;
  try {
   QueueConnection QCon = getConnection();  
   Session session = QCon.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
   QCon.start();
   queue = ((AQjmsSession) session).getQueue(user, queueName);
   MessageConsumer consumer = session.createConsumer(queue);
   TextMessage msg = (TextMessage) consumer.receive();
   System.out.println("MESSAGE RECEIVED " + msg.getText());

   consumer.close();
   session.close();
   QCon.close();
  } catch (JMSException e) {  
   e.printStackTrace();
  }
 }

 public static void main(String args[]) {
  String userName = "ratha";
  String queue = "test";
  // createQueue( userName, qTable, queue);
  sendMessage(userName, queue);
  browseMessage(userName, queue);
  // consumeMessage(userName, queue);
 }
}

You might need following jars in your class-path in order to run above java client
  • ojdbc6.jar (can be found at db_home\jdbc\lib)
  • jta.jar (can be found at db_home\jdbc\jlib)
  • jmscommon.jar (can be found at db_home\RDBMS\jlib folder)
  • aqapi.jar(can be found at db_home\RDBMS\jlib folder)