| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.locks.Lock; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteState; |
| import org.apache.ignite.Ignition; |
| import org.apache.ignite.configuration.DeploymentMode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.P1; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.IgniteState.STOPPED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; |
| |
| /** |
| * Tests for node failure in transactions. |
| */ |
| public abstract class GridCacheNodeFailureAbstractTest extends GridCommonAbstractTest { |
| /** Random number generator. */ |
| private static final Random RAND = new Random(); |
| |
| /** Grid count. */ |
| private static final int GRID_CNT = 2; |
| |
| /** */ |
| private static final Integer KEY = 1; |
| |
| /** */ |
| private static final String VALUE = "test"; |
| |
| /** Grid instances. */ |
| private static final List<Ignite> IGNITEs = new ArrayList<>(); |
| |
| /** |
| * Start grid by default. |
| */ |
| protected GridCacheNodeFailureAbstractTest() { |
| super(false /*start grid. */); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration c = super.getConfiguration(igniteInstanceName); |
| |
| c.setDeploymentMode(DeploymentMode.SHARED); |
| |
| return c; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| for (int i = 0; i < GRID_CNT; i++) |
| IGNITEs.add(startGrid(i)); |
| |
| // Wait for stable topology to avoid deadlocks. |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Override protected void afterTestsStopped() throws Exception { |
| IGNITEs.clear(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Override protected void beforeTest() throws Exception { |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK); |
| |
| for (int i = 0; i < GRID_CNT; i++) { |
| if (Ignition.state(IGNITEs.get(i).name()) == STOPPED) { |
| info("Restarting grid: " + i); |
| |
| IGNITEs.set(i, startGrid(i)); |
| } |
| |
| assert !jcache(i).isLocalLocked(KEY, false) : "Entry is locked for grid [idx=" + i + ']'; |
| } |
| |
| // Wait for stable topology to avoid deadlocks. |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * @param i Grid index. |
| * @return Cache. |
| */ |
| @Override protected <K, V> IgniteCache<K, V> jcache(int i) { |
| return IGNITEs.get(i).cache(DEFAULT_CACHE_NAME); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If test failed. |
| * |
| * Note: test was disabled for REPPLICATED cache case because IGNITE-601. |
| * This comment should be removed if test passed stably. |
| */ |
| @Test |
| public void testPessimisticReadCommitted() throws Throwable { |
| checkTransaction(PESSIMISTIC, READ_COMMITTED); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If test failed. |
| */ |
| @Test |
| public void testPessimisticRepeatableRead() throws Throwable { |
| checkTransaction(PESSIMISTIC, REPEATABLE_READ); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If test failed. |
| */ |
| @Test |
| public void testPessimisticSerializable() throws Throwable { |
| checkTransaction(PESSIMISTIC, SERIALIZABLE); |
| } |
| |
| /** |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @throws Exception If check failed. |
| */ |
| private void checkTransaction(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Throwable { |
| int idx = RAND.nextInt(GRID_CNT); |
| |
| info("Grid will be stopped: " + idx); |
| |
| Ignite g = grid(idx); |
| |
| Transaction tx = g.transactions().txStart(concurrency, isolation); |
| |
| try { |
| g.cache(DEFAULT_CACHE_NAME).put(KEY, VALUE); |
| |
| int checkIdx = (idx + 1) % G.allGrids().size(); |
| |
| info("Check grid index: " + checkIdx); |
| |
| IgniteFuture<?> f = waitForLocalEvent(grid(checkIdx).events(), new P1<Event>() { |
| @Override public boolean apply(Event e) { |
| info("Received grid event: " + e); |
| |
| return true; |
| } |
| }, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| stopGrid(idx); |
| |
| f.get(); |
| |
| IgniteCache<Integer, String> checkCache = jcache(checkIdx); |
| |
| boolean locked = false; |
| Lock lock = checkCache.lock(KEY); |
| |
| for (int i = 0; !locked && i < 3; i++) { |
| locked = lock.tryLock(); |
| |
| if (!locked) |
| U.sleep(500); |
| else |
| break; |
| } |
| |
| assert locked : "Failed to lock key on cache [idx=" + checkIdx + ", key=" + KEY + ']'; |
| |
| lock.unlock(); |
| } |
| catch (IgniteTxOptimisticCheckedException e) { |
| U.warn(log, "Optimistic transaction failure (will rollback) [msg=" + e.getMessage() + ", tx=" + tx + ']'); |
| |
| if (G.state(g.name()) == IgniteState.STARTED) |
| tx.rollback(); |
| |
| assert concurrency == OPTIMISTIC && isolation == SERIALIZABLE; |
| } |
| catch (Throwable e) { |
| error("Transaction failed (will rollback): " + tx, e); |
| |
| if (G.state(g.name()) == IgniteState.STARTED) |
| tx.rollback(); |
| |
| throw e; |
| } |
| } |
| |
| /** |
| * @throws Exception If check failed. |
| * |
| * Note: test was disabled for REPPLICATED cache case because IGNITE-601. |
| * This comment should be removed if test passed stably. |
| */ |
| @Test |
| public void testLock() throws Exception { |
| int idx = 0; |
| |
| info("Grid will be stopped: " + idx); |
| |
| info("Nodes for key [id=" + grid(idx).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(KEY) + |
| ", key=" + KEY + ']'); |
| |
| IgniteCache<Integer, String> cache = jcache(idx); |
| |
| cache.put(KEY, VALUE); |
| |
| Lock lock = cache.lock(KEY); |
| |
| assert lock.tryLock(); |
| |
| int checkIdx = 1; |
| |
| info("Check grid index: " + checkIdx); |
| |
| IgniteCache<Integer, String> checkCache = jcache(checkIdx); |
| |
| assert !checkCache.lock(KEY).tryLock(); |
| |
| IgniteFuture<?> f = waitForLocalEvent(grid(checkIdx).events(), new P1<Event>() { |
| @Override public boolean apply(Event e) { |
| info("Received grid event: " + e); |
| |
| return true; |
| } |
| }, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| stopGrid(idx); |
| |
| f.get(); |
| |
| boolean locked = false; |
| Lock checkLock = checkCache.lock(KEY); |
| |
| for (int i = 0; !locked && i < 3; i++) { |
| locked = checkLock.tryLock(); |
| |
| if (!locked) { |
| info("Still not locked..."); |
| |
| U.sleep(1500); |
| } |
| else |
| break; |
| } |
| |
| assert locked : "Failed to lock"; |
| |
| checkLock.unlock(); |
| |
| assert !checkCache.isLocalLocked(KEY, false); |
| } |
| } |