blob: 9883c16d74483afe3f9ad47691c8d5f599a7bb92 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.quickfixj.examples;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Body;
import org.apache.camel.Exchange;
import org.apache.camel.Header;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.quickfixj.MessagePredicate;
import org.apache.camel.component.quickfixj.QuickfixjComponent;
import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
import org.apache.camel.component.quickfixj.QuickfixjProducer;
import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonTransformer;
import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.FieldNotFound;
import quickfix.SessionID;
import quickfix.field.AvgPx;
import quickfix.field.ClOrdID;
import quickfix.field.CumQty;
import quickfix.field.ExecID;
import quickfix.field.ExecTransType;
import quickfix.field.ExecType;
import quickfix.field.LeavesQty;
import quickfix.field.MsgType;
import quickfix.field.OrdStatus;
import quickfix.field.OrderID;
import quickfix.field.Side;
import quickfix.field.Symbol;
import quickfix.fix42.ExecutionReport;
import quickfix.fix42.OrderStatusRequest;
public class RequestReplyExample {
private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
public static void main(String[] args) throws Exception {
new RequestReplyExample().run();
}
public void run() throws Exception {
DefaultCamelContext context = new DefaultCamelContext();
final CountDownLatch logonLatch = new CountDownLatch(1);
final String orderStatusServiceUrl = "http://localhost:9123/order/status";
RouteBuilder routes = new RouteBuilder() {
@Override
public void configure() throws Exception {
// Synchronize the logon so we don't start sending status requests too early
from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:TRADER->MARKET").
filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
bean(new CountDownLatchDecrementer("logon", logonLatch));
// Incoming status requests are converted to InOut exchange pattern and passed to the
// order status service. The response is sent back to the session making the request.
from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER&exchangePattern=InOut")
.filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.ORDER_STATUS_REQUEST))
.bean(new MarketOrderStatusService());
from("jetty:" + orderStatusServiceUrl)
.bean(new OrderStatusRequestTransformer())
.routingSlip(bean(FixSessionRouter.class, "route"))
.bean(new QuickfixjMessageJsonTransformer());
}
};
context.addRoutes(routes);
LOG.info("Starting Camel context");
context.start();
if (!logonLatch.await(5L, TimeUnit.SECONDS)) {
throw new IllegalStateException("Logon did not succeed");
}
// Send a request to the order status web service.
// Verify that the response is a JSON response.
URL orderStatusUrl = new URL(orderStatusServiceUrl + "?sessionID=FIX.4.2:TRADER->MARKET&orderID=abc");
HttpURLConnection connection = (HttpURLConnection) orderStatusUrl.openConnection();
BufferedReader orderStatusReply = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String line = orderStatusReply.readLine();
if (!line.equals("\"message\": {")) {
throw new Exception("Don't appear to be a JSON response");
} else {
StringBuilder sb = new StringBuilder();
while (line != null) {
sb.append(line);
sb.append('\n');
line = orderStatusReply.readLine();
}
LOG.info("Web request response:\n" + sb);
}
orderStatusReply.close();
LOG.info("Shutting down Camel context");
context.stop();
LOG.info("Example complete");
}
public static class OrderStatusRequestTransformer {
public void transform(Exchange exchange) throws FieldNotFound {
String sessionID = (String) exchange.getIn().getHeader("sessionID");
String orderID = (String) exchange.getIn().getHeader("orderID");
OrderStatusRequest request = new OrderStatusRequest(new ClOrdID("XYZ"), new Symbol("GOOG"), new Side(Side.BUY));
request.set(new OrderID(orderID));
// Look for a reply execution report back to the requester session
// and having the requested OrderID. This is a loose correlation but the best
// we can do with FIX 4.2. Newer versions of FIX have an optional explicit correlation field.
exchange.setProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY, new MessagePredicate(
new SessionID(sessionID), MsgType.EXECUTION_REPORT).withField(OrderID.FIELD, request.getString(OrderID.FIELD)));
exchange.getIn().setBody(request);
}
}
public static class MarketOrderStatusService {
private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
public ExecutionReport getOrderStatus(OrderStatusRequest request) throws FieldNotFound {
LOG.info("Received order status request for orderId=" + request.getOrderID().getValue());
return new ExecutionReport(request.getOrderID(),
new ExecID(UUID.randomUUID().toString()),
new ExecTransType(ExecTransType.STATUS),
new ExecType(ExecType.REJECTED),
new OrdStatus(OrdStatus.REJECTED),
new Symbol("GOOG"),
new Side(Side.BUY),
new LeavesQty(100),
new CumQty(0),
new AvgPx(0));
}
}
public static class FixSessionRouter {
public String route(@Header("sessionID") String sessionID, @Body Object body) {
return String.format("quickfix:examples/inprocess.cfg?sessionID=%s", sessionID);
}
}
}