blob: d3a46d8c61d33e9c7fa22c040ec620e14e699618 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.store.spring;
import javax.cache.integration.CacheWriterException;
import javax.sql.DataSource;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionIsolation;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
/**
* Cache store session listener based on Spring transaction management.
* <p>
* This listener starts a new DB transaction for each session and commits
* or rolls it back when session ends. If there is no ongoing
* cache transaction, this listener is no-op.
* <p>
* Store implementation can use any Spring APIs like {@link JdbcTemplate}
* and others. The listener will guarantee that if there is an
* ongoing cache transaction, all store operations within this
* transaction will be automatically enlisted in the same database
* transaction.
* <p>
* {@link org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} requires that either
* {@link #setTransactionManager(PlatformTransactionManager) transaction manager}
* or {@link #setDataSource(DataSource) data source} is configured. If non of them is
* provided, exception is thrown. Is both are provided, data source will be
* ignored.
* <p>
* If there is a transaction, a {@link TransactionStatus} object will be saved
* as a store session {@link CacheStoreSession#attachment() attachment}. It
* can be used to acquire current DB transaction status.
*/
public class CacheSpringStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
/** Transaction manager. */
private PlatformTransactionManager txMgr;
/** Data source. */
private DataSource dataSrc;
/** Logger. */
@LoggerResource
private IgniteLogger log;
/**
* Sets transaction manager.
* <p>
* Either transaction manager or data source is required.
* If none is provided, exception will be thrown on startup.
*
* @param txMgr Transaction manager.
*/
public void setTransactionManager(PlatformTransactionManager txMgr) {
this.txMgr = txMgr;
}
/**
* Gets transaction manager.
*
* @return Transaction manager.
*/
public PlatformTransactionManager getTransactionManager() {
return txMgr;
}
/**
* Sets data source.
* <p>
* Either transaction manager or data source is required.
* If none is provided, exception will be thrown on startup.
*
* @param dataSrc Data source.
*/
public void setDataSource(DataSource dataSrc) {
this.dataSrc = dataSrc;
}
/**
* Gets data source.
*
* @return Data source.
*/
public DataSource getDataSource() {
return dataSrc;
}
/** {@inheritDoc} */
@Override public void start() throws IgniteException {
if (txMgr == null && dataSrc == null)
throw new IgniteException("Either transaction manager or data source is required by " +
getClass().getSimpleName() + '.');
if (dataSrc != null) {
if (txMgr == null)
txMgr = new DataSourceTransactionManager(dataSrc);
else
U.warn(log, "Data source configured in " + getClass().getSimpleName() +
" will be ignored (transaction manager is already set).");
}
assert txMgr != null;
}
/** {@inheritDoc} */
@Override public void stop() throws IgniteException {
// No-op.
}
/** {@inheritDoc} */
@Override public void onSessionStart(CacheStoreSession ses) {
if (ses.isWithinTransaction() && ses.attachment() == null) {
try {
TransactionDefinition def = definition(ses.transaction(), ses.cacheName());
ses.attach(txMgr.getTransaction(def));
}
catch (TransactionException e) {
throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
}
}
}
/** {@inheritDoc} */
@Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
if (ses.isWithinTransaction()) {
TransactionStatus tx = ses.attach(null);
if (tx != null) {
try {
if (commit)
txMgr.commit(tx);
else
txMgr.rollback(tx);
}
catch (TransactionException e) {
throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e);
}
}
}
}
/**
* Gets DB transaction isolation level based on ongoing cache transaction isolation.
*
* @return DB transaction isolation.
*/
private TransactionDefinition definition(Transaction tx, String cacheName) {
assert tx != null;
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']');
def.setIsolationLevel(isolationLevel(tx.isolation()));
long timeoutSec = (tx.timeout() + 500) / 1000;
if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE)
def.setTimeout((int)timeoutSec);
return def;
}
/**
* Gets DB transaction isolation level based on ongoing cache transaction isolation.
*
* @param isolation Cache transaction isolation.
* @return DB transaction isolation.
*/
private int isolationLevel(TransactionIsolation isolation) {
switch (isolation) {
case READ_COMMITTED:
return TransactionDefinition.ISOLATION_READ_COMMITTED;
case REPEATABLE_READ:
return TransactionDefinition.ISOLATION_REPEATABLE_READ;
case SERIALIZABLE:
return TransactionDefinition.ISOLATION_SERIALIZABLE;
default:
throw new IllegalStateException(); // Will never happen.
}
}
}