| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.datastructures; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCountDownLatch; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterGroup; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.lang.IgniteCallable; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.transactions.Transaction; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.MINUTES; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| |
| /** |
| * Cache count down latch self test. |
| */ |
| public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomicsAbstractTest |
| implements Externalizable { |
| /** */ |
| private static final int NODES_CNT = 4; |
| |
| /** */ |
| protected static final int THREADS_CNT = 5; |
| |
| /** */ |
| private static final Random RND = new Random(); |
| |
| /** {@inheritDoc} */ |
| @Override protected int gridCount() { |
| return NODES_CNT; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLatch() throws Exception { |
| checkLatch(); |
| } |
| |
| /** |
| * Implementation of ignite data structures internally uses special system caches, need make sure |
| * that transaction on these system caches do not intersect with transactions started by user. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIsolation() throws Exception { |
| Ignite ignite = grid(0); |
| |
| CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME); |
| |
| cfg.setName("myCache"); |
| cfg.setAtomicityMode(TRANSACTIONAL); |
| cfg.setWriteSynchronizationMode(FULL_SYNC); |
| |
| IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cfg); |
| |
| try { |
| IgniteCountDownLatch latch = ignite.countDownLatch("latch1", 10, false, true); |
| |
| assertNotNull(latch); |
| |
| try (Transaction tx = ignite.transactions().txStart()) { |
| cache.put(1, 1); |
| |
| assertEquals(8, latch.countDown(2)); |
| |
| tx.rollback(); |
| } |
| |
| assertEquals(0, cache.size()); |
| |
| assertEquals(7, latch.countDown(1)); |
| } |
| finally { |
| ignite.destroyCache(cfg.getName()); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void checkLatch() throws Exception { |
| // Test API. |
| checkAutoDelete(); |
| |
| checkAwait(); |
| |
| checkCountDown(); |
| |
| // Test main functionality. |
| IgniteCountDownLatch latch1 = grid(0).countDownLatch("latch", 2, false, true); |
| |
| assertEquals(2, latch1.count()); |
| |
| IgniteFuture<Object> fut = grid(0).compute().callAsync(new IgniteCallable<Object>() { |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| @LoggerResource |
| private IgniteLogger log; |
| |
| @Nullable @Override public Object call() throws Exception { |
| // Test latch in multiple threads on each node. |
| IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( |
| new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| IgniteCountDownLatch latch = ignite.countDownLatch("latch", 2, false, true); |
| |
| assert latch != null && latch.count() == 2; |
| |
| log.info("Thread is going to wait on latch: " + Thread.currentThread().getName()); |
| |
| assert latch.await(1, MINUTES); |
| |
| log.info("Thread is again runnable: " + Thread.currentThread().getName()); |
| |
| return null; |
| } |
| }, |
| 5, |
| "test-thread" |
| ); |
| |
| fut.get(); |
| |
| return null; |
| } |
| }); |
| |
| Thread.sleep(3000); |
| |
| assert latch1.countDown() == 1; |
| |
| assert latch1.countDown() == 0; |
| |
| // Ensure there are no hangs. |
| fut.get(); |
| |
| // Test operations on removed latch. |
| latch1.close(); |
| |
| checkRemovedLatch(latch1); |
| } |
| |
| /** |
| * @param latch Latch. |
| * @throws Exception If failed. |
| */ |
| protected void checkRemovedLatch(final IgniteCountDownLatch latch) throws Exception { |
| assert GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return latch.removed(); |
| } |
| }, 5000); |
| |
| assert latch.removed(); |
| |
| assert latch.count() == 0; |
| |
| // Test await on removed future. |
| latch.await(); |
| assert latch.await(10); |
| assert latch.await(10, SECONDS); |
| |
| latch.await(); |
| |
| // Test countdown. |
| assert latch.countDown() == 0; |
| assert latch.countDown(5) == 0; |
| latch.countDownAll(); |
| } |
| |
| /** |
| * @throws Exception Exception. |
| */ |
| private void checkAutoDelete() throws Exception { |
| IgniteCountDownLatch latch = createLatch("rmv", 5, true); |
| |
| latch.countDownAll(); |
| |
| // Latch should be removed since autoDelete = true |
| checkRemovedLatch(latch); |
| |
| IgniteCountDownLatch latch1 = createLatch("rmv1", 5, false); |
| |
| latch1.countDownAll(); |
| |
| // Latch should NOT be removed since autoDelete = false |
| assert !latch1.removed(); |
| |
| removeLatch("rmv1"); |
| } |
| |
| /** |
| * @throws Exception Exception. |
| */ |
| private void checkAwait() throws Exception { |
| // Check only 'false' cases here. Successful await is tested over the grid. |
| IgniteCountDownLatch latch = createLatch("await", 5, false); |
| |
| assert !latch.await(10); |
| assert !latch.await(10, MILLISECONDS); |
| |
| removeLatch("await"); |
| } |
| |
| /** |
| * @throws Exception Exception. |
| */ |
| private void checkCountDown() throws Exception { |
| IgniteCountDownLatch latch = createLatch("cnt", 10, true); |
| |
| assert latch.countDown() == 9; |
| assert latch.countDown(2) == 7; |
| |
| latch.countDownAll(); |
| |
| assert latch.count() == 0; |
| |
| checkRemovedLatch(latch); |
| |
| IgniteCountDownLatch latch1 = createLatch("cnt1", 10, true); |
| |
| assert latch1.countDown() == 9; |
| assert latch1.countDown(2) == 7; |
| |
| latch1.countDownAll(); |
| |
| assert latch1.count() == 0; |
| |
| checkRemovedLatch(latch1); |
| } |
| |
| /** |
| * @param latchName Latch name. |
| * @param cnt Count. |
| * @param autoDel Auto delete flag. |
| * @return New latch. |
| * @throws Exception If failed. |
| */ |
| private IgniteCountDownLatch createLatch(String latchName, int cnt, boolean autoDel) |
| throws Exception { |
| IgniteCountDownLatch latch = grid(RND.nextInt(NODES_CNT)).countDownLatch(latchName, cnt, autoDel, true); |
| |
| // Test initialization. |
| assert latchName.equals(latch.name()); |
| assert latch.count() == cnt; |
| assert latch.initialCount() == cnt; |
| assert latch.autoDelete() == autoDel; |
| |
| return latch; |
| } |
| |
| /** |
| * @param latchName Latch name. |
| * @throws Exception If failed. |
| */ |
| private void removeLatch(String latchName) |
| throws Exception { |
| IgniteCountDownLatch latch = grid(RND.nextInt(NODES_CNT)).countDownLatch(latchName, 10, false, true); |
| |
| assert latch != null; |
| |
| if (latch.count() > 0) |
| latch.countDownAll(); |
| |
| // Remove latch on random node. |
| IgniteCountDownLatch latch0 = grid(RND.nextInt(NODES_CNT)).countDownLatch(latchName, 0, false, false); |
| |
| assertNotNull(latch0); |
| |
| latch0.close(); |
| |
| // Ensure latch is removed on all nodes. |
| for (Ignite g : G.allGrids()) |
| assertNull(((IgniteKernal)g).context().dataStructures().countDownLatch(latchName, null, 10, true, false)); |
| |
| checkRemovedLatch(latch); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLatchMultinode1() throws Exception { |
| if (gridCount() == 1) |
| return; |
| |
| IgniteCountDownLatch latch = grid(0).countDownLatch("l1", 10, |
| true, |
| true); |
| |
| List<IgniteInternalFuture<?>> futs = new ArrayList<>(); |
| |
| final AtomicBoolean countedDown = new AtomicBoolean(); |
| |
| CountDownLatch allLatchesObtained = new CountDownLatch(gridCount()); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| final Ignite ignite = grid(i); |
| |
| futs.add(GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| IgniteCountDownLatch latch = ignite.countDownLatch("l1", 10, |
| true, |
| false); |
| |
| allLatchesObtained.countDown(); |
| |
| assertNotNull(latch); |
| |
| boolean wait = latch.await(30_000); |
| |
| assertTrue(countedDown.get()); |
| |
| assertEquals(0, latch.count()); |
| |
| assertTrue(wait); |
| |
| return null; |
| } |
| })); |
| } |
| |
| for (int i = 0; i < 10; i++) { |
| if (i == 9) { |
| countedDown.set(true); |
| |
| allLatchesObtained.await(); |
| } |
| |
| latch.countDown(); |
| } |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| fut.get(30_000); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLatchBroadcast() throws Exception { |
| Ignite ignite = grid(0); |
| ClusterGroup srvsGrp = ignite.cluster().forServers(); |
| |
| int numOfSrvs = srvsGrp.nodes().size(); |
| |
| ignite.destroyCache("testCache"); |
| IgniteCache<Object, Object> cache = ignite.createCache("testCache"); |
| |
| for (ClusterNode node : srvsGrp.nodes()) |
| cache.put(String.valueOf(node.id()), 0); |
| |
| for (int i = 0; i < 500; i++) { |
| IgniteCountDownLatch latch1 = createLatch1(ignite, numOfSrvs); |
| IgniteCountDownLatch latch2 = createLatch2(ignite, numOfSrvs); |
| |
| ignite.compute(srvsGrp).broadcast(new IgniteRunnableJob(latch1, latch2, i)); |
| assertTrue(latch2.await(10000)); |
| } |
| } |
| |
| /** |
| * @param client Ignite client. |
| * @param numOfSrvs Number of server nodes. |
| * @return Ignite latch. |
| */ |
| private IgniteCountDownLatch createLatch1(Ignite client, int numOfSrvs) { |
| return client.countDownLatch( |
| "testName1", // Latch name. |
| numOfSrvs, // Initial count. |
| true, // Auto remove, when counter has reached zero. |
| true // Create if it does not exist. |
| ); |
| } |
| |
| /** |
| * @param client Ignite client. |
| * @param numOfSrvs Number of server nodes. |
| * @return Ignite latch. |
| */ |
| private IgniteCountDownLatch createLatch2(Ignite client, int numOfSrvs) { |
| return client.countDownLatch( |
| "testName2", // Latch name. |
| numOfSrvs, // Initial count. |
| true, // Auto remove, when counter has reached zero. |
| true // Create if it does not exist. |
| ); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLatchMultinode2() throws Exception { |
| if (gridCount() == 1) |
| return; |
| |
| IgniteCountDownLatch latch = grid(0).countDownLatch("l2", gridCount() * 3, |
| true, |
| true); |
| |
| assertNotNull(latch); |
| |
| List<IgniteInternalFuture<?>> futs = new ArrayList<>(); |
| |
| final AtomicInteger cnt = new AtomicInteger(); |
| |
| for (int i = 0; i < gridCount(); i++) { |
| final Ignite ignite = grid(i); |
| |
| futs.add(GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| IgniteCountDownLatch latch = ignite.countDownLatch("l2", 10, |
| true, |
| false); |
| |
| assertNotNull(latch); |
| |
| for (int i = 0; i < 3; i++) { |
| cnt.incrementAndGet(); |
| |
| latch.countDown(); |
| } |
| |
| boolean wait = latch.await(30_000); |
| |
| assertEquals(gridCount() * 3, cnt.get()); |
| |
| assertEquals(0, latch.count()); |
| |
| assertTrue(wait); |
| |
| return null; |
| } |
| })); |
| } |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| fut.get(30_000); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| // No-op. |
| } |
| |
| /** |
| * Ignite job |
| */ |
| public class IgniteRunnableJob implements IgniteRunnable { |
| |
| /** |
| * Ignite. |
| */ |
| @IgniteInstanceResource |
| Ignite igniteInstance; |
| |
| /** |
| * Number of iteration. |
| */ |
| protected final int iteration; |
| |
| /** |
| * Ignite latch 1. |
| */ |
| private final IgniteCountDownLatch latch1; |
| |
| /** |
| * Ignite latch 2. |
| */ |
| private final IgniteCountDownLatch latch2; |
| |
| /** |
| * @param latch1 Ignite latch 1. |
| * @param latch2 Ignite latch 2. |
| * @param iteration Number of iteration. |
| */ |
| public IgniteRunnableJob(IgniteCountDownLatch latch1, IgniteCountDownLatch latch2, int iteration) { |
| this.iteration = iteration; |
| this.latch1 = latch1; |
| this.latch2 = latch2; |
| } |
| |
| /** |
| * @return Ignite latch. |
| */ |
| IgniteCountDownLatch createLatch1() { |
| return latch1; |
| } |
| |
| /** |
| * @return Ignite latch. |
| */ |
| IgniteCountDownLatch createLatch2() { |
| return latch2; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| |
| IgniteCountDownLatch latch1 = createLatch1(); |
| IgniteCountDownLatch latch2 = createLatch2(); |
| |
| IgniteCache<Object, Object> cache = igniteInstance.cache("testCache"); |
| |
| for (ClusterNode node : igniteInstance.cluster().forServers().nodes()) { |
| Integer val = (Integer)cache.get(String.valueOf(node.id())); |
| assertEquals(val, (Integer)iteration); |
| } |
| |
| latch1.countDown(); |
| |
| assertTrue(latch1.await(10000)); |
| |
| cache.put(getUID(), (iteration + 1)); |
| |
| latch2.countDown(); |
| |
| } |
| |
| /** |
| * @return Node UUID as string. |
| */ |
| String getUID() { |
| String id = ""; |
| Collection<ClusterNode> nodes = igniteInstance.cluster().forLocal().nodes(); |
| for (ClusterNode node : nodes) { |
| if (node.isLocal()) |
| id = String.valueOf(node.id()); |
| } |
| return id; |
| } |
| |
| /** |
| * @return Ignite. |
| */ |
| public Ignite igniteInstance() { |
| return igniteInstance; |
| } |
| } |
| } |