| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.near; |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.cache.affinity.AffinityKey; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| |
| /** |
| * Tests near transactions. |
| */ |
| public class GridCacheNearTxMultiNodeSelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static final int GRID_CNT = 3; |
| |
| /** Number of backups for partitioned tests. */ |
| protected int backups = 1; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| // Default cache configuration. |
| CacheConfiguration cacheCfg = defaultCacheConfiguration(); |
| |
| cacheCfg.setCacheMode(PARTITIONED); |
| cacheCfg.setAtomicityMode(TRANSACTIONAL); |
| cacheCfg.setNearConfiguration(new NearCacheConfiguration()); |
| cacheCfg.setWriteSynchronizationMode(FULL_SYNC); |
| cacheCfg.setBackups(backups); |
| cacheCfg.setRebalanceMode(SYNC); |
| |
| cfg.setCacheConfiguration(cacheCfg); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| backups = 1; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings( {"unchecked"}) |
| @Test |
| public void testTxCleanup() throws Exception { |
| backups = 1; |
| |
| Ignite ignite = startGrids(GRID_CNT); |
| |
| try { |
| Integer mainKey = 0; |
| |
| ClusterNode priNode = ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(mainKey); |
| ClusterNode backupNode = F.first(F.view(ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(mainKey), |
| F.notIn(F.asList(priNode)))); |
| ClusterNode otherNode = F.first(ignite.cluster().forPredicate(F.notIn(F.asList(priNode, backupNode))).nodes()); |
| |
| assert priNode != backupNode; |
| assert backupNode != otherNode; |
| assert priNode != otherNode; |
| |
| final Ignite priIgnite = grid(priNode); |
| Ignite backupIgnite = grid(backupNode); |
| Ignite otherIgnite = grid(otherNode); |
| |
| List<Ignite> ignites = F.asList(otherIgnite, priIgnite, backupIgnite); |
| |
| int cntr = 0; |
| |
| // Update main key from all nodes. |
| for (Ignite g : ignites) |
| g.cache(DEFAULT_CACHE_NAME).put(mainKey, ++cntr); |
| |
| info("Updated mainKey from all nodes."); |
| |
| int keyCnt = 200; |
| |
| Set<Integer> keys = new TreeSet<>(); |
| |
| // Populate cache from all nodes. |
| for (int i = 1; i <= keyCnt; i++) { |
| keys.add(i); |
| |
| Ignite g = F.rand(ignites); |
| |
| g.cache(DEFAULT_CACHE_NAME).put(new AffinityKey<>(i, mainKey), Integer.toString(cntr++)); |
| } |
| |
| IgniteCache cache = priIgnite.cache(DEFAULT_CACHE_NAME); |
| |
| Transaction tx = priIgnite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); |
| |
| try { |
| cache.get(mainKey); |
| |
| cache.removeAll(keys); |
| |
| cache.put(mainKey, ++cntr); |
| |
| tx.commit(); |
| } |
| catch (Error | Exception e) { |
| error("Transaction failed: " + tx, e); |
| |
| throw e; |
| } |
| finally { |
| tx.close(); |
| } |
| |
| stopGrid(priIgnite.name(), true); |
| stopGrid(backupIgnite.name(), true); |
| |
| Ignite newIgnite = startGrid(GRID_CNT); |
| |
| ignites = F.asList(otherIgnite, newIgnite); |
| |
| for (Ignite g : ignites) { |
| GridNearCacheAdapter near = ((IgniteKernal)g).internalCache(DEFAULT_CACHE_NAME).context().near(); |
| GridDhtCacheAdapter dht = near.dht(); |
| |
| checkTm(g, near.context().tm()); |
| checkTm(g, dht.context().tm()); |
| } |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxReadersUpdate() throws Exception { |
| startGridsMultiThreaded(GRID_CNT); |
| |
| try { |
| testReadersUpdate(OPTIMISTIC, REPEATABLE_READ); |
| |
| testReadersUpdate(PESSIMISTIC, REPEATABLE_READ); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @param concurrency Transaction concurrency. |
| * @param isolation Transaction isolation. |
| * @throws Exception If failed. |
| */ |
| private void testReadersUpdate(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { |
| Ignite ignite = grid(0); |
| IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); |
| |
| try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { |
| for (int i = 0; i < 100; i++) |
| cache.put(i, 1); |
| |
| tx.commit(); |
| } |
| |
| // Create readers. |
| for (int g = 0; g < GRID_CNT; g++) { |
| IgniteCache<Integer, Integer> c = grid(g).cache(DEFAULT_CACHE_NAME); |
| |
| for (int i = 0; i < 100; i++) |
| assertEquals((Integer)1, c.get(i)); |
| } |
| |
| try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) { |
| for (int i = 0; i < 100; i++) |
| cache.put(i, 2); |
| |
| tx.commit(); |
| } |
| |
| for (int g = 0; g < GRID_CNT; g++) { |
| IgniteCache<Integer, Integer> c = grid(g).cache(DEFAULT_CACHE_NAME); |
| |
| for (int i = 0; i < 100; i++) |
| assertEquals((Integer)2, c.get(i)); |
| } |
| } |
| |
| /** |
| * @param g Grid. |
| * @param tm Transaction manager. |
| */ |
| private void checkTm(Ignite g, IgniteTxManager tm) { |
| Collection<IgniteInternalTx> txs = tm.activeTransactions(); |
| |
| info(">>> Number of transactions in the set [size=" + txs.size() + |
| ", nodeId=" + g.cluster().localNode().id() + ']'); |
| |
| for (IgniteInternalTx tx : txs) |
| assert tx.done() : "Transaction is not finished: " + tx; |
| } |
| } |