| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.integration; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteTransactions; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| |
| /** |
| * |
| */ |
| public class IgniteCacheTxStoreSessionTest extends IgniteCacheStoreSessionAbstractTest { |
| /** */ |
| @Before |
| public void beforeIgniteCacheTxStoreSessionTest() { |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE); |
| |
| return super.cacheConfiguration(igniteInstanceName); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected int gridCount() { |
| return 3; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheMode cacheMode() { |
| return PARTITIONED; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheAtomicityMode atomicityMode() { |
| return TRANSACTIONAL; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected NearCacheConfiguration nearConfiguration() { |
| return null; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testStoreSessionTx() throws Exception { |
| testTxPut(jcache(0), null, null); |
| |
| testTxPut(ignite(0).cache(CACHE_NAME1), null, null); |
| |
| testTxRemove(null, null); |
| |
| testTxPutRemove(null, null); |
| |
| for (TransactionConcurrency concurrency : F.asList(PESSIMISTIC)) { |
| for (TransactionIsolation isolation : F.asList(REPEATABLE_READ)) { |
| testTxPut(jcache(0), concurrency, isolation); |
| |
| testTxRemove(concurrency, isolation); |
| |
| testTxPutRemove(concurrency, isolation); |
| } |
| } |
| } |
| |
| /** |
| * @param concurrency Concurrency mode. |
| * @param isolation Isolation mode. |
| * @throws Exception If failed. |
| */ |
| private void testTxPutRemove(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { |
| log.info("Test tx put/remove [concurrency=" + concurrency + ", isolation=" + isolation + ']'); |
| |
| IgniteCache<Integer, Integer> cache = jcache(0); |
| |
| List<Integer> keys = testKeys(cache, 3); |
| |
| Integer key1 = keys.get(0); |
| Integer key2 = keys.get(1); |
| Integer key3 = keys.get(2); |
| |
| try (Transaction tx = startTx(concurrency, isolation)) { |
| log.info("Do tx put1."); |
| |
| cache.put(key1, key1); |
| |
| log.info("Do tx put2."); |
| |
| cache.put(key2, key2); |
| |
| log.info("Do tx remove."); |
| |
| cache.remove(key3); |
| |
| expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), DEFAULT_CACHE_NAME)); |
| expData.add(new ExpectedData(true, "delete", F.<Object, Object>asMap(0, "writeAll"), DEFAULT_CACHE_NAME)); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "writeAll", 1, "delete"), DEFAULT_CACHE_NAME)); |
| |
| log.info("Do tx commit."); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(0, expData.size()); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param concurrency Concurrency mode. |
| * @param isolation Isolation mode. |
| * @throws Exception If failed. |
| */ |
| private void testTxPut(IgniteCache<Object, Object> cache, |
| TransactionConcurrency concurrency, |
| TransactionIsolation isolation) throws Exception { |
| log.info("Test tx put [concurrency=" + concurrency + ", isolation=" + isolation + ']'); |
| |
| List<Integer> keys = testKeys(cache, 3); |
| |
| Integer key1 = keys.get(0); |
| |
| try (Transaction tx = startTx(concurrency, isolation)) { |
| log.info("Do tx get."); |
| expData.add(new ExpectedData(false, "load", new HashMap(), cache.getName())); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "load"), cache.getName())); |
| |
| cache.get(key1); |
| |
| expData.clear(); |
| |
| log.info("Do tx put."); |
| |
| cache.put(key1, key1); |
| |
| expData.add(new ExpectedData(true, "write", new HashMap<>(), cache.getName())); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "write"), cache.getName())); |
| |
| log.info("Do tx commit."); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(0, expData.size()); |
| |
| Integer key2 = keys.get(1); |
| Integer key3 = keys.get(2); |
| |
| try (Transaction tx = startTx(concurrency, isolation)) { |
| log.info("Do tx put1."); |
| |
| cache.put(key2, key2); |
| |
| log.info("Do tx put2."); |
| |
| cache.put(key3, key3); |
| |
| expData.add(new ExpectedData(true, "writeAll", new HashMap<>(), cache.getName())); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "writeAll"), cache.getName())); |
| |
| log.info("Do tx commit."); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(0, expData.size()); |
| } |
| |
| /** |
| * @param concurrency Concurrency mode. |
| * @param isolation Isolation mode. |
| * @throws Exception If failed. |
| */ |
| private void testTxRemove(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { |
| log.info("Test tx remove [concurrency=" + concurrency + ", isolation=" + isolation + ']'); |
| |
| IgniteCache<Integer, Integer> cache = jcache(0); |
| |
| List<Integer> keys = testKeys(cache, 3); |
| |
| Integer key1 = keys.get(0); |
| |
| try (Transaction tx = startTx(concurrency, isolation)) { |
| log.info("Do tx get."); |
| |
| cache.get(key1); |
| |
| log.info("Do tx remove."); |
| |
| cache.remove(key1, key1); |
| |
| expData.add(new ExpectedData(true, "delete", new HashMap<>(), DEFAULT_CACHE_NAME)); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "delete"), DEFAULT_CACHE_NAME)); |
| |
| log.info("Do tx commit."); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(0, expData.size()); |
| |
| Integer key2 = keys.get(1); |
| Integer key3 = keys.get(2); |
| |
| try (Transaction tx = startTx(concurrency, isolation)) { |
| log.info("Do tx remove1."); |
| |
| cache.remove(key2, key2); |
| |
| log.info("Do tx remove2."); |
| |
| cache.remove(key3, key3); |
| |
| expData.add(new ExpectedData(true, "deleteAll", new HashMap<>(), DEFAULT_CACHE_NAME)); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "deleteAll"), DEFAULT_CACHE_NAME)); |
| |
| log.info("Do tx commit."); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(0, expData.size()); |
| } |
| |
| /** |
| * @param concurrency Concurrency mode. |
| * @param isolation Isolation mode. |
| * @return Transaction. |
| */ |
| private Transaction startTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { |
| IgniteTransactions txs = ignite(0).transactions(); |
| |
| if (concurrency == null) |
| return txs.txStart(); |
| |
| return txs.txStart(concurrency, isolation); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testSessionCrossCacheTx() throws Exception { |
| IgniteCache<Object, Object> cache0 = ignite(0).cache(DEFAULT_CACHE_NAME); |
| |
| IgniteCache<Object, Object> cache1 = ignite(0).cache(CACHE_NAME1); |
| |
| Integer key1 = primaryKey(cache0); |
| Integer key2 = primaryKeys(cache1, 1, key1 + 1).get(0); |
| |
| try (Transaction tx = startTx(null, null)) { |
| cache0.put(key1, 1); |
| |
| cache1.put(key2, 0); |
| |
| expData.add(new ExpectedData(true, "write", new HashMap<>(), DEFAULT_CACHE_NAME)); |
| expData.add(new ExpectedData(true, "write", F.<Object, Object>asMap(0, "write"), CACHE_NAME1)); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "write", 1, "write"), DEFAULT_CACHE_NAME)); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(0, expData.size()); |
| |
| try (Transaction tx = startTx(null, null)) { |
| cache1.put(key1, 1); |
| |
| cache0.put(key2, 0); |
| |
| expData.add(new ExpectedData(true, "write", new HashMap<>(), CACHE_NAME1)); |
| expData.add(new ExpectedData(true, "write", F.<Object, Object>asMap(0, "write"), DEFAULT_CACHE_NAME)); |
| expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "write", 1, "write"), CACHE_NAME1)); |
| |
| tx.commit(); |
| } |
| |
| assertEquals(0, expData.size()); |
| } |
| } |