blob: 7a9298dbf7327726ee6bf6d4c3d62d30245ffe75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.transactions.Transaction;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Abstract class for cache tests.
*/
public class GridCacheFinishPartitionsSelfTest extends GridCacheAbstractSelfTest {
/** */
private static final int GRID_CNT = 1;
/** Grid kernal. */
private IgniteKernal grid;
/** {@inheritDoc} */
@Override protected int gridCount() {
return GRID_CNT;
}
/** */
@Before
public void beforeGridCacheFinishPartitionsSelfTest() {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
grid = (IgniteKernal)grid(0);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
grid = null;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
CacheConfiguration cc = defaultCacheConfiguration();
cc.setCacheMode(PARTITIONED);
cc.setBackups(1);
cc.setAtomicityMode(TRANSACTIONAL);
cc.setNearConfiguration(new NearCacheConfiguration());
c.setCacheConfiguration(cc);
return c;
}
/**
* @throws Exception If failed.
*/
@Test
public void testTxFinishPartitions() throws Exception {
String key = "key";
String val = "value";
IgniteCache<String, String> cache = grid.cache(DEFAULT_CACHE_NAME);
int keyPart = grid.<String, String>internalCache(DEFAULT_CACHE_NAME).context().affinity().partition(key);
cache.put(key, val);
// Wait for tx-enlisted partition.
long waitTime = runTransactions(key, keyPart, F.asList(keyPart));
info("Wait time, ms: " + waitTime);
// Wait for not enlisted partition.
waitTime = runTransactions(key, keyPart, F.asList(keyPart + 1));
info("Wait time, ms: " + waitTime);
// Wait for both partitions.
waitTime = runTransactions(key, keyPart, F.asList(keyPart, keyPart + 1));
info("Wait time, ms: " + waitTime);
}
/**
* @param key Key.
* @param keyPart Key partition.
* @param waitParts Partitions to wait.
* @return Wait time.
* @throws Exception If failed.
*/
private long runTransactions(final String key, final int keyPart, final Collection<Integer> waitParts)
throws Exception {
int threadNum = 1;
final CyclicBarrier barrier = new CyclicBarrier(threadNum);
final CountDownLatch latch = new CountDownLatch(threadNum);
final AtomicLong start = new AtomicLong();
GridTestUtils.runMultiThreaded(new Callable() {
@Override public Object call() throws Exception {
if (barrier.await() == 0)
start.set(System.currentTimeMillis());
IgniteCache<String, String> cache = grid(0).cache(DEFAULT_CACHE_NAME);
Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
cache.get(key);
IgniteInternalFuture<?> fut = grid.context().cache().context().partitionReleaseFuture(
new AffinityTopologyVersion(GRID_CNT + 1));
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> e) {
latch.countDown();
}
});
assert !fut.isDone() : "Failed waiting for locks " +
"[keyPart=" + keyPart + ", waitParts=" + waitParts + ", done=" + fut.isDone() + ']';
tx.commit();
return null;
}
}, threadNum, "test-finish-partitions-thread");
latch.await();
return System.currentTimeMillis() - start.get();
}
/**
* Tests method {@link GridCacheMvccManager#finishLocks(org.apache.ignite.lang.IgnitePredicate,
* AffinityTopologyVersion)}.
*
* @throws Exception If failed.
*/
@Test
public void testMvccFinishPartitions() throws Exception {
String key = "key";
int keyPart = grid.internalCache(DEFAULT_CACHE_NAME).context().affinity().partition(key);
// Wait for tx-enlisted partition.
long waitTime = runLock(key, keyPart, F.asList(keyPart));
info("Wait time, ms: " + waitTime);
// Wait for not enlisted partition.
waitTime = runLock(key, keyPart, F.asList(keyPart + 1));
info("Wait time, ms: " + waitTime);
// Wait for both partitions.
waitTime = runLock(key, keyPart, F.asList(keyPart, keyPart + 1));
info("Wait time, ms: " + waitTime);
}
/**
* Tests finish future for particular set of keys.
*
* @throws Exception If failed.
*/
@Test
public void testMvccFinishKeys() throws Exception {
IgniteCache<String, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
final String key = "key";
cache.get(key);
GridCacheAdapter<String, Integer> internal = grid.internalCache(DEFAULT_CACHE_NAME);
KeyCacheObject cacheKey = internal.context().toCacheKeyObject(key);
IgniteInternalFuture<?> nearFut = internal.context().mvcc().finishKeys(Collections.singletonList(cacheKey),
internal.context().cacheId(),
new AffinityTopologyVersion(2));
IgniteInternalFuture<?> dhtFut = internal.context().near().dht().context().mvcc().finishKeys(
Collections.singletonList(cacheKey),
internal.context().cacheId(),
new AffinityTopologyVersion(2));
assert !nearFut.isDone();
assert !dhtFut.isDone();
tx.commit();
}
}
/**
* Tests chained locks and partitions release future.
*
* @throws Exception If failed.
*/
@Test
public void testMvccFinishPartitionsContinuousLockAcquireRelease() throws Exception {
int key = 1;
GridCacheSharedContext<Object, Object> ctx = grid.context().cache().context();
final AtomicLong end = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(1);
IgniteCache<Integer, String> cache = grid.cache(DEFAULT_CACHE_NAME);
Lock lock = cache.lock(key);
lock.lock();
long start = System.currentTimeMillis();
info("Start time: " + start);
IgniteInternalFuture<?> fut = ctx.partitionReleaseFuture(new AffinityTopologyVersion(GRID_CNT + 1));
assert fut != null;
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> e) {
end.set(System.currentTimeMillis());
latch.countDown();
info("End time: " + end.get());
}
});
Lock lock1 = cache.lock(key + 1);
lock1.lock();
lock.unlock();
Lock lock2 = cache.lock(key + 2);
lock2.lock();
lock1.unlock();
assert !fut.isDone() : "Failed waiting for locks";
lock2.unlock();
latch.await();
}
/**
* @param key Key.
* @param keyPart Key partition.
* @param waitParts Partitions to wait.
* @return Wait time.
* @throws Exception If failed.
*/
private long runLock(String key, int keyPart, Collection<Integer> waitParts) throws Exception {
GridCacheSharedContext<Object, Object> ctx = grid.context().cache().context();
final AtomicLong end = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(1);
IgniteCache<String, String> cache = grid.cache(DEFAULT_CACHE_NAME);
Lock lock = cache.lock(key);
lock.lock();
long start;
try {
start = System.currentTimeMillis();
info("Start time: " + start);
IgniteInternalFuture<?> fut = ctx.partitionReleaseFuture(new AffinityTopologyVersion(GRID_CNT + 1));
assert fut != null;
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> e) {
end.set(System.currentTimeMillis());
latch.countDown();
info("End time: " + end.get());
}
});
assert !fut.isDone() : "Failed waiting for locks [keyPart=" + keyPart + ", waitParts=" + waitParts + ", done="
+ fut.isDone() + ']';
}
finally {
lock.unlock();
}
latch.await();
return end.get() - start;
}
}