| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.servicemix.eip.patterns; |
| |
| import javax.jbi.management.DeploymentException; |
| import javax.jbi.messaging.ExchangeStatus; |
| import javax.jbi.messaging.Fault; |
| import javax.jbi.messaging.InOut; |
| import javax.jbi.messaging.MessageExchange; |
| |
| import org.apache.servicemix.eip.EIPEndpoint; |
| import org.apache.servicemix.eip.support.ExchangeTarget; |
| import org.apache.servicemix.common.util.MessageUtil; |
| |
| /** |
| * A RoutingSlip component can be used to route an incoming In-Out exchange |
| * through a series of target services. |
| * This endpoint implements the |
| * <a href="http://www.enterpriseintegrationpatterns.com/RoutingTable.html">Routing Slip</a> |
| * pattern, with the limitation that the routing table is static. |
| * This endpoint only uses In-Out MEPs and errors or faults sent by targets are reported |
| * back to the consumer, thus interrupting the routing process. |
| * In addition, this endpoint is fully asynchronous and uses an exchange store to provide |
| * full HA and recovery for clustered / persistent flows. |
| * |
| * @author gnodet |
| * @version $Revision: 376451 $ |
| * @org.apache.xbean.XBean element="static-routing-slip" |
| */ |
| public class StaticRoutingSlip extends EIPEndpoint { |
| |
| /** |
| * List of target components used in the RoutingSlip |
| */ |
| private ExchangeTarget[] targets; |
| /** |
| * The correlation property used by this component |
| */ |
| private String correlation; |
| /** |
| * The current index of the target |
| */ |
| private String index; |
| /** |
| * The id of the previous target exchange |
| */ |
| private String previous; |
| |
| /** |
| * @return Returns the targets. |
| */ |
| public ExchangeTarget[] getTargets() { |
| return targets; |
| } |
| |
| /** |
| * List of target endpoints used in the RoutingSlip |
| * |
| * @param targets The targets to set. |
| */ |
| public void setTargets(ExchangeTarget[] targets) { |
| this.targets = targets; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.servicemix.eip.EIPEndpoint#validate() |
| */ |
| public void validate() throws DeploymentException { |
| super.validate(); |
| // Check target |
| if (targets == null || targets.length == 0) { |
| throw new IllegalArgumentException("targets should contain at least one ExchangeTarget"); |
| } |
| // Create correlation properties |
| correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint(); |
| index = "RoutingSlip.Index." + getService() + "." + getEndpoint(); |
| previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange) |
| */ |
| protected void processSync(MessageExchange exchange) throws Exception { |
| if (!(exchange instanceof InOut)) { |
| throw new IllegalStateException("Use an InOut MEP"); |
| } |
| MessageExchange current = exchange; |
| for (int i = 0; i < targets.length; i++) { |
| InOut me = getExchangeFactory().createInOutExchange(); |
| targets[i].configureTarget(me, getContext()); |
| if (i == 0) { |
| MessageUtil.transferInToIn(current, me); |
| } else { |
| MessageUtil.transferOutToIn(current, me); |
| } |
| sendSync(me); |
| if (i != 0) { |
| done(current); |
| } |
| if (me.getStatus() == ExchangeStatus.DONE) { |
| throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE); |
| } else if (me.getStatus() == ExchangeStatus.ERROR) { |
| fail(exchange, me.getError()); |
| return; |
| } else if (me.getFault() != null) { |
| Fault fault = MessageUtil.copyFault(me); |
| MessageUtil.transferToFault(fault, exchange); |
| done(me); |
| sendSync(exchange); |
| return; |
| } else if (me.getOutMessage() == null) { |
| throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE |
| + " but has no Out nor Fault message"); |
| } |
| current = me; |
| } |
| MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange); |
| done(current); |
| sendSync(exchange); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange) |
| */ |
| protected void processAsync(MessageExchange exchange) throws Exception { |
| // This exchange comes from the consumer |
| if (exchange.getRole() == MessageExchange.Role.PROVIDER) { |
| processProviderAsync(exchange); |
| // The exchange comes from a target |
| } else { |
| processConsumerAsync(exchange); |
| } |
| } |
| |
| protected void processProviderAsync(MessageExchange exchange) throws Exception { |
| if (exchange.getStatus() == ExchangeStatus.DONE) { |
| String correlationId = (String) exchange.getProperty(correlation); |
| if (correlationId == null) { |
| throw new IllegalStateException(correlation + " property not found"); |
| } |
| // Ack last target hit |
| MessageExchange me = (MessageExchange) store.load(correlationId); |
| done(me); |
| } else if (exchange.getStatus() == ExchangeStatus.ERROR) { |
| String correlationId = (String) exchange.getProperty(correlation); |
| if (correlationId == null) { |
| throw new IllegalStateException(correlation + " property not found"); |
| } |
| // Ack last target hit |
| MessageExchange me = (MessageExchange) store.load(correlationId); |
| done(me); |
| } else if (!(exchange instanceof InOut)) { |
| throw new IllegalStateException("Use an InOut MEP"); |
| } else { |
| MessageExchange me = getExchangeFactory().createInOutExchange(); |
| me.setProperty(correlation, exchange.getExchangeId()); |
| me.setProperty(index, new Integer(0)); |
| targets[0].configureTarget(me, getContext()); |
| store.store(exchange.getExchangeId(), exchange); |
| MessageUtil.transferInToIn(exchange, me); |
| send(me); |
| } |
| } |
| |
| private void processConsumerAsync(MessageExchange exchange) throws Exception { |
| String correlationId = (String) exchange.getProperty(correlation); |
| String previousId = (String) exchange.getProperty(previous); |
| Integer prevIndex = (Integer) exchange.getProperty(index); |
| if (correlationId == null) { |
| throw new IllegalStateException(correlation + " property not found"); |
| } |
| if (prevIndex == null) { |
| throw new IllegalStateException(previous + " property not found"); |
| } |
| // This should never happen, as we can only send DONE |
| if (exchange.getStatus() == ExchangeStatus.DONE) { |
| throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE); |
| // ERROR are sent back to the consumer |
| } else if (exchange.getStatus() == ExchangeStatus.ERROR) { |
| MessageExchange me = (MessageExchange) store.load(correlationId); |
| fail(me, exchange.getError()); |
| // Ack the previous target |
| if (previousId != null) { |
| me = (MessageExchange) store.load(previousId); |
| done(me); |
| } |
| // Faults are sent back to the consumer |
| } else if (exchange.getFault() != null) { |
| MessageExchange me = (MessageExchange) store.load(correlationId); |
| me.setProperty(correlation, exchange.getExchangeId()); |
| store.store(exchange.getExchangeId(), exchange); |
| MessageUtil.transferFaultToFault(exchange, me); |
| send(me); |
| // Ack the previous target |
| if (previousId != null) { |
| me = (MessageExchange) store.load(previousId); |
| done(me); |
| } |
| // Out message, give it to next target or back to consumer |
| } else if (exchange.getMessage("out") != null) { |
| // This is the answer from the last target |
| if (prevIndex.intValue() == targets.length - 1) { |
| MessageExchange me = (MessageExchange) store.load(correlationId); |
| me.setProperty(correlation, exchange.getExchangeId()); |
| store.store(exchange.getExchangeId(), exchange); |
| MessageUtil.transferOutToOut(exchange, me); |
| send(me); |
| if (previousId != null) { |
| me = (MessageExchange) store.load(previousId); |
| done(me); |
| } |
| // We still have a target to hit |
| } else { |
| MessageExchange me = getExchangeFactory().createInOutExchange(); |
| Integer curIndex = new Integer(prevIndex.intValue() + 1); |
| me.setProperty(correlation, correlationId); |
| me.setProperty(index, curIndex); |
| me.setProperty(previous, exchange.getExchangeId()); |
| targets[curIndex.intValue()].configureTarget(me, getContext()); |
| store.store(exchange.getExchangeId(), exchange); |
| MessageUtil.transferOutToIn(exchange, me); |
| try { |
| send(me); |
| } catch (RuntimeException re) { |
| // send delivery channel errors back to calling endpoint |
| if (correlationId != null) { |
| me = (MessageExchange) store.load(correlationId); |
| } |
| fail(me, re); |
| } |
| if (previousId != null) { |
| me = (MessageExchange) store.load(previousId); |
| done(me); |
| } |
| } |
| // This should not happen |
| } else { |
| throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE |
| + " but has no Out nor Fault message"); |
| } |
| } |
| |
| } |