blob: 4a8c7b15a29725d0685c75e9dc3a5fe9a305ca63 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit test for delayed page replacement mode.
*/
public class IgnitePageMemReplaceDelayedWriteUnitTest {
/** Per test timeout */
@Rule
public Timeout globalTimeout = new Timeout((int) GridTestUtils.DFLT_TEST_TIMEOUT);
/** CPU count. */
private static final int CPUS = 32;
/** 1 megabyte in bytes */
private static final long MB = 1024L * 1024;
/** Logger. */
private IgniteLogger log = new NullLogger();
/** Page memory, published here for backward call from replacement write callback. */
private PageMemoryImpl pageMemory;
/**
* Test delayed eviction causes locking in page reads
* @throws IgniteCheckedException if failed.
*/
@Test
public void testReplacementWithDelayCausesLockForRead() throws IgniteCheckedException {
IgniteConfiguration cfg = getConfiguration(16 * MB);
AtomicInteger totalEvicted = new AtomicInteger();
PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
log.info("Evicting " + fullPageId);
assert getLockedPages(fullPageId).contains(fullPageId);
assert !getSegment(fullPageId).writeLock().isHeldByCurrentThread();
totalEvicted.incrementAndGet();
};
int pageSize = 4096;
PageMemoryImpl memory = createPageMemory(cfg, pageWriter, pageSize);
this.pageMemory = memory;
long pagesTotal = cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().getMaxSize() / pageSize;
long markDirty = pagesTotal * 2 / 3;
for (int i = 0; i < markDirty; i++) {
long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
long ptr = memory.acquirePage(1, pageId);
memory.releasePage(1, pageId, ptr);
}
GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint();
int cpPages = ids.size();
log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages");
for (int i = 0; i < cpPages; i++) {
long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
long ptr = memory.acquirePage(1, pageId);
memory.releasePage(1, pageId, ptr);
}
List<Collection<FullPageId>> stripes = getAllLockedPages();
assert !stripes.isEmpty();
for (Collection<FullPageId> pageIds : stripes) {
assert pageIds.isEmpty();
}
assert totalEvicted.get() > 0;
memory.stop(true);
}
/**
* Test delayed eviction causes locking in page reads
* @throws IgniteCheckedException if failed.
*/
@Test
public void testBackwardCompatibilityMode() throws IgniteCheckedException {
IgniteConfiguration cfg = getConfiguration(16 * MB);
AtomicInteger totalEvicted = new AtomicInteger();
PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
log.info("Evicting " + fullPageId);
assert getSegment(fullPageId).writeLock().isHeldByCurrentThread();
totalEvicted.incrementAndGet();
};
System.setProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE, "false");
int pageSize = 4096;
PageMemoryImpl memory;
try {
memory = createPageMemory(cfg, pageWriter, pageSize);
}
finally {
System.clearProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE);
}
this.pageMemory = memory;
long pagesTotal = cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().getMaxSize() / pageSize;
long markDirty = pagesTotal * 2 / 3;
for (int i = 0; i < markDirty; i++) {
long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
long ptr = memory.acquirePage(1, pageId);
memory.releasePage(1, pageId, ptr);
}
GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint();
int cpPages = ids.size();
log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages");
for (int i = 0; i < cpPages; i++) {
long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
long ptr = memory.acquirePage(1, pageId);
memory.releasePage(1, pageId, ptr);
}
assert totalEvicted.get() > 0;
memory.stop(true);
}
/**
* @param fullPageId page ID to determine segment for
* @return segment related
*/
@SuppressWarnings("TypeMayBeWeakened") private ReentrantReadWriteLock getSegment(FullPageId fullPageId) {
ReentrantReadWriteLock[] segments = U.field(pageMemory, "segments");
int idx = PageMemoryImpl.segmentIndex(fullPageId.groupId(), fullPageId.pageId(),
segments.length);
return segments[idx];
}
/**
* @param cfg configuration
* @param pageWriter writer for page replacement.
* @param pageSize page size
* @return implementation for test
*/
@NotNull
private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, PageStoreWriter pageWriter, int pageSize) {
IgniteCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class);
when(db.checkpointLockIsHeldByThread()).thenReturn(true);
GridCacheSharedContext sctx = Mockito.mock(GridCacheSharedContext.class);
when(sctx.gridConfig()).thenReturn(cfg);
when(sctx.pageStore()).thenReturn(new NoOpPageStoreManager());
when(sctx.wal()).thenReturn(new NoOpWALManager());
when(sctx.database()).thenReturn(db);
when(sctx.logger(any(Class.class))).thenReturn(log);
GridKernalContext kernalCtx = mock(GridKernalContext.class);
when(kernalCtx.config()).thenReturn(cfg);
when(kernalCtx.log(any(Class.class))).thenReturn(log);
when(kernalCtx.internalSubscriptionProcessor()).thenAnswer(new Answer<Object>() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
return new GridInternalSubscriptionProcessor(kernalCtx);
}
});
when(kernalCtx.encryption()).thenAnswer(new Answer<Object>() {
@Override public Object answer(InvocationOnMock mock) throws Throwable {
return new GridEncryptionManager(kernalCtx);
}
});
when(sctx.kernalContext()).thenReturn(kernalCtx);
DataRegionConfiguration regCfg = cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration();
DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(regCfg);
long[] sizes = prepareSegmentSizes(regCfg.getMaxSize());
DirectMemoryProvider provider = new UnsafeMemoryProvider(log);
PageMemoryImpl memory = new PageMemoryImpl(provider, sizes, sctx, pageSize,
pageWriter, null, () -> true, memMetrics, PageMemoryImpl.ThrottlingPolicy.DISABLED,
mock(CheckpointWriteProgressSupplier.class));
memory.start();
return memory;
}
/**
* @param overallSize default region size in bytes
* @return configuration for test.
*/
@NotNull private IgniteConfiguration getConfiguration(long overallSize) {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setEncryptionSpi(new NoopEncryptionSpi());
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setMaxSize(overallSize)));
return cfg;
}
/**
* @return collection with pages locked
* @param fullPageId page id to check lock.
*/
private Collection<FullPageId> getLockedPages(FullPageId fullPageId) {
Object tracker = delayedReplacementTracker();
Object[] stripes = U.field(tracker, "stripes");
int idx = PageMemoryImpl.segmentIndex(fullPageId.groupId(), fullPageId.pageId(),
stripes.length);
return U.field(stripes[idx], "locked");
}
/**
* @return delayed write tracked from page memory.
*/
@NotNull private Object delayedReplacementTracker() {
Object tracker = U.field(pageMemory, "delayedPageReplacementTracker");
if (tracker == null)
throw new IllegalStateException("Delayed replacement is not configured");
return tracker;
}
/**
* @return all locked pages stripes underlying collectinos
*/
private List<Collection<FullPageId>> getAllLockedPages() {
Object tracker = delayedReplacementTracker();
Object[] stripes = U.field(tracker, "stripes");
Stream<Collection<FullPageId>> locked = Arrays.asList(stripes).stream().map(stripe ->
(Collection<FullPageId>)U.field(stripe, "locked"));
return locked.collect(Collectors.toList());
}
/**
* @param overallSize all regions size
* @return segments size, cp pool is latest array element.
*/
private long[] prepareSegmentSizes(long overallSize) {
int segments = CPUS;
long[] sizes = new long[segments + 1];
for (int i = 0; i < sizes.length; i++)
sizes[i] = overallSize / segments;
sizes[segments] = overallSize / 100;
return sizes;
}
}