blob: a5d56a2573b80dc872cd8bf2a46e5b149875a3a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.pagememory;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.fileio.AsyncFileIoFactory;
import org.apache.ignite.internal.fileio.FileIoFactory;
import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
import org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileView;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.jetbrains.annotations.Nullable;
/**
* Storage engine implementation based on {@link PageMemory} for persistent case.
*/
public class PersistentPageMemoryStorageEngine implements StorageEngine {
/** Engine name. */
public static final String ENGINE_NAME = "aipersist";
/**
* Maximum "work units" that are allowed to be used during {@link BplusTree} destruction.
*
* @see BplusTree#startGradualDestruction
*/
public static final int MAX_DESTRUCTION_WORK_UNITS = 1_000;
private static final IgniteLogger LOG = Loggers.forClass(PersistentPageMemoryStorageEngine.class);
private final String igniteInstanceName;
private final PersistentPageMemoryStorageEngineConfiguration engineConfig;
private final StorageConfiguration storageConfiguration;
private final PageIoRegistry ioRegistry;
private final Path storagePath;
@Nullable
private final LongJvmPauseDetector longJvmPauseDetector;
private final Map<String, PersistentPageMemoryDataRegion> regions = new ConcurrentHashMap<>();
@Nullable
private volatile FilePageStoreManager filePageStoreManager;
@Nullable
private volatile PartitionMetaManager partitionMetaManager;
@Nullable
private volatile CheckpointManager checkpointManager;
private volatile ExecutorService destructionExecutor;
private final FailureProcessor failureProcessor;
private final LogSyncer logSyncer;
/**
* Constructor.
*
* @param igniteInstanceName String igniteInstanceName
* @param engineConfig PageMemory storage engine configuration.
* @param ioRegistry IO registry.
* @param storagePath Storage path.
* @param failureProcessor Failure processor that is used to handle critical errors.
* @param longJvmPauseDetector Long JVM pause detector.
* @param logSyncer Write-ahead log synchronizer.
*/
public PersistentPageMemoryStorageEngine(
String igniteInstanceName,
PersistentPageMemoryStorageEngineConfiguration engineConfig,
StorageConfiguration storageConfiguration,
PageIoRegistry ioRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
FailureProcessor failureProcessor,
LogSyncer logSyncer
) {
this.igniteInstanceName = igniteInstanceName;
this.engineConfig = engineConfig;
this.storageConfiguration = storageConfiguration;
this.ioRegistry = ioRegistry;
this.storagePath = storagePath;
this.longJvmPauseDetector = longJvmPauseDetector;
this.failureProcessor = failureProcessor;
this.logSyncer = logSyncer;
}
/**
* Returns a storage engine configuration.
*/
public PersistentPageMemoryStorageEngineConfiguration configuration() {
return engineConfig;
}
@Override
public String name() {
return ENGINE_NAME;
}
@Override
public void start() throws StorageException {
int pageSize = engineConfig.pageSize().value();
try {
FileIoFactory fileIoFactory = engineConfig.checkpoint().useAsyncFileIoFactory().value()
? new AsyncFileIoFactory()
: new RandomAccessFileIoFactory();
filePageStoreManager = new FilePageStoreManager(
igniteInstanceName,
storagePath,
fileIoFactory,
pageSize,
failureProcessor
);
filePageStoreManager.start();
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error starting file page store manager", e);
}
partitionMetaManager = new PartitionMetaManager(ioRegistry, pageSize);
try {
checkpointManager = new CheckpointManager(
igniteInstanceName,
null,
longJvmPauseDetector,
failureProcessor,
engineConfig.checkpoint(),
filePageStoreManager,
partitionMetaManager,
regions.values(),
ioRegistry,
logSyncer,
pageSize
);
checkpointManager.start();
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error starting checkpoint manager", e);
}
// TODO: IGNITE-17066 Add handling deleting/updating data regions configuration
storageConfiguration.profiles().value().stream().forEach(p -> {
if (p instanceof PersistentPageMemoryProfileView) {
addDataRegion(p.name());
}
});
// TODO: remove this executor, see https://issues.apache.org/jira/browse/IGNITE-21683
destructionExecutor = new ThreadPoolExecutor(
0,
Runtime.getRuntime().availableProcessors(),
100,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(igniteInstanceName, "persistent-mv-partition-destruction", LOG)
);
}
@Override
public void stop() throws StorageException {
try {
Stream<AutoCloseable> closeRegions = regions.values().stream().map(region -> region::stop);
ExecutorService destructionExecutor = this.destructionExecutor;
CheckpointManager checkpointManager = this.checkpointManager;
FilePageStoreManager filePageStoreManager = this.filePageStoreManager;
Stream<AutoCloseable> resources = Stream.of(
destructionExecutor == null
? null
: (AutoCloseable) () -> shutdownAndAwaitTermination(destructionExecutor, 30, TimeUnit.SECONDS),
checkpointManager == null ? null : (AutoCloseable) checkpointManager::stop,
filePageStoreManager == null ? null : (AutoCloseable) filePageStoreManager::stop
);
closeAll(Stream.concat(resources, closeRegions));
} catch (Exception e) {
throw new StorageException("Error when stopping components", e);
}
}
@Override
public boolean isVolatile() {
return false;
}
@Override
public PersistentPageMemoryTableStorage createMvTable(
StorageTableDescriptor tableDescriptor,
StorageIndexDescriptorSupplier indexDescriptorSupplier
) throws StorageException {
PersistentPageMemoryDataRegion dataRegion = regions.get(tableDescriptor.getStorageProfile());
assert dataRegion != null : "tableId=" + tableDescriptor.getId() + ", dataRegion=" + tableDescriptor.getStorageProfile();
return new PersistentPageMemoryTableStorage(tableDescriptor, indexDescriptorSupplier, this, dataRegion, destructionExecutor);
}
@Override
public void dropMvTable(int tableId) {
FilePageStoreManager filePageStoreManager = this.filePageStoreManager;
assert filePageStoreManager != null : "Component has not started";
try {
filePageStoreManager.destroyGroupIfExists(tableId);
} catch (IOException e) {
throw new StorageException("Failed to destroy table directory: {}", e, tableId);
}
}
/**
* Returns checkpoint manager, {@code null} if engine not started.
*/
public @Nullable CheckpointManager checkpointManager() {
return checkpointManager;
}
/**
* Creates, starts and adds a new data region to the engine.
*
* @param name Data region name.
*/
private void addDataRegion(String name) {
PersistentPageMemoryProfileConfiguration storageProfileConfiguration =
(PersistentPageMemoryProfileConfiguration) storageConfiguration.profiles().get(name);
int pageSize = engineConfig.pageSize().value();
PersistentPageMemoryDataRegion dataRegion = new PersistentPageMemoryDataRegion(
storageProfileConfiguration,
ioRegistry,
filePageStoreManager,
partitionMetaManager,
checkpointManager,
pageSize
);
dataRegion.start();
regions.put(name, dataRegion);
}
}