blob: a0c1fbbd26c1e192f74caa4a546cd7bf5bd3d163 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.cloud;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.cloud.CloudCachePolicy;
import org.apache.asterix.common.cloud.IPartitionBootstrapper;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext;
import org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
import org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
import org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
import org.apache.hyracks.storage.common.disk.DummyPhysicalDrive;
import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public final class CloudConfigurator {
private final CloudProperties cloudProperties;
private final IOManager localIoManager;
private final AbstractCloudIOManager cloudIOManager;
private final IPhysicalDrive physicalDrive;
private final IDiskResourceCacheLockNotifier lockNotifier;
private final IDiskCachedPageAllocator pageAllocator;
private final IBufferCacheReadContext defaultContext;
private final boolean diskCacheManagerRequired;
private final long diskCacheMonitoringInterval;
private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager,
INamespacePathResolver nsPathResolver) throws HyracksDataException {
this.cloudProperties = cloudProperties;
localIoManager = (IOManager) ioManager;
diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE;
cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver);
physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager);
lockNotifier = createLockNotifier(diskCacheManagerRequired);
pageAllocator = createPageAllocator(diskCacheManagerRequired);
defaultContext = createDefaultBufferCachePageOpContext(diskCacheManagerRequired, physicalDrive);
diskCacheMonitoringInterval = cloudProperties.getStorageDiskMonitorInterval();
}
public IPartitionBootstrapper getPartitionBootstrapper() {
return cloudIOManager;
}
public IIOManager getCloudIoManager() {
return cloudIOManager;
}
public IDiskResourceCacheLockNotifier getLockNotifier() {
return lockNotifier;
}
public IDiskCachedPageAllocator getPageAllocator() {
return pageAllocator;
}
public IBufferCacheReadContext getDefaultContext() {
return defaultContext;
}
public IDiskCacheMonitoringService createDiskCacheMonitoringService(INCServiceContext serviceContext,
IBufferCache bufferCache, Map<Integer, BufferedFileHandle> fileInfoMap) {
if (!diskCacheManagerRequired) {
return NoOpDiskCacheMonitoringService.INSTANCE;
}
CloudDiskResourceCacheLockNotifier resourceCacheManager = (CloudDiskResourceCacheLockNotifier) lockNotifier;
BufferCache diskBufferCache = (BufferCache) bufferCache;
int numOfIoDevices = localIoManager.getIODevices().size();
IApplicationConfig appConfig = serviceContext.getAppConfig();
int ioParallelism = appConfig.getInt(NCConfig.Option.IO_WORKERS_PER_PARTITION);
int sweepQueueSize = appConfig.getInt(NCConfig.Option.IO_QUEUE_SIZE);
int numOfSweepThreads = ioParallelism * numOfIoDevices;
// Ensure at least each sweep thread has one entry in the queue
int maxSweepQueueSize = Math.max(numOfSweepThreads, sweepQueueSize);
long inactiveThreshold = cloudProperties.getStorageIndexInactiveDurationThreshold();
// +1 for the monitorThread
ExecutorService executor = Executors.newFixedThreadPool(numOfSweepThreads + 1);
DiskCacheSweeperThread monitorThread = new DiskCacheSweeperThread(executor, diskCacheMonitoringInterval,
resourceCacheManager, cloudIOManager, numOfSweepThreads, maxSweepQueueSize, physicalDrive,
diskBufferCache, fileInfoMap, inactiveThreshold);
IDiskCacheMonitoringService diskCacheService =
new CloudDiskCacheMonitoringAndPrefetchingService(executor, physicalDrive, monitorThread);
localIoManager.setSpaceMaker(monitorThread);
return diskCacheService;
}
public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager,
INamespacePathResolver nsPathResolver) throws HyracksDataException {
return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver);
}
public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties,
INamespacePathResolver nsPathResolver) throws HyracksDataException {
IOManager localIoManager = (IOManager) ioManager;
CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
if (policy == CloudCachePolicy.EAGER) {
return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver);
}
boolean selective = policy == CloudCachePolicy.SELECTIVE;
return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective);
}
private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties,
IIOManager ioManager) throws HyracksDataException {
if (diskCacheManagerRequired) {
double storagePercentage = cloudProperties.getStorageAllocationPercentage();
double pressureThreshold = cloudProperties.getStorageSweepThresholdPercentage();
long pressureDebugSize = cloudProperties.getStorageDebugSweepThresholdSize();
return new PhysicalDrive(ioManager.getIODevices(), pressureThreshold, storagePercentage, pressureDebugSize);
}
return DummyPhysicalDrive.INSTANCE;
}
private static IDiskResourceCacheLockNotifier createLockNotifier(boolean diskCacheManagerRequired) {
if (diskCacheManagerRequired) {
return new CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
}
return NoOpDiskResourceCacheLockNotifier.INSTANCE;
}
private static IDiskCachedPageAllocator createPageAllocator(boolean diskCacheManagerRequired) {
if (diskCacheManagerRequired) {
return CloudDiskCachedPageAllocator.INSTANCE;
}
return DefaultDiskCachedPageAllocator.INSTANCE;
}
private static IBufferCacheReadContext createDefaultBufferCachePageOpContext(boolean diskCacheManagerRequired,
IPhysicalDrive drive) {
if (diskCacheManagerRequired) {
return new DefaultCloudReadContext(drive);
}
return DefaultBufferCacheReadContextProvider.DEFAULT;
}
}