| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.database; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicLongArray; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| import java.util.function.Predicate; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; |
| import org.apache.ignite.internal.pagemem.FullPageId; |
| import org.apache.ignite.internal.pagemem.PageIdAllocator; |
| import org.apache.ignite.internal.pagemem.PageMemory; |
| import org.apache.ignite.internal.pagemem.PageUtils; |
| import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.DataStructure; |
| import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManager; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener; |
| import org.apache.ignite.internal.processors.failure.FailureProcessor; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.GridRandom; |
| import org.apache.ignite.internal.util.GridStripedLock; |
| import org.apache.ignite.internal.util.IgniteTree; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.lang.GridCursor; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.SB; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.GridTestKernalContext; |
| import org.apache.ignite.testframework.junits.WithSystemProperty; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.jetbrains.annotations.Nullable; |
| import org.jsr166.ConcurrentLinkedHashMap; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_PAGE_LOCK_TRACKER_CHECK_INTERVAL; |
| import static org.apache.ignite.internal.pagemem.PageIdUtils.effectivePageId; |
| import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.rnd; |
| import static org.apache.ignite.internal.processors.database.BPlusTreeSelfTest.TestTree.threadId; |
| import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP; |
| import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT; |
| import static org.apache.ignite.internal.util.IgniteTree.OperationType.REMOVE; |
| |
| /** |
| */ |
| @WithSystemProperty(key = IGNITE_PAGE_LOCK_TRACKER_CHECK_INTERVAL, value = "20000") |
| public class BPlusTreeSelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static final short LONG_INNER_IO = 30000; |
| |
| /** */ |
| private static final short LONG_LEAF_IO = 30001; |
| |
| /** */ |
| protected static final int PAGE_SIZE = 512; |
| |
| /** */ |
| protected static final long MB = 1024 * 1024; |
| |
| /** */ |
| protected static final int CPUS = Runtime.getRuntime().availableProcessors(); |
| |
| /** */ |
| private static final int CACHE_ID = 100500; |
| |
| /** */ |
| protected static int MAX_PER_PAGE = 0; |
| |
| /** */ |
| protected static int CNT = 10; |
| |
| /** */ |
| private static int PUT_INC = 1; |
| |
| /** */ |
| private static int RMV_INC = 1; |
| |
| /** Forces printing lock/unlock events on the test tree */ |
| private static boolean PRINT_LOCKS = false; |
| |
| /** */ |
| protected PageMemory pageMem; |
| |
| /** */ |
| private ReuseList reuseList; |
| |
| /** */ |
| private static final Collection<Long> rmvdIds = new GridConcurrentHashSet<>(); |
| |
| /** Stop. */ |
| private final AtomicBoolean stop = new AtomicBoolean(); |
| |
| /** Future. */ |
| private volatile GridCompoundFuture<?, ?> asyncRunFut; |
| |
| /** Tracking of locks holding. */ |
| private PageLockTrackerManager lockTrackerManager; |
| |
| /** |
| * Check that we do not keep any locks at the moment. |
| */ |
| protected void assertNoLocks() { |
| assertTrue(TestPageLockListener.checkNoLocks()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| stop.set(false); |
| |
| long seed = System.nanoTime(); |
| |
| X.println("Test seed: " + seed + "L; // "); |
| |
| rnd = new Random(seed); |
| |
| pageMem = createPageMemory(); |
| |
| reuseList = createReuseList(CACHE_ID, pageMem, 0, true); |
| |
| lockTrackerManager = new PageLockTrackerManager(log, "testTreeManager") { |
| @Override public PageLockListener createPageLockTracker(String name) { |
| return new TestPageLockListener(super.createPageLockTracker(name)); |
| } |
| }; |
| |
| lockTrackerManager.start(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return 10 * 60 * 1000L; |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| * @param pageMem Page memory. |
| * @param rootId Root page ID. |
| * @param initNew Init new flag. |
| * @return Reuse list. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected ReuseList createReuseList(int cacheId, PageMemory pageMem, long rootId, boolean initNew) |
| throws IgniteCheckedException { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| rnd = null; |
| |
| try { |
| if (asyncRunFut != null && !asyncRunFut.isDone()) { |
| stop.set(true); |
| |
| try { |
| asyncRunFut.cancel(); |
| asyncRunFut.get(60000); |
| } |
| catch (Throwable ex) { |
| //Ignore |
| } |
| } |
| |
| if (reuseList != null) { |
| long size = reuseList.recycledPagesCount(); |
| |
| assertTrue("Reuse size: " + size, size < 7000); |
| } |
| |
| for (int i = 0; i < 10; i++) { |
| if (acquiredPages() != 0) { |
| System.out.println("!!!"); |
| U.sleep(10); |
| } |
| } |
| |
| assertEquals(0, acquiredPages()); |
| } |
| finally { |
| if (pageMem != null) |
| pageMem.stop(true); |
| |
| if (lockTrackerManager != null) |
| lockTrackerManager.stop(); |
| |
| MAX_PER_PAGE = 0; |
| PUT_INC = 1; |
| RMV_INC = -1; |
| CNT = 10; |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testFind() throws IgniteCheckedException { |
| TestTree tree = createTestTree(true); |
| TreeMap<Long, Long> map = new TreeMap<>(); |
| |
| long size = CNT * CNT; |
| |
| for (long i = 1; i <= size; i++) { |
| tree.put(i); |
| map.put(i, i); |
| } |
| |
| checkCursor(tree.find(null, null), map.values().iterator()); |
| checkCursor(tree.find(10L, 70L), map.subMap(10L, true, 70L, true).values().iterator()); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testRetries() throws IgniteCheckedException { |
| TestTree tree = createTestTree(true); |
| |
| tree.numRetries = 1; |
| |
| long size = CNT * CNT; |
| |
| try { |
| for (long i = 1; i <= size; i++) |
| tree.put(i); |
| |
| fail(); |
| } |
| catch (IgniteCheckedException ignored) { |
| } |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| @Test |
| public void testIsEmpty() throws Exception { |
| TestTree tree = createTestTree(true); |
| |
| assertTrue(tree.isEmpty()); |
| |
| for (long i = 1; i <= 500; i++) { |
| tree.put(i); |
| |
| assertFalse(tree.isEmpty()); |
| } |
| |
| for (long i = 1; i <= 500; i++) { |
| assertFalse(tree.isEmpty()); |
| |
| tree.remove(i); |
| } |
| |
| assertTrue(tree.isEmpty()); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testFindWithClosure() throws IgniteCheckedException { |
| TestTree tree = createTestTree(true); |
| TreeMap<Long, Long> map = new TreeMap<>(); |
| |
| long size = CNT * CNT; |
| |
| for (long i = 1; i <= size; i++) { |
| tree.put(i); |
| map.put(i, i); |
| } |
| |
| checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.<Long>emptySet()), null), |
| Collections.<Long>emptyList().iterator()); |
| |
| checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(map.keySet()), null), |
| map.values().iterator()); |
| |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| for (int i = 0; i < 100; i++) { |
| Long val = rnd.nextLong(size) + 1; |
| |
| checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.singleton(val)), null), |
| Collections.singleton(val).iterator()); |
| } |
| |
| for (int i = 0; i < 200; i++) { |
| long vals = rnd.nextLong(size) + 1; |
| |
| TreeSet<Long> exp = new TreeSet<>(); |
| |
| for (long k = 0; k < vals; k++) |
| exp.add(rnd.nextLong(size) + 1); |
| |
| checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); |
| |
| checkCursor(tree.find(0L, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); |
| |
| checkCursor(tree.find(0L, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); |
| |
| checkCursor(tree.find(null, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); |
| } |
| } |
| |
| /** |
| * @param cursor cursor to check. |
| * @param iterator iterator with expected result. |
| * @throws IgniteCheckedException If failed |
| */ |
| private void checkCursor(GridCursor<Long> cursor, Iterator<Long> iterator) throws IgniteCheckedException { |
| while (cursor.next()) { |
| assertTrue(iterator.hasNext()); |
| |
| assertEquals(iterator.next(), cursor.get()); |
| } |
| |
| assertFalse(iterator.hasNext()); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_mm_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = -1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_mm_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = -1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_pm_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = 1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_pm_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = 1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_pp_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = 1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_pp_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = 1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_mp_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = -1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_1_20_mp_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 20; |
| PUT_INC = -1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(false); |
| } |
| |
| // ------- 2 - 40 |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_mm_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = -1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_mm_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = -1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_pm_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = 1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_pm_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = 1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_pp_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = 1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_pp_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = 1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_mp_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = -1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_2_40_mp_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 2; |
| CNT = 40; |
| PUT_INC = -1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(false); |
| } |
| |
| // ------- 3 - 60 |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_mm_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = -1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_mm_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = -1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_pm_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = 1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_pm_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = 1; |
| RMV_INC = -1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_pp_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = 1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_pp_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = 1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_mp_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = -1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testPutRemove_3_60_mp_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 3; |
| CNT = 60; |
| PUT_INC = -1; |
| RMV_INC = 1; |
| |
| doTestPutRemove(false); |
| } |
| |
| /** |
| * @param canGetRow Can get row from inner page. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void doTestPutRemove(boolean canGetRow) throws IgniteCheckedException { |
| TestTree tree = createTestTree(canGetRow); |
| |
| long cnt = CNT; |
| |
| for (long x = PUT_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += PUT_INC) { |
| assertNull(tree.findOne(x)); |
| |
| tree.put(x); |
| |
| assertNoLocks(); |
| |
| assertEquals(x, tree.findOne(x).longValue()); |
| checkIterate(tree, x, x, x, true); |
| |
| assertNoLocks(); |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| X.println(tree.printTree()); |
| |
| assertNoLocks(); |
| |
| assertNull(tree.findOne(-1L)); |
| |
| for (long x = 0; x < cnt; x++) { |
| assertEquals(x, tree.findOne(x).longValue()); |
| checkIterate(tree, x, x, x, true); |
| } |
| |
| assertNoLocks(); |
| |
| assertNull(tree.findOne(cnt)); |
| checkIterate(tree, cnt, cnt, null, false); |
| |
| for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) { |
| X.println(" -- " + x); |
| |
| assertEquals(Long.valueOf(x), tree.remove(x)); |
| |
| assertNoLocks(); |
| |
| X.println(tree.printTree()); |
| |
| assertNoLocks(); |
| |
| assertNull(tree.findOne(x)); |
| checkIterate(tree, x, x, null, false); |
| |
| assertNoLocks(); |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| assertFalse(tree.find(null, null).next()); |
| assertEquals(0, tree.size()); |
| assertEquals(0, tree.rootLevel()); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * @param tree Tree. |
| * @param lower Lower bound. |
| * @param upper Upper bound. |
| * @param exp Value to find. |
| * @param expFound {@code True} if value should be found. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound) |
| throws IgniteCheckedException { |
| TestTreeRowClosure c = new TestTreeRowClosure(exp); |
| |
| tree.iterate(lower, upper, c); |
| |
| assertEquals(expFound, c.found); |
| } |
| |
| /** |
| * @param tree Tree. |
| * @param lower Lower bound. |
| * @param upper Upper bound. |
| * @param c Closure. |
| * @param expFound {@code True} if value should be found. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound) |
| throws IgniteCheckedException { |
| c.found = false; |
| |
| tree.iterate(lower, upper, c); |
| |
| assertEquals(expFound, c.found); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testRandomInvoke_1_30_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 30; |
| |
| doTestRandomInvoke(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testRandomInvoke_1_30_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 30; |
| |
| doTestRandomInvoke(false); |
| } |
| |
| /** |
| * @param canGetRow Can get row from inner page. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void doTestRandomInvoke(boolean canGetRow) throws IgniteCheckedException { |
| TestTree tree = createTestTree(canGetRow); |
| |
| Map<Long, Long> map = new HashMap<>(); |
| |
| int loops = reuseList == null ? 20_000 : 60_000; |
| |
| for (int i = 0; i < loops; i++) { |
| final Long x = (long)BPlusTree.randomInt(CNT); |
| final int rnd = BPlusTree.randomInt(11); |
| |
| if (i % 10_000 == 0) { |
| // X.println(tree.printTree()); |
| X.println(" --> " + i + " ++> " + x); |
| } |
| |
| // Update map. |
| if (!map.containsKey(x)) { |
| if (rnd % 2 == 0) { |
| map.put(x, x); |
| |
| // X.println("put0: " + x); |
| } |
| else { |
| // X.println("noop0: " + x); |
| } |
| } |
| else { |
| if (rnd % 2 == 0) { |
| // X.println("put1: " + x); |
| } |
| else if (rnd % 3 == 0) { |
| map.remove(x); |
| |
| // X.println("rmv1: " + x); |
| } |
| else { |
| // X.println("noop1: " + x); |
| } |
| } |
| |
| // Consistently update tree. |
| tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() { |
| |
| IgniteTree.OperationType op; |
| |
| Long newRow; |
| |
| @Override public void call(@Nullable Long row) throws IgniteCheckedException { |
| if (row == null) { |
| if (rnd % 2 == 0) { |
| op = PUT; |
| newRow = x; |
| } |
| else { |
| op = NOOP; |
| newRow = null; |
| } |
| } |
| else { |
| assertEquals(x, row); |
| |
| if (rnd % 2 == 0) { |
| op = PUT; |
| newRow = x; // We can not replace x with y here, because keys must be equal. |
| } |
| else if (rnd % 3 == 0) { |
| op = REMOVE; |
| newRow = null; |
| } |
| else { |
| op = NOOP; |
| newRow = null; |
| } |
| } |
| } |
| |
| @Override public Long newRow() { |
| return newRow; |
| } |
| |
| @Override public IgniteTree.OperationType operationType() { |
| return op; |
| } |
| }); |
| |
| assertNoLocks(); |
| |
| // X.println(tree.printTree()); |
| |
| tree.validateTree(); |
| |
| if (i % 100 == 0) |
| assertEqualContents(tree, map); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testRandomPutRemove_1_30_0() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 30; |
| |
| doTestRandomPutRemove(false); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testRandomPutRemove_1_30_1() throws IgniteCheckedException { |
| MAX_PER_PAGE = 1; |
| CNT = 30; |
| |
| doTestRandomPutRemove(true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassiveRemove3_false() throws Exception { |
| MAX_PER_PAGE = 3; |
| |
| doTestMassiveRemove(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassiveRemove3_true() throws Exception { |
| MAX_PER_PAGE = 3; |
| |
| doTestMassiveRemove(true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassiveRemove2_false() throws Exception { |
| MAX_PER_PAGE = 2; |
| |
| doTestMassiveRemove(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassiveRemove2_true() throws Exception { |
| MAX_PER_PAGE = 2; |
| |
| doTestMassiveRemove(true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassiveRemove1_false() throws Exception { |
| MAX_PER_PAGE = 1; |
| |
| doTestMassiveRemove(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassiveRemove1_true() throws Exception { |
| MAX_PER_PAGE = 1; |
| |
| doTestMassiveRemove(true); |
| } |
| |
| /** |
| * @param canGetRow Can get row in inner page. |
| * @throws Exception If failed. |
| */ |
| private void doTestMassiveRemove(final boolean canGetRow) throws Exception { |
| final int threads = 64; |
| final int keys = 3000; |
| |
| final AtomicLongArray rmvd = new AtomicLongArray(keys); |
| |
| final TestTree tree = createTestTree(canGetRow); |
| |
| // Put keys in reverse order to have a better balance in the tree (lower height). |
| for (long i = keys - 1; i >= 0; i--) { |
| tree.put(i); |
| // X.println(tree.printTree()); |
| } |
| |
| assertEquals(keys, tree.size()); |
| |
| tree.validateTree(); |
| |
| info("Remove..."); |
| |
| try { |
| GridTestUtils.runMultiThreaded(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| Random rnd = new GridRandom(); |
| |
| for (;;) { |
| int idx = 0; |
| boolean found = false; |
| |
| for (int i = 0, shift = rnd.nextInt(keys); i < keys; i++) { |
| idx = (i + shift) % keys; |
| |
| if (rmvd.get(idx) == 0 && rmvd.compareAndSet(idx, 0, 1)) { |
| found = true; |
| |
| break; |
| } |
| } |
| |
| if (!found) |
| break; |
| |
| assertEquals(Long.valueOf(idx), tree.remove((long)idx)); |
| |
| if (canGetRow) |
| rmvdIds.add((long)idx); |
| } |
| |
| return null; |
| } |
| }, threads, "remove"); |
| |
| assertEquals(0, tree.size()); |
| |
| tree.validateTree(); |
| } |
| finally { |
| rmvdIds.clear(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassivePut1_true() throws Exception { |
| MAX_PER_PAGE = 1; |
| |
| doTestMassivePut(true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassivePut1_false() throws Exception { |
| MAX_PER_PAGE = 1; |
| |
| doTestMassivePut(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassivePut2_true() throws Exception { |
| MAX_PER_PAGE = 2; |
| |
| doTestMassivePut(true); |
| } |
| |
| /** */ |
| @Test |
| public void testMassivePut2_false() throws Exception { |
| MAX_PER_PAGE = 2; |
| |
| doTestMassivePut(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMassivePut3_true() throws Exception { |
| MAX_PER_PAGE = 3; |
| |
| doTestMassivePut(true); |
| } |
| |
| /** */ |
| @Test |
| public void testMassivePut3_false() throws Exception { |
| MAX_PER_PAGE = 3; |
| |
| doTestMassivePut(false); |
| } |
| |
| /** |
| * @param canGetRow Can get row in inner page. |
| * @throws Exception If failed. |
| */ |
| private void doTestMassivePut(final boolean canGetRow) throws Exception { |
| final int threads = 16; |
| final int keys = 26; // We may fail to insert more on small pages size because of tree height. |
| |
| final TestTree tree = createTestTree(canGetRow); |
| |
| info("Put..."); |
| |
| final AtomicLongArray k = new AtomicLongArray(keys); |
| |
| GridTestUtils.runMultiThreaded(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| Random rnd = new GridRandom(); |
| |
| for (;;) { |
| int idx = 0; |
| boolean found = false; |
| |
| for (int i = 0, shift = rnd.nextInt(keys); i < keys; i++) { |
| idx = (i + shift) % keys; |
| |
| if (k.get(idx) == 0 && k.compareAndSet(idx, 0, 1)) { |
| found = true; |
| |
| break; |
| } |
| } |
| |
| if (!found) |
| break; |
| |
| assertNull(tree.put((long)idx)); |
| |
| assertNoLocks(); |
| } |
| |
| return null; |
| } |
| }, threads, "put"); |
| |
| assertEquals(keys, tree.size()); |
| |
| tree.validateTree(); |
| |
| GridCursor<Long> c = tree.find(null, null); |
| |
| long x = 0; |
| |
| while (c.next()) |
| assertEquals(Long.valueOf(x++), c.get()); |
| |
| assertEquals(keys, x); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * @param canGetRow Can get row from inner page. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void doTestRandomPutRemove(boolean canGetRow) throws IgniteCheckedException { |
| TestTree tree = createTestTree(canGetRow); |
| |
| Map<Long, Long> map = new HashMap<>(); |
| |
| int loops = reuseList == null ? 100_000 : 300_000; |
| |
| for (int i = 0; i < loops; i++) { |
| Long x = (long)BPlusTree.randomInt(CNT); |
| |
| boolean put = BPlusTree.randomInt(2) == 0; |
| |
| if (i % 10_000 == 0) { |
| // X.println(tree.printTree()); |
| X.println(" --> " + (put ? "put " : "rmv ") + i + " " + x); |
| } |
| |
| if (put) |
| assertEquals(map.put(x, x), tree.put(x)); |
| else { |
| if (map.remove(x) != null) |
| assertEquals(x, tree.remove(x)); |
| |
| assertNull(tree.remove(x)); |
| } |
| |
| assertNoLocks(); |
| |
| // X.println(tree.printTree()); |
| tree.validateTree(); |
| |
| if (i % 100 == 0) |
| assertEqualContents(tree, map); |
| } |
| } |
| |
| /** |
| * @param tree Tree. |
| * @param map Map. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void assertEqualContents(IgniteTree<Long, Long> tree, Map<Long, Long> map) throws IgniteCheckedException { |
| GridCursor<Long> cursor = tree.find(null, null); |
| |
| while (cursor.next()) { |
| Long x = cursor.get(); |
| |
| assert x != null; |
| |
| assertEquals(map.get(x), x); |
| |
| assertNoLocks(); |
| } |
| |
| assertEquals(map.size(), tree.size()); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testEmptyCursors() throws IgniteCheckedException { |
| MAX_PER_PAGE = 5; |
| |
| TestTree tree = createTestTree(true); |
| |
| assertFalse(tree.find(null, null).next()); |
| assertFalse(tree.find(0L, 1L).next()); |
| |
| tree.put(1L); |
| tree.put(2L); |
| tree.put(3L); |
| |
| assertEquals(3, size(tree.find(null, null))); |
| |
| assertFalse(tree.find(4L, null).next()); |
| assertFalse(tree.find(null, 0L).next()); |
| |
| assertNoLocks(); |
| } |
| |
| /** */ |
| private void doTestCursor(boolean canGetRow) throws IgniteCheckedException { |
| TestTree tree = createTestTree(canGetRow); |
| |
| for (long i = 15; i >= 0; i--) |
| tree.put(i); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testCursorConcurrentMerge() throws IgniteCheckedException { |
| MAX_PER_PAGE = 5; |
| |
| // X.println(" " + pageMem.pageSize()); |
| |
| TestTree tree = createTestTree(true); |
| |
| TreeMap<Long, Long> map = new TreeMap<>(); |
| |
| for (int i = 0; i < 20_000 + rnd.nextInt(2 * MAX_PER_PAGE); i++) { |
| Long row = (long)rnd.nextInt(40_000); |
| |
| // X.println(" <-- " + row); |
| |
| assertEquals(map.put(row, row), tree.put(row)); |
| assertEquals(row, tree.findOne(row)); |
| |
| assertNoLocks(); |
| } |
| |
| final int off = rnd.nextInt(5 * MAX_PER_PAGE); |
| |
| Long upperBound = 30_000L + rnd.nextInt(2 * MAX_PER_PAGE); |
| |
| GridCursor<Long> c = tree.find(null, upperBound); |
| Iterator<Long> i = map.headMap(upperBound, true).keySet().iterator(); |
| |
| Long last = null; |
| |
| for (int j = 0; j < off; j++) { |
| assertTrue(c.next()); |
| |
| // X.println(" <-> " + c.get()); |
| |
| assertEquals(i.next(), c.get()); |
| |
| last = c.get(); |
| |
| assertNoLocks(); |
| } |
| |
| if (last != null) { |
| // X.println(" >-< " + last + " " + upperBound); |
| |
| c = tree.find(last, upperBound); |
| |
| assertTrue(c.next()); |
| assertEquals(last, c.get()); |
| |
| assertNoLocks(); |
| } |
| |
| while (c.next()) { |
| // X.println(" --> " + c.get()); |
| |
| assertNotNull(c.get()); |
| assertEquals(i.next(), c.get()); |
| assertEquals(c.get(), tree.remove(c.get())); |
| |
| i.remove(); |
| |
| assertNoLocks(); |
| } |
| |
| assertEquals(map.size(), size(tree.find(null, null))); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * Verifies that {@link BPlusTree#size} and {@link BPlusTree#size} methods behave correctly |
| * on single-threaded addition and removal of elements in random order. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testSizeForPutRmvSequential() throws IgniteCheckedException { |
| MAX_PER_PAGE = 5; |
| |
| boolean DEBUG_PRINT = false; |
| |
| int itemCnt = (int) Math.pow(MAX_PER_PAGE, 5) + rnd.nextInt(MAX_PER_PAGE * MAX_PER_PAGE); |
| |
| Long[] items = new Long[itemCnt]; |
| for (int i = 0; i < itemCnt; ++i) |
| items[i] = (long) i; |
| |
| TestTree testTree = createTestTree(true); |
| TreeMap<Long, Long> goldenMap = new TreeMap<>(); |
| |
| assertEquals(0, testTree.size()); |
| assertEquals(0, goldenMap.size()); |
| |
| final Predicate<Long> rowMatcher = new Predicate<Long>() { |
| @Override public boolean test(Long row) { |
| return row % 7 == 0; |
| } |
| }; |
| |
| final BPlusTree.TreeRowClosure<Long, Long> rowClosure = new BPlusTree.TreeRowClosure<Long, Long>() { |
| @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) |
| throws IgniteCheckedException { |
| return rowMatcher.test(io.getLookupRow(tree, pageAddr, idx)); |
| } |
| }; |
| |
| int correctMatchingRows = 0; |
| |
| Collections.shuffle(Arrays.asList(items), rnd); |
| |
| for (Long row : items) { |
| if (DEBUG_PRINT) { |
| X.println(" --> put(" + row + ")"); |
| X.print(testTree.printTree()); |
| } |
| |
| assertEquals(goldenMap.put(row, row), testTree.put(row)); |
| assertEquals(row, testTree.findOne(row)); |
| |
| if (rowMatcher.test(row)) |
| ++correctMatchingRows; |
| |
| assertEquals(correctMatchingRows, testTree.size(rowClosure)); |
| |
| long correctSize = goldenMap.size(); |
| |
| assertEquals(correctSize, testTree.size()); |
| assertEquals(correctSize, size(testTree.find(null, null))); |
| |
| assertNoLocks(); |
| } |
| |
| Collections.shuffle(Arrays.asList(items), rnd); |
| |
| for (Long row : items) { |
| if (DEBUG_PRINT) { |
| X.println(" --> rmv(" + row + ")"); |
| X.print(testTree.printTree()); |
| } |
| |
| assertEquals(row, goldenMap.remove(row)); |
| assertEquals(row, testTree.remove(row)); |
| assertNull(testTree.findOne(row)); |
| |
| if (rowMatcher.test(row)) |
| --correctMatchingRows; |
| |
| assertEquals(correctMatchingRows, testTree.size(rowClosure)); |
| |
| long correctSize = goldenMap.size(); |
| |
| assertEquals(correctSize, testTree.size()); |
| assertEquals(correctSize, size(testTree.find(null, null))); |
| |
| assertNoLocks(); |
| } |
| } |
| |
| /** |
| * Verifies that {@link BPlusTree#size()} method behaves correctly when run concurrently with |
| * {@link BPlusTree#put}, {@link BPlusTree#remove} methods. Please see details in |
| * {@link #doTestSizeForRandomPutRmvMultithreaded}. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testSizeForRandomPutRmvMultithreaded_5_4() throws Exception { |
| MAX_PER_PAGE = 5; |
| CNT = 10_000; |
| |
| doTestSizeForRandomPutRmvMultithreaded(4); |
| } |
| |
| /** */ |
| @Test |
| public void testSizeForRandomPutRmvMultithreaded_3_256() throws Exception { |
| MAX_PER_PAGE = 3; |
| CNT = 10_000; |
| |
| doTestSizeForRandomPutRmvMultithreaded(256); |
| } |
| |
| /** |
| * Verifies that {@link BPlusTree#size()} method behaves correctly when run between series of |
| * concurrent {@link BPlusTree#put}, {@link BPlusTree#remove} methods. |
| * |
| * @param rmvPutSlidingWindowSize Sliding window size (distance between items being deleted and added). |
| * @throws Exception If failed. |
| */ |
| private void doTestSizeForRandomPutRmvMultithreaded(final int rmvPutSlidingWindowSize) throws Exception { |
| final TestTree tree = createTestTree(false); |
| |
| final boolean DEBUG_PRINT = false; |
| |
| final AtomicLong curRmvKey = new AtomicLong(0); |
| final AtomicLong curPutKey = new AtomicLong(rmvPutSlidingWindowSize); |
| |
| for (long i = curRmvKey.get(); i < curPutKey.get(); ++i) |
| assertNull(tree.put(i)); |
| |
| final int putRmvThreadCnt = Math.min(Runtime.getRuntime().availableProcessors(), rmvPutSlidingWindowSize); |
| |
| final int loopCnt = CNT / putRmvThreadCnt; |
| |
| final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt); |
| final CyclicBarrier sizeOpBarrier = new CyclicBarrier(putRmvThreadCnt); |
| |
| IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| |
| for (int i = 0; i < loopCnt && !stop.get(); ++i) { |
| putRmvOpBarrier.await(); |
| |
| Long putVal = curPutKey.getAndIncrement(); |
| |
| if (DEBUG_PRINT || (i & 0x7ff) == 0) |
| X.println(" --> put(" + putVal + ")"); |
| |
| assertNull(tree.put(putVal)); |
| |
| assertNoLocks(); |
| |
| Long rmvVal = curRmvKey.getAndIncrement(); |
| |
| if (DEBUG_PRINT || (i & 0x7ff) == 0) |
| X.println(" --> rmv(" + rmvVal + ")"); |
| |
| assertEquals(rmvVal, tree.remove(rmvVal)); |
| assertNull(tree.remove(rmvVal)); |
| |
| assertNoLocks(); |
| |
| if (stop.get()) |
| break; |
| |
| sizeOpBarrier.await(); |
| |
| long correctSize = curPutKey.get() - curRmvKey.get(); |
| |
| if (DEBUG_PRINT || (i & 0x7ff) == 0) |
| X.println("====> correctSize=" + correctSize); |
| |
| assertEquals(correctSize, size(tree.find(null, null))); |
| assertEquals(correctSize, tree.size()); |
| } |
| |
| return null; |
| } |
| }, putRmvThreadCnt, "put-remove-size"); |
| |
| IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!stop.get()) { |
| Thread.sleep(5000); |
| |
| X.println(TestTree.printLocks()); |
| } |
| |
| return null; |
| } |
| }, 1, "printLocks"); |
| |
| asyncRunFut = new GridCompoundFuture<>(); |
| |
| asyncRunFut.add((IgniteInternalFuture) putRmvFut); |
| asyncRunFut.add((IgniteInternalFuture) lockPrintingFut); |
| |
| asyncRunFut.markInitialized(); |
| |
| try { |
| putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); |
| } |
| finally { |
| stop.set(true); |
| putRmvOpBarrier.reset(); |
| sizeOpBarrier.reset(); |
| |
| asyncRunFut.get(); |
| } |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence |
| * and {@link BPlusTree#size} methods results in correct calculation of tree size. |
| * |
| * @see #doTestSizeForRandomPutRmvMultithreadedAsync doTestSizeForRandomPutRmvMultithreadedAsync() for details. |
| */ |
| @Test |
| public void testSizeForRandomPutRmvMultithreadedAsync_16() throws Exception { |
| doTestSizeForRandomPutRmvMultithreadedAsync(16); |
| } |
| |
| /** |
| * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence |
| * and {@link BPlusTree#size} methods results in correct calculation of tree size. |
| * |
| * @see #doTestSizeForRandomPutRmvMultithreadedAsync doTestSizeForRandomPutRmvMultithreadedAsync() for details. |
| */ |
| @Test |
| public void testSizeForRandomPutRmvMultithreadedAsync_3() throws Exception { |
| doTestSizeForRandomPutRmvMultithreadedAsync(3); |
| } |
| |
| /** |
| * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence |
| * and {@link BPlusTree#size} methods results in correct calculation of tree size. |
| * |
| * Since in the presence of concurrent modifications the size may differ from the actual one, the test maintains |
| * sliding window of records in the tree, uses a barrier between concurrent runs to limit runaway delta in |
| * the calculated size, and checks that the measured size lies within certain bounds. |
| * |
| * NB: This test has to be changed with the integration of IGNITE-3478. |
| * |
| */ |
| public void doTestSizeForRandomPutRmvMultithreadedAsync(final int rmvPutSlidingWindowSize) throws Exception { |
| MAX_PER_PAGE = 5; |
| |
| final boolean DEBUG_PRINT = false; |
| |
| final TestTree tree = createTestTree(false); |
| |
| final AtomicLong curRmvKey = new AtomicLong(0); |
| final AtomicLong curPutKey = new AtomicLong(rmvPutSlidingWindowSize); |
| |
| for (long i = curRmvKey.get(); i < curPutKey.get(); ++i) |
| assertNull(tree.put(i)); |
| |
| final int putRmvThreadCnt = Math.min(Runtime.getRuntime().availableProcessors(), rmvPutSlidingWindowSize); |
| final int sizeThreadCnt = putRmvThreadCnt; |
| |
| final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt + sizeThreadCnt, new Runnable() { |
| @Override public void run() { |
| if (DEBUG_PRINT) { |
| try { |
| X.println("===BARRIER=== size=" + tree.size() |
| + "; contents=[" + tree.findFirst() + ".." + tree.findLast() + "]" |
| + "; rmvVal=" + curRmvKey.get() + "; putVal=" + curPutKey.get()); |
| |
| X.println(tree.printTree()); |
| } |
| catch (IgniteCheckedException e) { |
| // ignore |
| } |
| } |
| } |
| }); |
| |
| final int loopCnt = 500; |
| |
| IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| for (int i = 0; i < loopCnt && !stop.get(); ++i) { |
| int order; |
| try { |
| order = putRmvOpBarrier.await(); |
| } catch (BrokenBarrierException e) { |
| break; |
| } |
| |
| Long putVal = curPutKey.getAndIncrement(); |
| |
| if (DEBUG_PRINT || (i & 0x3ff) == 0) |
| X.println(order + ": --> put(" + putVal + ")"); |
| |
| assertNull(tree.put(putVal)); |
| |
| Long rmvVal = curRmvKey.getAndIncrement(); |
| |
| if (DEBUG_PRINT || (i & 0x3ff) == 0) |
| X.println(order + ": --> rmv(" + rmvVal + ")"); |
| |
| assertEquals(rmvVal, tree.remove(rmvVal)); |
| assertNull(tree.findOne(rmvVal)); |
| } |
| |
| return null; |
| } |
| }, putRmvThreadCnt, "put-remove"); |
| |
| IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| |
| final List<Long> treeContents = new ArrayList<>(rmvPutSlidingWindowSize * 2); |
| |
| final BPlusTree.TreeRowClosure<Long, Long> rowDumper = new BPlusTree.TreeRowClosure<Long, Long>() { |
| @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) |
| throws IgniteCheckedException { |
| |
| treeContents.add(io.getLookupRow(tree, pageAddr, idx)); |
| return true; |
| } |
| }; |
| |
| for (long iter = 0; !stop.get(); ++iter) { |
| int order = 0; |
| |
| try { |
| order = putRmvOpBarrier.await(); |
| } catch (BrokenBarrierException e) { |
| break; |
| } |
| |
| long correctSize = curPutKey.get() - curRmvKey.get(); |
| |
| treeContents.clear(); |
| long treeSize = tree.size(rowDumper); |
| |
| long minBound = correctSize - putRmvThreadCnt; |
| long maxBound = correctSize + putRmvThreadCnt; |
| |
| if (DEBUG_PRINT || (iter & 0x3ff) == 0) |
| X.println(order + ": size=" + treeSize + "; bounds=[" + minBound + ".." + maxBound |
| + "]; contents=" + treeContents); |
| |
| if (treeSize < minBound || treeSize > maxBound) { |
| fail("Tree size is not in bounds [" + minBound + ".." + maxBound + "]: " + treeSize |
| + "; Tree contents: " + treeContents); |
| } |
| } |
| |
| return null; |
| } |
| }, sizeThreadCnt, "size"); |
| |
| IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!stop.get()) { |
| Thread.sleep(5000); |
| |
| X.println(TestTree.printLocks()); |
| } |
| |
| return null; |
| } |
| }, 1, "printLocks"); |
| |
| asyncRunFut = new GridCompoundFuture<>(); |
| |
| asyncRunFut.add((IgniteInternalFuture) putRmvFut); |
| asyncRunFut.add((IgniteInternalFuture) sizeFut); |
| asyncRunFut.add((IgniteInternalFuture) lockPrintingFut); |
| |
| asyncRunFut.markInitialized(); |
| |
| try { |
| putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); |
| } |
| finally { |
| stop.set(true); |
| putRmvOpBarrier.reset(); |
| |
| asyncRunFut.get(); |
| } |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * The test forces {@link BPlusTree#size} method to run into a livelock: during single run |
| * the method is picking up new pages which are concurrently added to the tree until the new pages are not added |
| * anymore. Test verifies that despite livelock condition a size from a valid range is returned. |
| * |
| * NB: This test has to be changed with the integration of IGNITE-3478. |
| * |
| * @throws Exception if test failed |
| */ |
| @Test |
| public void testPutSizeLivelock() throws Exception { |
| MAX_PER_PAGE = 5; |
| CNT = 800; |
| |
| final int SLIDING_WINDOW_SIZE = 16; |
| final boolean DEBUG_PRINT = false; |
| |
| final TestTree tree = createTestTree(false); |
| |
| final AtomicLong curRmvKey = new AtomicLong(0); |
| final AtomicLong curPutKey = new AtomicLong(SLIDING_WINDOW_SIZE); |
| |
| for (long i = curRmvKey.get(); i < curPutKey.get(); ++i) |
| assertNull(tree.put(i)); |
| |
| final int hwThreads = Runtime.getRuntime().availableProcessors(); |
| final int putRmvThreadCnt = Math.max(1, hwThreads / 2); |
| final int sizeThreadCnt = hwThreads - putRmvThreadCnt; |
| |
| final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt, new Runnable() { |
| @Override public void run() { |
| if (DEBUG_PRINT) { |
| try { |
| X.println("===BARRIER=== size=" + tree.size() |
| + " [" + tree.findFirst() + ".." + tree.findLast() + "]"); |
| } |
| catch (IgniteCheckedException e) { |
| // ignore |
| } |
| } |
| } |
| }); |
| |
| final int loopCnt = CNT / hwThreads; |
| |
| IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| for (int i = 0; i < loopCnt && !stop.get(); ++i) { |
| int order; |
| try { |
| order = putRmvOpBarrier.await(); |
| } catch (BrokenBarrierException e) { |
| // barrier reset() has been called: terminate |
| break; |
| } |
| |
| Long putVal = curPutKey.getAndIncrement(); |
| |
| if ((i & 0xff) == 0) |
| X.println(order + ": --> put(" + putVal + ")"); |
| |
| assertNull(tree.put(putVal)); |
| |
| Long rmvVal = curRmvKey.getAndIncrement(); |
| |
| if ((i & 0xff) == 0) |
| X.println(order + ": --> rmv(" + rmvVal + ")"); |
| |
| assertEquals(rmvVal, tree.remove(rmvVal)); |
| assertNull(tree.findOne(rmvVal)); |
| } |
| |
| return null; |
| } |
| }, putRmvThreadCnt, "put-remove"); |
| |
| IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| |
| final List<Long> treeContents = new ArrayList<>(SLIDING_WINDOW_SIZE * 2); |
| |
| final BPlusTree.TreeRowClosure<Long, Long> rowDumper = new BPlusTree.TreeRowClosure<Long, Long>() { |
| @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) |
| throws IgniteCheckedException { |
| |
| treeContents.add(io.getLookupRow(tree, pageAddr, idx)); |
| |
| final long endMs = System.currentTimeMillis() + 10; |
| final long endPutKey = curPutKey.get() + MAX_PER_PAGE; |
| |
| while (System.currentTimeMillis() < endMs && curPutKey.get() < endPutKey) |
| Thread.yield(); |
| |
| return true; |
| } |
| }; |
| |
| while (!stop.get()) { |
| treeContents.clear(); |
| |
| long treeSize = tree.size(rowDumper); |
| long curPutVal = curPutKey.get(); |
| |
| X.println(" ======> size=" + treeSize + "; last-put-value=" + curPutVal); |
| |
| if (treeSize < SLIDING_WINDOW_SIZE || treeSize > curPutVal) |
| fail("Tree size is not in bounds [" + SLIDING_WINDOW_SIZE + ".." + curPutVal + "]:" |
| + treeSize + "; contents=" + treeContents); |
| } |
| |
| return null; |
| } |
| }, sizeThreadCnt, "size"); |
| |
| asyncRunFut = new GridCompoundFuture<>(); |
| |
| asyncRunFut.add((IgniteInternalFuture) putRmvFut); |
| asyncRunFut.add((IgniteInternalFuture) sizeFut); |
| |
| asyncRunFut.markInitialized(); |
| |
| try { |
| putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); |
| } |
| finally { |
| stop.set(true); |
| putRmvOpBarrier.reset(); |
| |
| asyncRunFut.get(); |
| } |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * Verifies that in case for threads concurrently calling put and remove |
| * on a tree with 1-3 pages, the size() method performs correctly. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutRmvSizeSinglePageContention() throws Exception { |
| MAX_PER_PAGE = 10; |
| CNT = 20_000; |
| final boolean DEBUG_PRINT = false; |
| final int SLIDING_WINDOWS_SIZE = MAX_PER_PAGE * 2; |
| |
| final TestTree tree = createTestTree(false); |
| |
| final AtomicLong curPutKey = new AtomicLong(0); |
| final BlockingQueue<Long> rowsToRemove = new ArrayBlockingQueue<>(MAX_PER_PAGE / 2); |
| |
| final int hwThreadCnt = Runtime.getRuntime().availableProcessors(); |
| final int putThreadCnt = Math.max(1, hwThreadCnt / 4); |
| final int rmvThreadCnt = Math.max(1, hwThreadCnt / 2 - putThreadCnt); |
| final int sizeThreadCnt = Math.max(1, hwThreadCnt - putThreadCnt - rmvThreadCnt); |
| |
| final AtomicInteger sizeInvokeCnt = new AtomicInteger(0); |
| |
| final int loopCnt = CNT; |
| |
| IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| int iter = 0; |
| while (!stop.get()) { |
| long size = tree.size(); |
| |
| if (DEBUG_PRINT || (++iter & 0xffff) == 0) |
| X.println(" --> size() = " + size); |
| |
| sizeInvokeCnt.incrementAndGet(); |
| } |
| |
| return null; |
| } |
| }, sizeThreadCnt, "size"); |
| |
| // Let the size threads ignite |
| while (sizeInvokeCnt.get() < sizeThreadCnt * 2) |
| Thread.yield(); |
| |
| IgniteInternalFuture<?> rmvFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| int iter = 0; |
| while (!stop.get()) { |
| Long rmvVal = rowsToRemove.poll(200, TimeUnit.MILLISECONDS); |
| if (rmvVal != null) |
| assertEquals(rmvVal, tree.remove(rmvVal)); |
| |
| if (DEBUG_PRINT || (++iter & 0x3ff) == 0) |
| X.println(" --> rmv(" + rmvVal + ")"); |
| } |
| |
| return null; |
| } |
| }, rmvThreadCnt, "rmv"); |
| |
| IgniteInternalFuture<?> putFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| for (int i = 0; i < loopCnt && !stop.get(); ++i) { |
| Long putVal = curPutKey.getAndIncrement(); |
| assertNull(tree.put(putVal)); |
| |
| while (rowsToRemove.size() > SLIDING_WINDOWS_SIZE && !stop.get()) |
| Thread.yield(); |
| |
| rowsToRemove.put(putVal); |
| |
| if (DEBUG_PRINT || (i & 0x3ff) == 0) |
| X.println(" --> put(" + putVal + ")"); |
| } |
| |
| return null; |
| } |
| }, putThreadCnt, "put"); |
| |
| IgniteInternalFuture<?> treePrintFut = multithreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!stop.get()) { |
| Thread.sleep(1000); |
| |
| X.println(TestTree.printLocks()); |
| X.println(tree.printTree()); |
| } |
| |
| return null; |
| } |
| }, 1, "printTree"); |
| |
| asyncRunFut = new GridCompoundFuture<>(); |
| |
| asyncRunFut.add((IgniteInternalFuture) sizeFut); |
| asyncRunFut.add((IgniteInternalFuture) rmvFut); |
| asyncRunFut.add((IgniteInternalFuture) putFut); |
| asyncRunFut.add((IgniteInternalFuture) treePrintFut); |
| |
| asyncRunFut.markInitialized(); |
| |
| try { |
| putFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); |
| } |
| finally { |
| stop.set(true); |
| |
| asyncRunFut.get(); |
| } |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * The test verifies that {@link BPlusTree#put}, {@link BPlusTree#remove}, {@link BPlusTree#find}, and |
| * {@link BPlusTree#size} run concurrently, perform correctly and report correct values. |
| * |
| * A sliding window of numbers is maintainted in the tests. |
| * |
| * NB: This test has to be changed with the integration of IGNITE-3478. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutRmvFindSizeMultithreaded() throws Exception { |
| MAX_PER_PAGE = 5; |
| CNT = 60_000; |
| |
| final int SLIDING_WINDOW_SIZE = 100; |
| |
| final TestTree tree = createTestTree(false); |
| |
| final AtomicLong curPutKey = new AtomicLong(0); |
| final BlockingQueue<Long> rowsToRemove = new ArrayBlockingQueue<>(SLIDING_WINDOW_SIZE); |
| |
| final int hwThreadCnt = Runtime.getRuntime().availableProcessors(); |
| final int putThreadCnt = Math.max(1, hwThreadCnt / 4); |
| final int rmvThreadCnt = Math.max(1, hwThreadCnt / 4); |
| final int findThreadCnt = Math.max(1, hwThreadCnt / 4); |
| final int sizeThreadCnt = Math.max(1, hwThreadCnt - putThreadCnt - rmvThreadCnt - findThreadCnt); |
| |
| final AtomicInteger sizeInvokeCnt = new AtomicInteger(0); |
| |
| final int loopCnt = CNT; |
| |
| IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| int iter = 0; |
| while (!stop.get()) { |
| long size = tree.size(); |
| |
| if ((++iter & 0x3ff) == 0) |
| X.println(" --> size() = " + size); |
| |
| sizeInvokeCnt.incrementAndGet(); |
| } |
| |
| return null; |
| } |
| }, sizeThreadCnt, "size"); |
| |
| // Let the size threads start |
| while (sizeInvokeCnt.get() < sizeThreadCnt * 2) |
| Thread.yield(); |
| |
| IgniteInternalFuture<?> rmvFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| int iter = 0; |
| while (!stop.get()) { |
| Long rmvVal = rowsToRemove.poll(200, TimeUnit.MILLISECONDS); |
| if (rmvVal != null) |
| assertEquals(rmvVal, tree.remove(rmvVal)); |
| |
| if ((++iter & 0x3ff) == 0) |
| X.println(" --> rmv(" + rmvVal + ")"); |
| } |
| |
| return null; |
| } |
| }, rmvThreadCnt, "rmv"); |
| |
| IgniteInternalFuture<?> findFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| int iter = 0; |
| while (!stop.get()) { |
| Long findVal = curPutKey.get() |
| + SLIDING_WINDOW_SIZE / 2 |
| - rnd.nextInt(SLIDING_WINDOW_SIZE * 2); |
| |
| tree.findOne(findVal); |
| |
| if ((++iter & 0x3ff) == 0) |
| X.println(" --> fnd(" + findVal + ")"); |
| } |
| |
| return null; |
| } |
| }, findThreadCnt, "find"); |
| |
| IgniteInternalFuture<?> putFut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| for (int i = 0; i < loopCnt && !stop.get(); ++i) { |
| Long putVal = curPutKey.getAndIncrement(); |
| assertNull(tree.put(putVal)); |
| |
| while (rowsToRemove.size() > SLIDING_WINDOW_SIZE) { |
| if (stop.get()) |
| return null; |
| |
| Thread.yield(); |
| } |
| |
| rowsToRemove.put(putVal); |
| |
| if ((i & 0x3ff) == 0) |
| X.println(" --> put(" + putVal + ")"); |
| } |
| |
| return null; |
| } |
| }, putThreadCnt, "put"); |
| |
| IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!stop.get()) { |
| Thread.sleep(1000); |
| |
| X.println(TestTree.printLocks()); |
| } |
| |
| return null; |
| } |
| }, 1, "printLocks"); |
| |
| asyncRunFut = new GridCompoundFuture<>(); |
| |
| asyncRunFut.add((IgniteInternalFuture) sizeFut); |
| asyncRunFut.add((IgniteInternalFuture) rmvFut); |
| asyncRunFut.add((IgniteInternalFuture) findFut); |
| asyncRunFut.add((IgniteInternalFuture) putFut); |
| asyncRunFut.add((IgniteInternalFuture) lockPrintingFut); |
| |
| asyncRunFut.markInitialized(); |
| |
| try { |
| putFut.get(getTestTimeout(), TimeUnit.MILLISECONDS); |
| } |
| finally { |
| stop.set(true); |
| |
| asyncRunFut.get(); |
| } |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTestRandomPutRemoveMultithreaded_1_30_0() throws Exception { |
| MAX_PER_PAGE = 1; |
| CNT = 30; |
| |
| doTestRandomPutRemoveMultithreaded(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTestRandomPutRemoveMultithreaded_1_30_1() throws Exception { |
| MAX_PER_PAGE = 1; |
| CNT = 30; |
| |
| doTestRandomPutRemoveMultithreaded(true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTestRandomPutRemoveMultithreaded_2_50_0() throws Exception { |
| MAX_PER_PAGE = 2; |
| CNT = 50; |
| |
| doTestRandomPutRemoveMultithreaded(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTestRandomPutRemoveMultithreaded_2_50_1() throws Exception { |
| MAX_PER_PAGE = 2; |
| CNT = 50; |
| |
| doTestRandomPutRemoveMultithreaded(true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTestRandomPutRemoveMultithreaded_3_70_0() throws Exception { |
| MAX_PER_PAGE = 3; |
| CNT = 70; |
| |
| doTestRandomPutRemoveMultithreaded(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTestRandomPutRemoveMultithreaded_3_70_1() throws Exception { |
| MAX_PER_PAGE = 3; |
| CNT = 70; |
| |
| doTestRandomPutRemoveMultithreaded(true); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Test |
| public void testFindFirstAndLast() throws IgniteCheckedException { |
| MAX_PER_PAGE = 5; |
| |
| TestTree tree = createTestTree(true); |
| |
| Long first = tree.findFirst(); |
| assertNull(first); |
| |
| Long last = tree.findLast(); |
| assertNull(last); |
| |
| for (long idx = 1L; idx <= 10L; ++idx) |
| tree.put(idx); |
| |
| first = tree.findFirst(); |
| assertEquals((Long)1L, first); |
| |
| last = tree.findLast(); |
| assertEquals((Long)10L, last); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIterate() throws Exception { |
| MAX_PER_PAGE = 5; |
| |
| TestTree tree = createTestTree(true); |
| |
| checkIterate(tree, 0L, 100L, null, false); |
| |
| for (long idx = 1L; idx <= 10L; ++idx) |
| tree.put(idx); |
| |
| for (long idx = 1L; idx <= 10L; ++idx) |
| checkIterate(tree, idx, 100L, idx, true); |
| |
| checkIterate(tree, 0L, 100L, 1L, true); |
| |
| for (long idx = 1L; idx <= 10L; ++idx) |
| checkIterate(tree, idx, 100L, 10L, true); |
| |
| checkIterate(tree, 0L, 100L, 100L, false); |
| |
| for (long idx = 1L; idx <= 10L; ++idx) |
| checkIterate(tree, 0L, 100L, idx, true); |
| |
| for (long idx = 0L; idx <= 10L; ++idx) |
| checkIterate(tree, idx, 11L, -1L, false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIterateConcurrentPutRemove() throws Exception { |
| iterateConcurrentPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIterateConcurrentPutRemove_1() throws Exception { |
| MAX_PER_PAGE = 1; |
| |
| iterateConcurrentPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIterateConcurrentPutRemove_2() throws Exception { |
| MAX_PER_PAGE = 2; |
| |
| iterateConcurrentPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testIteratePutRemove_10() throws Exception { |
| MAX_PER_PAGE = 10; |
| |
| iterateConcurrentPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void iterateConcurrentPutRemove() throws Exception { |
| final TestTree tree = createTestTree(true); |
| |
| // Single key per page is a degenerate case: it is very hard to merge pages in a tree because |
| // to merge we need to remove a split key from a parent page and add it to a back page, but this |
| // is impossible if we already have a key in a back page, thus we will have lots of empty routing pages. |
| // This way the tree grows faster than shrinks and gets out of height limit of 26 (for this page size) quickly. |
| // Since the tree height can not be larger than the key count for this case, we can use 26 as a safe number. |
| final int KEYS = MAX_PER_PAGE == 1 ? 26 : GridTestUtils.SF.applyLB(10_000, 2_500); |
| |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| for (int i = 0; i < 10; i++) { |
| for (long idx = 0L; idx < KEYS; ++idx) |
| tree.put(idx); |
| |
| final Long findKey; |
| |
| if (MAX_PER_PAGE > 0) { |
| switch (i) { |
| case 0: |
| findKey = 1L; |
| |
| break; |
| |
| case 1: |
| findKey = (long)MAX_PER_PAGE; |
| |
| break; |
| |
| case 2: |
| findKey = (long)MAX_PER_PAGE - 1; |
| |
| break; |
| |
| case 3: |
| findKey = (long)MAX_PER_PAGE + 1; |
| |
| break; |
| |
| case 4: |
| findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE; |
| |
| break; |
| |
| case 5: |
| findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE - 1; |
| |
| break; |
| |
| case 6: |
| findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE + 1; |
| |
| break; |
| |
| case 7: |
| findKey = (long)KEYS - 1; |
| |
| break; |
| |
| default: |
| findKey = rnd.nextLong(KEYS); |
| } |
| } |
| else |
| findKey = rnd.nextLong(KEYS); |
| |
| info("Iteration [iter=" + i + ", key=" + findKey + ']'); |
| |
| assertEquals(findKey, tree.findOne(findKey)); |
| checkIterate(tree, findKey, findKey, findKey, true); |
| |
| IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| TestTreeRowClosure p = new TestTreeRowClosure(findKey); |
| |
| TestTreeRowClosure falseP = new TestTreeRowClosure(-1L); |
| |
| int cnt = 0; |
| |
| while (!stop.get()) { |
| int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100); |
| |
| checkIterateC(tree, findKey, findKey, p, true); |
| |
| checkIterateC(tree, findKey - shift, findKey, p, true); |
| |
| checkIterateC(tree, findKey - shift, findKey + shift, p, true); |
| |
| checkIterateC(tree, findKey, findKey + shift, p, true); |
| |
| checkIterateC(tree, -100L, KEYS + 100L, falseP, false); |
| |
| cnt++; |
| } |
| |
| info("Done, read count: " + cnt); |
| |
| return null; |
| } |
| }, 10, "find"); |
| |
| asyncRunFut = new GridCompoundFuture<>(); |
| |
| asyncRunFut.add(getFut); |
| |
| asyncRunFut.markInitialized(); |
| |
| try { |
| U.sleep(100); |
| |
| for (int j = 0; j < 20; j++) { |
| for (long idx = 0L; idx < KEYS / 2; ++idx) { |
| long toRmv = rnd.nextLong(KEYS); |
| |
| if (toRmv != findKey) |
| tree.remove(toRmv); |
| } |
| |
| for (long idx = 0L; idx < KEYS / 2; ++idx) { |
| long put = rnd.nextLong(KEYS); |
| |
| tree.put(put); |
| } |
| } |
| } |
| finally { |
| stop.set(true); |
| } |
| |
| asyncRunFut.get(); |
| |
| stop.set(false); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception { |
| //calculate tree size when split happens |
| final TestTree t = createTestTree(true); |
| long i = 0; |
| |
| for (; ; i++) { |
| t.put(i); |
| |
| if (t.rootLevel() > 0) //split happened |
| break; |
| } |
| |
| final long treeStartSize = i; |
| |
| final AtomicReference<Throwable> failed = new AtomicReference<>(); |
| |
| for (int k = 0; k < 100; k++) { |
| final TestTree tree = createTestTree(true); |
| |
| final AtomicBoolean start = new AtomicBoolean(); |
| |
| final AtomicInteger ready = new AtomicInteger(); |
| |
| Thread first = new Thread(new Runnable() { |
| @Override public void run() { |
| ready.incrementAndGet(); |
| |
| while (!start.get()); //waiting without blocking |
| |
| try { |
| tree.remove(treeStartSize / 2L); |
| } |
| catch (Throwable th) { |
| failed.set(th); |
| } |
| } |
| }); |
| |
| Thread second = new Thread(new Runnable() { |
| @Override public void run() { |
| ready.incrementAndGet(); |
| |
| while (!start.get()); //waiting without blocking |
| |
| try { |
| tree.put(treeStartSize + 1); |
| } |
| catch (Throwable th) { |
| failed.set(th); |
| } |
| } |
| }); |
| |
| for (int j = 0; j < treeStartSize; j++) |
| tree.put((long)j); |
| |
| first.start(); |
| second.start(); |
| |
| while (ready.get() != 2); |
| |
| start.set(true); |
| |
| first.join(); |
| second.join(); |
| |
| assertNull(failed.get()); |
| } |
| } |
| |
| /** |
| * @param canGetRow Can get row from inner page. |
| * @throws Exception If failed. |
| */ |
| private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws Exception { |
| final TestTree tree = createTestTree(canGetRow); |
| |
| final Map<Long, Long> map = new ConcurrentHashMap<>(); |
| |
| final int loops = reuseList == null ? 20_000 : 60_000; |
| |
| final GridStripedLock lock = new GridStripedLock(256); |
| |
| final String[] ops = {"put", "rmv", "inv_put", "inv_rmv"}; |
| |
| IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| for (int i = 0; i < loops && !stop.get(); i++) { |
| final Long x = (long)DataStructure.randomInt(CNT); |
| final int op = DataStructure.randomInt(4); |
| |
| if (i % 10000 == 0) |
| X.println(" --> " + ops[op] + "_" + i + " " + x); |
| |
| Lock l = lock.getLock(x.longValue()); |
| |
| l.lock(); |
| |
| try { |
| if (op == 0) { // Put. |
| assertEquals(map.put(x, x), tree.put(x)); |
| |
| assertNoLocks(); |
| } |
| else if (op == 1) { // Remove. |
| if (map.remove(x) != null) { |
| assertEquals(x, tree.remove(x)); |
| |
| assertNoLocks(); |
| } |
| |
| assertNull(tree.remove(x)); |
| |
| assertNoLocks(); |
| } |
| else if (op == 2) { |
| tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() { |
| IgniteTree.OperationType opType; |
| |
| @Override public void call(@Nullable Long row) { |
| opType = PUT; |
| |
| if (row != null) |
| assertEquals(x, row); |
| } |
| |
| @Override public Long newRow() { |
| return x; |
| } |
| |
| @Override public IgniteTree.OperationType operationType() { |
| return opType; |
| } |
| }); |
| |
| map.put(x, x); |
| } |
| else if (op == 3) { |
| tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() { |
| IgniteTree.OperationType opType; |
| |
| @Override public void call(@Nullable Long row) { |
| if (row != null) { |
| assertEquals(x, row); |
| opType = REMOVE; |
| } |
| else |
| opType = NOOP; |
| } |
| |
| @Override public Long newRow() { |
| return null; |
| } |
| |
| @Override public IgniteTree.OperationType operationType() { |
| return opType; |
| } |
| }); |
| |
| map.remove(x); |
| } |
| else |
| fail(); |
| } |
| finally { |
| l.unlock(); |
| } |
| } |
| |
| return null; |
| } |
| }, Runtime.getRuntime().availableProcessors(), "put-remove"); |
| |
| IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!stop.get()) { |
| Thread.sleep(5000); |
| |
| X.println(TestTree.printLocks()); |
| } |
| |
| return null; |
| } |
| }, 1, "printLocks"); |
| |
| IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!stop.get()) { |
| int low = DataStructure.randomInt(CNT); |
| int high = low + DataStructure.randomInt(CNT - low); |
| |
| GridCursor<Long> c = tree.find((long)low, (long)high); |
| |
| Long last = null; |
| |
| while (c.next()) { |
| // Correct bounds. |
| assertTrue(low + " <= " + c.get() + " <= " + high, c.get() >= low); |
| assertTrue(low + " <= " + c.get() + " <= " + high, c.get() <= high); |
| |
| if (last != null) // No duplicates. |
| assertTrue(low + " <= " + last + " < " + c.get() + " <= " + high, c.get() > last); |
| |
| last = c.get(); |
| } |
| |
| TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure(); |
| |
| tree.iterate((long)low, (long)high, cl); |
| |
| last = cl.val; |
| |
| if (last != null) { |
| assertTrue(low + " <= " + last + " <= " + high, last >= low); |
| assertTrue(low + " <= " + last + " <= " + high, last <= high); |
| } |
| } |
| |
| return null; |
| } |
| }, 4, "find"); |
| |
| asyncRunFut = new GridCompoundFuture<>(); |
| |
| asyncRunFut.add((IgniteInternalFuture)fut); |
| asyncRunFut.add((IgniteInternalFuture)fut2); |
| asyncRunFut.add((IgniteInternalFuture)fut3); |
| |
| asyncRunFut.markInitialized(); |
| |
| try { |
| fut.get(getTestTimeout(), TimeUnit.MILLISECONDS); |
| } |
| finally { |
| stop.set(true); |
| |
| asyncRunFut.get(); |
| } |
| |
| GridCursor<Long> cursor = tree.find(null, null); |
| |
| while (cursor.next()) { |
| Long x = cursor.get(); |
| |
| assert x != null; |
| |
| assertEquals(map.get(x), x); |
| } |
| |
| info("size: " + map.size()); |
| |
| assertEquals(map.size(), tree.size()); |
| |
| tree.validateTree(); |
| |
| assertNoLocks(); |
| } |
| |
| /** |
| * @param c Cursor. |
| * @return Number of elements. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private static int size(GridCursor<?> c) throws IgniteCheckedException { |
| int cnt = 0; |
| |
| while (c.next()) |
| cnt++; |
| |
| return cnt; |
| } |
| |
| /** |
| * @param pageId Page ID. |
| * @param pageAddr Page address. |
| */ |
| public static void checkPageId(long pageId, long pageAddr) { |
| long actual = PageIO.getPageId(pageAddr); |
| |
| // Page ID must be 0L for newly allocated page, for reused page effective ID must remain the same. |
| if (actual != 0L && pageId != actual) |
| throw new IllegalStateException("Page ID: " + U.hexLong(actual)); |
| } |
| |
| /** |
| * @param canGetRow Can get row from inner page. |
| * @return Test tree instance. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected TestTree createTestTree(boolean canGetRow) throws IgniteCheckedException { |
| TestTree tree = new TestTree( |
| reuseList, canGetRow, CACHE_ID, pageMem, allocateMetaPage().pageId(), lockTrackerManager); |
| |
| assertEquals(0, tree.size()); |
| assertEquals(0, tree.rootLevel()); |
| |
| return tree; |
| } |
| |
| /** |
| * @return Allocated meta page ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private FullPageId allocateMetaPage() throws IgniteCheckedException { |
| return new FullPageId(pageMem.allocatePage(CACHE_ID, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX), CACHE_ID); |
| } |
| |
| /** |
| * Test tree. |
| */ |
| protected static class TestTree extends BPlusTree<Long, Long> { |
| /** Number of retries. */ |
| private int numRetries = super.getLockRetries(); |
| |
| /** |
| * @param reuseList Reuse list. |
| * @param canGetRow Can get row from inner page. |
| * @param cacheId Cache ID. |
| * @param pageMem Page memory. |
| * @param metaPageId Meta page ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public TestTree( |
| ReuseList reuseList, |
| boolean canGetRow, |
| int cacheId, |
| PageMemory pageMem, |
| long metaPageId, |
| PageLockTrackerManager lockTrackerManager |
| ) throws IgniteCheckedException { |
| super( |
| "test", |
| cacheId, |
| null, |
| pageMem, |
| null, |
| new AtomicLong(), |
| metaPageId, |
| reuseList, |
| new IOVersions<>(new LongInnerIO(canGetRow)), |
| new IOVersions<>(new LongLeafIO()), |
| PageIdAllocator.FLAG_IDX, |
| new FailureProcessor(new GridTestKernalContext(log)) { |
| @Override public boolean process(FailureContext failureCtx) { |
| lockTrackerManager.dumpLocksToLog(); |
| |
| return true; |
| } |
| }, |
| lockTrackerManager |
| ); |
| |
| PageIO.registerTest(latestInnerIO(), latestLeafIO()); |
| |
| initTree(true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected int compare(BPlusIO<Long> io, long pageAddr, int idx, Long n2) |
| throws IgniteCheckedException { |
| Long n1 = io.getLookupRow(this, pageAddr, idx); |
| |
| return Long.compare(n1, n2); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Long getRow(BPlusIO<Long> io, long pageAddr, int idx, Object ignore) |
| throws IgniteCheckedException { |
| assert io.canGetRow() : io; |
| |
| return io.getLookupRow(this, pageAddr, idx); |
| } |
| |
| /** |
| * @return Thread ID. |
| */ |
| static Object threadId() { |
| return Thread.currentThread().getId(); //.getName(); |
| } |
| |
| /** |
| * @param b String builder. |
| * @param locks Locks. |
| * @param beforeLock Before lock. |
| */ |
| private static void printLocks(SB b, ConcurrentMap<Object, Map<Long, Long>> locks, Map<Object, Long> beforeLock) { |
| for (Map.Entry<Object, Map<Long, Long>> entry : locks.entrySet()) { |
| Object thId = entry.getKey(); |
| |
| Long z = beforeLock.get(thId); |
| |
| Set<Map.Entry<Long, Long>> xx = entry.getValue().entrySet(); |
| |
| if (z == null && xx.isEmpty()) |
| continue; |
| |
| b.a(" ## " + thId); |
| |
| if (z != null) |
| b.a(" --> ").appendHex(z).a(" (").appendHex(effectivePageId(z)).a(')'); |
| |
| b.a('\n'); |
| |
| for (Map.Entry<Long, Long> x : xx) |
| b.a(" - ").appendHex(x.getValue()).a(" (").appendHex(x.getKey()).a(")\n"); |
| |
| b.a('\n'); |
| } |
| } |
| |
| /** |
| * @return List of locks as a String. |
| */ |
| static String printLocks() { |
| SB b = new SB(); |
| |
| b.a("\n--------read---------\n"); |
| |
| printLocks(b, TestPageLockListener.readLocks, TestPageLockListener.beforeReadLock); |
| |
| b.a("\n-+------write---------\n"); |
| |
| printLocks(b, TestPageLockListener.writeLocks, TestPageLockListener.beforeWriteLock); |
| |
| return b.toString(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected int getLockRetries() { |
| return numRetries; |
| } |
| } |
| |
| /** |
| * TODO refactor to use integer in inner page |
| * Long inner. |
| */ |
| private static final class LongInnerIO extends BPlusInnerIO<Long> { |
| /** |
| */ |
| protected LongInnerIO(boolean canGetRow) { |
| super(LONG_INNER_IO, 1, canGetRow, 8); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getMaxCount(long buf, int pageSize) { |
| if (MAX_PER_PAGE != 0) |
| return MAX_PER_PAGE; |
| |
| return super.getMaxCount(buf, pageSize); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void store(long dst, int dstIdx, BPlusIO<Long> srcIo, long src, int srcIdx) |
| throws IgniteCheckedException { |
| Long row = srcIo.getLookupRow(null, src, srcIdx); |
| |
| store(dst, dstIdx, row, null, false); |
| } |
| |
| /** |
| * @param row Row. |
| */ |
| private void checkNotRemoved(Long row) { |
| if (rmvdIds.contains(row)) |
| fail("Removed row: " + row); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void storeByOffset(long pageAddr, int off, Long row) { |
| checkNotRemoved(row); |
| |
| PageUtils.putLong(pageAddr, off, row); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Long getLookupRow(BPlusTree<Long, ?> tree, long pageAddr, int idx) { |
| Long row = PageUtils.getLong(pageAddr, offset(idx)); |
| |
| checkNotRemoved(row); |
| |
| return row; |
| } |
| } |
| |
| /** |
| * @return Page memory. |
| */ |
| protected PageMemory createPageMemory() throws Exception { |
| DataRegionConfiguration plcCfg = new DataRegionConfiguration() |
| .setInitialSize(1024 * MB) |
| .setMaxSize(1024 * MB); |
| |
| PageMemory pageMem = new PageMemoryNoStoreImpl(log, |
| new UnsafeMemoryProvider(log), |
| null, |
| PAGE_SIZE, |
| plcCfg, |
| new DataRegionMetricsImpl(plcCfg, new GridTestKernalContext(log())), |
| true); |
| |
| pageMem.start(); |
| |
| return pageMem; |
| } |
| |
| /** |
| * @return Number of acquired pages. |
| */ |
| protected long acquiredPages() { |
| return ((PageMemoryNoStoreImpl)pageMem).acquiredPages(); |
| } |
| |
| /** |
| * Long leaf. |
| */ |
| private static final class LongLeafIO extends BPlusLeafIO<Long> { |
| /** |
| */ |
| LongLeafIO() { |
| super(LONG_LEAF_IO, 1, 8); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getMaxCount(long pageAddr, int pageSize) { |
| if (MAX_PER_PAGE != 0) |
| return MAX_PER_PAGE; |
| |
| return super.getMaxCount(pageAddr, pageSize); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void storeByOffset(long pageAddr, int off, Long row) { |
| PageUtils.putLong(pageAddr, off, row); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void store(long dst, int dstIdx, BPlusIO<Long> srcIo, long src, int srcIdx) { |
| assert srcIo == this; |
| |
| PageUtils.putLong(dst, offset(dstIdx), PageUtils.getLong(src, offset(srcIdx))); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Long getLookupRow(BPlusTree<Long, ?> tree, long pageAddr, int idx) { |
| return PageUtils.getLong(pageAddr, offset(idx)); |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class TestTreeRowClosure implements BPlusTree.TreeRowClosure<Long, Long> { |
| /** */ |
| private final Long expVal; |
| |
| /** */ |
| private boolean found; |
| |
| /** |
| * @param expVal Value to find or {@code null} to find first. |
| */ |
| TestTreeRowClosure(Long expVal) { |
| this.expVal = expVal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) |
| throws IgniteCheckedException { |
| assert !found; |
| |
| found = expVal == null || io.getLookupRow(tree, pageAddr, idx).equals(expVal); |
| |
| return !found; |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class TestTreeFindFirstClosure implements BPlusTree.TreeRowClosure<Long, Long> { |
| /** */ |
| private Long val; |
| |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) |
| throws IgniteCheckedException { |
| assert val == null; |
| |
| val = io.getLookupRow(tree, pageAddr, idx); |
| |
| return false; |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class TestTreeFindFilteredClosure implements BPlusTree.TreeRowClosure<Long, Long> { |
| /** */ |
| private final Set<Long> vals; |
| |
| /** |
| * @param vals Values to allow in filter. |
| */ |
| TestTreeFindFilteredClosure(Set<Long> vals) { |
| this.vals = vals; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) |
| throws IgniteCheckedException { |
| Long val = io.getLookupRow(tree, pageAddr, idx); |
| |
| return vals.contains(val); |
| } |
| } |
| |
| /** */ |
| private static class TestPageLockListener implements PageLockListener { |
| /** */ |
| static ConcurrentMap<Object, Long> beforeReadLock = new ConcurrentHashMap<>(); |
| |
| /** */ |
| static ConcurrentMap<Object, Long> beforeWriteLock = new ConcurrentHashMap<>(); |
| |
| /** */ |
| static ConcurrentMap<Object, Map<Long, Long>> readLocks = new ConcurrentHashMap<>(); |
| |
| /** */ |
| static ConcurrentMap<Object, Map<Long, Long>> writeLocks = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final PageLockListener delegate; |
| |
| /** |
| * @param delegate Real implementation of page lock listener. |
| */ |
| private TestPageLockListener(PageLockListener delegate) { |
| this.delegate = delegate; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onBeforeReadLock(int cacheId, long pageId, long page) { |
| delegate.onBeforeReadLock(cacheId, pageId, page); |
| |
| if (PRINT_LOCKS) |
| X.println(" onBeforeReadLock: " + U.hexLong(pageId)); |
| |
| assertNull(beforeReadLock.put(threadId(), pageId)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onReadLock(int cacheId, long pageId, long page, long pageAddr) { |
| delegate.onReadLock(cacheId, pageId, page, pageAddr); |
| |
| if (PRINT_LOCKS) |
| X.println(" onReadLock: " + U.hexLong(pageId)); |
| |
| if (pageAddr != 0L) { |
| long actual = PageIO.getPageId(pageAddr); |
| |
| checkPageId(pageId, pageAddr); |
| |
| assertNull(locks(true).put(pageId, actual)); |
| } |
| |
| assertEquals(Long.valueOf(pageId), beforeReadLock.remove(threadId())); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onReadUnlock(int cacheId, long pageId, long page, long pageAddr) { |
| delegate.onReadUnlock(cacheId, pageId, page, pageAddr); |
| |
| if (PRINT_LOCKS) |
| X.println(" onReadUnlock: " + U.hexLong(pageId)); |
| |
| checkPageId(pageId, pageAddr); |
| |
| long actual = PageIO.getPageId(pageAddr); |
| |
| assertEquals(Long.valueOf(actual), locks(true).remove(pageId)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onBeforeWriteLock(int cacheId, long pageId, long page) { |
| delegate.onBeforeWriteLock(cacheId, pageId, page); |
| |
| if (PRINT_LOCKS) |
| X.println(" onBeforeWriteLock: " + U.hexLong(pageId)); |
| |
| assertNull(beforeWriteLock.put(threadId(), pageId)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onWriteLock(int cacheId, long pageId, long page, long pageAddr) { |
| delegate.onWriteLock(cacheId, pageId, page, pageAddr); |
| |
| if (PRINT_LOCKS) |
| X.println(" onWriteLock: " + U.hexLong(pageId)); |
| |
| if (pageAddr != 0L) { |
| checkPageId(pageId, pageAddr); |
| |
| long actual = PageIO.getPageId(pageAddr); |
| |
| if (actual == 0L) |
| actual = pageId; // It is a newly allocated page. |
| |
| assertNull(locks(false).put(pageId, actual)); |
| } |
| |
| assertEquals(Long.valueOf(pageId), beforeWriteLock.remove(threadId())); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onWriteUnlock(int cacheId, long pageId, long page, long pageAddr) { |
| delegate.onWriteUnlock(cacheId, pageId, page, pageAddr); |
| |
| if (PRINT_LOCKS) |
| X.println(" onWriteUnlock: " + U.hexLong(pageId)); |
| |
| assertEquals(effectivePageId(pageId), effectivePageId(PageIO.getPageId(pageAddr))); |
| |
| assertEquals(Long.valueOf(pageId), locks(false).remove(pageId)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() { |
| delegate.close(); |
| } |
| |
| /** |
| * @param read Read or write locks. |
| * @return Locks map. |
| */ |
| private static Map<Long, Long> locks(boolean read) { |
| ConcurrentMap<Object, Map<Long, Long>> m = read ? readLocks : writeLocks; |
| |
| Object thId = threadId(); |
| |
| Map<Long, Long> locks = m.get(thId); |
| |
| if (locks == null) { |
| locks = new ConcurrentLinkedHashMap<>(); |
| |
| if (m.putIfAbsent(thId, locks) != null) |
| locks = m.get(thId); |
| } |
| |
| return locks; |
| } |
| |
| /** |
| * @return {@code true} If current thread does not keep any locks. |
| */ |
| static boolean checkNoLocks() { |
| return locks(true).isEmpty() && locks(false).isEmpty(); |
| } |
| } |
| } |