Wednesday, June 6, 2012

Writing a simple FIX initiator and executor using QuickFIX/J

FIX (Financial Information eXchange) is a  communication protocol widely used in the finance sectors. QuickFIX/J is an open source FIX engine. In WSO2ESB, we use quickfix/j library as our base FIX engine to enable the fix transport.
Communication between the FIX initiator and executor happens via fix sessions. Initiator first  initiates a session with the server. After the successful acknowledgement from the executor, client is able to send fix messages.
Session related configurations are defined in a configuration file..
Sample executor config file:
[default]
FileStorePath=/root/FixPerformancetest-setup/executor
FileLogPath=/root/FixPerformancetest-setup/executor/executorlogs
FileIncludeMilliseconds=Y
FileIncludeTimeStampForMessages=Y
PersistMessages=N
ConnectionType=acceptor
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
SenderCompID=EXEC
TargetCompID=INIT
SocketAcceptPort=19876
ValidOrderTypes=1,2,F
DefaultMarketPrice=12.30
SocketKeepAlive=Y
SocketTcpNoDelay=Y
SocketReuseAddress=Y
CheckLatency=N
ResetOnLogon=Y

[session]
AcceptorTemplate=Y
DataDictionary=FIX42.xml
BeginString=FIX.4.2
SocketAcceptPort=19876

At the initiator side configuration file we interchange the  SenderCompID and TargetCompID.
eg:
SenderCompID=INIT
TargetCompID=EXEC

Initiator

To write an initiator/executor we should implement the "quickfix.Application" interface.

public class FIXInitiatorApplication implements Application {
@Override
public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
}

@Override
public void fromApp(Message message, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
   System.out.println("Received reply from executor");
}

@Override
public void onCreate(SessionID arg0) {
  // TODO Auto-generated method stub
}

@Override
public void onLogon(SessionID sessionId) {
   System.out.println("Initiator LOGGED ON.......");
   NewOrderSingle order = new NewOrderSingle(new ClOrdID("MISYS1001"),
   new HandlInst(HandlInst.MANUAL_ORDER), new Symbol("MISYS"), new   Side(Side.BUY), new TransactTime(new Date()), new OrdType(OrdType.LIMIT));

   Session.sendToTarget(order, sessionId);
}

@Override
public void onLogout(SessionID arg0) {
   System.out.println("Session logged out");
}

@Override
public void toAdmin(Message arg0, SessionID arg1) {
   // TODO Auto-generated method stub
}

@Override
public void toApp(Message arg0, SessionID arg1) throws DoNotSend {
   // TODO Auto-generated method stub
   }
}
public class FIXInitiator {
private SocketInitiator socketInitiator;

public static void main(String[] args) throws ConfigError, InterruptedException, IOException {

  InputStream inputStream = FIXAcceptorExecutor.class.getResourceAsStream("initiator.cfg");
  startInitiator(inputStream);
}

private static void startInitiator(InputStream inputStream) throws ConfigError,
InterruptedException, IOException {

  FIXInitiator fixIniator = new FIXInitiator();
  SessionSettings sessionSettings = new SessionSettings(inputStream);
  FIXInitiatorApplication application = new FIXInitiatorApplication();
  FileStoreFactory fileStoreFactory = new FileStoreFactory(sessionSettings);
  LogFactory logFactory = new FileLogFactory(sessionSettings);
  MessageFactory messageFactory = new DefaultMessageFactory();
  fixIniator.socketInitiator = new SocketInitiator(application, fileStoreFactory,
                                     sessionSettings, logFactory, messageFactory);
  fixIniator.socketInitiator.start();
  System.out.println("press to quit");
  System.in.read();
  fixIniator.socketInitiator.stop();
}
Executor

Like as initiator, we need to implement the Application interface.To send reply back to the client(ie: to initiator) we need to extend the MessageCracker class and need to override onMessage() with the response message.


public class FIXAcceptorApplication extends MessageCracker implements Application {

    @Override
    public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound,  IncorrectDataFormat, IncorrectTagValue, RejectLogon {
    }

