FIX (Financial Information eXchange) is a communication protocol widely used in the finance sectors. QuickFIX/J is an open source FIX engine. In WSO2ESB, we use quickfix/j library as our base FIX engine to enable the fix transport.
Communication between the FIX initiator and executor happens via fix sessions. Initiator first initiates a session with the server. After the successful acknowledgement from the executor, client is able to send fix messages.Session related configurations are defined in a configuration file..
Sample executor config file:
[default]
FileStorePath=/root/FixPerformancetest-setup/executor
FileLogPath=/root/FixPerformancetest-setup/executor/executorlogs
FileIncludeMilliseconds=Y
FileIncludeTimeStampForMessages=Y
PersistMessages=N
ConnectionType=acceptor
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
SenderCompID=EXEC
TargetCompID=INIT
SocketAcceptPort=19876
ValidOrderTypes=1,2,F
DefaultMarketPrice=12.30
SocketKeepAlive=Y
SocketTcpNoDelay=Y
SocketReuseAddress=Y
CheckLatency=N
ResetOnLogon=Y
[session]
AcceptorTemplate=Y
DataDictionary=FIX42.xml
BeginString=FIX.4.2
SocketAcceptPort=19876
At the initiator side configuration file we interchange the SenderCompID and TargetCompID.
eg:
SenderCompID=INIT
TargetCompID=EXEC
Initiator
To write an initiator/executor we should implement the "quickfix.Application" interface.
public class FIXInitiatorApplication implements Application {
@Override
public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
}
@Override
public void fromApp(Message message, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
System.out.println("Received reply from executor");
}
@Override
public void onCreate(SessionID arg0) {
// TODO Auto-generated method stub
}
@Override
public void onLogon(SessionID sessionId) {
System.out.println("Initiator LOGGED ON.......");
NewOrderSingle order = new NewOrderSingle(new ClOrdID("MISYS1001"),
new HandlInst(HandlInst.MANUAL_ORDER), new Symbol("MISYS"), new Side(Side.BUY), new TransactTime(new Date()), new OrdType(OrdType.LIMIT));
Session.sendToTarget(order, sessionId);
}
@Override
public void onLogout(SessionID arg0) {
System.out.println("Session logged out");
}
@Override
public void toAdmin(Message arg0, SessionID arg1) {
// TODO Auto-generated method stub
}
@Override
public void toApp(Message arg0, SessionID arg1) throws DoNotSend {
// TODO Auto-generated method stub
}
}
public class FIXInitiator {Executor
private SocketInitiator socketInitiator;
public static void main(String[] args) throws ConfigError, InterruptedException, IOException {
InputStream inputStream = FIXAcceptorExecutor.class.getResourceAsStream("initiator.cfg");
startInitiator(inputStream);
}
private static void startInitiator(InputStream inputStream) throws ConfigError,
InterruptedException, IOException {
FIXInitiator fixIniator = new FIXInitiator();
SessionSettings sessionSettings = new SessionSettings(inputStream);
FIXInitiatorApplication application = new FIXInitiatorApplication();
FileStoreFactory fileStoreFactory = new FileStoreFactory(sessionSettings);
LogFactory logFactory = new FileLogFactory(sessionSettings);
MessageFactory messageFactory = new DefaultMessageFactory();
fixIniator.socketInitiator = new SocketInitiator(application, fileStoreFactory,
sessionSettings, logFactory, messageFactory);
fixIniator.socketInitiator.start();
System.out.println("pressto quit");
System.in.read();
fixIniator.socketInitiator.stop();
}
Like as initiator, we need to implement the Application interface.To send reply back to the client(ie: to initiator) we need to extend the MessageCracker class and need to override onMessage() with the response message.
public class FIXAcceptorApplication extends MessageCracker implements Application {
@Override
public void fromAdmin(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
}
@Override
public void fromApp(Message arg0, SessionID arg1) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
System.out.println("("Acceptor received new message.. ");
crack(arg0, arg1);
}
@Override
public void onCreate(SessionID arg0) {
}
@Override
public void onLogon(SessionID arg0) {
System.out.println("Acceptor logged on.........");
}
@Override
public void onLogout(SessionID arg0) {
}
@Override
public void toAdmin(Message arg0, SessionID arg1) {
// TODO
}
@Override
public void toApp(Message arg0, SessionID arg1) throws DoNotSend {
}
public void onMessage(NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
OrderQty orderQty = new OrderQty(10.0);
Price price = new Price(10.0);
ExecutionReport executionReport =
new ExecutionReport(getOrderIDCounter(),
getExecutionIDCounter(),
new ExecTransType(ExecTransType.NEW),
new ExecType(ExecType.FILL),
new OrdStatus(OrdStatus.FILLED),
order.getSymbol(), order.getSide(),
new LeavesQty(0),
new CumQty(orderQty.getValue()),
new AvgPx(price.getValue()));
executionReport.set(order.getClOrdID());
executionReport.set(orderQty);
executionReport.set(new LastShares(orderQty.getValue()));
executionReport.set(new LastPx(price.getValue()));
try {
Session session = Session.lookupSession(sessionID);
Session.sendToTarget(executionReport, sessionID);
System.out.println("NewOrderSingle Execution Completed-----");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("Error during order execution" + ex.getMessage());
}
}
}
public class FIXAcceptorExecutor {
private final SocketAcceptor acceptor;
private final static Map> dynamicSessionMappings = new HashMap >();
public FIXAcceptorExecutor(SessionSettings settings) throws ConfigError, FieldConvertError {
Application application = new FIXAcceptorApplication();
MessageStoreFactory messageStoreFactory = new FileStoreFactory(settings);
LogFactory logFactory = new FileLogFactory(settings);
MessageFactory messageFactory = new DefaultMessageFactory();
acceptor = new SocketAcceptor(application, messageStoreFactory, settings, logFactory, messageFactory);
configureDynamicSessions(settings, application, messageStoreFactory, logFactory,
messageFactory);
}
private void configureDynamicSessions(SessionSettings settings, Application application,
MessageStoreFactory messageStoreFactory, LogFactory logFactory, MessageFactory messageFactory) throws ConfigError, FieldConvertError {
IteratorsectionIterator = settings.sectionIterator();
while (sectionIterator.hasNext()) {
SessionID sessionID = sectionIterator.next();
if (isSessionTemplate(settings, sessionID)) {
InetSocketAddress address = getAcceptorSocketAddress(settings, sessionID);
getMappings(address).add(new TemplateMapping(sessionID, sessionID));
}
}
for (Map.Entry> entry : dynamicSessionMappings.entrySet()) {
acceptor.setSessionProvider(entry.getKey(),
new DynamicAcceptorSessionProvider(settings,
entry.getValue(),
application,
messageStoreFactory,
logFactory,
messageFactory));
}
}
private ListgetMappings(InetSocketAddress address) {
Listmappings = dynamicSessionMappings.get(address);
if (mappings == null) {
mappings = new ArrayList();
dynamicSessionMappings.put(address, mappings);
}
return mappings;
}
private InetSocketAddress getAcceptorSocketAddress(SessionSettings settings, SessionID sessionID) throws ConfigError, FieldConvertError {
String acceptorHost = "0.0.0.0";
if (settings.isSetting(sessionID, SETTING_SOCKET_ACCEPT_ADDRESS)) {
acceptorHost = settings.getString(sessionID, SETTING_SOCKET_ACCEPT_ADDRESS);
}
int acceptorPort = (int) settings.getLong(sessionID, SETTING_SOCKET_ACCEPT_PORT);
InetSocketAddress address = new InetSocketAddress(acceptorHost, acceptorPort);
return address;
}
private boolean isSessionTemplate(SessionSettings settings, SessionID sessionID)
throws ConfigError,
FieldConvertError {
return settings.isSetting(sessionID, SETTING_ACCEPTOR_TEMPLATE) &&
settings.getBool(sessionID, SETTING_ACCEPTOR_TEMPLATE);
}
private void start() throws RuntimeError, ConfigError {
acceptor.start();
}
private void stop() {
acceptor.stop();
}
public static void main(String args[]) throws Exception {
InputStream inputStream = null;
if (args.length == 0) {
inputStream = FIXAcceptorExecutor.class.getResourceAsStream("executor.cfg");
} else if (args.length == 1) {
inputStream = new FileInputStream(args[0]);
}
SessionSettings settings = new SessionSettings(inputStream);
FIXAcceptorExecutor executor = new FIXAcceptorExecutor(settings);
executor.start();
System.out.println("pressto quit");
System.in.read();
executor.stop();
}
}