| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cache.store.spring; |
| |
| import javax.cache.integration.CacheWriterException; |
| import javax.sql.DataSource; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cache.store.CacheStoreSession; |
| import org.apache.ignite.cache.store.CacheStoreSessionListener; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lifecycle.LifecycleAware; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.springframework.jdbc.core.JdbcTemplate; |
| import org.springframework.jdbc.datasource.DataSourceTransactionManager; |
| import org.springframework.transaction.PlatformTransactionManager; |
| import org.springframework.transaction.TransactionDefinition; |
| import org.springframework.transaction.TransactionException; |
| import org.springframework.transaction.TransactionStatus; |
| import org.springframework.transaction.support.DefaultTransactionDefinition; |
| |
| /** |
| * Cache store session listener based on Spring transaction management. |
| * <p> |
| * This listener starts a new DB transaction for each session and commits |
| * or rolls it back when session ends. If there is no ongoing |
| * cache transaction, this listener is no-op. |
| * <p> |
| * Store implementation can use any Spring APIs like {@link JdbcTemplate} |
| * and others. The listener will guarantee that if there is an |
| * ongoing cache transaction, all store operations within this |
| * transaction will be automatically enlisted in the same database |
| * transaction. |
| * <p> |
| * {@link org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} requires that either |
| * {@link #setTransactionManager(PlatformTransactionManager) transaction manager} |
| * or {@link #setDataSource(DataSource) data source} is configured. If non of them is |
| * provided, exception is thrown. Is both are provided, data source will be |
| * ignored. |
| * <p> |
| * If there is a transaction, a {@link TransactionStatus} object will be saved |
| * as a store session {@link CacheStoreSession#attachment() attachment}. It |
| * can be used to acquire current DB transaction status. |
| */ |
| public class CacheSpringStoreSessionListener implements CacheStoreSessionListener, LifecycleAware { |
| /** Transaction manager. */ |
| private PlatformTransactionManager txMgr; |
| |
| /** Data source. */ |
| private DataSource dataSrc; |
| |
| /** Logger. */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** |
| * Sets transaction manager. |
| * <p> |
| * Either transaction manager or data source is required. |
| * If none is provided, exception will be thrown on startup. |
| * |
| * @param txMgr Transaction manager. |
| */ |
| public void setTransactionManager(PlatformTransactionManager txMgr) { |
| this.txMgr = txMgr; |
| } |
| |
| /** |
| * Gets transaction manager. |
| * |
| * @return Transaction manager. |
| */ |
| public PlatformTransactionManager getTransactionManager() { |
| return txMgr; |
| } |
| |
| /** |
| * Sets data source. |
| * <p> |
| * Either transaction manager or data source is required. |
| * If none is provided, exception will be thrown on startup. |
| * |
| * @param dataSrc Data source. |
| */ |
| public void setDataSource(DataSource dataSrc) { |
| this.dataSrc = dataSrc; |
| } |
| |
| /** |
| * Gets data source. |
| * |
| * @return Data source. |
| */ |
| public DataSource getDataSource() { |
| return dataSrc; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteException { |
| if (txMgr == null && dataSrc == null) |
| throw new IgniteException("Either transaction manager or data source is required by " + |
| getClass().getSimpleName() + '.'); |
| |
| if (dataSrc != null) { |
| if (txMgr == null) |
| txMgr = new DataSourceTransactionManager(dataSrc); |
| else |
| U.warn(log, "Data source configured in " + getClass().getSimpleName() + |
| " will be ignored (transaction manager is already set)."); |
| } |
| |
| assert txMgr != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() throws IgniteException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionStart(CacheStoreSession ses) { |
| if (ses.isWithinTransaction() && ses.attachment() == null) { |
| try { |
| TransactionDefinition def = definition(ses.transaction(), ses.cacheName()); |
| |
| ses.attach(txMgr.getTransaction(def)); |
| } |
| catch (TransactionException e) { |
| throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { |
| if (ses.isWithinTransaction()) { |
| TransactionStatus tx = ses.attach(null); |
| |
| if (tx != null) { |
| try { |
| if (commit) |
| txMgr.commit(tx); |
| else |
| txMgr.rollback(tx); |
| } |
| catch (TransactionException e) { |
| throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Gets DB transaction isolation level based on ongoing cache transaction isolation. |
| * |
| * @return DB transaction isolation. |
| */ |
| private TransactionDefinition definition(Transaction tx, String cacheName) { |
| assert tx != null; |
| |
| DefaultTransactionDefinition def = new DefaultTransactionDefinition(); |
| |
| def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>") + ", id=" + tx.xid() + ']'); |
| def.setIsolationLevel(isolationLevel(tx.isolation())); |
| |
| long timeoutSec = (tx.timeout() + 500) / 1000; |
| |
| if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE) |
| def.setTimeout((int)timeoutSec); |
| |
| return def; |
| } |
| |
| /** |
| * Gets DB transaction isolation level based on ongoing cache transaction isolation. |
| * |
| * @param isolation Cache transaction isolation. |
| * @return DB transaction isolation. |
| */ |
| private int isolationLevel(TransactionIsolation isolation) { |
| switch (isolation) { |
| case READ_COMMITTED: |
| return TransactionDefinition.ISOLATION_READ_COMMITTED; |
| |
| case REPEATABLE_READ: |
| return TransactionDefinition.ISOLATION_REPEATABLE_READ; |
| |
| case SERIALIZABLE: |
| return TransactionDefinition.ISOLATION_SERIALIZABLE; |
| |
| default: |
| throw new IllegalStateException(); // Will never happen. |
| } |
| } |
| } |