| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.processor; |
| |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.camel.AsyncCallback; |
| import org.apache.camel.AsyncProcessor; |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.LoggingLevel; |
| import org.apache.camel.Message; |
| import org.apache.camel.Predicate; |
| import org.apache.camel.Processor; |
| import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; |
| import org.apache.camel.model.OnExceptionDefinition; |
| import org.apache.camel.spi.SubUnitOfWorkCallback; |
| import org.apache.camel.util.AsyncProcessorHelper; |
| import org.apache.camel.util.CamelContextHelper; |
| import org.apache.camel.util.EventHelper; |
| import org.apache.camel.util.ExchangeHelper; |
| import org.apache.camel.util.MessageHelper; |
| import org.apache.camel.util.ObjectHelper; |
| import org.apache.camel.util.ServiceHelper; |
| |
| /** |
| * Base redeliverable error handler that also supports a final dead letter queue in case |
| * all redelivery attempts fail. |
| * <p/> |
| * This implementation should contain all the error handling logic and the sub classes |
| * should only configure it according to what they support. |
| * |
| * @version |
| */ |
| public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor { |
| |
| private static ScheduledExecutorService executorService; |
| protected final String executorServiceRef; |
| protected final CamelContext camelContext; |
| protected final Processor deadLetter; |
| protected final String deadLetterUri; |
| protected final Processor output; |
| protected final AsyncProcessor outputAsync; |
| protected final Processor redeliveryProcessor; |
| protected final RedeliveryPolicy redeliveryPolicy; |
| protected final Predicate retryWhilePolicy; |
| protected final CamelLogger logger; |
| protected final boolean useOriginalMessagePolicy; |
| protected boolean redeliveryEnabled; |
| |
| /** |
| * Contains the current redelivery data |
| */ |
| protected class RedeliveryData { |
| Exchange original; |
| boolean sync = true; |
| int redeliveryCounter; |
| long redeliveryDelay; |
| Predicate retryWhilePredicate = retryWhilePolicy; |
| boolean redeliverFromSync; |
| |
| // default behavior which can be overloaded on a per exception basis |
| RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; |
| Processor deadLetterProcessor = deadLetter; |
| Processor failureProcessor; |
| Processor onRedeliveryProcessor = redeliveryProcessor; |
| Predicate handledPredicate = getDefaultHandledPredicate(); |
| Predicate continuedPredicate; |
| boolean useOriginalInMessage = useOriginalMessagePolicy; |
| boolean asyncDelayedRedelivery = redeliveryPolicy.isAsyncDelayedRedelivery(); |
| } |
| |
| /** |
| * Tasks which performs asynchronous redelivery attempts, and being triggered by a |
| * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task |
| * has to be delayed before a redelivery attempt is performed. |
| */ |
| private class AsyncRedeliveryTask implements Callable<Boolean> { |
| |
| private final Exchange exchange; |
| private final AsyncCallback callback; |
| private final RedeliveryData data; |
| |
| public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { |
| this.exchange = exchange; |
| this.callback = callback; |
| this.data = data; |
| } |
| |
| public Boolean call() throws Exception { |
| // prepare for redelivery |
| prepareExchangeForRedelivery(exchange, data); |
| |
| // letting onRedeliver be executed at first |
| deliverToOnRedeliveryProcessor(exchange, data); |
| |
| if (log.isTraceEnabled()) { |
| log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange}); |
| } |
| |
| // emmit event we are doing redelivery |
| EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); |
| |
| // process the exchange (also redelivery) |
| boolean sync; |
| if (data.redeliverFromSync) { |
| // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from |
| // this error handler, which means we have to invoke the callback with false, to have the callback |
| // be notified when we are done |
| sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { |
| public void done(boolean doneSync) { |
| log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); |
| |
| // mark we are in sync mode now |
| data.sync = false; |
| |
| // only process if the exchange hasn't failed |
| // and it has not been handled by the error processor |
| if (isDone(exchange)) { |
| callback.done(false); |
| return; |
| } |
| |
| // error occurred so loop back around which we do by invoking the processAsyncErrorHandler |
| processAsyncErrorHandler(exchange, callback, data); |
| } |
| }); |
| } else { |
| // this redelivery task was scheduled from asynchronous, which means we should only |
| // handle when the asynchronous task was done |
| sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { |
| public void done(boolean doneSync) { |
| log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); |
| |
| // this callback should only handle the async case |
| if (doneSync) { |
| return; |
| } |
| |
| // mark we are in async mode now |
| data.sync = false; |
| |
| // only process if the exchange hasn't failed |
| // and it has not been handled by the error processor |
| if (isDone(exchange)) { |
| callback.done(doneSync); |
| return; |
| } |
| // error occurred so loop back around which we do by invoking the processAsyncErrorHandler |
| processAsyncErrorHandler(exchange, callback, data); |
| } |
| }); |
| } |
| |
| return sync; |
| } |
| } |
| |
| public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, |
| Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, |
| String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, String executorServiceRef) { |
| |
| ObjectHelper.notNull(camelContext, "CamelContext", this); |
| ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); |
| |
| this.camelContext = camelContext; |
| this.redeliveryProcessor = redeliveryProcessor; |
| this.deadLetter = deadLetter; |
| this.output = output; |
| this.outputAsync = AsyncProcessorTypeConverter.convert(output); |
| this.redeliveryPolicy = redeliveryPolicy; |
| this.logger = logger; |
| this.deadLetterUri = deadLetterUri; |
| this.useOriginalMessagePolicy = useOriginalMessagePolicy; |
| this.retryWhilePolicy = retryWhile; |
| this.executorServiceRef = executorServiceRef; |
| } |
| |
| public boolean supportTransacted() { |
| return false; |
| } |
| |
| public void process(Exchange exchange) throws Exception { |
| if (output == null) { |
| // no output then just return |
| return; |
| } |
| AsyncProcessorHelper.process(this, exchange); |
| } |
| |
| public boolean process(Exchange exchange, final AsyncCallback callback) { |
| return processErrorHandler(exchange, callback, new RedeliveryData()); |
| } |
| |
| /** |
| * Process the exchange using redelivery error handling. |
| */ |
| protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { |
| |
| // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the |
| // original Exchange is being redelivered, and not a mutated Exchange |
| data.original = defensiveCopyExchangeIfNeeded(exchange); |
| |
| // use looping to have redelivery attempts |
| while (true) { |
| |
| // can we still run |
| if (!isRunAllowed()) { |
| if (exchange.getException() == null) { |
| exchange.setException(new RejectedExecutionException()); |
| } |
| // we cannot process so invoke callback |
| callback.done(data.sync); |
| return data.sync; |
| } |
| |
| // did previous processing cause an exception? |
| boolean handle = shouldHandleException(exchange); |
| if (handle) { |
| handleException(exchange, data); |
| } |
| |
| // compute if we are exhausted or not |
| boolean exhausted = isExhausted(exchange, data); |
| if (exhausted) { |
| Processor target = null; |
| boolean deliver = true; |
| |
| // the unit of work may have an optional callback associated we need to leverage |
| SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); |
| if (uowCallback != null) { |
| // signal to the callback we are exhausted |
| uowCallback.onExhausted(exchange); |
| // do not deliver to the failure processor as its been handled by the callback instead |
| deliver = false; |
| } |
| |
| if (deliver) { |
| // should deliver to failure processor (either from onException or the dead letter channel) |
| target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; |
| } |
| // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair |
| // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) |
| boolean sync = deliverToFailureProcessor(target, exchange, data, callback); |
| // we are breaking out |
| return sync; |
| } |
| |
| if (data.redeliveryCounter > 0) { |
| // calculate delay |
| data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter); |
| |
| if (data.redeliveryDelay > 0) { |
| // okay there is a delay so create a scheduled task to have it executed in the future |
| |
| if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) { |
| // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to |
| // have it being executed in the future, or immediately |
| // we are continuing asynchronously |
| |
| // mark we are routing async from now and that this redelivery task came from a synchronous routing |
| data.sync = false; |
| data.redeliverFromSync = true; |
| AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); |
| |
| // schedule the redelivery task |
| if (log.isTraceEnabled()) { |
| log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); |
| } |
| executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); |
| |
| return false; |
| } else { |
| // async delayed redelivery was disabled or we are transacted so we must be synchronous |
| // as the transaction manager requires to execute in the same thread context |
| try { |
| data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); |
| } catch (InterruptedException e) { |
| // we was interrupted so break out |
| exchange.setException(e); |
| // mark the exchange to stop continue routing when interrupted |
| // as we do not want to continue routing (for example a task has been cancelled) |
| exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); |
| callback.done(data.sync); |
| return data.sync; |
| } |
| } |
| } |
| |
| // prepare for redelivery |
| prepareExchangeForRedelivery(exchange, data); |
| |
| // letting onRedeliver be executed |
| deliverToOnRedeliveryProcessor(exchange, data); |
| |
| // emmit event we are doing redelivery |
| EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); |
| } |
| |
| // process the exchange (also redelivery) |
| boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { |
| public void done(boolean sync) { |
| // this callback should only handle the async case |
| if (sync) { |
| return; |
| } |
| |
| // mark we are in async mode now |
| data.sync = false; |
| |
| // if we are done then notify callback and exit |
| if (isDone(exchange)) { |
| callback.done(sync); |
| return; |
| } |
| |
| // error occurred so loop back around which we do by invoking the processAsyncErrorHandler |
| // method which takes care of this in a asynchronous manner |
| processAsyncErrorHandler(exchange, callback, data); |
| } |
| }); |
| |
| if (!sync) { |
| // the remainder of the Exchange is being processed asynchronously so we should return |
| return false; |
| } |
| // we continue to route synchronously |
| |
| // if we are done then notify callback and exit |
| boolean done = isDone(exchange); |
| if (done) { |
| callback.done(true); |
| return true; |
| } |
| |
| // error occurred so loop back around..... |
| } |
| } |
| |
| /** |
| * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback. |
| * <p/> |
| * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use |
| * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b> |
| * in terms of logic. |
| */ |
| protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { |
| // can we still run |
| if (!isRunAllowed()) { |
| if (exchange.getException() == null) { |
| exchange.setException(new RejectedExecutionException()); |
| } |
| callback.done(data.sync); |
| return; |
| } |
| |
| // did previous processing cause an exception? |
| boolean handle = shouldHandleException(exchange); |
| if (handle) { |
| handleException(exchange, data); |
| } |
| |
| // compute if we are exhausted or not |
| boolean exhausted = isExhausted(exchange, data); |
| if (exhausted) { |
| Processor target = null; |
| boolean deliver = true; |
| |
| // the unit of work may have an optional callback associated we need to leverage |
| SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); |
| if (uowCallback != null) { |
| // signal to the callback we are exhausted |
| uowCallback.onExhausted(exchange); |
| // do not deliver to the failure processor as its been handled by the callback instead |
| deliver = false; |
| } |
| |
| if (deliver) { |
| // should deliver to failure processor (either from onException or the dead letter channel) |
| target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; |
| } |
| // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair |
| // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) |
| deliverToFailureProcessor(target, exchange, data, callback); |
| // we are breaking out |
| return; |
| } |
| |
| if (data.redeliveryCounter > 0) { |
| // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to |
| // have it being executed in the future, or immediately |
| // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously |
| // to ensure the callback will continue routing from where we left |
| AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); |
| |
| // calculate the redelivery delay |
| data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter); |
| if (data.redeliveryDelay > 0) { |
| // schedule the redelivery task |
| if (log.isTraceEnabled()) { |
| log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); |
| } |
| executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); |
| } else { |
| // execute the task immediately |
| executorService.submit(task); |
| } |
| } |
| } |
| |
| /** |
| * Performs a defensive copy of the exchange if needed |
| * |
| * @param exchange the exchange |
| * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled). |
| */ |
| protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) { |
| // only do a defensive copy if redelivery is enabled |
| if (redeliveryEnabled) { |
| return ExchangeHelper.createCopy(exchange, true); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Strategy whether the exchange has an exception that we should try to handle. |
| * <p/> |
| * Standard implementations should just look for an exception. |
| */ |
| protected boolean shouldHandleException(Exchange exchange) { |
| return exchange.getException() != null; |
| } |
| |
| /** |
| * Strategy to determine if the exchange is done so we can continue |
| */ |
| protected boolean isDone(Exchange exchange) { |
| boolean answer = isCancelledOrInterrupted(exchange); |
| |
| // only done if the exchange hasn't failed |
| // and it has not been handled by the failure processor |
| // or we are exhausted |
| if (!answer) { |
| answer = exchange.getException() == null |
| || ExchangeHelper.isFailureHandled(exchange) |
| || ExchangeHelper.isRedeliveryExhausted(exchange); |
| } |
| |
| log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); |
| return answer; |
| } |
| |
| /** |
| * Strategy to determine if the exchange was cancelled or interrupted |
| */ |
| protected boolean isCancelledOrInterrupted(Exchange exchange) { |
| boolean answer = false; |
| |
| if (ExchangeHelper.isInterrupted(exchange)) { |
| // mark the exchange to stop continue routing when interrupted |
| // as we do not want to continue routing (for example a task has been cancelled) |
| exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); |
| answer = true; |
| } |
| |
| log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer); |
| return answer; |
| } |
| |
| /** |
| * Returns the output processor |
| */ |
| public Processor getOutput() { |
| return output; |
| } |
| |
| /** |
| * Returns the dead letter that message exchanges will be sent to if the |
| * redelivery attempts fail |
| */ |
| public Processor getDeadLetter() { |
| return deadLetter; |
| } |
| |
| public String getDeadLetterUri() { |
| return deadLetterUri; |
| } |
| |
| public boolean isUseOriginalMessagePolicy() { |
| return useOriginalMessagePolicy; |
| } |
| |
| public RedeliveryPolicy getRedeliveryPolicy() { |
| return redeliveryPolicy; |
| } |
| |
| public CamelLogger getLogger() { |
| return logger; |
| } |
| |
| protected Predicate getDefaultHandledPredicate() { |
| // Default is not not handle errors |
| return null; |
| } |
| |
| protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) { |
| Exception caught = exchange.getException(); |
| |
| // we continue so clear any exceptions |
| exchange.setException(null); |
| // clear rollback flags |
| exchange.setProperty(Exchange.ROLLBACK_ONLY, null); |
| // reset cached streams so they can be read again |
| MessageHelper.resetStreamCache(exchange.getIn()); |
| |
| // its continued then remove traces of redelivery attempted and caught exception |
| exchange.getIn().removeHeader(Exchange.REDELIVERED); |
| exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); |
| exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); |
| exchange.removeProperty(Exchange.FAILURE_HANDLED); |
| // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception |
| |
| // create log message |
| String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId(); |
| msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; |
| msg = msg + ". Handled and continue routing."; |
| |
| // log that we failed but want to continue |
| logFailedDelivery(false, false, true, exchange, msg, data, null); |
| } |
| |
| protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) { |
| // there must be a defensive copy of the exchange |
| ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this); |
| |
| // okay we will give it another go so clear the exception so we can try again |
| exchange.setException(null); |
| |
| // clear rollback flags |
| exchange.setProperty(Exchange.ROLLBACK_ONLY, null); |
| |
| // TODO: We may want to store these as state on RedelieryData so we keep them in case end user messes with Exchange |
| // and then put these on the exchange when doing a redelivery / fault processor |
| |
| // preserve these headers |
| Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); |
| Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class); |
| Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class); |
| |
| // we are redelivering so copy from original back to exchange |
| exchange.getIn().copyFrom(data.original.getIn()); |
| exchange.setOut(null); |
| // reset cached streams so they can be read again |
| MessageHelper.resetStreamCache(exchange.getIn()); |
| |
| // put back headers |
| if (redeliveryCounter != null) { |
| exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); |
| } |
| if (redeliveryMaxCounter != null) { |
| exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); |
| } |
| if (redelivered != null) { |
| exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered); |
| } |
| } |
| |
| protected void handleException(Exchange exchange, RedeliveryData data) { |
| Exception e = exchange.getException(); |
| |
| // store the original caused exception in a property, so we can restore it later |
| exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); |
| |
| // find the error handler to use (if any) |
| OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); |
| if (exceptionPolicy != null) { |
| data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); |
| data.handledPredicate = exceptionPolicy.getHandledPolicy(); |
| data.continuedPredicate = exceptionPolicy.getContinuedPolicy(); |
| data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy(); |
| data.useOriginalInMessage = exceptionPolicy.isUseOriginalMessage(); |
| data.asyncDelayedRedelivery = exceptionPolicy.isAsyncDelayedRedelivery(exchange.getContext()); |
| |
| // route specific failure handler? |
| Processor processor = exceptionPolicy.getErrorHandler(); |
| if (processor != null) { |
| data.failureProcessor = processor; |
| } |
| // route specific on redelivery? |
| processor = exceptionPolicy.getOnRedelivery(); |
| if (processor != null) { |
| data.onRedeliveryProcessor = processor; |
| } |
| } |
| |
| // only log if not failure handled or not an exhausted unit of work |
| if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) { |
| String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId() |
| + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; |
| logFailedDelivery(true, false, false, exchange, msg, data, e); |
| } |
| |
| data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data); |
| } |
| |
| /** |
| * Gives an optional configure redelivery processor a chance to process before the Exchange |
| * will be redelivered. This can be used to alter the Exchange. |
| */ |
| protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) { |
| if (data.onRedeliveryProcessor == null) { |
| return; |
| } |
| |
| if (log.isTraceEnabled()) { |
| log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered", |
| data.onRedeliveryProcessor, exchange); |
| } |
| |
| // run this synchronously as its just a Processor |
| try { |
| data.onRedeliveryProcessor.process(exchange); |
| } catch (Throwable e) { |
| exchange.setException(e); |
| } |
| log.trace("Redelivery processor done"); |
| } |
| |
| /** |
| * All redelivery attempts failed so move the exchange to the dead letter queue |
| */ |
| protected boolean deliverToFailureProcessor(final Processor processor, final Exchange exchange, |
| final RedeliveryData data, final AsyncCallback callback) { |
| boolean sync = true; |
| |
| Exception caught = exchange.getException(); |
| |
| // we did not success with the redelivery so now we let the failure processor handle it |
| // clear exception as we let the failure processor handle it |
| exchange.setException(null); |
| |
| boolean handled = false; |
| // regard both handled or continued as being handled |
| if (shouldHandled(exchange, data) || shouldContinue(exchange, data)) { |
| // its handled then remove traces of redelivery attempted |
| exchange.getIn().removeHeader(Exchange.REDELIVERED); |
| exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); |
| exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); |
| handled = true; |
| } else { |
| // must decrement the redelivery counter as we didn't process the redelivery but is |
| // handling by the failure handler. So we must -1 to not let the counter be out-of-sync |
| decrementRedeliveryCounter(exchange); |
| } |
| |
| // is the a failure processor to process the Exchange |
| if (processor != null) { |
| |
| // prepare original IN body if it should be moved instead of current body |
| if (data.useOriginalInMessage) { |
| log.trace("Using the original IN message instead of current"); |
| Message original = exchange.getUnitOfWork().getOriginalInMessage(); |
| exchange.setIn(original); |
| if (exchange.hasOut()) { |
| log.trace("Removing the out message to avoid some uncertain behavior"); |
| exchange.setOut(null); |
| } |
| } |
| |
| // reset cached streams so they can be read again |
| MessageHelper.resetStreamCache(exchange.getIn()); |
| |
| log.trace("Failure processor {} is processing Exchange: {}", processor, exchange); |
| |
| // store the last to endpoint as the failure endpoint |
| exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); |
| |
| // the failure processor could also be asynchronous |
| AsyncProcessor afp = AsyncProcessorTypeConverter.convert(processor); |
| sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() { |
| public void done(boolean sync) { |
| log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); |
| try { |
| prepareExchangeAfterFailure(exchange, data); |
| // fire event as we had a failure processor to handle it, which there is a event for |
| boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null; |
| EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel); |
| } finally { |
| // if the fault was handled asynchronously, this should be reflected in the callback as well |
| data.sync &= sync; |
| callback.done(data.sync); |
| } |
| } |
| }); |
| } else { |
| try { |
| // no processor but we need to prepare after failure as well |
| prepareExchangeAfterFailure(exchange, data); |
| } finally { |
| // callback we are done |
| callback.done(data.sync); |
| } |
| } |
| |
| // create log message |
| String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId(); |
| msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; |
| if (processor != null) { |
| msg = msg + ". Processed by failure processor: " + processor; |
| } |
| |
| // log that we failed delivery as we are exhausted |
| logFailedDelivery(false, handled, false, exchange, msg, data, null); |
| |
| return sync; |
| } |
| |
| protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data) { |
| // we could not process the exchange so we let the failure processor handled it |
| ExchangeHelper.setFailureHandled(exchange); |
| |
| // honor if already set a handling |
| boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null; |
| if (alreadySet) { |
| boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class); |
| log.trace("This exchange has already been marked for handling: {}", handled); |
| if (handled) { |
| exchange.setException(null); |
| } else { |
| // exception not handled, put exception back in the exchange |
| exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); |
| // and put failure endpoint back as well |
| exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); |
| } |
| return; |
| } |
| |
| if (shouldHandled(exchange, data)) { |
| log.trace("This exchange is handled so its marked as not failed: {}", exchange); |
| exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); |
| } else if (shouldContinue(exchange, data)) { |
| log.trace("This exchange is continued: {}", exchange); |
| // okay we want to continue then prepare the exchange for that as well |
| prepareExchangeForContinue(exchange, data); |
| } else { |
| log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); |
| // exception not handled, put exception back in the exchange |
| exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); |
| exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); |
| // and put failure endpoint back as well |
| exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); |
| } |
| } |
| |
| private void logFailedDelivery(boolean shouldRedeliver, boolean handled, boolean continued, Exchange exchange, String message, RedeliveryData data, Throwable e) { |
| if (logger == null) { |
| return; |
| } |
| |
| if (!exchange.isRollbackOnly()) { |
| // if we should not rollback, then check whether logging is enabled |
| if (handled && !data.currentRedeliveryPolicy.isLogHandled()) { |
| // do not log handled |
| return; |
| } |
| |
| if (continued && !data.currentRedeliveryPolicy.isLogContinued()) { |
| // do not log handled |
| return; |
| } |
| |
| if (shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) { |
| // do not log retry attempts |
| return; |
| } |
| |
| if (!shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) { |
| // do not log exhausted |
| return; |
| } |
| } |
| |
| LoggingLevel newLogLevel; |
| boolean logStackTrace; |
| if (exchange.isRollbackOnly()) { |
| newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); |
| logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); |
| } else if (shouldRedeliver) { |
| newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); |
| logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace(); |
| } else { |
| newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); |
| logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); |
| } |
| if (e == null) { |
| e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); |
| } |
| |
| if (exchange.isRollbackOnly()) { |
| String msg = "Rollback exchangeId: " + exchange.getExchangeId(); |
| Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); |
| if (cause != null) { |
| msg = msg + " due: " + cause.getMessage(); |
| } |
| if (newLogLevel == LoggingLevel.ERROR) { |
| // log intended rollback on maximum WARN level (no ERROR) |
| logger.log(msg, LoggingLevel.WARN); |
| } else { |
| // otherwise use the desired logging level |
| logger.log(msg, newLogLevel); |
| } |
| } else if (e != null && logStackTrace) { |
| logger.log(message, e, newLogLevel); |
| } else { |
| logger.log(message, newLogLevel); |
| } |
| } |
| |
| /** |
| * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback). |
| * <p/> |
| * If the exchange is exhausted, then we will not continue processing, but let the |
| * failure processor deal with the exchange. |
| * |
| * @param exchange the current exchange |
| * @param data the redelivery data |
| * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust. |
| */ |
| private boolean isExhausted(Exchange exchange, RedeliveryData data) { |
| // if marked as rollback only then do not continue/redeliver |
| boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class); |
| if (rollbackOnly) { |
| log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); |
| return true; |
| } |
| // its the first original call so continue |
| if (data.redeliveryCounter == 0) { |
| return false; |
| } |
| // its a potential redelivery so determine if we should redeliver or not |
| boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate); |
| return !redeliver; |
| } |
| |
| /** |
| * Determines whether or not to continue if we are exhausted. |
| * |
| * @param exchange the current exchange |
| * @param data the redelivery data |
| * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust. |
| */ |
| private boolean shouldContinue(Exchange exchange, RedeliveryData data) { |
| if (data.continuedPredicate != null) { |
| return data.continuedPredicate.matches(exchange); |
| } |
| // do not continue by default |
| return false; |
| } |
| |
| /** |
| * Determines whether or not to handle if we are exhausted. |
| * |
| * @param exchange the current exchange |
| * @param data the redelivery data |
| * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust. |
| */ |
| private boolean shouldHandled(Exchange exchange, RedeliveryData data) { |
| if (data.handledPredicate != null) { |
| return data.handledPredicate.matches(exchange); |
| } |
| // do not handle by default |
| return false; |
| } |
| |
| /** |
| * Increments the redelivery counter and adds the redelivered flag if the |
| * message has been redelivered |
| */ |
| private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) { |
| Message in = exchange.getIn(); |
| Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); |
| int next = 1; |
| if (counter != null) { |
| next = counter + 1; |
| } |
| in.setHeader(Exchange.REDELIVERY_COUNTER, next); |
| in.setHeader(Exchange.REDELIVERED, Boolean.TRUE); |
| // if maximum redeliveries is used, then provide that information as well |
| if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) { |
| in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries()); |
| } |
| return next; |
| } |
| |
| /** |
| * Prepares the redelivery counter and boolean flag for the failure handle processor |
| */ |
| private void decrementRedeliveryCounter(Exchange exchange) { |
| Message in = exchange.getIn(); |
| Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); |
| if (counter != null) { |
| int prev = counter - 1; |
| in.setHeader(Exchange.REDELIVERY_COUNTER, prev); |
| // set boolean flag according to counter |
| in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); |
| } else { |
| // not redelivered |
| in.setHeader(Exchange.REDELIVERY_COUNTER, 0); |
| in.setHeader(Exchange.REDELIVERED, Boolean.FALSE); |
| } |
| } |
| |
| /** |
| * Determines if redelivery is enabled by checking if any of the redelivery policy |
| * settings may allow redeliveries. |
| * |
| * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise |
| * @throws Exception can be thrown |
| */ |
| private boolean determineIfRedeliveryIsEnabled() throws Exception { |
| // determine if redeliver is enabled either on error handler |
| if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) { |
| // must check for != 0 as (-1 means redeliver forever) |
| return true; |
| } |
| if (retryWhilePolicy != null) { |
| return true; |
| } |
| |
| // or on the exception policies |
| if (!exceptionPolicies.isEmpty()) { |
| // walk them to see if any of them have a maximum redeliveries > 0 or retry until set |
| for (OnExceptionDefinition def : exceptionPolicies.values()) { |
| |
| if (def.getRedeliveryPolicy() != null) { |
| String ref = def.getRedeliveryPolicyRef(); |
| if (ref != null) { |
| // lookup in registry if ref provided |
| RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class); |
| if (policy.getMaximumRedeliveries() != 0) { |
| // must check for != 0 as (-1 means redeliver forever) |
| return true; |
| } |
| } else { |
| Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries()); |
| if (max != null && max != 0) { |
| // must check for != 0 as (-1 means redeliver forever) |
| return true; |
| } |
| } |
| } |
| |
| if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) { |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| @Override |
| protected void doStart() throws Exception { |
| ServiceHelper.startServices(output, outputAsync, deadLetter); |
| // use a shared scheduler |
| if (executorService == null || executorService.isShutdown()) { |
| // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping |
| if (executorServiceRef != null) { |
| executorService = camelContext.getExecutorServiceStrategy().lookupScheduled(this, "ErrorHandlerRedeliveryTask", executorServiceRef); |
| if (executorService == null) { |
| throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry."); |
| } |
| } else { |
| executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "ErrorHandlerRedeliveryTask"); |
| } |
| } |
| |
| // determine if redeliver is enabled or not |
| redeliveryEnabled = determineIfRedeliveryIsEnabled(); |
| if (log.isDebugEnabled()) { |
| log.debug("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this); |
| } |
| } |
| |
| @Override |
| protected void doStop() throws Exception { |
| // noop, do not stop any services which we only do when shutting down |
| // as the error handler can be context scoped, and should not stop in case |
| // a route stops |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| ServiceHelper.stopServices(deadLetter, output, outputAsync); |
| } |
| } |