blob: 7f0b52948098a9418a5ccc3ef7ce4a9a83e4b920 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.nc;
import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
import java.io.IOException;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.cloud.CloudIOManager;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.IDiskWriteRateLimiterProvider;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IReceptionistFactory;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.config.NodeProperties;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
import org.apache.asterix.common.context.GlobalVirtualBufferCache;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategyFactory;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.RMIClientFactory;
import org.apache.asterix.metadata.RMIServerFactory;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
import org.apache.asterix.replication.management.ReplicationChannel;
import org.apache.asterix.replication.management.ReplicationManager;
import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
import org.apache.asterix.runtime.utils.NoOpCoordinationService;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy;
import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import org.apache.hyracks.storage.common.file.FileMapManager;
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
import org.apache.hyracks.util.NetworkUtil;
import org.apache.hyracks.util.cache.CacheManager;
import org.apache.hyracks.util.cache.ICacheManager;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NCAppRuntimeContext implements INcApplicationContext {
private static final Logger LOGGER = LogManager.getLogger();
private ILSMMergePolicyFactory metadataMergePolicyFactory;
private final INCServiceContext ncServiceContext;
private final IResourceIdFactory resourceIdFactory;
private final CompilerProperties compilerProperties;
private final ExternalProperties externalProperties;
private final MetadataProperties metadataProperties;
private final StorageProperties storageProperties;
private final TransactionProperties txnProperties;
private final ActiveProperties activeProperties;
private final BuildProperties buildProperties;
private final ReplicationProperties replicationProperties;
private final MessagingProperties messagingProperties;
private final NodeProperties nodeProperties;
private ExecutorService threadExecutor;
private IDatasetLifecycleManager datasetLifecycleManager;
private IBufferCache bufferCache;
private IVirtualBufferCache virtualBufferCache;
private ITransactionSubsystem txnSubsystem;
private IMetadataNode metadataNodeStub;
private ILSMIOOperationScheduler lsmIOScheduler;
private PersistentLocalResourceRepository localResourceRepository;
private IIOManager ioManager;
private IIOManager persistenceIOManager;
private boolean isShuttingdown;
private ActiveManager activeManager;
private IReplicationChannel replicationChannel;
private IReplicationManager replicationManager;
private ExternalLibraryManager libraryManager;
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
private volatile HyracksConnection hcc;
private volatile ResultSet resultSet;
private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private IReplicaManager replicaManager;
private IReceptionist receptionist;
private final ICacheManager cacheManager;
private IConfigValidator configValidator;
private IDiskWriteRateLimiterProvider diskWriteRateLimiterProvider;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory) {
this.ncServiceContext = ncServiceContext;
compilerProperties = propertiesFactory.newCompilerProperties();
externalProperties = propertiesFactory.newExternalProperties();
metadataProperties = propertiesFactory.newMetadataProperties();
storageProperties = propertiesFactory.newStorageProperties();
txnProperties = propertiesFactory.newTransactionProperties();
activeProperties = propertiesFactory.newActiveProperties();
buildProperties = propertiesFactory.newBuildProperties();
replicationProperties = propertiesFactory.newReplicationProperties();
messagingProperties = propertiesFactory.newMessagingProperties();
nodeProperties = propertiesFactory.newNodeProperties();
ncExtensionManager = extensionManager;
componentProvider = new StorageComponentProvider();
resourceIdFactory = new GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory();
persistedResourceRegistry = ncServiceContext.getPersistedResourceRegistry();
cacheManager = new CacheManager();
}
@Override
public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
if (isCloudDeployment()) {
persistenceIOManager = new CloudIOManager((IOManager) ioManager);
} else {
persistenceIOManager = ioManager;
}
int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
threadExecutor =
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
lsmIOScheduler = createIoScheduler(storageProperties);
metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(persistenceIOManager);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
new PersistentLocalResourceRepositoryFactory(persistenceIOManager, indexCheckpointManagerProvider,
persistedResourceRegistry);
localResourceRepository =
(PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
configValidator = configValidatorFactory.create();
txnSubsystem = new TransactionSubsystem(this, recoveryManagerFactory);
IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
SystemState systemState = recoveryMgr.getSystemState();
boolean resetStorageData = initialRun || systemState == SystemState.PERMANENT_DATA_LOSS;
if (resetStorageData) {
//delete any storage data before the resource factory is initialized
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN,
"Deleting the storage dir. initialRun = " + initialRun + ", systemState = " + systemState);
}
localResourceRepository.deleteStorageData();
}
int maxScheduledFlushes = storageProperties.getMaxScheduledFlushes();
if (maxScheduledFlushes <= 0) {
maxScheduledFlushes = ioManager.getIODevices().size();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The value of maxScheduledFlushes is not provided. Setting maxConcurrentFlushes = {}.",
maxScheduledFlushes);
}
}
virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties, maxScheduledFlushes);
// Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
// the metadata bootstrap task
((ILifeCycleComponent) virtualBufferCache).start();
datasetLifecycleManager =
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
replicaManager = new ReplicaManager(this, nodePartitions);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
this.ncServiceContext);
receptionist = receptionistFactory.create();
if (replicationProperties.isReplicationEnabled()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Replication is enabled");
}
replicationManager = new ReplicationManager(this, replicationStrategyFactory, replicationProperties);
//pass replication manager to replication required object
//LogManager to replicate logs
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
//PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
localResourceRepository.setReplicationManager(replicationManager);
//initialize replication channel
replicationChannel = new ReplicationChannel(this);
bufferCache = new BufferCache(persistenceIOManager, prs, pcp, new FileMapManager(),
storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory(),
replicationManager);
} else {
bufferCache = new BufferCache(persistenceIOManager, prs, pcp, new FileMapManager(),
storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory());
}
NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
FileReference appDir =
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, ioManager);
libraryManager.initialize(resetStorageData);
/*
* The order of registration is important. The buffer cache must registered before recovery and transaction
* managers. Notes: registered components are stopped in reversed order
*/
ILifeCycleComponentManager lccm = getServiceContext().getLifeCycleComponentManager();
lccm.register((ILifeCycleComponent) virtualBufferCache);
lccm.register((ILifeCycleComponent) bufferCache);
/*
* LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager
* to process any logs that might be generated during stopping these components
*/
lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
/*
* ReplicationManager must be stopped after indexLifecycleManager and recovery manager
* so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas
*/
if (replicationManager != null) {
lccm.register(replicationManager);
}
lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
/*
* Stopping indexLifecycleManager will flush and close all datasets.
*/
lccm.register((ILifeCycleComponent) datasetLifecycleManager);
lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
lccm.register(txnSubsystem.getCheckpointManager());
lccm.register(libraryManager);
diskWriteRateLimiterProvider = new DiskWriteRateLimiterProvider();
}
@Override
public boolean isShuttingdown() {
return isShuttingdown;
}
@Override
public void setShuttingdown(boolean isShuttingdown) {
this.isShuttingdown = isShuttingdown;
}
@Override
public synchronized void preStop() throws Exception {
activeManager.shutdown();
if (metadataNodeStub != null) {
unexportMetadataNodeStub();
}
}
@Override
public void deinitialize() throws HyracksDataException {
}
@Override
public IBufferCache getBufferCache() {
return bufferCache;
}
@Override
public IVirtualBufferCache getVirtualBufferCache() {
return virtualBufferCache;
}
@Override
public ITransactionSubsystem getTransactionSubsystem() {
return txnSubsystem;
}
@Override
public IDatasetLifecycleManager getDatasetLifecycleManager() {
return datasetLifecycleManager;
}
@Override
public ILSMIOOperationScheduler getLSMIOScheduler() {
return lsmIOScheduler;
}
@Override
public ILocalResourceRepository getLocalResourceRepository() {
return localResourceRepository;
}
@Override
public IResourceIdFactory getResourceIdFactory() {
return resourceIdFactory;
}
@Override
public IIOManager getIoManager() {
return ioManager;
}
@Override
public IIOManager getPersistenceIoManager() {
return persistenceIOManager;
}
@Override
public StorageProperties getStorageProperties() {
return storageProperties;
}
@Override
public TransactionProperties getTransactionProperties() {
return txnProperties;
}
@Override
public CompilerProperties getCompilerProperties() {
return compilerProperties;
}
@Override
public MetadataProperties getMetadataProperties() {
return metadataProperties;
}
@Override
public ExternalProperties getExternalProperties() {
return externalProperties;
}
@Override
public ActiveProperties getActiveProperties() {
return activeProperties;
}
@Override
public BuildProperties getBuildProperties() {
return buildProperties;
}
@Override
public MessagingProperties getMessagingProperties() {
return messagingProperties;
}
@Override
public NodeProperties getNodeProperties() {
return nodeProperties;
}
@Override
public ExecutorService getThreadExecutor() {
return threadExecutor;
}
@Override
public ILSMMergePolicyFactory getMetadataMergePolicyFactory() {
return metadataMergePolicyFactory;
}
@Override
public ActiveManager getActiveManager() {
return activeManager;
}
@Override
public ReplicationProperties getReplicationProperties() {
return replicationProperties;
}
@Override
public IReplicationChannel getReplicationChannel() {
return replicationChannel;
}
@Override
public IReplicationManager getReplicationManager() {
return replicationManager;
}
@Override
public ILibraryManager getLibraryManager() {
return libraryManager;
}
@Override
public void initializeMetadata(boolean newUniverse, int partitionId) throws Exception {
LOGGER.info("Bootstrapping metadata");
MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(),
ncExtensionManager.getMetadataExtensions(), partitionId);
//noinspection unchecked
ConcurrentHashMap<CcId, IAsterixStateProxy> proxyMap =
(ConcurrentHashMap<CcId, IAsterixStateProxy>) getServiceContext().getDistributedState();
if (proxyMap == null) {
throw new IllegalStateException("Metadata node cannot access distributed state");
}
// This is a special case, we just give the metadataNode directly.
// This way we can delay the registration of the metadataNode until
// it is completely initialized.
MetadataManager.initialize(proxyMap.values(), MetadataNode.INSTANCE);
MetadataBootstrap.startUniverse(getServiceContext(), newUniverse);
MetadataBootstrap.startDDLRecovery();
ncExtensionManager.initializeMetadata(getServiceContext());
LOGGER.info("Metadata node bound");
}
@Override
public synchronized void exportMetadataNodeStub() throws RemoteException {
if (metadataNodeStub == null) {
final INetworkSecurityManager networkSecurityManager =
ncServiceContext.getControllerService().getNetworkSecurityManager();
// clients need to have the client factory on their classpath- to enable older clients, only use
// our client socket factory when SSL is enabled
if (networkSecurityManager.getConfiguration().isSslEnabled()) {
final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
final RMIClientFactory clientSocketFactory =
new RMIClientFactory(networkSecurityManager.getConfiguration());
metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
} else {
metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
getMetadataProperties().getMetadataPort());
}
}
}
@Override
public synchronized void unexportMetadataNodeStub() throws RemoteException {
if (metadataNodeStub != null) {
UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
}
metadataNodeStub = null;
}
@Override
public synchronized void bindMetadataNodeStub(CcId ccId) throws RemoteException {
if (metadataNodeStub == null) {
throw new IllegalStateException("Metadata node not exported");
}
//noinspection unchecked
((ConcurrentMap<CcId, IAsterixStateProxy>) getServiceContext().getDistributedState()).get(ccId)
.setMetadataNode(metadataNodeStub);
}
@Override
public NCExtensionManager getExtensionManager() {
return ncExtensionManager;
}
@Override
public IStorageComponentProvider getStorageComponentProvider() {
return componentProvider;
}
@Override
public INCServiceContext getServiceContext() {
return ncServiceContext;
}
@Override
public IHyracksClientConnection getHcc() throws HyracksDataException {
HyracksConnection hc = hcc;
if (hc == null || !hc.isConnected()) {
synchronized (this) {
hc = hcc;
if (hc == null || !hc.isConnected()) {
try {
ResultSet rs = resultSet;
resultSet = null;
NetworkUtil.closeQuietly(rs);
NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService();
// TODO(mblow): multicc
CcId primaryCcId = ncSrv.getPrimaryCcId();
ClusterControllerInfo ccInfo = ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo();
NetworkUtil.closeQuietly(hc);
hcc = hc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort(),
ncSrv.getNetworkSecurityManager().getSocketChannelFactory());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
}
}
return hc;
}
@Override
public IResultSet getResultSet() throws HyracksDataException {
ResultSet rs = resultSet;
if (rs == null) {
synchronized (this) {
rs = resultSet;
if (rs == null) {
try {
resultSet = rs = ResultReader.createResultSet(getHcc(), ncServiceContext.getControllerService(),
compilerProperties);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
}
}
return rs;
}
@Override
public IReplicaManager getReplicaManager() {
return replicaManager;
}
@Override
public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
return indexCheckpointManagerProvider;
}
@Override
public ICoordinationService getCoordinationService() {
return NoOpCoordinationService.INSTANCE;
}
@Override
public long getMaxTxnId() {
if (txnSubsystem == null) {
throw new IllegalStateException("cannot determine max txn id before txnSubsystem is initialized!");
}
return Math.max(MetadataManager.INSTANCE == null ? 0 : MetadataManager.INSTANCE.getMaxTxnId(),
txnSubsystem.getTransactionManager().getMaxTxnId());
}
@Override
public IPersistedResourceRegistry getPersistedResourceRegistry() {
return persistedResourceRegistry;
}
@Override
public IReceptionist getReceptionist() {
return receptionist;
}
@Override
public ICacheManager getCacheManager() {
return cacheManager;
}
@Override
public IConfigValidator getConfigValidator() {
return configValidator;
}
private ILSMIOOperationScheduler createIoScheduler(StorageProperties properties) {
String schedulerName = storageProperties.getIoScheduler();
int numPartitions = ioManager.getIODevices().size();
int maxConcurrentFlushes = storageProperties.geMaxConcurrentFlushes(numPartitions);
int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions);
int maxConcurrentMerges = storageProperties.getMaxConcurrentMerges(numPartitions);
ILSMIOOperationScheduler ioScheduler = null;
if (AsynchronousScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = AsynchronousScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE, maxConcurrentFlushes, maxScheduledMerges, maxConcurrentMerges);
} else if (GreedyScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE, maxConcurrentFlushes, maxScheduledMerges, maxConcurrentMerges);
} else {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN,
"Unknown storage I/O scheduler: " + schedulerName + "; defaulting to greedy I/O scheduler.");
}
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE, maxConcurrentFlushes, maxScheduledMerges, maxConcurrentMerges);
}
return ioScheduler;
}
@Override
public IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider() {
return diskWriteRateLimiterProvider;
}
@Override
public boolean isCloudDeployment() {
return ncServiceContext.getAppConfig().getBoolean(CLOUD_DEPLOYMENT);
}
}