| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence.wal.memtracker; |
| |
| import java.nio.ByteBuffer; |
| import java.util.BitSet; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.mem.DirectMemoryProvider; |
| import org.apache.ignite.internal.mem.DirectMemoryRegion; |
| import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; |
| import org.apache.ignite.internal.pagemem.FullPageId; |
| import org.apache.ignite.internal.pagemem.PageIdUtils; |
| import org.apache.ignite.internal.pagemem.PageMemory; |
| import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; |
| import org.apache.ignite.internal.processors.cache.CacheGroupContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheProcessor; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; |
| import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; |
| import org.apache.ignite.internal.processors.cache.persistence.DataRegion; |
| import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; |
| import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; |
| import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO; |
| import org.apache.ignite.internal.util.GridUnsafe; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.plugin.IgnitePlugin; |
| import org.apache.ignite.plugin.PluginContext; |
| import org.apache.ignite.spi.encryption.EncryptionSpi; |
| import org.mockito.Mockito; |
| |
| import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_CACHE_ID_DATA_REF_MVCC_LEAF; |
| import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA_REF_MVCC_LEAF; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| /** |
| * Page memory tracker. |
| * |
| * Replicates Ignite's page memory changes to own managed memory region by intercepting WAL records and |
| * applying page snapshots and deltas. |
| */ |
| public class PageMemoryTracker implements IgnitePlugin { |
| /** */ |
| private final long DUMP_DIFF_BYTES_LIMIT = 65536L; |
| |
| /** Plugin context. */ |
| private final PluginContext ctx; |
| |
| /** Config. */ |
| private final PageMemoryTrackerConfiguration cfg; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Grid context. */ |
| private final GridKernalContext gridCtx; |
| |
| /** Page allocator mutex. */ |
| private final Object pageAllocatorMux = new Object(); |
| |
| /** Pages. */ |
| private final Map<FullPageId, DirectMemoryPage> pages = new ConcurrentHashMap<>(); |
| |
| /** Dumped diff bytes. */ |
| private volatile long dumpedDiffBytes = 0L; |
| |
| /** Page slots. */ |
| private volatile DirectMemoryPageSlot[] pageSlots; |
| |
| /** Free page slots. */ |
| private final BitSet freeSlots = new BitSet(); |
| |
| /** Last allocated page index. */ |
| private volatile int lastPageIdx; |
| |
| /** Free page slots count. */ |
| private volatile int freeSlotsCnt; |
| |
| /** Page size. */ |
| private volatile int pageSize; |
| |
| /** Page memory mock. */ |
| private volatile PageMemory pageMemoryMock; |
| |
| /** Memory provider. */ |
| private volatile DirectMemoryProvider memoryProvider; |
| |
| /** Memory region. */ |
| private volatile DirectMemoryRegion memoryRegion; |
| |
| /** Memory region lock, to prevent race between memory region deallocation and delta records applying. */ |
| private final ReadWriteLock memoryRegionLock = new ReentrantReadWriteLock(); |
| |
| /** Max pages. */ |
| private volatile int maxPages; |
| |
| /** Tracking started. */ |
| private volatile boolean started; |
| |
| /** Tracker was started with empty PDS. */ |
| private volatile boolean emptyPds; |
| |
| /** Statistics. */ |
| private final ConcurrentMap<WALRecord.RecordType, AtomicInteger> stats = new ConcurrentHashMap<>(); |
| |
| /** Checkpoint listener. */ |
| private CheckpointListener checkpointLsnr; |
| |
| /** Temporary byte buffer, used to compact local pages. */ |
| private volatile ByteBuffer tmpBuf1; |
| |
| /** Temporary byte buffer, used to compact remote pages. */ |
| private volatile ByteBuffer tmpBuf2; |
| |
| /** |
| * @param ctx Plugin context. |
| * @param cfg Configuration. |
| */ |
| PageMemoryTracker(PluginContext ctx, PageMemoryTrackerConfiguration cfg) { |
| this.ctx = ctx; |
| this.cfg = cfg; |
| log = ctx.log(getClass()); |
| gridCtx = ((IgniteEx)ctx.grid()).context(); |
| } |
| |
| /** |
| * Creates WAL manager. |
| */ |
| IgniteWriteAheadLogManager createWalManager() { |
| if (isEnabled()) { |
| return new FileWriteAheadLogManager(gridCtx) { |
| @Override public WALPointer log(WALRecord record) throws IgniteCheckedException { |
| WALPointer res = super.log(record); |
| |
| applyWalRecord(record); |
| |
| return res; |
| } |
| |
| @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException { |
| super.resumeLogging(lastPtr); |
| |
| if (lastPtr == null) |
| emptyPds = true; |
| } |
| }; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Creates page store manager. |
| */ |
| IgnitePageStoreManager createPageStoreManager() { |
| if (isEnabled()) { |
| return new FilePageStoreManager(gridCtx) { |
| @Override public void shutdownForCacheGroup(CacheGroupContext grp, |
| boolean destroy) throws IgniteCheckedException { |
| super.shutdownForCacheGroup(grp, destroy); |
| |
| cleanupPages(fullPageId -> fullPageId.groupId() == grp.groupId()); |
| } |
| |
| @Override public void truncate(int grpId, int partId, int tag) throws IgniteCheckedException { |
| super.truncate(grpId, partId, tag); |
| |
| cleanupPages(fullPageId -> fullPageId.groupId() == grpId |
| && PageIdUtils.partId(fullPageId.pageId()) == partId); |
| } |
| }; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Start tracking pages. |
| */ |
| synchronized void start() { |
| if (!isEnabled() || started) |
| return; |
| |
| pageSize = ctx.igniteConfiguration().getDataStorageConfiguration().getPageSize(); |
| |
| pageMemoryMock = mockPageMemory(); |
| |
| GridCacheSharedContext sharedCtx = gridCtx.cache().context(); |
| |
| // Initialize one memory region for all data regions of target ignite node. |
| long maxMemorySize = 0; |
| |
| for (DataRegion dataRegion : sharedCtx.database().dataRegions()) { |
| if (dataRegion.pageMemory() instanceof PageMemoryImpl) |
| maxMemorySize += dataRegion.config().getMaxSize(); |
| } |
| |
| long[] chunks = new long[] {maxMemorySize}; |
| |
| memoryProvider = new UnsafeMemoryProvider(log); |
| |
| memoryProvider.initialize(chunks); |
| |
| memoryRegion = memoryProvider.nextRegion(); |
| |
| GridUnsafe.setMemory(memoryRegion.address(), memoryRegion.size(), (byte)0); |
| |
| maxPages = (int)(maxMemorySize / pageSize); |
| |
| pageSlots = new DirectMemoryPageSlot[maxPages]; |
| |
| freeSlotsCnt = maxPages; |
| |
| tmpBuf1 = ByteBuffer.allocateDirect(pageSize); |
| tmpBuf2 = ByteBuffer.allocateDirect(pageSize); |
| |
| if (cfg.isCheckPagesOnCheckpoint()) { |
| checkpointLsnr = new CheckpointListener() { |
| @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException { |
| if (!checkPages(false, true)) |
| throw new IgniteCheckedException("Page memory is inconsistent after applying WAL delta records."); |
| } |
| |
| @Override public void beforeCheckpointBegin(Context ctx) { |
| /* No-op. */ |
| } |
| |
| @Override public void onCheckpointBegin(Context ctx) { |
| /* No-op. */ |
| } |
| }; |
| |
| ((GridCacheDatabaseSharedManager)gridCtx.cache().context().database()).addCheckpointListener(checkpointLsnr); |
| } |
| |
| lastPageIdx = 0; |
| |
| started = true; |
| |
| log.info("PageMemory tracker started, " + U.readableSize(maxMemorySize, false) + " offheap memory allocated."); |
| } |
| |
| /** |
| * Creates a mock for the Page Memory. |
| */ |
| private PageMemory mockPageMemory() { |
| PageMemory mock = mock(PageMemory.class); |
| |
| when(mock.pageSize()).thenReturn(pageSize); |
| |
| when(mock.realPageSize(Mockito.anyInt())).then(invocation -> { |
| int grpId = (Integer)invocation.getArguments()[0]; |
| |
| if (gridCtx.encryption().getActiveKey(grpId) == null) |
| return pageSize; |
| |
| EncryptionSpi encSpi = ctx.igniteConfiguration().getEncryptionSpi(); |
| |
| return pageSize |
| - (encSpi.encryptedSizeNoPadding(pageSize) - pageSize) |
| - encSpi.blockSize() /* For CRC. */; |
| }); |
| |
| when(mock.pageBuffer(Mockito.anyLong())).then(invocation -> { |
| long pageAddr = (Long)invocation.getArguments()[0]; |
| |
| return GridUnsafe.wrapPointer(pageAddr, pageSize); |
| }); |
| |
| when(mock.metrics()).thenReturn(new DataRegionMetricsImpl(new DataRegionConfiguration(), gridCtx)); |
| |
| return mock; |
| } |
| |
| /** |
| * Stop tracking, release resources. |
| */ |
| synchronized void stop() { |
| if (!started) |
| return; |
| |
| started = false; |
| |
| pages.clear(); |
| |
| pageSlots = null; |
| |
| freeSlots.clear(); |
| |
| stats.clear(); |
| |
| memoryRegionLock.writeLock().lock(); |
| |
| try { |
| memoryProvider.shutdown(true); |
| } |
| finally { |
| memoryRegionLock.writeLock().unlock(); |
| } |
| |
| if (checkpointLsnr != null) { |
| ((GridCacheDatabaseSharedManager)gridCtx.cache().context().database()) |
| .removeCheckpointListener(checkpointLsnr); |
| |
| checkpointLsnr = null; |
| } |
| |
| log.info("PageMemory tracker stopped."); |
| } |
| |
| /** |
| * Is plugin enabled. |
| */ |
| private boolean isEnabled() { |
| return (cfg != null && cfg.isEnabled() && CU.isPersistenceEnabled(ctx.igniteConfiguration())); |
| } |
| |
| /** |
| * Cleanup pages by predicate. |
| * |
| * @param pred Predicate. |
| */ |
| private void cleanupPages(IgnitePredicate<FullPageId> pred) { |
| synchronized (pageAllocatorMux) { |
| for (Map.Entry<FullPageId, DirectMemoryPage> pageEntry : pages.entrySet()) { |
| if (pred.apply(pageEntry.getKey())) { |
| pages.remove(pageEntry.getKey()); |
| |
| freeSlots.set(pageEntry.getValue().slot().index()); |
| |
| freeSlotsCnt++; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Allocates new page for given FullPageId. |
| * |
| * @param fullPageId Full page id. |
| */ |
| private DirectMemoryPage allocatePage(FullPageId fullPageId) throws IgniteCheckedException { |
| synchronized (pageAllocatorMux) { |
| // Double check. |
| DirectMemoryPage page = pages.get(fullPageId); |
| |
| if (page != null) |
| return page; |
| |
| if (freeSlotsCnt == 0) |
| throw new IgniteCheckedException("Can't allocate new page"); |
| |
| int pageIdx; |
| |
| if (lastPageIdx < maxPages) |
| pageIdx = lastPageIdx++; |
| else { |
| pageIdx = freeSlots.nextSetBit(0); |
| |
| assert pageIdx >= 0; |
| |
| freeSlots.clear(pageIdx); |
| } |
| |
| freeSlotsCnt--; |
| |
| long pageAddr = memoryRegion.address() + ((long)pageIdx) * pageSize; |
| |
| DirectMemoryPageSlot pageSlot = pageSlots[pageIdx]; |
| if (pageSlot == null) |
| pageSlot = pageSlots[pageIdx] = new DirectMemoryPageSlot(pageAddr, pageIdx); |
| |
| pageSlot.lock(); |
| |
| try { |
| page = new DirectMemoryPage(pageSlot, fullPageId); |
| |
| pages.put(fullPageId, page); |
| |
| if (pageSlot.owningPage() != null) { |
| // Clear memory if slot was already used. |
| GridUnsafe.setMemory(pageAddr, pageSize, (byte)0); |
| } |
| |
| pageSlot.owningPage(page); |
| } |
| finally { |
| pageSlot.unlock(); |
| } |
| |
| return page; |
| } |
| } |
| |
| /** |
| * Gets or allocates page for given FullPageId. |
| * |
| * @param fullPageId Full page id. |
| * @return Page. |
| */ |
| private DirectMemoryPage page(FullPageId fullPageId) throws IgniteCheckedException { |
| DirectMemoryPage page = pages.get(fullPageId); |
| |
| if (page == null) |
| page = allocatePage(fullPageId); |
| |
| return page; |
| } |
| |
| /** |
| * Apply WAL record to local memory region. |
| */ |
| private void applyWalRecord(WALRecord record) throws IgniteCheckedException { |
| memoryRegionLock.readLock().lock(); |
| |
| try { |
| if (!started) |
| return; |
| |
| if (record instanceof MemoryRecoveryRecord && !emptyPds) { |
| synchronized (pageAllocatorMux) { |
| pages.clear(); |
| |
| lastPageIdx = 0; |
| |
| freeSlotsCnt = maxPages; |
| |
| freeSlots.clear(); |
| |
| stats.clear(); |
| } |
| } |
| else if (record instanceof PageSnapshot) { |
| PageSnapshot snapshot = (PageSnapshot)record; |
| |
| int grpId = snapshot.fullPageId().groupId(); |
| long pageId = snapshot.fullPageId().pageId(); |
| |
| FullPageId fullPageId = new FullPageId(pageId, grpId); |
| |
| DirectMemoryPage page = page(fullPageId); |
| |
| page.lock(); |
| |
| try { |
| GridUnsafe.copyHeapOffheap(snapshot.pageData(), GridUnsafe.BYTE_ARR_OFF, page.address(), pageSize); |
| |
| page.changeHistory().clear(); |
| |
| page.changeHistory().add(record); |
| } |
| finally { |
| page.unlock(); |
| } |
| } |
| else if (record instanceof PageDeltaRecord) { |
| PageDeltaRecord deltaRecord = (PageDeltaRecord)record; |
| |
| int grpId = deltaRecord.groupId(); |
| long pageId = deltaRecord.pageId(); |
| |
| FullPageId fullPageId = new FullPageId(pageId, grpId); |
| |
| DirectMemoryPage page = page(fullPageId); |
| |
| page.lock(); |
| |
| try { |
| deltaRecord.applyDelta(pageMemoryMock, page.address()); |
| |
| page.changeHistory().add(record); |
| } |
| finally { |
| page.unlock(); |
| } |
| } |
| else |
| return; |
| |
| // Increment statistics. |
| stats.computeIfAbsent(record.type(), r -> new AtomicInteger()).incrementAndGet(); |
| } |
| finally { |
| memoryRegionLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Total count of allocated pages in page store. |
| */ |
| private long pageStoreAllocatedPages() { |
| IgnitePageStoreManager pageStoreMgr = gridCtx.cache().context().pageStore(); |
| |
| assert pageStoreMgr != null; |
| |
| long totalAllocated = pageStoreMgr.pagesAllocated(MetaStorage.METASTORAGE_CACHE_ID); |
| |
| if (MvccUtils.mvccEnabled(gridCtx)) |
| totalAllocated += pageStoreMgr.pagesAllocated(TxLog.TX_LOG_CACHE_ID); |
| |
| for (CacheGroupContext ctx : gridCtx.cache().cacheGroups()) |
| totalAllocated += pageStoreMgr.pagesAllocated(ctx.groupId()); |
| |
| return totalAllocated; |
| } |
| |
| /** |
| * Checks if there are any differences between the Ignite's data regions content and pages inside the tracker. |
| * |
| * @param checkAll Check all tracked pages, otherwise check until first error. |
| * @return {@code true} if content of all tracked pages equals to content of these pages in the ignite instance. |
| */ |
| public boolean checkPages(boolean checkAll) throws IgniteCheckedException { |
| return checkPages(checkAll, false); |
| } |
| |
| /** |
| * Checks if there are any differences between the Ignite's data regions content and pages inside the tracker. |
| * |
| * @param checkAll Check all tracked pages, otherwise check until first error. |
| * @param checkPageCnt Check tracked and allocated pages count. This check can be done only if there is no |
| * concurrent modification of pages in the system (for example when checkpointWriteLock is held). Some threads |
| * (for example MVCC vacuum cleaner) can modify pages even if there is no activity from a users point of view. |
| * @return {@code true} if content of all tracked pages equals to content of these pages in the ignite instance. |
| */ |
| private boolean checkPages(boolean checkAll, boolean checkPageCnt) throws IgniteCheckedException { |
| if (!started) |
| throw new IgniteCheckedException("Page memory checking only possible when tracker is started."); |
| |
| GridCacheProcessor cacheProc = gridCtx.cache(); |
| |
| boolean res = true; |
| |
| synchronized (pageAllocatorMux) { |
| long totalAllocated = pageStoreAllocatedPages(); |
| |
| log.info(">>> Total tracked pages: " + pages.size()); |
| log.info(">>> Total allocated pages: " + totalAllocated); |
| |
| dumpStats(); |
| |
| if (emptyPds && checkPageCnt && pages.size() != totalAllocated) { |
| res = false; |
| |
| log.error("Started from empty PDS, but tracked pages count not equals to allocated pages count"); |
| |
| dumpPagesCountDiff(); |
| |
| if (!checkAll) |
| return false; |
| } |
| } |
| |
| Set<Integer> groupsWarned = new HashSet<>(); |
| |
| for (DirectMemoryPage page : pages.values()) { |
| FullPageId fullPageId = page.fullPageId(); |
| |
| PageMemory pageMem; |
| |
| if (fullPageId.groupId() == MetaStorage.METASTORAGE_CACHE_ID) |
| pageMem = cacheProc.context().database().metaStorage().pageMemory(); |
| else if (fullPageId.groupId() == TxLog.TX_LOG_CACHE_ID) |
| pageMem = cacheProc.context().database().dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory(); |
| else { |
| CacheGroupContext ctx = cacheProc.cacheGroup(fullPageId.groupId()); |
| |
| if (ctx != null) |
| pageMem = ctx.dataRegion().pageMemory(); |
| else { |
| if (!groupsWarned.contains(fullPageId.groupId())) { |
| log.warning("Cache group " + fullPageId.groupId() + " not found."); |
| |
| groupsWarned.add(fullPageId.groupId()); |
| } |
| |
| continue; |
| } |
| } |
| |
| assert pageMem instanceof PageMemoryImpl; |
| |
| long rmtPage = pageMem.acquirePage(fullPageId.groupId(), fullPageId.pageId()); |
| |
| try { |
| long rmtPageAddr = pageMem.readLockForce(fullPageId.groupId(), fullPageId.pageId(), rmtPage); |
| |
| try { |
| page.lock(); |
| |
| try { |
| if (rmtPageAddr == 0L) { |
| res = false; |
| |
| log.error("Can't lock page: " + fullPageId); |
| |
| dumpHistory(page); |
| } |
| else if (!comparePages(fullPageId, page, rmtPageAddr)) |
| res = false; |
| |
| if (!res && !checkAll) |
| return false; |
| } |
| finally { |
| page.unlock(); |
| } |
| } |
| finally { |
| if (rmtPageAddr != 0L) |
| pageMem.readUnlock(fullPageId.groupId(), fullPageId.pageId(), rmtPage); |
| } |
| } |
| finally { |
| pageMem.releasePage(fullPageId.groupId(), fullPageId.pageId(), rmtPage); |
| } |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Compare pages content. |
| * |
| * @param fullPageId Full page ID. |
| * @param expPage Expected page. |
| * @param actualPageAddr Actual page address. |
| * @return {@code True} if pages are equals, {@code False} otherwise. |
| * @throws IgniteCheckedException If fails. |
| */ |
| private boolean comparePages(FullPageId fullPageId, DirectMemoryPage expPage, long actualPageAddr) throws IgniteCheckedException { |
| long expPageAddr = expPage.address(); |
| |
| GridCacheProcessor cacheProc = gridCtx.cache(); |
| |
| ByteBuffer locBuf = GridUnsafe.wrapPointer(expPageAddr, pageSize); |
| ByteBuffer rmtBuf = GridUnsafe.wrapPointer(actualPageAddr, pageSize); |
| |
| PageIO pageIo = PageIO.getPageIO(actualPageAddr); |
| |
| if (pageIo.getType() == T_DATA_REF_MVCC_LEAF || pageIo.getType() == T_CACHE_ID_DATA_REF_MVCC_LEAF) { |
| assert cacheProc.cacheGroup(fullPageId.groupId()).mvccEnabled(); |
| |
| AbstractDataLeafIO io = (AbstractDataLeafIO)pageIo; |
| |
| int cnt = io.getMaxCount(actualPageAddr, pageSize); |
| |
| // Reset lock info as there is no sense to log it into WAL. |
| for (int i = 0; i < cnt; i++) { |
| io.setMvccLockCoordinatorVersion(expPageAddr, i, io.getMvccLockCoordinatorVersion(actualPageAddr, i)); |
| io.setMvccLockCounter(expPageAddr, i, io.getMvccLockCounter(actualPageAddr, i)); |
| } |
| } |
| |
| // Compare only meaningful data. |
| if (pageIo instanceof CompactablePageIO) { |
| tmpBuf1.clear(); |
| tmpBuf2.clear(); |
| |
| ((CompactablePageIO)pageIo).compactPage(locBuf, tmpBuf1, pageSize); |
| ((CompactablePageIO)pageIo).compactPage(rmtBuf, tmpBuf2, pageSize); |
| |
| locBuf = tmpBuf1; |
| rmtBuf = tmpBuf2; |
| } |
| |
| if (!locBuf.equals(rmtBuf)) { |
| log.error("Page buffers are not equals [fullPageId=" + fullPageId + ", pageIo=" + pageIo + ']'); |
| |
| dumpDiff(locBuf, rmtBuf); |
| |
| dumpHistory(expPage); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Dump statistics to log. |
| */ |
| private void dumpStats() { |
| log.info(">>> Processed WAL records:"); |
| |
| for (Map.Entry<WALRecord.RecordType, AtomicInteger> entry : stats.entrySet()) |
| log.info(" " + entry.getKey() + '=' + entry.getValue().get()); |
| } |
| |
| /** |
| * Dump difference between two ByteBuffers to log. |
| * |
| * @param buf1 Buffer 1. |
| * @param buf2 Buffer 2. |
| */ |
| private void dumpDiff(ByteBuffer buf1, ByteBuffer buf2) { |
| if (dumpedDiffBytes > DUMP_DIFF_BYTES_LIMIT) |
| return; |
| |
| log.error(">>> Diff:"); |
| |
| for (int i = 0; i < Math.min(buf1.remaining(), buf2.remaining()); i++) { |
| byte b1 = buf1.get(buf1.position() + i); |
| byte b2 = buf2.get(buf2.position() + i); |
| |
| if (b1 != b2) |
| log.error(String.format(" 0x%04X: %02X %02X", i, b1, b2)); |
| |
| dumpedDiffBytes++; |
| } |
| |
| if (buf1.remaining() < buf2.remaining()) { |
| for (int i = buf1.remaining(); i < buf2.remaining(); i++) |
| log.error(String.format(" 0x%04X: %02X", i, buf2.get(buf2.position() + i))); |
| |
| dumpedDiffBytes++; |
| } |
| else if (buf1.remaining() > buf2.remaining()) { |
| for (int i = buf2.remaining(); i < buf1.remaining(); i++) |
| log.error(String.format(" 0x%04X: %02X", i, buf1.get(buf1.position() + i))); |
| |
| dumpedDiffBytes++; |
| } |
| } |
| |
| /** |
| * Dump page change history to log. |
| * |
| * @param page Page. |
| */ |
| private void dumpHistory(DirectMemoryPage page) { |
| log.error(">>> Change history:"); |
| |
| for (WALRecord record : page.changeHistory()) |
| log.error(" " + record); |
| } |
| |
| /** |
| * Dump diff between allocated and tracked page counts. |
| */ |
| private void dumpPagesCountDiff() throws IgniteCheckedException { |
| Map<Integer, Long> pagesByGroups = pages.keySet().stream().collect( |
| Collectors.groupingBy(FullPageId::groupId, Collectors.counting())); |
| |
| IgnitePageStoreManager pageStoreMgr = gridCtx.cache().context().pageStore(); |
| |
| for (Map.Entry<Integer, Long> groupPages : pagesByGroups.entrySet()) { |
| int grpId = groupPages.getKey(); |
| long grpPagesAllocated = pageStoreMgr.pagesAllocated(grpId); |
| |
| if (grpPagesAllocated != groupPages.getValue()) { |
| log.error(">>> Page count for groupId " + grpId + ": allocated=" + grpPagesAllocated + |
| ", tracked=" + groupPages.getValue()); |
| |
| Map<Integer, Long> pagesByParts = pages.keySet().stream().filter(id -> id.groupId() == grpId) |
| .collect(Collectors.groupingBy(id -> PageIdUtils.partId(id.pageId()), Collectors.counting())); |
| |
| for (Map.Entry<Integer, Long> partPages : pagesByParts.entrySet()) { |
| long partPagesAllocated = pageStoreMgr.pages(grpId, partPages.getKey()); |
| |
| if (partPagesAllocated != partPages.getValue()) { |
| log.error(">>>> Page count for partId " + partPages.getKey() + ": allocated=" + |
| partPagesAllocated + ", tracked=" + partPages.getValue()); |
| } |
| } |
| |
| int partCnt = gridCtx.cache().cacheGroup(grpId).config().getAffinity().partitions(); |
| |
| for (int partId = 0; partId < partCnt; partId++) { |
| if (pageStoreMgr.exists(grpId, partId) && !pagesByParts.keySet().contains(partId)) { |
| log.error(">>>> Page count for partId " + partId + ": allocated=" + |
| pageStoreMgr.pages(grpId, partId) + ", tracked=0"); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class DirectMemoryPage { |
| /** Page slot. */ |
| private final DirectMemoryPageSlot slot; |
| |
| /** Change history. */ |
| private final List<WALRecord> changeHist = new LinkedList<>(); |
| |
| /** Full page id. */ |
| private final FullPageId fullPageId; |
| |
| /** |
| * @param slot Memory page slot. |
| */ |
| private DirectMemoryPage(DirectMemoryPageSlot slot, FullPageId fullPageId) { |
| this.slot = slot; |
| this.fullPageId = fullPageId; |
| } |
| |
| /** |
| * Lock page. |
| */ |
| public void lock() throws IgniteCheckedException { |
| slot.lock(); |
| |
| if (slot.owningPage() != this) { |
| slot.unlock(); |
| |
| throw new IgniteCheckedException("Memory slot owning page changed, can't access page buffer."); |
| } |
| } |
| |
| /** |
| * Unlock page. |
| */ |
| public void unlock() { |
| slot.unlock(); |
| } |
| |
| /** |
| * @return Page address. |
| */ |
| public long address() { |
| return slot.address(); |
| } |
| |
| /** |
| * Change history. |
| */ |
| @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") |
| public List<WALRecord> changeHistory() { |
| return changeHist; |
| } |
| |
| /** |
| * @return Full page id. |
| */ |
| public FullPageId fullPageId() { |
| return fullPageId; |
| } |
| |
| /** |
| * @return Memory page slot. |
| */ |
| public DirectMemoryPageSlot slot() { |
| return slot; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class DirectMemoryPageSlot { |
| /** Page slot address. */ |
| private final long addr; |
| |
| /** Page slot index. */ |
| private final int idx; |
| |
| /** Page lock. */ |
| private final Lock lock = new ReentrantLock(); |
| |
| /** Owning page. */ |
| private DirectMemoryPage owningPage; |
| |
| /** |
| * @param addr Page address. |
| * @param idx Page slot index |
| */ |
| private DirectMemoryPageSlot(long addr, int idx) { |
| this.addr = addr; |
| this.idx = idx; |
| } |
| |
| /** |
| * Lock page slot. |
| */ |
| @SuppressWarnings("LockAcquiredButNotSafelyReleased") |
| public void lock() { |
| lock.lock(); |
| } |
| |
| /** |
| * Unlock page slot. |
| */ |
| public void unlock() { |
| lock.unlock(); |
| } |
| |
| /** |
| * @return Page slot address. |
| */ |
| public long address() { |
| return addr; |
| } |
| |
| /** |
| * @return Page slot index. |
| */ |
| public int index() { |
| return idx; |
| } |
| |
| /** |
| * @return Owning page. |
| */ |
| public DirectMemoryPage owningPage() { |
| return owningPage; |
| } |
| |
| /** |
| * @param owningPage Owning page. |
| */ |
| public void owningPage(DirectMemoryPage owningPage) { |
| this.owningPage = owningPage; |
| } |
| } |
| } |