blob: 57955095e19ab67eeea7627a743aa9c0bf2dd8fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.pagememory;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.freelist.FreeListImpl;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.reuse.ReuseList;
import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
import org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link AbstractPageMemoryTableStorage} for persistent case.
*/
public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableStorage {
/** String to format free list's name. */
private static final String FREE_LIST_NAME = "PersistentFreeList_%d_%d";
/** Storage engine instance. */
private final PersistentPageMemoryStorageEngine engine;
/** Data region instance. */
private final PersistentPageMemoryDataRegion dataRegion;
private final ExecutorService destructionExecutor;
private final PageEvictionTracker pageEvictionTracker = PageEvictionTrackerNoOp.INSTANCE;
/**
* Constructor.
*
* @param tableDescriptor Table descriptor.
* @param indexDescriptorSupplier Index descriptor supplier.
* @param engine Storage engine instance.
* @param dataRegion Data region for the table.
*/
PersistentPageMemoryTableStorage(
StorageTableDescriptor tableDescriptor,
StorageIndexDescriptorSupplier indexDescriptorSupplier,
PersistentPageMemoryStorageEngine engine,
PersistentPageMemoryDataRegion dataRegion,
ExecutorService destructionExecutor
) {
super(tableDescriptor, indexDescriptorSupplier);
this.engine = engine;
this.dataRegion = dataRegion;
this.destructionExecutor = destructionExecutor;
}
/**
* Returns a storage engine instance.
*/
public PersistentPageMemoryStorageEngine engine() {
return engine;
}
@Override
public PersistentPageMemoryDataRegion dataRegion() {
return dataRegion;
}
@Override
public boolean isVolatile() {
return false;
}
@Override
protected void finishDestruction() {
dataRegion.pageMemory().onGroupDestroyed(getTableId());
}
@Override
public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) {
GroupPartitionId groupPartitionId = createGroupPartitionId(partitionId);
PartitionMeta meta = getOrCreatePartitionMetaOnCreatePartition(groupPartitionId);
return inCheckpointLock(() -> {
PersistentPageMemory pageMemory = dataRegion.pageMemory();
FreeListImpl freeList = createFreeList(partitionId, pageMemory, meta);
VersionChainTree versionChainTree = createVersionChainTree(partitionId, freeList, pageMemory, meta);
IndexMetaTree indexMetaTree = createIndexMetaTree(partitionId, freeList, pageMemory, meta);
GcQueue gcQueue = createGcQueue(partitionId, freeList, pageMemory, meta);
return new PersistentPageMemoryMvPartitionStorage(
this,
partitionId,
meta,
freeList,
pageEvictionTracker,
versionChainTree,
indexMetaTree,
gcQueue,
destructionExecutor
);
});
}
/**
* Returns id of the last started checkpoint, or {@code null} if no checkpoints were started yet.
*/
public @Nullable UUID lastCheckpointId() {
CheckpointProgress lastCeckpointProgress = dataRegion.checkpointManager().lastCheckpointProgress();
return lastCeckpointProgress == null ? null : lastCeckpointProgress.id();
}
/**
* Returns new {@link FreeListImpl} instance for partition.
*
* @param partId Partition ID.
* @param pageMemory Persistent page memory instance.
* @param meta Partition metadata.
* @throws StorageException If failed.
*/
private FreeListImpl createFreeList(
int partId,
PersistentPageMemory pageMemory,
PartitionMeta meta
) throws StorageException {
try {
boolean initNew = false;
if (meta.freeListRootPageId() == 0) {
long rootPageId = pageMemory.allocatePage(getTableId(), partId, FLAG_AUX);
meta.freeListRootPageId(lastCheckpointId(), rootPageId);
initNew = true;
}
return new FreeListImpl(
getTableId(),
partId,
String.format(FREE_LIST_NAME, getTableId(), partId),
dataRegion.pageMemory(),
null,
PageLockListenerNoOp.INSTANCE,
meta.freeListRootPageId(),
initNew,
dataRegion.pageListCacheLimit(),
pageEvictionTracker,
IoStatisticsHolderNoOp.INSTANCE
);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error creating free list: [tableId={}, partitionId={}]", e, getTableId(), partId);
}
}
/**
* Returns new {@link VersionChainTree} instance for partition.
*
* @param partId Partition ID.
* @param reuseList Reuse list.
* @param pageMemory Persistent page memory instance.
* @param meta Partition metadata.
* @throws StorageException If failed.
*/
private VersionChainTree createVersionChainTree(
int partId,
ReuseList reuseList,
PersistentPageMemory pageMemory,
PartitionMeta meta
) throws StorageException {
try {
boolean initNew = false;
if (meta.versionChainTreeRootPageId() == 0) {
long rootPageId = pageMemory.allocatePage(getTableId(), partId, FLAG_AUX);
meta.versionChainTreeRootPageId(lastCheckpointId(), rootPageId);
initNew = true;
}
return new VersionChainTree(
getTableId(),
Integer.toString(getTableId()),
partId,
dataRegion.pageMemory(),
PageLockListenerNoOp.INSTANCE,
new AtomicLong(),
meta.versionChainTreeRootPageId(),
reuseList,
initNew
);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error creating VersionChainTree: [tableId={}, partitionId={}]", e, getTableId(), partId);
}
}
/**
* Returns new {@link IndexMetaTree} instance for partition.
*
* @param partitionId Partition ID.
* @param reuseList Reuse list.
* @param pageMemory Persistent page memory instance.
* @param meta Partition metadata.
* @throws StorageException If failed.
*/
private IndexMetaTree createIndexMetaTree(
int partitionId,
ReuseList reuseList,
PersistentPageMemory pageMemory,
PartitionMeta meta
) {
try {
boolean initNew = false;
if (meta.indexTreeMetaPageId() == 0) {
long rootPageId = pageMemory.allocatePage(getTableId(), partitionId, FLAG_AUX);
meta.indexTreeMetaPageId(lastCheckpointId(), rootPageId);
initNew = true;
}
return new IndexMetaTree(
getTableId(),
Integer.toString(getTableId()),
partitionId,
dataRegion.pageMemory(),
PageLockListenerNoOp.INSTANCE,
new AtomicLong(),
meta.indexTreeMetaPageId(),
reuseList,
initNew
);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error creating IndexMetaTree: [tableId={}, partitionId={}]", e, getTableId(), partitionId);
}
}
/**
* Returns new {@link GcQueue} instance for partition.
*
* @param partitionId Partition ID.
* @param reuseList Reuse list.
* @param pageMemory Persistent page memory instance.
* @param meta Partition metadata.
* @throws StorageException If failed.
*/
private GcQueue createGcQueue(
int partitionId,
ReuseList reuseList,
PersistentPageMemory pageMemory,
PartitionMeta meta
) {
try {
boolean initNew = false;
if (meta.gcQueueMetaPageId() == 0) {
long rootPageId = pageMemory.allocatePage(getTableId(), partitionId, FLAG_AUX);
meta.gcQueueMetaPageId(lastCheckpointId(), rootPageId);
initNew = true;
}
return new GcQueue(
getTableId(),
Integer.toString(getTableId()),
partitionId,
dataRegion.pageMemory(),
PageLockListenerNoOp.INSTANCE,
new AtomicLong(),
meta.gcQueueMetaPageId(),
reuseList,
initNew
);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error creating GarbageCollectionTree: [tableId={}, partitionId={}]", e, getTableId(), partitionId);
}
}
@Override
CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
// It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
// compactor to remove the partition, and then simply delete the partition file and its delta files.
boolean transitioned = mvPartitionStorage.transitionToDestroyedState();
if (transitioned) {
mvPartitionStorage.closeResources();
}
return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
}
@Override
CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
PersistentPageMemory pageMemory = dataRegion.pageMemory();
int partitionId = groupPartitionId.getPartitionId();
PartitionMeta meta = getOrCreatePartitionMetaOnCreatePartition(groupPartitionId);
inCheckpointLock(() -> {
FreeListImpl freeList = createFreeList(partitionId, pageMemory, meta);
VersionChainTree versionChainTree = createVersionChainTree(partitionId, freeList, pageMemory, meta);
IndexMetaTree indexMetaTree = createIndexMetaTree(partitionId, freeList, pageMemory, meta);
GcQueue gcQueue = createGcQueue(partitionId, freeList, pageMemory, meta);
((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructures(
meta,
freeList,
versionChainTree,
indexMetaTree,
gcQueue
);
return null;
});
});
}
private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
.thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
.thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
}
private GroupPartitionId createGroupPartitionId(int partitionId) {
return new GroupPartitionId(getTableId(), partitionId);
}
private <V> V inCheckpointLock(Supplier<V> supplier) {
CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
checkpointTimeoutLock.checkpointReadLock();
try {
return supplier.get();
} finally {
checkpointTimeoutLock.checkpointReadUnlock();
}
}
/**
* Creates or receives partition meta from a file.
*
* <p>Safe to use without a checkpointReadLock as we read the meta directly without using {@link PageMemory}.</p>
*
* @param groupPartitionId Partition of the group.
*/
private PartitionMeta getOrCreatePartitionMetaOnCreatePartition(GroupPartitionId groupPartitionId) {
ByteBuffer buffer = allocateBuffer(dataRegion.pageMemory().pageSize());
try {
FilePageStore filePageStore = dataRegion.filePageStoreManager().getStore(groupPartitionId);
// TODO: IGNITE-20983 This shouldn't happen, we should read the page store and its meta again
if (filePageStore != null) {
PartitionMeta partitionMeta = dataRegion.partitionMetaManager().getMeta(groupPartitionId);
assert partitionMeta != null : groupPartitionId;
return partitionMeta;
}
filePageStore = readOrCreateAndInitFilePageStore(groupPartitionId, buffer);
PartitionMeta partitionMeta = readOrCreatePartitionMeta(groupPartitionId, filePageStore, buffer.rewind());
filePageStore.pages(partitionMeta.pageCount());
filePageStore.setPageAllocationListener(pageIdx -> {
assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
partitionMeta.incrementPageCount(lastCheckpointId());
});
dataRegion.filePageStoreManager().addStore(groupPartitionId, filePageStore);
dataRegion.partitionMetaManager().addMeta(groupPartitionId, partitionMeta);
if (filePageStore.deltaFileCount() > 0) {
dataRegion.checkpointManager().triggerCompaction();
}
return partitionMeta;
} finally {
freeBuffer(buffer);
}
}
private FilePageStore readOrCreateAndInitFilePageStore(
GroupPartitionId groupPartitionId,
ByteBuffer buffer
) throws StorageException {
try {
FilePageStore filePageStore = dataRegion.filePageStoreManager().readOrCreateStore(groupPartitionId, buffer);
assert !filePageStore.isMarkedToDestroy() : IgniteStringFormatter.format(
"Should not be marked for deletion: [tableId={}, partitionId={}]",
groupPartitionId.getGroupId(),
groupPartitionId.getPartitionId()
);
filePageStore.ensure();
return filePageStore;
} catch (IgniteInternalCheckedException e) {
throw new StorageException(
"Error read and initializing file page store: [tableId={}, partitionId={}]",
e,
groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()
);
}
}
private PartitionMeta readOrCreatePartitionMeta(
GroupPartitionId groupPartitionId,
FilePageStore filePageStore,
ByteBuffer buffer
) throws StorageException {
try {
return dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore, buffer);
} catch (IgniteInternalCheckedException e) {
throw new StorageException(
"Error reading or creating partition meta information: [tableId={}, partitionId={}]",
e,
getTableId(), groupPartitionId.getPartitionId()
);
}
}
}