| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.util.Collection; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheRebalanceMode; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.WALMode; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.pagemem.FullPageId; |
| import org.apache.ignite.internal.pagemem.PageIdAllocator; |
| import org.apache.ignite.internal.pagemem.PageIdUtils; |
| import org.apache.ignite.internal.pagemem.PageMemory; |
| import org.apache.ignite.internal.pagemem.PageUtils; |
| import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; |
| import org.apache.ignite.internal.pagemem.store.PageStore; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; |
| import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| /** |
| * This test generates WAL & Page Store with N pages, then rewrites pages with zeroes and tries to acquire all pages. |
| */ |
| public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstractTest { |
| /** Total pages. */ |
| private static final int totalPages = 512; |
| |
| /** Cache name. */ |
| private final String cacheName = "cache"; |
| |
| /** Policy name. */ |
| private final String policyName = "dfltDataRegion"; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(gridName); |
| |
| CacheConfiguration ccfg = new CacheConfiguration(cacheName); |
| ccfg.setAffinity(new RendezvousAffinityFunction(true, 1)); |
| |
| if (MvccFeatureChecker.forcedMvcc()) |
| ccfg.setRebalanceDelay(Long.MAX_VALUE); |
| else |
| ccfg.setRebalanceMode(CacheRebalanceMode.NONE); |
| |
| ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); |
| |
| cfg.setCacheConfiguration(ccfg); |
| |
| DataStorageConfiguration memCfg = new DataStorageConfiguration() |
| .setDefaultDataRegionConfiguration( |
| new DataRegionConfiguration() |
| .setMaxSize(1024L * 1024 * 1024) |
| .setPersistenceEnabled(true) |
| .setName(policyName)) |
| .setWalMode(WALMode.LOG_ONLY) |
| .setCheckpointFrequency(500) |
| .setAlwaysWriteFullPages(true); |
| |
| cfg.setDataStorageConfiguration(memCfg); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| stopAllGrids(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| @Test |
| public void testPageRecoveryAfterFileCorruption() throws Exception { |
| IgniteEx ig = startGrid(0); |
| |
| ig.cluster().active(true); |
| |
| IgniteCache<Integer, Integer> cache = ig.cache(cacheName); |
| |
| // Put for create data store and init meta page. |
| cache.put(1, 1); |
| |
| GridCacheSharedContext sharedCtx = ig.context().cache().context(); |
| |
| GridCacheDatabaseSharedManager psMgr = (GridCacheDatabaseSharedManager)sharedCtx.database(); |
| |
| FilePageStoreManager pageStore = (FilePageStoreManager)sharedCtx.pageStore(); |
| |
| U.sleep(1_000); |
| |
| // Disable integrated checkpoint thread. |
| psMgr.enableCheckpoints(false).get(); |
| |
| PageMemory mem = sharedCtx.database().dataRegion(policyName).pageMemory(); |
| |
| DummyPageIO pageIO = new DummyPageIO(); |
| |
| int cacheId = sharedCtx.cache().cache(cacheName).context().cacheId(); |
| |
| int pagesCnt = getTotalPagesToTest(); |
| |
| FullPageId[] pages = new FullPageId[pagesCnt]; |
| |
| // Get lock to prevent assertion. A new page should be allocated under checkpoint lock. |
| psMgr.checkpointReadLock(); |
| |
| try { |
| for (int i = 0; i < pagesCnt; i++) { |
| pages[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); |
| |
| initPage(mem, pageIO, pages[i]); |
| } |
| |
| generateWal( |
| (PageMemoryImpl)mem, |
| sharedCtx.pageStore(), |
| sharedCtx.wal(), |
| cacheId, |
| pages |
| ); |
| } |
| finally { |
| psMgr.checkpointReadUnlock(); |
| } |
| |
| eraseDataFromDisk(pageStore, cacheId, pages[0]); |
| |
| stopAllGrids(); |
| |
| ig = startGrid(0); |
| |
| ig.cluster().active(true); |
| |
| checkRestore(ig, pages); |
| } |
| |
| /** |
| * @return count of pages to test. Note complexity of test is N^2. |
| */ |
| protected int getTotalPagesToTest() { |
| return totalPages; |
| } |
| |
| /** |
| * Initializes page. |
| * @param mem page memory implementation. |
| * @param pageIO page io implementation. |
| * @param fullId full page id. |
| * @throws IgniteCheckedException if error occurs. |
| */ |
| private void initPage(PageMemory mem, PageIO pageIO, FullPageId fullId) throws IgniteCheckedException { |
| long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); |
| |
| try { |
| final long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page); |
| |
| try { |
| pageIO.initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId())); |
| } |
| finally { |
| mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true); |
| } |
| } |
| finally { |
| mem.releasePage(fullId.groupId(), fullId.pageId(), page); |
| } |
| } |
| |
| /** |
| * @param pageStore Page store. |
| * @param cacheId Cache id. |
| * @param page Page. |
| */ |
| private void eraseDataFromDisk( |
| FilePageStoreManager pageStore, |
| int cacheId, |
| FullPageId page |
| ) throws IgniteCheckedException, IOException { |
| PageStore store = pageStore.getStore( |
| cacheId, |
| PageIdUtils.partId(page.pageId()) |
| ); |
| |
| FilePageStore filePageStore = (FilePageStore)store; |
| |
| FileIO fileIO = U.field(filePageStore, "fileIO"); |
| |
| long size = fileIO.size(); |
| |
| fileIO.writeFully(ByteBuffer.allocate((int)size - filePageStore.headerSize()), filePageStore.headerSize()); |
| |
| fileIO.force(); |
| } |
| |
| /** |
| * @param ig Ig. |
| * @param pages Pages. |
| */ |
| private void checkRestore(IgniteEx ig, FullPageId[] pages) throws IgniteCheckedException { |
| GridCacheSharedContext<Object, Object> shared = ig.context().cache().context(); |
| |
| GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)shared.database(); |
| |
| dbMgr.enableCheckpoints(false).get(); |
| |
| PageMemory mem = shared.database().dataRegion(null).pageMemory(); |
| |
| dbMgr.checkpointReadLock(); |
| |
| try { |
| for (FullPageId fullId : pages) { |
| long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); |
| |
| try { |
| long pageAddr = mem.readLock(fullId.groupId(), fullId.pageId(), page); |
| |
| for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(fullId.groupId()); j += 4) |
| assertEquals(j + (int)fullId.pageId(), PageUtils.getInt(pageAddr, j)); |
| |
| mem.readUnlock(fullId.groupId(), fullId.pageId(), page); |
| } |
| finally { |
| mem.releasePage(fullId.groupId(), fullId.pageId(), page); |
| } |
| } |
| } |
| finally { |
| dbMgr.checkpointReadUnlock(); |
| } |
| } |
| |
| /** |
| * @param mem Mem. |
| * @param storeMgr Store manager. |
| * @param wal Wal. |
| * @param cacheId Cache id. |
| * @param pages Pages. |
| */ |
| private void generateWal( |
| final PageMemoryImpl mem, |
| final IgnitePageStoreManager storeMgr, |
| final IgniteWriteAheadLogManager wal, |
| final int cacheId, FullPageId[] pages |
| ) throws Exception { |
| // Mark the start position. |
| CheckpointRecord cpRec = new CheckpointRecord(null); |
| |
| WALPointer start = wal.log(cpRec); |
| |
| wal.flush(start, false); |
| |
| for (FullPageId fullId : pages) { |
| long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); |
| |
| try { |
| long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page); |
| |
| PageIO.setPageId(pageAddr, fullId.pageId()); |
| |
| try { |
| for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(fullId.groupId()); j += 4) |
| PageUtils.putInt(pageAddr, j, j + (int)fullId.pageId()); |
| } |
| finally { |
| mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true); |
| } |
| } |
| finally { |
| mem.releasePage(fullId.groupId(), fullId.pageId(), page); |
| } |
| } |
| |
| Collection<FullPageId> pageIds = mem.beginCheckpoint(new GridFinishedFuture()); |
| |
| info("Acquired pages for checkpoint: " + pageIds.size()); |
| |
| try { |
| long begin = System.currentTimeMillis(); |
| |
| long cp = 0; |
| |
| AtomicLong write = new AtomicLong(); |
| |
| PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> { |
| int groupId = fullPageId.groupId(); |
| long pageId = fullPageId.pageId(); |
| |
| for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(groupId); j += 4) |
| assertEquals(j + (int)pageId, buf.getInt(j)); |
| |
| buf.rewind(); |
| |
| long writeStart = System.nanoTime(); |
| |
| storeMgr.write(cacheId, pageId, buf, tag, true); |
| |
| long writeEnd = System.nanoTime(); |
| |
| write.getAndAdd(writeEnd - writeStart); |
| }; |
| |
| ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize()); |
| |
| tmpBuf.order(ByteOrder.nativeOrder()); |
| |
| for (FullPageId fullId : pages) { |
| if (pageIds.contains(fullId)) { |
| long cpStart = System.nanoTime(); |
| |
| mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null); |
| |
| long cpEnd = System.nanoTime(); |
| |
| cp += cpEnd - cpStart; |
| } |
| } |
| |
| long syncStart = System.currentTimeMillis(); |
| |
| storeMgr.sync(cacheId, 0); |
| |
| long end = System.currentTimeMillis(); |
| |
| info("Written pages in " + (end - begin) + "ms, copy took " + (cp / 1_000_000) + "ms, " + |
| "write took " + (write.get() / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms"); |
| } |
| finally { |
| info("Finishing checkpoint..."); |
| |
| mem.finishCheckpoint(); |
| |
| info("Finished checkpoint"); |
| } |
| |
| wal.flush(wal.log(new CheckpointRecord(null)), false); |
| |
| for (FullPageId fullId : pages) { |
| long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); |
| |
| try { |
| assertFalse("Page has a temp heap copy after the last checkpoint: [cacheId=" + |
| fullId.groupId() + ", pageId=" + fullId.pageId() + "]", mem.hasTempCopy(page)); |
| |
| assertFalse("Page is dirty after the last checkpoint: [cacheId=" + |
| fullId.groupId() + ", pageId=" + fullId.pageId() + "]", mem.isDirty(fullId.groupId(), fullId.pageId(), page)); |
| } |
| finally { |
| mem.releasePage(fullId.groupId(), fullId.pageId(), page); |
| } |
| } |
| } |
| } |