blob: 1cc5be73fd4eb989a7dc7a5cd2f24a77bb1683b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.eip.patterns;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.common.util.MessageUtil;
/**
* A RoutingSlip component can be used to route an incoming In-Out exchange
* through a series of target services.
* This endpoint implements the
* <a href="http://www.enterpriseintegrationpatterns.com/RoutingTable.html">Routing Slip</a>
* pattern, with the limitation that the routing table is static.
* This endpoint only uses In-Out MEPs and errors or faults sent by targets are reported
* back to the consumer, thus interrupting the routing process.
* In addition, this endpoint is fully asynchronous and uses an exchange store to provide
* full HA and recovery for clustered / persistent flows.
*
* @author gnodet
* @version $Revision: 376451 $
* @org.apache.xbean.XBean element="static-routing-slip"
*/
public class StaticRoutingSlip extends EIPEndpoint {
/**
* List of target components used in the RoutingSlip
*/
private ExchangeTarget[] targets;
/**
* The correlation property used by this component
*/
private String correlation;
/**
* The current index of the target
*/
private String index;
/**
* The id of the previous target exchange
*/
private String previous;
/**
* @return Returns the targets.
*/
public ExchangeTarget[] getTargets() {
return targets;
}
/**
* List of target endpoints used in the RoutingSlip
*
* @param targets The targets to set.
*/
public void setTargets(ExchangeTarget[] targets) {
this.targets = targets;
}
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#validate()
*/
public void validate() throws DeploymentException {
super.validate();
// Check target
if (targets == null || targets.length == 0) {
throw new IllegalArgumentException("targets should contain at least one ExchangeTarget");
}
// Create correlation properties
correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint();
index = "RoutingSlip.Index." + getService() + "." + getEndpoint();
previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint();
}
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
protected void processSync(MessageExchange exchange) throws Exception {
if (!(exchange instanceof InOut)) {
throw new IllegalStateException("Use an InOut MEP");
}
MessageExchange current = exchange;
for (int i = 0; i < targets.length; i++) {
InOut me = getExchangeFactory().createInOutExchange();
targets[i].configureTarget(me, getContext());
if (i == 0) {
MessageUtil.transferInToIn(current, me);
} else {
MessageUtil.transferOutToIn(current, me);
}
sendSync(me);
if (i != 0) {
done(current);
}
if (me.getStatus() == ExchangeStatus.DONE) {
throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
} else if (me.getStatus() == ExchangeStatus.ERROR) {
fail(exchange, me.getError());
return;
} else if (me.getFault() != null) {
Fault fault = MessageUtil.copyFault(me);
MessageUtil.transferToFault(fault, exchange);
done(me);
sendSync(exchange);
return;
} else if (me.getOutMessage() == null) {
throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+ " but has no Out nor Fault message");
}
current = me;
}
MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange);
done(current);
sendSync(exchange);
}
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
*/
protected void processAsync(MessageExchange exchange) throws Exception {
// This exchange comes from the consumer
if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
processProviderAsync(exchange);
// The exchange comes from a target
} else {
processConsumerAsync(exchange);
}
}
protected void processProviderAsync(MessageExchange exchange) throws Exception {
if (exchange.getStatus() == ExchangeStatus.DONE) {
String correlationId = (String) exchange.getProperty(correlation);
if (correlationId == null) {
throw new IllegalStateException(correlation + " property not found");
}
// Ack last target hit
MessageExchange me = (MessageExchange) store.load(correlationId);
done(me);
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
String correlationId = (String) exchange.getProperty(correlation);
if (correlationId == null) {
throw new IllegalStateException(correlation + " property not found");
}
// Ack last target hit
MessageExchange me = (MessageExchange) store.load(correlationId);
done(me);
} else if (!(exchange instanceof InOut)) {
throw new IllegalStateException("Use an InOut MEP");
} else {
MessageExchange me = getExchangeFactory().createInOutExchange();
me.setProperty(correlation, exchange.getExchangeId());
me.setProperty(index, new Integer(0));
targets[0].configureTarget(me, getContext());
store.store(exchange.getExchangeId(), exchange);
MessageUtil.transferInToIn(exchange, me);
send(me);
}
}
private void processConsumerAsync(MessageExchange exchange) throws Exception {
String correlationId = (String) exchange.getProperty(correlation);
String previousId = (String) exchange.getProperty(previous);
Integer prevIndex = (Integer) exchange.getProperty(index);
if (correlationId == null) {
throw new IllegalStateException(correlation + " property not found");
}
if (prevIndex == null) {
throw new IllegalStateException(previous + " property not found");
}
// This should never happen, as we can only send DONE
if (exchange.getStatus() == ExchangeStatus.DONE) {
throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
// ERROR are sent back to the consumer
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
MessageExchange me = (MessageExchange) store.load(correlationId);
fail(me, exchange.getError());
// Ack the previous target
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
// Faults are sent back to the consumer
} else if (exchange.getFault() != null) {
MessageExchange me = (MessageExchange) store.load(correlationId);
me.setProperty(correlation, exchange.getExchangeId());
store.store(exchange.getExchangeId(), exchange);
MessageUtil.transferFaultToFault(exchange, me);
send(me);
// Ack the previous target
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
// Out message, give it to next target or back to consumer
} else if (exchange.getMessage("out") != null) {
// This is the answer from the last target
if (prevIndex.intValue() == targets.length - 1) {
MessageExchange me = (MessageExchange) store.load(correlationId);
me.setProperty(correlation, exchange.getExchangeId());
store.store(exchange.getExchangeId(), exchange);
MessageUtil.transferOutToOut(exchange, me);
send(me);
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
// We still have a target to hit
} else {
MessageExchange me = getExchangeFactory().createInOutExchange();
Integer curIndex = new Integer(prevIndex.intValue() + 1);
me.setProperty(correlation, correlationId);
me.setProperty(index, curIndex);
me.setProperty(previous, exchange.getExchangeId());
targets[curIndex.intValue()].configureTarget(me, getContext());
store.store(exchange.getExchangeId(), exchange);
MessageUtil.transferOutToIn(exchange, me);
try {
send(me);
} catch (RuntimeException re) {
// send delivery channel errors back to calling endpoint
if (correlationId != null) {
me = (MessageExchange) store.load(correlationId);
}
fail(me, re);
}
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
}
// This should not happen
} else {
throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+ " but has no Out nor Fault message");
}
}
}