blob: f0234329a65a4e97c74fe5310dced09a20e51093 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor.aggregate.jdbc;
import javax.sql.DataSource;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.transaction.PlatformTransactionManager;
/**
* PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL Violation Exceptions
* using special {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
*/
public class PostgresAggregationRepository extends JdbcAggregationRepository {
/**
* Creates an aggregation repository
*/
public PostgresAggregationRepository() {
}
/**
* Creates an aggregation repository with the three mandatory parameters
*/
public PostgresAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName, DataSource dataSource) {
super(transactionManager, repositoryName, dataSource);
}
/**
* Inserts a new record into the given repository table
*
* @param camelContext the current CamelContext
* @param correlationId the correlation key
* @param exchange the aggregated exchange
* @param repositoryName The name of the table
*/
protected void insert(final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName) throws Exception {
// The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased
int totalParameterIndex = 2;
StringBuilder queryBuilder = new StringBuilder()
.append("INSERT INTO ").append(repositoryName)
.append('(')
.append(EXCHANGE).append(", ")
.append(ID);
if (isStoreBodyAsText()) {
queryBuilder.append(", ").append(BODY);
totalParameterIndex++;
}
if (hasHeadersToStoreAsText()) {
for (String headerName : getHeadersToStoreAsText()) {
queryBuilder.append(", ").append(headerName);
totalParameterIndex++;
}
}
queryBuilder.append(") VALUES (");
for (int i = 0; i < totalParameterIndex - 1; i++) {
queryBuilder.append("?, ");
}
queryBuilder.append("?)");
queryBuilder.append(" ON CONFLICT DO NOTHING");
String sql = queryBuilder.toString();
int updateCount = insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true);
if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
throw new DataIntegrityViolationException("No row was inserted due to data violation");
}
}
}