Tuesday, November 27, 2012

XML validation against schema

We could use simple online tools to validate xml files against a single schema. But if we have more than one schema files, which are sub schemas of the base schema,  needed to validate a single xml file, we could use some other  tools which are availble freely.
I used libxml which works better. But for windows-64 bit platform, binary distribution is not available. We need to compile the source files. For linux, compilation would be easy.

Run xmllint from the "bin" folder

# xmllint --noout --schema <BASE_XSD_FILE>  <XML_FILE_TO_BE_VALIDATED>

If there are any errors it will be shown in the command prompt.

Monday, November 26, 2012

Configuring wso2esb to pass messages through proxy server

Organizations may expose the services over a proxy server for several purposes. In such a case, when user configures ESB, he has to provide proxy server configurations.
In axis2 configuration , at the transport sender configuration two properties has to be provided.
  • http.proxyHost : Proxy server's IP
  • http.proxyPort : Prosy server's port
eg :
 <transportSender name="http" class="org.apache.synapse.transport.nhttp.HttpCoreNIOSender">
        <parameter name="non-blocking" locked="false">true&lt;/parameter>
        <parameter name="http.proxyHost" locked="false">  </parameter>
        <parameter name="http.proxyPort" locked="false">3128</parameter>
And a property (POST_TO_URI) has to be set in the synapse configuration to make ESB's out going URL a complete URL.

    <property name="POST_TO_URI" value="true" scope="axis2"/>
            <address uri=""/>
Depends on the proxy server's behaviour we may need to set some additional properties.
  • DISABLE_CHUNKING : If the proxy server doesn't support HTTP chunking. 
<property name="DISABLE_CHUNKING" value="true" scope="axis2"/>
  • FORCE_HTTP_1.0 : If proxy server supports only HTTP/1.0 messages.
<property name="FORCE_HTTP_1.0" value="true" scope="axis2"/>
 These properties can be applied to WSO2API Manager as well, since WSO2ESB is used as the gateway for APIManager.

Saturday, November 24, 2012

Extracting CDATA section using XSLT

In a XML message, we pass some data which we might not want to be parsed by xml parsers.
Characters like "<>" are illegal in XML elements.  To save such characters we use CDATA section in our xml message.
For instance in the following xml message, we use CDATA block to pass <metadata> to other end without parsing "<,>" signs.

    <ser:getArtifactContentResponse    xmlns:ser="http://services.generic.governance.carbon.wso2.org">
   <metadata xmlns="http://www.wso2.org/governance/metadata">
        <description>scann doc&lt;/description>

To extract the CDATA section,( here it contains xml message) we could use simple xslt script.

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
    version="1.0" xmlns:ns="http://services.generic.governance.carbon.wso2.org">
    <xsl:output method="xml" version="1.0" encoding="UTF-8" />
    <xsl:template match="/">
        <xsl:value-of select="//ns:getArtifactContentResponse/ns:return/text()" disable-output-escaping="yes"/>
The output would be ;

   <metadata xmlns="http://www.wso2.org/governance/metadata">
        <description>scann doc</description>

Thursday, November 22, 2012

Overcomming character encoding issue in windows for JAVA applications

I faced invalid UTF-8/character encoding issues multiple times recently when compiling/running aplictaions in java. We could enforce java VM to use the encoding pattern as UTF-8. To do that, set the following environment property.

variable name :   JAVA_TOOL_OPTIONS
variable value :   -Dfile.encoding=UTF8

Wednesday, October 3, 2012

REST Support in WSO2ESB - Handling JSON messages

In mobile applications we mostly use JSON messages, since they are easy to process. XML messages need more computational power to build the message.

 To process json messages, we need to enable the required message builder and formatter in the axis2 configuration.

 <messageFormatter  contentType="application/json" class="org.apache.axis2.json. JSONMessageFormatter"/>  

 <messageBuilder contentType="application/json"  class="org.apache.axis2.json. JSONBuilder"/>

