blob: 4fdca6f1cfdbf19ef96c6042c130ffdb5db351d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor.aggregate.jdbc;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
import org.springframework.jdbc.support.lob.LobCreator;
import org.springframework.jdbc.support.lob.LobHandler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
/**
* JDBC based {@link org.apache.camel.spi.AggregationRepository}
* JdbcAggregationRepository will only preserve any Serializable compatible
* data types. If a data type is not such a type its dropped and a WARN is
* logged. And it only persists the Message body and the Message headers.
* The Exchange properties are not persisted.
*/
public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
private static final String ID = "id";
private static final String EXCHANGE = "exchange";
private static final String BODY = "body";
private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper();
private PlatformTransactionManager transactionManager;
private DataSource dataSource;
private TransactionTemplate transactionTemplate;
private TransactionTemplate transactionTemplateReadOnly;
private int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED;
private JdbcTemplate jdbcTemplate;
private LobHandler lobHandler = new DefaultLobHandler();
private String repositoryName;
private boolean returnOldExchange;
private JdbcCamelCodec codec = new JdbcCamelCodec();
private long recoveryInterval = 5000;
private boolean useRecovery = true;
private int maximumRedeliveries;
private String deadLetterUri;
private List<String> headersToStoreAsText;
private boolean storeBodyAsText;
private boolean allowSerializedHeaders;
/**
* Creates an aggregation repository
*/
public JdbcAggregationRepository() {
}
/**
* Creates an aggregation repository with the three mandatory parameters
*/
public JdbcAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName, DataSource dataSource) {
this.setRepositoryName(repositoryName);
this.setTransactionManager(transactionManager);
this.setDataSource(dataSource);
}
/**
* Sets the name of the repository
*/
public final void setRepositoryName(String repositoryName) {
this.repositoryName = repositoryName;
}
public final void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(propagationBehavior);
transactionTemplateReadOnly = new TransactionTemplate(transactionManager);
transactionTemplateReadOnly.setPropagationBehavior(propagationBehavior);
transactionTemplateReadOnly.setReadOnly(true);
}
/**
* Sets the DataSource to use for accessing the database
*/
public final void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Exchange add(final CamelContext camelContext, final String correlationId,
final Exchange oldExchange, final Exchange newExchange) throws OptimisticLockingException {
try {
return add(camelContext, correlationId, newExchange);
} catch (Exception e) {
if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
throw new OptimisticLockingException();
} else {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}
}
@Override
public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
return transactionTemplate.execute(new TransactionCallback<Exchange>() {
public Exchange doInTransaction(TransactionStatus status) {
Exchange result = null;
final String key = correlationId;
try {
LOG.debug("Adding exchange with key: [{}]", key);
boolean present = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM " + getRepositoryName() + " WHERE " + ID + " = ?", Integer.class, key) != 0;
// Recover existing exchange with that ID
if (isReturnOldExchange() && present) {
result = get(key, getRepositoryName(), camelContext);
}
if (present) {
update(camelContext, correlationId, exchange, getRepositoryName());
} else {
insert(camelContext, correlationId, exchange, getRepositoryName());
}
} catch (Exception e) {
throw new RuntimeException("Error adding to repository " + repositoryName + " with key " + key, e);
}
return result;
}
});
}
/**
* Updates the current exchange details in the given repository table
*
* @param camelContext the current CamelContext
* @param key the correlation key
* @param exchange the aggregated exchange
* @param repositoryName The name of the table
*/
protected void update(final CamelContext camelContext, final String key, final Exchange exchange, String repositoryName) throws Exception {
StringBuilder queryBuilder = new StringBuilder()
.append("UPDATE ").append(repositoryName)
.append(" SET ")
.append(EXCHANGE).append(" = ?");
if (storeBodyAsText) {
queryBuilder.append(", ").append(BODY).append(" = ?");
}
if (hasHeadersToStoreAsText()) {
for (String headerName : headersToStoreAsText) {
queryBuilder.append(", ").append(headerName).append(" = ?");
}
}
queryBuilder.append(" WHERE ").append(ID).append(" = ?");
String sql = queryBuilder.toString();
insertAndUpdateHelper(camelContext, key, exchange, sql, false);
}
/**
* Inserts a new record into the given repository table.
* note : the exchange properties are NOT persisted.
*
* @param camelContext the current CamelContext
* @param correlationId the correlation key
* @param exchange the aggregated exchange to insert. The headers will be persisted but not the properties.
* @param repositoryName The name of the table
*/
protected void insert(final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName) throws Exception {
// The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased
int totalParameterIndex = 2;
StringBuilder queryBuilder = new StringBuilder()
.append("INSERT INTO ").append(repositoryName)
.append('(')
.append(EXCHANGE).append(", ")
.append(ID);
if (storeBodyAsText) {
queryBuilder.append(", ").append(BODY);
totalParameterIndex++;
}
if (hasHeadersToStoreAsText()) {
for (String headerName : headersToStoreAsText) {
queryBuilder.append(", ").append(headerName);
totalParameterIndex++;
}
}
queryBuilder.append(") VALUES (");
for (int i = 0; i < totalParameterIndex - 1; i++) {
queryBuilder.append("?, ");
}
queryBuilder.append("?)");
String sql = queryBuilder.toString();
insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true);
}
protected void insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception {
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
jdbcTemplate.execute(sql,
new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
@Override
protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
int totalParameterIndex = 0;
lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data);
if (idComesFirst) {
ps.setString(++totalParameterIndex, key);
}
if (storeBodyAsText) {
ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class));
}
if (hasHeadersToStoreAsText()) {
for (String headerName : headersToStoreAsText) {
String headerValue = exchange.getIn().getHeader(headerName, String.class);
ps.setString(++totalParameterIndex, headerValue);
}
}
if (!idComesFirst) {
ps.setString(++totalParameterIndex, key);
}
}
});
}
@Override
public Exchange get(final CamelContext camelContext, final String correlationId) {
final String key = correlationId;
Exchange result = get(key, getRepositoryName(), camelContext);
LOG.debug("Getting key [{}] -> {}", key, result);
return result;
}
private Exchange get(final String key, final String repositoryName, final CamelContext camelContext) {
return transactionTemplateReadOnly.execute(new TransactionCallback<Exchange>() {
public Exchange doInTransaction(TransactionStatus status) {
try {
final byte[] data = jdbcTemplate.queryForObject(
"SELECT " + EXCHANGE + " FROM " + repositoryName + " WHERE " + ID + " = ?",
new Object[]{key}, byte[].class);
return codec.unmarshallExchange(camelContext, data);
} catch (EmptyResultDataAccessException ex) {
return null;
} catch (IOException ex) {
// Rollback the transaction
throw new RuntimeException("Error getting key " + key + " from repository " + repositoryName, ex);
} catch (ClassNotFoundException ex) {
// Rollback the transaction
throw new RuntimeException(ex);
}
}
});
}
@Override
public void remove(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
final String key = correlationId;
final String confirmKey = exchange.getExchangeId();
try {
LOG.debug("Removing key [{}]", key);
jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ?", key);
insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted());
} catch (Exception e) {
throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e);
}
}
});
}
@Override
public void confirm(final CamelContext camelContext, final String exchangeId) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
LOG.debug("Confirming exchangeId [{}]", exchangeId);
final String confirmKey = exchangeId;
jdbcTemplate.update("DELETE FROM " + getRepositoryNameCompleted() + " WHERE " + ID + " = ?",
new Object[]{confirmKey});
}
});
}
@Override
public Set<String> getKeys() {
return getKeys(getRepositoryName());
}
@Override
public Set<String> scan(CamelContext camelContext) {
return getKeys(getRepositoryNameCompleted());
}
/**
* Returns the keys in the given repository
*
* @param repositoryName The name of the table
* @return Set of keys in the given repository name
*/
protected Set<String> getKeys(final String repositoryName) {
return transactionTemplateReadOnly.execute(new TransactionCallback<LinkedHashSet<String>>() {
public LinkedHashSet<String> doInTransaction(TransactionStatus status) {
List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName,
new RowMapper<String>() {
public String mapRow(ResultSet rs, int rowNum) throws SQLException {
String id = rs.getString(ID);
LOG.trace("getKey [{}]", id);
return id;
}
});
return new LinkedHashSet<>(keys);
}
});
}
@Override
public Exchange recover(CamelContext camelContext, String exchangeId) {
final String key = exchangeId;
Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
LOG.debug("Recovering exchangeId [{}] -> {}", key, answer);
return answer;
}
/**
* If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover
* and resubmit. By default this interval is 5000 millis.
*/
public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
this.recoveryInterval = timeUnit.toMillis(interval);
}
public void setRecoveryInterval(long interval) {
this.recoveryInterval = interval;
}
public long getRecoveryIntervalInMillis() {
return recoveryInterval;
}
public boolean isUseRecovery() {
return useRecovery;
}
/**
* Whether or not recovery is enabled. This option is by default true. When enabled the Camel
* Aggregator automatic recover failed aggregated exchange and have them resubmitted.
*/
public void setUseRecovery(boolean useRecovery) {
this.useRecovery = useRecovery;
}
public int getMaximumRedeliveries() {
return maximumRedeliveries;
}
public void setMaximumRedeliveries(int maximumRedeliveries) {
this.maximumRedeliveries = maximumRedeliveries;
}
public String getDeadLetterUri() {
return deadLetterUri;
}
/**
* An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be
* moved. If this option is used then the maximumRedeliveries option must also be provided.
* Important note : if the deadletter route throws an exception, it will be send again to DLQ until it succeed !
*/
public void setDeadLetterUri(String deadLetterUri) {
this.deadLetterUri = deadLetterUri;
}
public boolean isReturnOldExchange() {
return returnOldExchange;
}
/**
* Whether the get operation should return the old existing Exchange if any existed.
* By default this option is false to optimize as we do not need the old exchange when aggregating.
*/
public void setReturnOldExchange(boolean returnOldExchange) {
this.returnOldExchange = returnOldExchange;
}
public void setJdbcCamelCodec(JdbcCamelCodec codec) {
this.codec = codec;
}
public boolean hasHeadersToStoreAsText() {
return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty();
}
/**
* Allows to store headers as String which is human readable. By default this option is disabled,
* storing the headers in binary format.
*
* @param headersToStoreAsText the list of headers to store as String
*/
public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
this.headersToStoreAsText = headersToStoreAsText;
}
/**
* Whether to store the message body as String which is human readable.
* By default this option is false storing the body in binary format.
*/
public void setStoreBodyAsText(boolean storeBodyAsText) {
this.storeBodyAsText = storeBodyAsText;
}
public boolean isAllowSerializedHeaders() {
return allowSerializedHeaders;
}
public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
this.allowSerializedHeaders = allowSerializedHeaders;
}
public int getPropagationBehavior() {
return propagationBehavior;
}
/**
* Sets propagation behavior to use with spring transaction template which are used for database access.
* The default is TransactionDefinition.PROPAGATION_REQUIRED.
*/
public void setPropagationBehavior(int propagationBehavior) {
this.propagationBehavior = propagationBehavior;
}
public LobHandler getLobHandler() {
return lobHandler;
}
/**
* Sets a custom LobHandler to use
*/
public void setLobHandler(LobHandler lobHandler) {
this.lobHandler = lobHandler;
}
public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() {
return jdbcOptimisticLockingExceptionMapper;
}
public void setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) {
this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper;
}
public String getRepositoryName() {
return repositoryName;
}
public String getRepositoryNameCompleted() {
return getRepositoryName() + "_completed";
}
@Override
protected void doStart() throws Exception {
ObjectHelper.notNull(repositoryName, "RepositoryName");
ObjectHelper.notNull(transactionManager, "TransactionManager");
ObjectHelper.notNull(dataSource, "DataSource");
// log number of existing exchanges
int current = getKeys().size();
int completed = scan(null).size();
if (current > 0) {
LOG.info("On startup there are " + current + " aggregate exchanges (not completed) in repository: " + getRepositoryName());
} else {
LOG.info("On startup there are no existing aggregate exchanges (not completed) in repository: {}", getRepositoryName());
}
if (completed > 0) {
LOG.warn("On startup there are " + completed + " completed exchanges to be recovered in repository: " + getRepositoryNameCompleted());
} else {
LOG.info("On startup there are no completed exchanges to be recovered in repository: {}", getRepositoryNameCompleted());
}
}
@Override
protected void doStop() throws Exception {
// noop
}
}