blob: a978eb0b20996e052b8bc5b729bec4573c8685b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.spring.spi;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.processor.CamelLogger;
import org.apache.camel.processor.RedeliveryErrorHandler;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
import org.apache.camel.util.ObjectHelper;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
/**
* The <a href="http://camel.apache.org/transactional-client.html">Transactional Client</a>
* EIP pattern.
*
* @version
*/
public class TransactionErrorHandler extends RedeliveryErrorHandler {
private final TransactionTemplate transactionTemplate;
private final String transactionKey;
/**
* Creates the transaction error handler.
*
* @param camelContext the camel context
* @param output outer processor that should use this default error handler
* @param logger logger to use for logging failures and redelivery attempts
* @param redeliveryProcessor an optional processor to run before redelivery attempt
* @param redeliveryPolicy policy for redelivery
* @param exceptionPolicyStrategy strategy for onException handling
* @param transactionTemplate the transaction template
* @param retryWhile retry while
* @param executorServiceRef reference to a {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>.
*/
public TransactionErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy,
TransactionTemplate transactionTemplate, Predicate retryWhile, String executorServiceRef) {
super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, false, retryWhile, executorServiceRef);
setExceptionPolicy(exceptionPolicyStrategy);
this.transactionTemplate = transactionTemplate;
this.transactionKey = ObjectHelper.getIdentityHashCode(transactionTemplate);
}
public boolean supportTransacted() {
return true;
}
@Override
public String toString() {
if (output == null) {
// if no output then don't do any description
return "";
}
return "TransactionErrorHandler:"
+ propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
+ "[" + getOutput() + "]";
}
@Override
public void process(Exchange exchange) throws Exception {
// we have to run this synchronously as Spring Transaction does *not* support
// using multiple threads to span a transaction
if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) {
// already transacted by this transaction template
// so lets just let the error handler process it
processByErrorHandler(exchange);
} else {
// not yet wrapped in transaction so lets do that
// and then have it invoke the error handler from within that transaction
processInTransaction(exchange);
}
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// invoke ths synchronous method as Spring Transaction does *not* support
// using multiple threads to span a transaction
try {
process(exchange);
} catch (Throwable e) {
exchange.setException(e);
}
// notify callback we are done synchronously
callback.done(true);
return true;
}
protected void processInTransaction(final Exchange exchange) throws Exception {
try {
// mark the beginning of this transaction boundary
exchange.getUnitOfWork().beginTransactedBy(transactionKey);
log.debug("Transaction begin ({}) for ExchangeId: {}", transactionKey, exchange.getExchangeId());
doInTransactionTemplate(exchange);
log.debug("Transaction commit ({}) for ExchangeId: {}", transactionKey, exchange.getExchangeId());
} catch (TransactionRollbackException e) {
// ignore as its just a dummy exception to force spring TX to rollback
log.debug("Transaction rollback ({}) for ExchangeId: {} due exchange was marked for rollbackOnly", transactionKey, exchange.getExchangeId());
} catch (Throwable e) {
log.warn("Transaction rollback (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId() + " due exception: " + e.getMessage());
exchange.setException(e);
} finally {
// mark the end of this transaction boundary
exchange.getUnitOfWork().endTransactedBy(transactionKey);
}
// if it was a local rollback only then remove its marker so outer transaction wont see the marker
Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST);
if (onlyLast != null && onlyLast) {
if (log.isDebugEnabled()) {
// log exception if there was a cause exception so we have the stacktrace
Exception cause = exchange.getException();
if (cause != null) {
log.debug("Transaction rollback (" + transactionKey + ") for ExchangeId: " + exchange.getExchangeId()
+ " due exchange was marked for rollbackOnlyLast and due exception: ", cause);
} else {
log.debug("Transaction rollback ({}) for ExchangeId: {} due exchange was marked for rollbackOnlyLast",
transactionKey, exchange.getExchangeId());
}
}
// remove caused exception due we was marked as rollback only last
// so by removing the exception, any outer transaction will not be affected
exchange.setException(null);
}
}
protected void doInTransactionTemplate(final Exchange exchange) {
// spring transaction template is working best with rollback if you throw it a runtime exception
// otherwise it may not rollback messages send to JMS queues etc.
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
// wrapper exception to throw if the exchange failed
// IMPORTANT: Must be a runtime exception to let Spring regard it as to do "rollback"
RuntimeException rce;
// and now let process the exchange by the error handler
processByErrorHandler(exchange);
// after handling and still an exception or marked as rollback only then rollback
if (exchange.getException() != null || exchange.isRollbackOnly()) {
// wrap exception in transacted exception
if (exchange.getException() != null) {
rce = ObjectHelper.wrapRuntimeCamelException(exchange.getException());
} else {
// create dummy exception to force spring transaction manager to rollback
rce = new TransactionRollbackException();
}
if (!status.isRollbackOnly()) {
status.setRollbackOnly();
}
// throw runtime exception to force rollback (which works best to rollback with Spring transaction manager)
throw rce;
}
}
});
}
/**
* Processes the {@link Exchange} using the error handler.
* <p/>
* This implementation will invoke ensure this occurs synchronously, that means if the async routing engine
* did kick in, then this implementation will wait for the task to complete before it continues.
*
* @param exchange the exchange
*/
protected void processByErrorHandler(final Exchange exchange) {
// must invoke the async method with empty callback to have it invoke the
// super.processErrorHandler
// we are transacted so we have to route synchronously so don't worry about returned
// value from the process method
// and the camel routing engine will detect this is an transacted Exchange and route
// it fully synchronously so we don't have to wait here if we hit an async endpoint
// all that is taken care of in the camel-core
super.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// noop
}
});
}
private static String propagationBehaviorToString(int propagationBehavior) {
String rc;
switch (propagationBehavior) {
case TransactionDefinition.PROPAGATION_MANDATORY:
rc = "PROPAGATION_MANDATORY";
break;
case TransactionDefinition.PROPAGATION_NESTED:
rc = "PROPAGATION_NESTED";
break;
case TransactionDefinition.PROPAGATION_NEVER:
rc = "PROPAGATION_NEVER";
break;
case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:
rc = "PROPAGATION_NOT_SUPPORTED";
break;
case TransactionDefinition.PROPAGATION_REQUIRED:
rc = "PROPAGATION_REQUIRED";
break;
case TransactionDefinition.PROPAGATION_REQUIRES_NEW:
rc = "PROPAGATION_REQUIRES_NEW";
break;
case TransactionDefinition.PROPAGATION_SUPPORTS:
rc = "PROPAGATION_SUPPORTS";
break;
default:
rc = "UNKNOWN";
}
return rc;
}
}