Lets look at a sample scenario, where we need to 'GET' a json message from the backend service.
In this sample, we try to retrieve an user details which is stored in a database. To retrieve user details, we may define a simple "select" operation as a data-service.
Say, our data service runs at following endpoint;


As a second step,we need to define a suitable API for this..
Lets consider the "GET"  will be the only verb we are going to handle.
<api name="getusers" context="/public/user">
      <resource methods="GET"

In the "get-users-in-seq" we might need to call our backend data service to retrieve user details.

 <sequence name="get-users-in-seq">  
      <property xmlns:ns="http://org.apache.synapse/xsd"
           <p:getUserInfo xmlns:p="http://ws.wso2.org/dataservice">
            <arg xmlns:ns="http://org.apache.synapse/xsd"
      <property name="SOAPAction" value="urn:getUserInfo" scope="transport"/>
      <send receive="recevingGetUserSeq">
            <address uri="http://localhost:9764/services/GetUserDetailsService"
In the above sequence,

We construct the soap message using payloadfactory mediator, in a form which is expected by the backend data-service and send that to backend service. (note that, you can identify the SOAP message format, if you create a soapui project with the dataservice's wsdl.)

Backend service now will return the soap response, which we need to convert back as a json message and need to send back to the client.

   <sequence name="ecevingGetUserSeq">
         <xslt key="gov:/transformations/getUserTransform.xslt">
         <property xmlns:ns="http://org.apache.synapse/xsd"
      <property name="messageType" value="application/json" scope="axis2"/>
Here we use a XSLT script, which will formulate the expected json format which client expects.(ie: it will pick relevant info from the soap response and will construct a json message out of it)
Note that, we need to set the  "MessgeType"  property to "application/json", else axis2 wont pick the right message formatter.

We can execute this API with the following curl command to extract the user "Alice's" details.

   curl -v http://localhost:8280/public/user/Alice

Related post;
REST support in WSO2ESB - Introduction

Friday, September 14, 2012

REST support in WSO2ESB - Introduction

Version 4.0.3/later versions have an effective REST API mechanism to support REST invocations. The HTTP verbs such as GET/POST/PUT/DELETE..  can be handled with a simple configuration .

Sample REST API configuration to handle a GET request;
 <api name="echoAPI" context="/echo">
      <resource methods="GET"
If you notice that ,here we define the context as "echo" which would be appear in the http url. User has to define a meaningful context if he wants to allow others  to use this API.
The url of the above API would be;
In this API we restricted ESB to handle only the GET requests. So, if user sends  requests for other verbs, those will be dropped at ESB end.

For the ur-template, we get only single parameter from the enduser.
So, to invoke this API, user has to send a single query parameter.

If user sends multiple query parameters he has to define something like;

But it totally depends on the service implementation, how user going to handle the template.

If we have number of query parameters we could simply define the url-mapping like;


This will accept all the GET requests coming with the following URL format;

Related post ;
REST Support in WSO2ESB - Handling JSON messages

Tuesday, September 4, 2012

Executing Carbon admin services from Proxy service

Carbon V4.0.0  provides ability to invoke admin services using proxy service. We could point the admin service as an endpoint.
Each request must have Basic-Auth headers, which has to be set as transport headers.
In mediation flow, we could use property mediator to set the authentication headers.

<property name="Authorization" expression="fn:concat('Basic ', base64Encode('UserName:Password'))"

We have to provide admin username and password to be encoded and set as transport header.

You might need to set <HostnameVerifier> parameter to AllowAll in the HTTPS transport sender configuration which is defined in axis2.xml.

Sample proxy conf;

<proxy name="adminServiceProxy" transports="https http"
          startOnLoad="true" trace="disable">
            <address uri="https://localhost:9444/services/UserProfileMgtService"/>
            <property name="Authorization"
                      expression="fn:concat('Basic ', base64Encode('admin:admin'))"

Tuesday, August 28, 2012

Running multiple servers at once using linux bash script

You might need to run multiple servers at once for the easiness rather starting one by one. This is useful at development phase when we deal with number of servers.
Here is the sample script which can be used to start number of servers. I've used wso2 carbon servers as sample servers.

# carbon        
# chkconfig:
# description:     Start/stop  Carbon  servers.
# Source function library.
#. /etc/rc.d/init.d/functions
case "$1" in
        if [ -f $ESB_HOME/bin/wso2server.sh ];
        echo "Starting servers"
             $ESB_HOME/bin/wso2server.sh start
             $AS1_HOME/bin/wso2server.sh start
             $APIMGR_HOME/bin/wso2server.sh start
             $AS2_HOME/bin/wso2server.sh start
             $GREG_HOME/bin/wso2server.sh start
             $IS_HOME/bin/wso2server.sh start
             $BAM_HOME/bin/wso2server.sh start             
        if [ -f $ESB_HOME/bin/wso2server.sh ];
        echo "Stopping servers"
             $ESB_HOME/bin/wso2server.sh stop
             $AS1_HOME/bin/wso2server.sh stop
             $APIMGR_HOME/bin/wso2server.sh stop
             $AS2_HOME/bin/wso2server.sh stop
             $GREG_HOME/bin/wso2server.sh stop
             $IS_HOME/bin/wso2server.sh stop
             $BAM_HOME/bin/wso2server.sh stop
     echo $"Usage: $0 {start|stop}"
    exit 1
exit $RETVAL
You can simply use a notepad editor to write the script and save it in the /etc/init.d/ folder with your preferred name.Say the script name is "carbon".
Start/Stop the servers as follows
# /etc/init.d/carbon start
# /etc/init.d/carbon stop
You may face permission issue to run your scripts..Use following command to overcome it;

# chmod 744 /etc/init.d/carbon

Friday, August 17, 2012

Excluding namespaces in XSLT

When we write a complex xslt script, we need to introduce functions, templates etc.. XSLT function needs namespace definition. But we may not need those namespaces in the output.
There is an extra attribute we need to define in our xslt script to avoid those additional namespaces,which are not to be present in our output.

<xsl:stylesheet version="1.0"
                xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                xmlns:func="http://www.function.com" exclude-result-prefixes="func">
Here if you note that, i have added an attribute(ie: exclude-result-prefixes) to the style-sheet to avoid my additional namespace ,which is  http://www.function.com .

Tuesday, August 7, 2012

Debugging XSLT script

To know what is the output of a XSLT function or variable we can simply use <xsl:message> to do a print statement in the xslt transfromation script.

Eg:  In the following template when we do the substring operation we might need to know, what is the input we are getting in our template.

<xsl:template name="FindPhoneNum">
    <xsl:param name="DigitsLength" />
    <xsl:param name="PhoneNum" />
        <xsl:value-of select="$PhoneNum" />
        <xsl:value-of select="$DigitsLength" />
    <xsl:variable name="number">
        <xsl:value-of select="substring($PhoneNum, $DigitsLength -6, $DigitsLength)" />
    <xsl:value-of select="$number" />
Output will be like;

PhoneNum---------: 18168918984
DigitsLength---------: 11

Saturday, August 4, 2012

Logging with Log4j Nested Diagnostic Contexts(NDC) in WSO2ESB

As I explained in my previous post this is another way to grab more information from various clients in a  multi-threaded environment.

A Nested Diagnostic Context, or NDC in short, is an instrument to distinguish interleaved log output from different sources. Log output is typically interleaved when a server handles multiple clients near-simultaneously. Note that NDCs are managed on a per thread basis.

User may have to get more contextual information in the logs, it would be good to them to have some centralized configuration to control information in the logs for all the services.
eg: If user wants to track a clientIP address, he needs to add "REMOTE_ADDR'/'REMOTE_HOST' in the log mediator, within the proxy service. He has to do this for all proxy services  and it is a performance hit.

To achieve above requirement, we could use NDC API in an axis2 handler, which can be kept at in/out message flow, where we can keep NDC stack to store all required data.

If we keep such handler, it can pick only axis2 level info. If user needs to pick some error information , (of course it is the main  purpose to keep logs) which occurred at synapse level, we could keep a custom mediator, which can pick all error information and set them in the Axis2MessageContext. From the axis2 handler we can extract those information and could keep them in the NDC stack.

Here is a  simple example  applies NDC to an axis2 handler.

    import org.apache.axis2.AxisFault;
    import org.apache.axis2.context.MessageContext;
    import org.apache.axis2.description.AxisService;
    import org.apache.axis2.description.Parameter;
    import org.apache.axis2.description.WSDL2Constants;
    import org.apache.axis2.handlers.AbstractHandler;
    import org.apache.log4j.Logger;
    import org.apache.log4j.NDC;

     *Custom handler for NDC type logging
    public class CustomLog4jDiagnosticContextSettHandler extends AbstractHandler {
        private static final Logger LOG = Logger. getLogger      (CustomLog4jDiagnosticContextSettHandler.class);

        public InvocationResponse invoke(MessageContext messageContext) throws AxisFault {
            boolean loggingContextIsSet = addLoggingContextData(messageContext);
            if (loggingContextIsSet) {
            // Before leaving remove the diagnostic context for this thread.
            return InvocationResponse.CONTINUE;

        private boolean addLoggingContextData(MessageContext messageContext) {
            if (LOG.isDebugEnabled() || LOG.isTraceEnabled()) {
                try {
                } catch (Exception e) {
                    LOG.error("Error pushing log data onto Log4J NDC stack.", e);
                    return false;
                return true;
            } else {
                return false;

        private void getLoggData(MessageContext messageContext) {

                 AxisService service = messageContext.getAxisService();
                 String service_name = service.getName();

                String remote_ip = (String) messageContext.getProperty ("REMOTE_ADDR");
                String operation_name = messageContext.getAxisOperation(). getSoapAction();
                // log all the requests
                String log = "Request comes from the client '" + remote_ip + "' for the service '" +
                                     service_name + "' and operation is '" + operation_name + "'";

                Exception failure_reason = messageContext.getFailureReason();
                // if axis2 error occurred log that details
                if (failure_reason != null) {
                    String axis_error_msg = failure_reason.getLocalizedMessage();
                    String axis_error_log =
                                            "Exception occured for the client '" + remote_ip +
                                                    "' sent request for the service '" + service_name +
                                                    "'. The root cause is '" + axis_error_msg + "'";
                // get synapse error logs

        private void removeLoggingContextData() {
            try {
                while (NDC.getDepth() != 0) {
            } catch (Exception e) {
                LOG.error("Error popping log data off of Log4J NDC stack.", e);

         * if synapse error occurs log that also(these details are retrieved from
         * custom mediator)
         * @param messageContext

        private void getSynapseFaultSeqLogData(MessageContext messageContext) {
            MessageContext axis2InMsgcontext = null;
            try {
                axis2InMsgcontext = messageContext.getOperationContext()
                                                  .getMessageContext (WSDL2Constants.                                                                                                                        MESSAGE_LABEL_IN);
            } catch (AxisFault e) {

            if (axis2InMsgcontext.getProperty("custom_errorMesssage") != null ||
                messageContext.getProperty("custom_errorMesssage") != null) {
                String synapse_error_msg = null;
                String synapse_error_code = null;
                String synapse_error_detail = null;
                Object synapse_error_exceptionObj;
                Exception synapse_error_exception = null;
                String remote_ip = null;
                // if fault sequence invoked in the messageInflow, properties
                // will be set to InMessageContext.
                if (axis2InMsgcontext.getProperty("custom_errorMesssage") != null) {
                    synapse_error_msg = (String) axis2InMsgcontext.getProperty ("custom_errorMesssage");
                    synapse_error_code = (String) axis2InMsgcontext.getProperty ("custom_errorCode");
                    synapse_error_detail = (String) axis2InMsgcontext.getProperty ("custom_errorDetail");
                    synapse_error_exceptionObj = axis2InMsgcontext.getProperty ("custom_exception");
                    synapse_error_exception = (Exception) synapse_error_exceptionObj;
                    remote_ip = (String) axis2InMsgcontext.getProperty("REMOTE_ADDR");
                String synapse_error_log = "Exception occured at ESB. The clientIP is '"+                                                                     remote_ip +"' Error message is '" +
                                                   (synapse_error_msg == null ? " not available *"
                                                                             : synapse_error_msg) +
                                                   "' Eror code is '" +
                                                   (synapse_error_code == null ? " not available*"
                                                                              : synapse_error_code) +
                                                   "'. Error detail is '" +
                                                   (synapse_error_detail == null ? " not available*"
                                                                                : synapse_error_detail) +
                                                   "'. Exception is '" +
                                                   (synapse_error_exception == null ? " not available*"


In the above sample, you can see that we are extarcting some synapse level information(synapse_error_msg/synapse_error_code etc..), which we can set via a custom mediator as i explained earlier.

To test above sample,
  • Make it as a jar file and keep it in the ESB_HOME/repository/components/lib folder .
  • Add the handler in axis2.xml
    <phase name="LoggingInPhase">
    <handler name="CustomLog4jDiagnosticContextSetterHandler"
    </phase> ;
    Do, this for inflow, outflow, infault flow, outfault flow phases ..(ie: we are registering the handler for all phases..) 
  • Add following line at log4j.properties file (ESB\lib\log4j.properties)
Now, you will see following type logs;

]2012-07-13 15:30:26,410 [-] [HttpServerWorker-1] DEBUG CustomLog4jDiagnosticContextSetterHandler Request comes from the client '' for the service 'testProxy' and operation is 'urn:mediate'
2012-07-13  15:30:26,481 [-] [HttpClientWorker-1] DEBUG CustomLog4jDiagnosticContextSetterHandler Exception occured at ESB. The clientIP is '' Error message is 'Couldn't find the endpoint with the key : bogus' Eror code is '305100'. Error detail is 'Couldn't find the endpoint with the key : bogus'. Exception is ' not available*

Monday, July 16, 2012

Different ways of logging in WSO2ESB

All wso2 carbon products have a standard way to define a logs .  All logging mechanisms are handled by a carbon logging-mgt component, which is controlled by a central log4j properties file. According to user   needs, he can edit the properties file to get desired log level info in different package level.

When consider ESB product, users have different requirements to acquire the log information in different layers/level of the mediation flow. There are number of ways you can get certain log info in wso2esb.
  • Log mediators :- Users can keep the log mediator in the sequence, and could get certain info about the messages which are passing through certain ESB artifacts
<log level='full'>
         <property name ='Request message' value='message arrived to insequence'/>
  • Editing log4j.properties file to get proxy service level log info:- In this way, user might need to add separate blocks of configuration for all configured proxies. This may be inappropriate in a real production system,where there are hundreds of proxy services deployed and user might need to add hundreds of lines configuration in the log4j.properties file for each proxy service
 log4j.category.SERVICE_LOGGER.SimpleStockQuoteProxy=INFO, PROXY_APPENDER
log4j.appender.PROXY_APPENDER.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%X{ip}-%X{host}] [%t] %5p %c{1} %m%

  •  There is another code level implementation, which can be used to extract some details in certain level of synapse artifacts.
    • eg: Proxy level/Sequence level...
SynapseObserver is an abstract class,which will get notified, whenever a new artifact deployed/undeployed. Each and every  message arrives to the mediation engine will notify the SynapseObserver.
We can extend this observer, to get all proxy service level log information rather adding hundreds line of configuration in the log4j.properties file.
Code :
import java.io.IOException;
import org.apache.synapse.config.AbstractSynapseObserver;
import org.apache.synapse.core.axis2.ProxyService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.DailyRollingFileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

 * This is a custom synapse observer to programatically engage the appender
 * for proxy services.
public class CustomProxyObserver extends AbstractSynapseObserver {
    private static final Log log = LogFactory.getLog(CustomProxyObserver.class);

    public void proxyServiceAdded(ProxyService proxy) {
        try {
        } catch (IOException e) {
            log.error("CustomProxyObserver could not set service level logger for the proxy : " +
                      proxy.getName(), e);

    public void proxyServiceRemoved(ProxyService proxy) {
        try {
        } catch (IOException e) {
            log.error("CustomProxyObserver could not set service level logger for the proxy : " +
                      proxy.getName(), e);

     * Method to set proxy service specific logger
     * @param proxy
     * @throws IOException
    private void setLogger(ProxyService proxy) throws IOException {

        String filename = "logs/" + proxy.getName() + ".log";
        String datePattern = "yyyy-MM-dd";
        String SYSTEM_LOG_PATTERN = "[%d] %5p - %x %m {%c}%n";

        PatternLayout layout = new PatternLayout(SYSTEM_LOG_PATTERN);
        DailyRollingFileAppender appender = null;
        appender = new DailyRollingFileAppender(layout, filename, datePattern);

        Logger proxyLogger = Logger.getLogger("SERVICE_LOGGER." + proxy.getName());
        proxyLogger.setLevel((Level) Level.DEBUG);


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">



 User needs to keep the bundle in the repository/components/dropins folder and need to edit the synapse.properties file to register the CustomObserver



You will see the generated log files in the ESB_HOME?logs folder.

There is another way, which also needs some coding effort, to extract some specific debug level  information, which are really useful to debug a certain use cases.I'll write about that with sample code and scenario in my next post.

Wednesday, July 11, 2012

Class Endpoints in Synapse

In Apache synapse, there are predefined endpoints , which can be used as service endpoints to send out the message from the mediation engine. Currently available endpoints are;
  • Address Endpoint
  • Default Endpoint 
  • WSDL endpoint
  • Load balance Endpoint 
  • Failover Endpoint 
  • Dynamic Load balance Endpoint

Anyway, Synapse does not support to extend the endpoint capability to add a custom endpoint according to the user needs as in mediators.(ie: class mediator)
To have such a feature, same like class mediator functionality, class endpoint concept has been implemented. The patch has been provided to the synapse project.

To add a class endpoint in the mediation flow, user should add following configuration;
<endpoint name="CustomEndpoint">
                    <class name="org.apache.synapse.endpoint.CustomEndpoint">
                                 <parameter name="foo">XYZ</parameter>*
The "CustomEndpoint" class implementation should be the child class of the AbstractEndpoint class. Using this type of class endpoints, user can add his own message sending logic or can load a custom synapse environment for a particular endpoint.

Related post: http://vvratha.blogspot.com/2013/06/class-endpointssample.html

Wednesday, June 6, 2012

Writing a simple FIX initiator and executor using QuickFIX/J

FIX (Financial Information eXchange) is a  communication protocol widely used in the finance sectors. QuickFIX/J is an open source FIX engine. In WSO2ESB, we use quickfix/j library as our base FIX engine to enable the fix transport.
Communication between the FIX initiator and executor happens via fix sessions. Initiator first  initiates a session with the server. After the successful acknowledgement from the executor, client is able to send fix messages.
Session related configurations are defined in a configuration file..
Sample executor config file:


At the initiator side configuration file we interchange the  SenderCompID and TargetCompID.


To write an initiator/executor we should implement the "quickfix.Application" interface.

public class FIXInitiatorApplication implements Application {
public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {

public void fromApp(Message message, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
   System.out.println("Received reply from executor");

public void onCreate(SessionID arg0) {
  // TODO Auto-generated method stub

public void onLogon(SessionID sessionId) {
   System.out.println("Initiator LOGGED ON.......");
   NewOrderSingle order = new NewOrderSingle(new ClOrdID("MISYS1001"),
   new HandlInst(HandlInst.MANUAL_ORDER), new Symbol("MISYS"), new   Side(Side.BUY), new TransactTime(new Date()), new OrdType(OrdType.LIMIT));

   Session.sendToTarget(order, sessionId);

public void onLogout(SessionID arg0) {
   System.out.println("Session logged out");

public void toAdmin(Message arg0, SessionID arg1) {
   // TODO Auto-generated method stub

public void toApp(Message arg0, SessionID arg1) throws DoNotSend {
   // TODO Auto-generated method stub
public class FIXInitiator {
private SocketInitiator socketInitiator;

public static void main(String[] args) throws ConfigError, InterruptedException, IOException {

  InputStream inputStream = FIXAcceptorExecutor.class.getResourceAsStream("initiator.cfg");

private static void startInitiator(InputStream inputStream) throws ConfigError,
InterruptedException, IOException {

  FIXInitiator fixIniator = new FIXInitiator();
  SessionSettings sessionSettings = new SessionSettings(inputStream);
  FIXInitiatorApplication application = new FIXInitiatorApplication();
  FileStoreFactory fileStoreFactory = new FileStoreFactory(sessionSettings);
  LogFactory logFactory = new FileLogFactory(sessionSettings);
  MessageFactory messageFactory = new DefaultMessageFactory();
  fixIniator.socketInitiator = new SocketInitiator(application, fileStoreFactory,
                                     sessionSettings, logFactory, messageFactory);
  System.out.println("press to quit");

Like as initiator, we need to implement the Application interface.To send reply back to the client(ie: to initiator) we need to extend the MessageCracker class and need to override onMessage() with the response message.

public class FIXAcceptorApplication extends MessageCracker implements Application {

    public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound,  IncorrectDataFormat, IncorrectTagValue, RejectLogon {

    public void fromApp(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
        System.out.println("("Acceptor received new message..  ");
        crack(arg0, arg1);

    public void onCreate(SessionID arg0) {

    public void onLogon(SessionID arg0) {
        System.out.println("Acceptor logged on.........");

    public void onLogout(SessionID arg0) {

    public void toAdmin(Message arg0, SessionID arg1) {
        // TODO


    public void toApp(Message arg0, SessionID arg1) throws DoNotSend {


    public void onMessage(NewOrderSingle order, SessionID sessionID) throws FieldNotFound,  UnsupportedMessageType,  IncorrectTagValue {
        OrderQty orderQty = new OrderQty(10.0);
        Price price = new Price(10.0);
        ExecutionReport executionReport =
                                          new ExecutionReport(getOrderIDCounter(),
                                                              new ExecTransType(ExecTransType.NEW),
                                                              new ExecType(ExecType.FILL),
                                                              new OrdStatus(OrdStatus.FILLED),
                                                              order.getSymbol(), order.getSide(),
                                                              new LeavesQty(0),
                                                              new CumQty(orderQty.getValue()),
                                                              new AvgPx(price.getValue()));

        executionReport.set(new LastShares(orderQty.getValue()));
        executionReport.set(new LastPx(price.getValue()));  

        try {
            Session session = Session.lookupSession(sessionID);
            Session.sendToTarget(executionReport, sessionID);
            System.out.println("NewOrderSingle Execution  Completed-----");
        } catch (Exception ex) {
            System.out.println("Error during order execution" + ex.getMessage());

 public class FIXAcceptorExecutor {
    private final SocketAcceptor acceptor;
    private final static Map>  dynamicSessionMappings =   new HashMap>();

    public FIXAcceptorExecutor(SessionSettings settings) throws ConfigError,  FieldConvertError {
        Application application = new FIXAcceptorApplication();
        MessageStoreFactory messageStoreFactory = new FileStoreFactory(settings);
        LogFactory logFactory = new FileLogFactory(settings);
        MessageFactory messageFactory = new DefaultMessageFactory();

        acceptor =  new SocketAcceptor(application, messageStoreFactory, settings, logFactory, messageFactory);

        configureDynamicSessions(settings, application, messageStoreFactory, logFactory,


    private void configureDynamicSessions(SessionSettings settings, Application application,
  MessageStoreFactory messageStoreFactory,  LogFactory logFactory, MessageFactory messageFactory) throws ConfigError, FieldConvertError {

        Iterator sectionIterator = settings.sectionIterator();

        while (sectionIterator.hasNext()) {
            SessionID sessionID = sectionIterator.next();      
            if (isSessionTemplate(settings, sessionID)) {
                InetSocketAddress address = getAcceptorSocketAddress(settings, sessionID);
                getMappings(address).add(new TemplateMapping(sessionID, sessionID));

        for (Map.Entry> entry : dynamicSessionMappings.entrySet()) {
                                        new DynamicAcceptorSessionProvider(settings,

    private List getMappings(InetSocketAddress address) {
        List mappings = dynamicSessionMappings.get(address);
        if (mappings == null) {
            mappings = new ArrayList();
            dynamicSessionMappings.put(address, mappings);
        return mappings;

    private InetSocketAddress getAcceptorSocketAddress(SessionSettings settings, SessionID sessionID)    throws ConfigError,  FieldConvertError {
        String acceptorHost = "";
        if (settings.isSetting(sessionID, SETTING_SOCKET_ACCEPT_ADDRESS)) {
            acceptorHost = settings.getString(sessionID, SETTING_SOCKET_ACCEPT_ADDRESS);          
        int acceptorPort = (int) settings.getLong(sessionID, SETTING_SOCKET_ACCEPT_PORT);

        InetSocketAddress address = new InetSocketAddress(acceptorHost, acceptorPort);
        return address;

    private boolean isSessionTemplate(SessionSettings settings, SessionID sessionID)
                                                                                    throws ConfigError,
                                                                                    FieldConvertError {
        return settings.isSetting(sessionID, SETTING_ACCEPTOR_TEMPLATE) &&
               settings.getBool(sessionID, SETTING_ACCEPTOR_TEMPLATE);

    private void start() throws RuntimeError, ConfigError {

    private void stop() {

    public static void main(String args[]) throws Exception {
        InputStream inputStream = null;
        if (args.length == 0) {
            inputStream = FIXAcceptorExecutor.class.getResourceAsStream("executor.cfg");
        } else if (args.length == 1) {
            inputStream = new FileInputStream(args[0]);

        SessionSettings settings = new SessionSettings(inputStream);
        FIXAcceptorExecutor executor = new FIXAcceptorExecutor(settings);

        System.out.println("press to quit");

Thursday, May 10, 2012

Java client to send/receive messages for ActiveMQ

ActiveMQ is a messagebroker supporting JMS 1.1 specification..Here i share a simple java client which can be used to send/browse  messages  in a destination which(ie: queue) is created in ActiveMQ server.
You might need ActiveMQ libraries in your classpath.

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;

public class ActiveMQTest {
 private static ActiveMQDestination destination;

    public static void offer() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(

        Connection connection = connectionFactory.createConnection();
        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) sendSession.createQueue("TEST.FOO");

        MessageProducer producer = sendSession.createProducer(destination);

        for (int i = 1; i < 5; i++) {
            TextMessage message = sendSession.createTextMessage(String.valueOf(i));
            // Send the messages
            System.out.println("########Sent message : " + message.getText());

    public static void peek() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(

        Connection connection = connectionFactory.createConnection();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        QueueBrowser browser = session.createBrowser((Queue) destination);
        Enumeration enumeration = browser.getEnumeration();

        if (enumeration.hasMoreElements()) {
            Object msg = enumeration.nextElement();
            TextMessage m = (TextMessage) msg;
            System.out.println(" !!!!!!!!Browsed  msg " +m.getText());



    public static void poll() throws Exception {

        ActiveMQConnectionFactory connectionFactory =
                                                      new ActiveMQConnectionFactory(
        Connection connection = connectionFactory.createConnection();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = (ActiveMQDestination) session.createQueue("TEST.FOO");
        MessageConsumer consumer = session.createConsumer(destination);
        TextMessage m = (TextMessage) consumer.receive(1000);
        if (m != null) {
            System.out.println("*********** Polled Messg" + m.getText());

    public static void main(String args[]) {
        try {
        } catch (Exception e) {
        try {
             for (int i = 1; i < 5; i++) {
        } catch (Exception e) {