| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.processor; |
| |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Predicate; |
| import org.apache.camel.Processor; |
| import org.apache.camel.model.OnExceptionDefinition; |
| import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; |
| import org.apache.camel.util.ExchangeHelper; |
| import org.apache.camel.util.MessageHelper; |
| import org.apache.camel.util.ServiceHelper; |
| |
| /** |
| * Default error handler |
| * |
| * @version $Revision$ |
| */ |
| public class DefaultErrorHandler extends ErrorHandlerSupport implements Processor { |
| private Processor output; |
| |
| public DefaultErrorHandler(Processor output, ExceptionPolicyStrategy exceptionPolicyStrategy) { |
| this.output = output; |
| setExceptionPolicy(exceptionPolicyStrategy); |
| } |
| |
| @Override |
| public String toString() { |
| return "DefaultErrorHandler[" + output + "]"; |
| } |
| |
| public boolean supportTransacted() { |
| return false; |
| } |
| |
| public void process(Exchange exchange) throws Exception { |
| try { |
| output.process(exchange); |
| } catch (Exception e) { |
| exchange.setException(e); |
| } |
| |
| // do not handle transacted exchanges as this error handler does not support it |
| boolean handle = true; |
| if (exchange.isTransacted() && !supportTransacted()) { |
| handle = false; |
| if (log.isDebugEnabled()) { |
| log.debug("This error handler does not support transacted exchanges." |
| + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId()); |
| } |
| } |
| |
| if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) { |
| handleException(exchange); |
| } |
| } |
| |
| private void handleException(Exchange exchange) throws Exception { |
| Exception e = exchange.getException(); |
| |
| // store the original caused exception in a property, so we can restore it later |
| exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); |
| |
| // find the error handler to use (if any) |
| OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); |
| if (exceptionPolicy != null) { |
| Predicate handledPredicate = exceptionPolicy.getHandledPolicy(); |
| |
| Processor processor = exceptionPolicy.getErrorHandler(); |
| prepareExchangeBeforeOnException(exchange); |
| if (processor != null) { |
| deliverToFaultProcessor(exchange, processor); |
| } |
| prepareExchangeAfterOnException(exchange, handledPredicate); |
| } |
| } |
| |
| private void prepareExchangeBeforeOnException(Exchange exchange) { |
| // okay lower the exception as we are handling it by onException |
| if (exchange.getException() != null) { |
| exchange.setException(null); |
| } |
| |
| // clear rollback flags |
| exchange.setProperty(Exchange.ROLLBACK_ONLY, null); |
| |
| // reset cached streams so they can be read again |
| MessageHelper.resetStreamCache(exchange.getIn()); |
| } |
| |
| private void deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) throws Exception { |
| failureProcessor.process(exchange); |
| } |
| |
| private void prepareExchangeAfterOnException(Exchange exchange, Predicate handledPredicate) { |
| if (handledPredicate == null || !handledPredicate.matches(exchange)) { |
| if (log.isDebugEnabled()) { |
| log.debug("This exchange is not handled so its marked as failed: " + exchange); |
| } |
| // exception not handled, put exception back in the exchange |
| exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("This exchange is handled so its marked as not failed: " + exchange); |
| } |
| exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE); |
| } |
| } |
| |
| /** |
| * Returns the output processor |
| */ |
| public Processor getOutput() { |
| return output; |
| } |
| |
| protected void doStart() throws Exception { |
| ServiceHelper.startServices(output); |
| } |
| |
| protected void doStop() throws Exception { |
| ServiceHelper.stopServices(output); |
| } |
| |
| } |