| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import javax.cache.CacheException; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cache.affinity.AffinityFunction; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.apache.ignite.transactions.TransactionOptimisticException; |
| |
| import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; |
| |
| /** |
| * Tests for local transactions. |
| */ |
| @SuppressWarnings({"BusyWait"}) |
| abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { |
| /** Random number generator. */ |
| private static final Random RAND = new Random(); |
| |
| /** Execution count. */ |
| private static final AtomicInteger cntr = new AtomicInteger(); |
| |
| /** |
| * Start grid by default. |
| */ |
| protected IgniteTxAbstractTest() { |
| super(false /*start grid. */); |
| } |
| |
| /** |
| * @return Grid count. |
| */ |
| protected abstract int gridCount(); |
| |
| /** |
| * @return Key count. |
| */ |
| protected abstract int keyCount(); |
| |
| /** |
| * @return Maximum key value. |
| */ |
| protected abstract int maxKeyValue(); |
| |
| /** |
| * @return Thread iterations. |
| */ |
| protected abstract int iterations(); |
| |
| /** |
| * @return True if in-test logging is enabled. |
| */ |
| protected abstract boolean isTestDebug(); |
| |
| /** |
| * @return {@code True} if memory stats should be printed. |
| */ |
| protected abstract boolean printMemoryStats(); |
| |
| /** */ |
| private void debug(String msg) { |
| if (isTestDebug()) |
| info(msg); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| startGridsMultiThreaded(gridCount()); |
| } |
| |
| /** |
| * @return Keys. |
| */ |
| protected Iterable<Integer> getKeys() { |
| List<Integer> keys = new ArrayList<>(keyCount()); |
| |
| for (int i = 0; i < keyCount(); i++) |
| keys.add(RAND.nextInt(maxKeyValue()) + 1); |
| |
| Collections.sort(keys); |
| |
| return Collections.unmodifiableList(keys); |
| } |
| |
| /** |
| * @return Random cache operation. |
| */ |
| protected OP getOp() { |
| switch (RAND.nextInt(3)) { |
| case 0: { return OP.READ; } |
| case 1: { return OP.WRITE; } |
| case 2: { return OP.REMOVE; } |
| |
| // Should never be reached. |
| default: { assert false; return null; } |
| } |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws Exception If check failed. |
| */ |
| protected void checkCommit(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { |
| int gridIdx = RAND.nextInt(gridCount()); |
| |
| Ignite ignite = grid(gridIdx); |
| |
| if (isTestDebug()) |
| debug("Checking commit on grid: " + ignite.cluster().localNode().id()); |
| |
| for (int i = 0; i < iterations(); i++) { |
| IgniteCache<Integer, String> cache = jcache(gridIdx); |
| |
| try (Transaction tx = ignite(gridIdx).transactions().txStart(concurrency, isolation, 0, 0)) { |
| int prevKey = -1; |
| |
| for (Integer key : getKeys()) { |
| // Make sure we have the same locking order for all concurrent transactions. |
| assert key >= prevKey : "key: " + key + ", prevKey: " + prevKey; |
| |
| if (isTestDebug()) { |
| AffinityFunction aff = cache.getConfiguration(CacheConfiguration.class).getAffinity(); |
| |
| int part = aff.partition(key); |
| |
| debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + |
| U.toShortString(ignite(gridIdx).affinity(DEFAULT_CACHE_NAME).mapPartitionToPrimaryAndBackups(part)) + ']'); |
| } |
| |
| String val = Integer.toString(key); |
| |
| switch (getOp()) { |
| case READ: { |
| if (isTestDebug()) |
| debug("Reading key [key=" + key + ", i=" + i + ']'); |
| |
| val = cache.get(key); |
| |
| if (isTestDebug()) |
| debug("Read value for key [key=" + key + ", val=" + val + ']'); |
| |
| break; |
| } |
| |
| case WRITE: { |
| if (isTestDebug()) |
| debug("Writing key and value [key=" + key + ", val=" + val + ", i=" + i + ']'); |
| |
| cache.put(key, val); |
| |
| break; |
| } |
| |
| case REMOVE: { |
| if (isTestDebug()) |
| debug("Removing key [key=" + key + ", i=" + i + ']'); |
| |
| cache.remove(key); |
| |
| break; |
| } |
| |
| default: { assert false; } |
| } |
| } |
| |
| tx.commit(); |
| |
| if (isTestDebug()) |
| debug("Committed transaction [i=" + i + ", tx=" + tx + ']'); |
| } |
| catch (TransactionOptimisticException e) { |
| if (!(concurrency == OPTIMISTIC && isolation == SERIALIZABLE)) { |
| log.error("Unexpected error: " + e, e); |
| |
| throw e; |
| } |
| } |
| catch (CacheException e) { |
| MvccFeatureChecker.assertMvccWriteConflict(e); |
| } |
| catch (Throwable e) { |
| log.error("Unexpected error: " + e, e); |
| |
| throw e; |
| } |
| } |
| |
| Transaction tx = ignite(gridIdx).transactions().tx(); |
| |
| assertNull("Thread should not have transaction upon completion", tx); |
| |
| if (printMemoryStats()) { |
| if (cntr.getAndIncrement() % 100 == 0) |
| // Print transaction memory stats. |
| ((IgniteKernal)grid(gridIdx)).internalCache(DEFAULT_CACHE_NAME).context().tm().printMemoryStats(); |
| } |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws IgniteCheckedException If check failed. |
| */ |
| protected void checkRollback(TransactionConcurrency concurrency, TransactionIsolation isolation) |
| throws Exception { |
| checkRollback(new ConcurrentHashMap<Integer, String>(), concurrency, isolation); |
| } |
| |
| /** |
| * @param map Map to check. |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws IgniteCheckedException If check failed. |
| */ |
| protected void checkRollback(ConcurrentMap<Integer, String> map, TransactionConcurrency concurrency, |
| TransactionIsolation isolation) throws Exception { |
| int gridIdx = RAND.nextInt(gridCount()); |
| |
| Ignite ignite = grid(gridIdx); |
| |
| if (isTestDebug()) |
| debug("Checking commit on grid: " + ignite.cluster().localNode().id()); |
| |
| for (int i = 0; i < iterations(); i++) { |
| IgniteCache<Integer, String> cache = jcache(gridIdx); |
| |
| Transaction tx = ignite(gridIdx).transactions().txStart(concurrency, isolation, 0, 0); |
| |
| try { |
| for (Integer key : getKeys()) { |
| if (isTestDebug()) { |
| AffinityFunction aff = cache.getConfiguration(CacheConfiguration.class).getAffinity(); |
| |
| int part = aff.partition(key); |
| |
| debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + |
| U.toShortString(ignite(gridIdx).affinity(DEFAULT_CACHE_NAME).mapPartitionToPrimaryAndBackups(part)) + ']'); |
| } |
| |
| String val = Integer.toString(key); |
| |
| switch (getOp()) { |
| case READ: { |
| debug("Reading key: " + key); |
| |
| checkMap(map, key, cache.get(key)); |
| |
| break; |
| } |
| |
| case WRITE: { |
| debug("Writing key and value [key=" + key + ", val=" + val + ']'); |
| |
| checkMap(map, key, cache.getAndPut(key, val)); |
| |
| break; |
| } |
| |
| case REMOVE: { |
| debug("Removing key: " + key); |
| |
| checkMap(map, key, cache.getAndRemove(key)); |
| |
| break; |
| } |
| |
| default: { assert false; } |
| } |
| } |
| |
| tx.rollback(); |
| |
| debug("Rolled back transaction: " + tx); |
| } |
| catch (TransactionOptimisticException e) { |
| tx.rollback(); |
| |
| log.warning("Rolled back transaction due to optimistic exception [tx=" + tx + ", e=" + e + ']'); |
| |
| throw e; |
| } |
| catch (Exception e) { |
| tx.rollback(); |
| |
| error("Rolled back transaction due to exception [tx=" + tx + ", e=" + e + ']'); |
| |
| throw e; |
| } |
| finally { |
| Transaction t1 = ignite(gridIdx).transactions().tx(); |
| |
| debug("t1=" + t1); |
| |
| assert t1 == null : "Thread should not have transaction upon completion ['t==tx'=" + (t1 == tx) + |
| ", t=" + t1 + ']'; |
| } |
| } |
| } |
| |
| /** |
| * @param map Map to check against. |
| * @param key Key. |
| * @param val Value. |
| */ |
| private void checkMap(ConcurrentMap<Integer, String> map, Integer key, String val) { |
| if (val != null) { |
| String v = map.putIfAbsent(key, val); |
| |
| assert v == null || v.equals(val); |
| } |
| } |
| |
| /** |
| * Checks integrity of all caches after tests. |
| * |
| * @throws IgniteCheckedException If check failed. |
| */ |
| @SuppressWarnings({"ErrorNotRethrown"}) |
| protected void finalChecks() throws Exception { |
| for (int i = 1; i <= maxKeyValue(); i++) { |
| for (int k = 0; k < 3; k++) { |
| try { |
| String v1 = null; |
| |
| for (int j = 0; j < gridCount(); j++) { |
| IgniteCache<Integer, String> cache = jcache(j); |
| |
| Transaction tx = ignite(j).transactions().tx(); |
| |
| assertNull("Transaction is not completed: " + tx, tx); |
| |
| if (j == 0) { |
| v1 = cache.get(i); |
| } |
| else { |
| String v2 = cache.get(i); |
| |
| if (!F.eq(v2, v1)) { |
| v1 = this.<Integer, String>jcache(0).get(i); |
| v2 = cache.get(i); |
| } |
| |
| assert F.eq(v2, v1) : |
| "Invalid cached value [key=" + i + ", v1=" + v1 + ", v2=" + v2 + ", grid=" + j + ']'; |
| } |
| } |
| |
| break; |
| } |
| catch (AssertionError e) { |
| if (k == 2) |
| throw e; |
| else |
| // Wait for transactions to complete. |
| Thread.sleep(500); |
| } |
| } |
| } |
| |
| for (int i = 1; i <= maxKeyValue(); i++) { |
| for (int k = 0; k < 3; k++) { |
| try { |
| for (int j = 0; j < gridCount(); j++) { |
| IgniteCache<Integer, String> cache = jcache(j); |
| |
| cache.removeAll(); |
| |
| // assert cache.keySet().isEmpty() : "Cache is not empty: " + cache.entrySet(); |
| } |
| |
| break; |
| } |
| catch (AssertionError e) { |
| if (k == 2) |
| throw e; |
| else |
| // Wait for transactions to complete. |
| Thread.sleep(500); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Cache operation. |
| */ |
| protected enum OP { |
| /** Cache read. */ |
| READ, |
| |
| /** Cache write. */ |
| WRITE, |
| |
| /** Cache remove. */ |
| REMOVE |
| } |
| } |