blob: a0504b78f450cfbeeb7c4ab82927eaa23d75de2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.datastructures;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.testframework.GridStringLogger;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Cache semaphore self test.
*/
public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstractTest
implements Externalizable {
/** */
private static final int NODES_CNT = 4;
/** */
protected static final int THREADS_CNT = 5;
/** */
private static final Random RND = new Random();
/** */
@Rule
public final ExpectedException exception = ExpectedException.none();
/** {@inheritDoc} */
@Override protected int gridCount() {
return NODES_CNT;
}
/**
* @throws Exception If failed.
*/
@Test
public void testSemaphore() throws Exception {
checkSemaphore();
checkSemaphoreSerialization();
}
/**
* @throws Exception If failed.
*/
@Test
public void testFailover() throws Exception {
if (atomicsCacheMode() == LOCAL)
return;
checkFailover(true);
checkFailover(false);
}
/**
* Implementation of ignite data structures internally uses special system caches, need make sure
* that transaction on these system caches do not intersect with transactions started by user.
*
* @throws Exception If failed.
*/
@Test
public void testIsolation() throws Exception {
Ignite ignite = grid(0);
CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
cfg.setName("myCache");
cfg.setAtomicityMode(TRANSACTIONAL);
cfg.setWriteSynchronizationMode(FULL_SYNC);
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cfg);
try {
IgniteSemaphore semaphore = ignite.semaphore("testIsolation", 1, true, true);
assertNotNull(semaphore);
try (Transaction tx = ignite.transactions().txStart()) {
cache.put(1, 1);
assertEquals(1, semaphore.availablePermits());
semaphore.acquire();
tx.rollback();
}
assertEquals(0, cache.size());
assertEquals(0, semaphore.availablePermits());
semaphore.close();
assertTrue(semaphore.removed());
}
finally {
ignite.destroyCache(cfg.getName());
}
}
/**
* @param failoverSafe Failover safe flag.
* @throws Exception If failed.
*/
private void checkFailover(boolean failoverSafe) throws Exception {
IgniteEx g = startGrid(NODES_CNT + 1);
// For vars locality.
{
// Ensure not exists.
assert g.semaphore("sem", 2, failoverSafe, false) == null;
IgniteSemaphore sem = g.semaphore(
"sem",
2,
failoverSafe,
true);
sem.acquire(2);
assert !sem.tryAcquire();
assertEquals(
0,
sem.availablePermits());
}
Ignite g0 = grid(0);
final IgniteSemaphore sem0 = g0.semaphore(
"sem",
-10,
false,
false);
assert !sem0.tryAcquire();
assertEquals(0, sem0.availablePermits());
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
sem0.acquire();
info("Acquired in separate thread.");
return null;
}
},
1);
Thread.sleep(100);
g.close();
try {
fut.get(500);
}
catch (IgniteCheckedException e) {
if (!failoverSafe && e.hasCause(InterruptedException.class))
info("Ignored expected exception: " + e);
else
throw e;
}
sem0.close();
}
/**
* @throws Exception If failed.
*/
private void checkSemaphore() throws Exception {
// Test API.
checkAcquire();
checkRelease();
checkFailoverSafe();
// Test main functionality.
final IgniteSemaphore semaphore1 = grid(0).semaphore("semaphore", -2, true, true);
assertEquals(-2, semaphore1.availablePermits());
IgniteFuture<Object> fut = grid(0).compute().callAsync(new IgniteCallable<Object>() {
@IgniteInstanceResource
private Ignite ignite;
@LoggerResource
private IgniteLogger log;
@Nullable @Override public Object call() throws Exception {
// Test semaphore in multiple threads on each node.
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
IgniteSemaphore semaphore = ignite.semaphore("semaphore", -2, true, true);
assert semaphore != null && semaphore.availablePermits() == -2;
log.info("Thread is going to wait on semaphore: " + Thread.currentThread().getName() +
", node = " + ignite.cluster().localNode() + ", sem = " + semaphore);
assert semaphore.tryAcquire(1, 1, MINUTES);
log.info("Thread is again runnable: " + Thread.currentThread().getName() +
", node = " + ignite.cluster().localNode() + ", sem = " + semaphore);
semaphore.release();
return null;
}
},
5,
"test-thread"
);
fut.get();
return null;
}
});
Thread.sleep(3000);
semaphore1.release(2);
assert semaphore1.availablePermits() == 0;
semaphore1.release(1);
// Ensure there are no hangs.
fut.get();
// Test operations on removed semaphore.
semaphore1.close();
checkRemovedSemaphore(semaphore1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testSemaphoreClosing() throws Exception {
IgniteConfiguration cfg;
GridStringLogger stringLogger;
stringLogger = new GridStringLogger();
cfg = optimize(getConfiguration("npeGrid"));
cfg.setGridLogger(stringLogger);
try (Ignite ignite = startGrid(cfg.getIgniteInstanceName(), cfg)) {
ignite.semaphore("semaphore", 1, true, true);
}
assertFalse(stringLogger.toString().contains(NullPointerException.class.getName()));
}
/**
* Test to verify the {@link IgniteSemaphore#acquireAndExecute(IgniteCallable, int)}.
*
* @throws Exception If failed.
*/
@Test
public void testAcquireAndExecute() throws Exception {
IgniteSemaphore semaphore = ignite(0).semaphore("testAcquireAndExecute", 1, true, true);
ExecutorService executorService = Executors.newSingleThreadExecutor();
IgniteCallable<Integer> callable = new IgniteCallable<Integer>() {
@Override public Integer call() {
assert (semaphore.availablePermits() == 0);
return 5;
}
};
IgniteFuture igniteFuture = semaphore.acquireAndExecute(callable, 1);
Runnable runnable = new Runnable() {
/** {@inheritDoc} */
@Override public void run() {
IgniteFutureImpl impl = (IgniteFutureImpl<Integer>)igniteFuture;
GridFutureAdapter fut = (GridFutureAdapter)(impl.internalFuture());
fut.onDone(true);
}
};
executorService.submit(runnable);
Thread.sleep(1000);
igniteFuture.get(7000, MILLISECONDS);
assertTrue(igniteFuture.isDone());
assertTrue(semaphore.availablePermits() == 1);
executorService.shutdown();
}
/**
* Test to verify the {@link IgniteSemaphore#acquireAndExecute(IgniteCallable, int)}'s behaviour in case of a failure.
*
* @throws Exception If failed.
*/
@Test
public void testAcquireAndExecuteIfFailure() {
IgniteSemaphore semaphore = ignite(0).semaphore("testAcquireAndExecuteIfFailure", 1, true, true);
ExecutorService executorService = Executors.newSingleThreadExecutor();
IgniteCallable<Integer> callable = new IgniteCallable<Integer>() {
@Override public Integer call() {
throw new RuntimeException("Foobar");
}
};
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
IgniteFuture igniteFuture = semaphore.acquireAndExecute(callable, 1);
Runnable runnable = new Runnable() {
/** {@inheritDoc} */
@Override public void run() {
try {
Thread.sleep(1000);
IgniteFutureImpl impl = (IgniteFutureImpl<Integer>)igniteFuture;
GridFutureAdapter fut = (GridFutureAdapter)(impl.internalFuture());
fut.onDone(true);
}
catch (InterruptedException e) {
throw new RuntimeException(e.getMessage());
}
}
};
executorService.submit(runnable);
((IgniteFutureImpl)igniteFuture).internalFuture().get();
assertTrue(igniteFuture.isDone());
return null;
}
}, RuntimeException.class, "Foobar");
executorService.shutdown();
}
/**
* @throws Exception If failed.
*/
private void checkSemaphoreSerialization() throws Exception {
final IgniteSemaphore sem = grid(0).semaphore("semaphore", -gridCount() + 1, true, true);
assertEquals(-gridCount() + 1, sem.availablePermits());
grid(0).compute().broadcast(new IgniteCallable<Object>() {
@Nullable @Override public Object call() throws Exception {
sem.release();
return null;
}
});
assert sem.availablePermits() == 1;
sem.acquire();
assert sem.availablePermits() == 0;
sem.release();
// Test operations on removed semaphore.
sem.close();
checkRemovedSemaphore(sem);
}
/**
* @param semaphore Semaphore.
* @throws Exception If failed.
*/
protected void checkRemovedSemaphore(final IgniteSemaphore semaphore) throws Exception {
assert GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return semaphore.removed();
}
}, 5000);
assert semaphore.removed();
}
/**
* This method only checks if parameter of new semaphore is initialized properly. For tests considering failure
* recovery see
*
* @throws Exception Exception.
*/
private void checkFailoverSafe() throws Exception {
// Checks only if semaphore is initialized properly
IgniteSemaphore semaphore = createSemaphore("rmv", 5, true);
assert semaphore.isFailoverSafe();
removeSemaphore("rmv");
IgniteSemaphore semaphore1 = createSemaphore("rmv1", 5, false);
assert !semaphore1.isFailoverSafe();
removeSemaphore("rmv1");
}
/**
* @throws Exception Exception.
*/
private void checkAcquire() throws Exception {
// Check only 'false' cases here. Successful await is tested over the grid.
IgniteSemaphore semaphore = createSemaphore("acquire", 5, false);
assert !semaphore.tryAcquire(10);
assert !semaphore.tryAcquire(10, 10, MICROSECONDS);
removeSemaphore("acquire");
}
/**
* @throws Exception Exception.
*/
private void checkRelease() throws Exception {
IgniteSemaphore semaphore = createSemaphore("release", 5, false);
semaphore.release();
assert semaphore.availablePermits() == 6;
semaphore.release(2);
assert semaphore.availablePermits() == 8;
assert semaphore.drainPermits() == 8;
assert semaphore.availablePermits() == 0;
removeSemaphore("release");
checkRemovedSemaphore(semaphore);
IgniteSemaphore semaphore2 = createSemaphore("release2", -5, false);
semaphore2.release();
assert semaphore2.availablePermits() == -4;
semaphore2.release(2);
assert semaphore2.availablePermits() == -2;
assert semaphore2.drainPermits() == -2;
assert semaphore2.availablePermits() == 0;
removeSemaphore("release2");
checkRemovedSemaphore(semaphore2);
}
/**
* @param semaphoreName Semaphore name.
* @param numPermissions Initial number of permissions.
* @param failoverSafe Fairness flag.
* @return New semaphore.
* @throws Exception If failed.
*/
private IgniteSemaphore createSemaphore(String semaphoreName, int numPermissions, boolean failoverSafe)
throws Exception {
IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, numPermissions, failoverSafe, true);
// Test initialization.
assert semaphoreName.equals(semaphore.name());
assert semaphore.availablePermits() == numPermissions;
assert semaphore.getQueueLength() == 0;
assert semaphore.isFailoverSafe() == failoverSafe;
return semaphore;
}
/**
* @param semaphoreName Semaphore name.
* @throws Exception If failed.
*/
private void removeSemaphore(final String semaphoreName) throws Exception {
IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 10, false, true);
assert semaphore != null;
if (semaphore.availablePermits() < 0)
semaphore.release(-semaphore.availablePermits());
// Remove semaphore on random node.
IgniteSemaphore semaphore0 = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 0, false, true);
assertNotNull(semaphore0);
semaphore0.close();
// Ensure semaphore is removed on all nodes.
assert GridTestUtils.waitForCondition(new GridAbsPredicateX() {
@Override public boolean applyx() throws IgniteCheckedException {
for (Ignite g : G.allGrids()) {
if (((IgniteKernal)g).context().dataStructures().semaphore(
semaphoreName, null, 10, true, false) != null)
return false;
}
return true;
}
}, 5_000);
checkRemovedSemaphore(semaphore);
}
/**
* @throws Exception If failed.
*/
@Test
public void testSemaphoreMultinode1() throws Exception {
if (gridCount() == 1)
return;
IgniteSemaphore semaphore = grid(0).semaphore("s1", 0, true, true);
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (int i = 0; i < gridCount(); i++) {
final Ignite ignite = grid(i);
futs.add(GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteSemaphore semaphore = ignite.semaphore("s1", 0, true, false);
assertNotNull(semaphore);
boolean wait = semaphore.tryAcquire(30_000, MILLISECONDS);
assertTrue(wait);
return null;
}
}));
}
for (int i = 0; i < 10; i++)
semaphore.release();
for (IgniteInternalFuture<?> fut : futs)
fut.get(30_000);
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
// No-op.
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// No-op.
}
}