blob: 351062bb3ec71d9aea4ba1fb1b4852985014d25f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.bam.processor;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
/**
* A base {@link Processor} for working on <a
* href="http://camel.apache.org/bam.html">BAM</a> which a derived
* class would do the actual persistence such as the {@link JpaBamProcessor}
*
* @version
*/
public abstract class BamProcessorSupport<T> implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(BamProcessorSupport.class);
private Class<T> entityType;
private Expression correlationKeyExpression;
private TransactionTemplate transactionTemplate;
private int retryCount = 20;
private long retrySleep = 1000L;
@SuppressWarnings("unchecked")
protected BamProcessorSupport(TransactionTemplate transactionTemplate,
Expression correlationKeyExpression) {
this.transactionTemplate = transactionTemplate;
this.correlationKeyExpression = correlationKeyExpression;
Type type = getClass().getGenericSuperclass();
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType)type;
Type[] arguments = parameterizedType.getActualTypeArguments();
if (arguments.length > 0) {
Type argumentType = arguments[0];
if (argumentType instanceof Class) {
this.entityType = (Class<T>) argumentType;
}
}
}
if (entityType == null) {
throw new IllegalArgumentException("Could not infer the entity type!");
}
}
protected BamProcessorSupport(TransactionTemplate transactionTemplate,
Expression correlationKeyExpression, Class<T> entitytype) {
this.transactionTemplate = transactionTemplate;
this.entityType = entitytype;
this.correlationKeyExpression = correlationKeyExpression;
}
public void process(final Exchange exchange) {
for (int i = 1; i <= retryCount; i++) {
if (i > 1) {
LOG.info("Retrying attempt: " + i);
try {
Thread.sleep(retrySleep);
} catch (InterruptedException e) {
// ignore
}
}
try {
transactionTemplate.execute(new TransactionCallback<T>() {
public T doInTransaction(TransactionStatus status) {
try {
Object key = getCorrelationKey(exchange);
T entity = loadEntity(exchange, key);
LOG.debug("Correlation key: {} with entity: {}", key, entity);
processEntity(exchange, entity);
return entity;
} catch (Exception e) {
onError(status, e);
return null;
}
}
});
if (i > 1) {
LOG.debug("Attempt {} worked!", i);
}
return;
} catch (Exception e) {
LOG.warn("Failed to complete transaction. This exception is ignored.", e);
}
}
}
// Properties
// -----------------------------------------------------------------------
public Expression getCorrelationKeyExpression() {
return correlationKeyExpression;
}
public Class<T> getEntityType() {
return entityType;
}
// Implemenation methods
// -----------------------------------------------------------------------
protected abstract void processEntity(Exchange exchange, T entity) throws Exception;
protected abstract T loadEntity(Exchange exchange, Object key) throws Exception;
protected abstract Class<?> getKeyType();
protected Object getCorrelationKey(Exchange exchange) throws NoCorrelationKeyException {
Object value;
Class<?> keyType = getKeyType();
if (keyType != null) {
value = correlationKeyExpression.evaluate(exchange, keyType);
} else {
value = correlationKeyExpression.evaluate(exchange, Object.class);
}
if (value == null) {
throw new NoCorrelationKeyException(this, exchange);
}
return value;
}
protected void onError(TransactionStatus status, Exception e) throws RuntimeCamelException {
status.setRollbackOnly();
LOG.warn("Caught: " + e, e);
throw wrapRuntimeCamelException(e);
}
}