    @Override
    public void fromApp(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
        System.out.println("("Acceptor received new message..  ");
        crack(arg0, arg1);
    }

    @Override
    public void onCreate(SessionID arg0) {
    }

    @Override
    public void onLogon(SessionID arg0) {
        System.out.println("Acceptor logged on.........");
    }

    @Override
    public void onLogout(SessionID arg0) {
    }

    @Override
    public void toAdmin(Message arg0, SessionID arg1) {
        // TODO

    }

    @Override
    public void toApp(Message arg0, SessionID arg1) throws DoNotSend {

    }

    public void onMessage(NewOrderSingle order, SessionID sessionID) throws FieldNotFound,  UnsupportedMessageType,  IncorrectTagValue {
        OrderQty orderQty = new OrderQty(10.0);
        Price price = new Price(10.0);
        ExecutionReport executionReport =
                                          new ExecutionReport(getOrderIDCounter(),
                                                              getExecutionIDCounter(),
                                                              new ExecTransType(ExecTransType.NEW),
                                                              new ExecType(ExecType.FILL),
                                                              new OrdStatus(OrdStatus.FILLED),
                                                              order.getSymbol(), order.getSide(),
                                                              new LeavesQty(0),
                                                              new CumQty(orderQty.getValue()),
                                                              new AvgPx(price.getValue()));

        executionReport.set(order.getClOrdID());
        executionReport.set(orderQty);
        executionReport.set(new LastShares(orderQty.getValue()));
        executionReport.set(new LastPx(price.getValue()));  

        try {
            Session session = Session.lookupSession(sessionID);
            Session.sendToTarget(executionReport, sessionID);
            System.out.println("NewOrderSingle Execution  Completed-----");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("Error during order execution" + ex.getMessage());
        }
    }
}

 public class FIXAcceptorExecutor {
    private final SocketAcceptor acceptor;
    private final static Map>  dynamicSessionMappings =   new HashMap>();

    public FIXAcceptorExecutor(SessionSettings settings) throws ConfigError,  FieldConvertError {
        Application application = new FIXAcceptorApplication();
        MessageStoreFactory messageStoreFactory = new FileStoreFactory(settings);
        LogFactory logFactory = new FileLogFactory(settings);
        MessageFactory messageFactory = new DefaultMessageFactory();

        acceptor =  new SocketAcceptor(application, messageStoreFactory, settings, logFactory, messageFactory);

        configureDynamicSessions(settings, application, messageStoreFactory, logFactory,
messageFactory);

    }

    private void configureDynamicSessions(SessionSettings settings, Application application,
  MessageStoreFactory messageStoreFactory,  LogFactory logFactory, MessageFactory messageFactory) throws ConfigError, FieldConvertError {

        Iterator sectionIterator = settings.sectionIterator();

        while (sectionIterator.hasNext()) {
            SessionID sessionID = sectionIterator.next();      
            if (isSessionTemplate(settings, sessionID)) {
                InetSocketAddress address = getAcceptorSocketAddress(settings, sessionID);
                getMappings(address).add(new TemplateMapping(sessionID, sessionID));
            }
        }

        for (Map.Entry> entry : dynamicSessionMappings.entrySet()) {
            acceptor.setSessionProvider(entry.getKey(),
                                        new DynamicAcceptorSessionProvider(settings,
                                                                           entry.getValue(),
                                                                           application,
                                                                           messageStoreFactory,
                                                                           logFactory,
                                                                           messageFactory));
        }
    }

    private List getMappings(InetSocketAddress address) {
        List mappings = dynamicSessionMappings.get(address);
        if (mappings == null) {
            mappings = new ArrayList();
            dynamicSessionMappings.put(address, mappings);
        }
        return mappings;
    }

    private InetSocketAddress getAcceptorSocketAddress(SessionSettings settings, SessionID sessionID)    throws ConfigError,  FieldConvertError {
        String acceptorHost = "0.0.0.0";
        if (settings.isSetting(sessionID, SETTING_SOCKET_ACCEPT_ADDRESS)) {
            acceptorHost = settings.getString(sessionID, SETTING_SOCKET_ACCEPT_ADDRESS);          
        }
      
        int acceptorPort = (int) settings.getLong(sessionID, SETTING_SOCKET_ACCEPT_PORT);

        InetSocketAddress address = new InetSocketAddress(acceptorHost, acceptorPort);
        return address;
    }

    private boolean isSessionTemplate(SessionSettings settings, SessionID sessionID)
                                                                                    throws ConfigError,
                                                                                    FieldConvertError {
        return settings.isSetting(sessionID, SETTING_ACCEPTOR_TEMPLATE) &&
               settings.getBool(sessionID, SETTING_ACCEPTOR_TEMPLATE);
    }

    private void start() throws RuntimeError, ConfigError {
        acceptor.start();
    }

    private void stop() {
        acceptor.stop();
    }

    public static void main(String args[]) throws Exception {
        InputStream inputStream = null;
        if (args.length == 0) {
            inputStream = FIXAcceptorExecutor.class.getResourceAsStream("executor.cfg");
        } else if (args.length == 1) {
            inputStream = new FileInputStream(args[0]);
        }

        SessionSettings settings = new SessionSettings(inputStream);
        FIXAcceptorExecutor executor = new FIXAcceptorExecutor(settings);

        executor.start();
        System.out.println("press to quit");
        System.in.read();
        executor.stop();
    }
}