blob: 873ddb5d80bf9c67099724eeff85b90504736807 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.engine.orchestration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.api.response.MigrationResponse;
import org.apache.cloudstack.engine.orchestration.service.StorageOrchestrationService;
import org.apache.cloudstack.framework.async.AsyncCallFuture;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.math3.stat.descriptive.moment.Mean;
import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
import org.apache.log4j.Logger;
public class StorageOrchestrator extends ManagerBase implements StorageOrchestrationService, Configurable {
private static final Logger s_logger = Logger.getLogger(StorageOrchestrator.class);
SnapshotDataStoreDao snapshotDataStoreDao;
SnapshotDao snapshotDao;
SnapshotDataFactory snapshotFactory;
DataStoreManager dataStoreManager;
StatsCollector statsCollector;
public StorageService storageService;
ConfigurationDao configDao;
private SecondaryStorageService secStgSrv;
TemplateDataStoreDao templateDataStoreDao;
VolumeDataStoreDao volumeDataStoreDao;
DataMigrationUtility migrationHelper;
ConfigKey<Double> ImageStoreImbalanceThreshold = new ConfigKey<>("Advanced", Double.class,
"The storage imbalance threshold that is compared with the standard deviation percentage for a storage utilization metric. " +
"The value is a percentage in decimal format.",
true, ConfigKey.Scope.Global);
Integer numConcurrentCopyTasksPerSSVM = 2;
public String getConfigComponentName() {
return StorageOrchestrationService.class.getName();
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[]{ImageStoreImbalanceThreshold};
static class MigrateBlockingQueue<T> extends ArrayBlockingQueue<T> {
MigrateBlockingQueue(int size) {
public boolean offer(T task) {
try {
} catch (InterruptedException e) {
throw new RuntimeException(e);
return true;
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
numConcurrentCopyTasksPerSSVM = StorageManager.SecStorageMaxMigrateSessions.value();
return true;
public MigrationResponse migrateData(Long srcDataStoreId, List<Long> destDatastores, MigrationPolicy migrationPolicy) {
List<DataObject> files = new LinkedList<>();
boolean success = true;
String message = null;
migrationHelper.checkIfCompleteMigrationPossible(migrationPolicy, srcDataStoreId);
DataStore srcDatastore = dataStoreManager.getDataStore(srcDataStoreId, DataStoreRole.Image);
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains = new HashMap<>();
Map<DataObject, Pair<List<TemplateInfo>, Long>> childTemplates = new HashMap<>();
files = migrationHelper.getSortedValidSourcesList(srcDatastore, snapshotChains, childTemplates);
if (files.isEmpty()) {
return new MigrationResponse(String.format("No files in Image store: %s to migrate", srcDatastore.getId()), migrationPolicy.toString(), true);
Map<Long, Pair<Long, Long>> storageCapacities = new Hashtable<>();
for (Long storeId : destDatastores) {
storageCapacities.put(storeId, new Pair<>(null, null));
storageCapacities.put(srcDataStoreId, new Pair<>(null, null));
if (migrationPolicy == MigrationPolicy.COMPLETE) {
s_logger.debug(String.format("Setting source image store: %s to read-only", srcDatastore.getId()));
storageService.updateImageStoreStatus(srcDataStoreId, true);
storageCapacities = getStorageCapacities(storageCapacities, srcDataStoreId);
double meanstddev = getStandardDeviation(storageCapacities);
double threshold = ImageStoreImbalanceThreshold.value();
MigrationResponse response = null;
ThreadPoolExecutor executor = new ThreadPoolExecutor(numConcurrentCopyTasksPerSSVM , numConcurrentCopyTasksPerSSVM, 30,
TimeUnit.MINUTES, new MigrateBlockingQueue<>(numConcurrentCopyTasksPerSSVM));
Date start = new Date();
if (meanstddev < threshold && migrationPolicy == MigrationPolicy.BALANCE) {
s_logger.debug("mean std deviation of the image stores is below threshold, no migration required");
response = new MigrationResponse("Migration not required as system seems balanced", migrationPolicy.toString(), true);
return response;
int skipped = 0;
List<Future<AsyncCallFuture<DataObjectResult>>> futures = new ArrayList<>();
while (true) {
DataObject chosenFileForMigration = null;
if (files.size() > 0) {
chosenFileForMigration = files.remove(0);
storageCapacities = getStorageCapacities(storageCapacities, srcDataStoreId);
List<Long> orderedDS = migrationHelper.sortDataStores(storageCapacities);
Long destDatastoreId = orderedDS.get(0);
if (chosenFileForMigration == null || destDatastoreId == null || (destDatastoreId == srcDatastore.getId() && migrationPolicy == MigrationPolicy.BALANCE) ) {
Pair<String, Boolean> result = migrateCompleted(destDatastoreId, srcDatastore, files, migrationPolicy, skipped);
message = result.first();
success = result.second();
if (migrationPolicy == MigrationPolicy.COMPLETE && destDatastoreId == srcDatastore.getId()) {
destDatastoreId = orderedDS.get(1);
if (chosenFileForMigration.getPhysicalSize() > storageCapacities.get(destDatastoreId).first()) {
s_logger.debug(String.format("%s: %s too large to be migrated to %s", chosenFileForMigration.getType().name() , chosenFileForMigration.getUuid(), destDatastoreId));
skipped += 1;
if (shouldMigrate(chosenFileForMigration, srcDatastore.getId(), destDatastoreId, storageCapacities, snapshotChains, childTemplates, migrationPolicy)) {
storageCapacities = migrateAway(chosenFileForMigration, storageCapacities, snapshotChains, childTemplates, srcDatastore, destDatastoreId, executor, futures);
} else {
if (migrationPolicy == MigrationPolicy.BALANCE) {
message = "Complete migration failed. Please set the source Image store to read-write mode if you want to continue using it";
success = false;
Date end = new Date();
handleSnapshotMigration(srcDataStoreId, start, end, migrationPolicy, futures, storageCapacities, executor);
return handleResponse(futures, migrationPolicy, message, success);
public MigrationResponse migrateResources(Long srcImgStoreId, Long destImgStoreId, List<Long> templateIdList,
List<Long> snapshotIdList) {
List<DataObject> files;
boolean success = true;
String message = null;
DataStore srcDatastore = dataStoreManager.getDataStore(srcImgStoreId, DataStoreRole.Image);
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains = new HashMap<>();
Map<DataObject, Pair<List<TemplateInfo>, Long>> childTemplates = new HashMap<>();
List<TemplateDataStoreVO> templates = templateDataStoreDao.listByStoreIdAndTemplateIds(srcImgStoreId, templateIdList);
List<SnapshotDataStoreVO> snapshots = snapshotDataStoreDao.listByStoreAndSnapshotIds(srcImgStoreId, DataStoreRole.Image, snapshotIdList);
if (!migrationHelper.filesReadyToMigrate(srcImgStoreId, templates, snapshots, Collections.emptyList())) {
throw new CloudRuntimeException("Migration failed as there are data objects which are not Ready - i.e, they may be in Migrating, creating, copying, etc. states");
files = migrationHelper.getSortedValidSourcesList(srcDatastore, snapshotChains, childTemplates, templates, snapshots);
if (files.isEmpty()) {
return new MigrationResponse(String.format("No files in Image store: %s to migrate", srcDatastore.getUuid()), null, true);
Map<Long, Pair<Long, Long>> storageCapacities = new Hashtable<>();
storageCapacities.put(srcImgStoreId, new Pair<>(null, null));
storageCapacities.put(destImgStoreId, new Pair<>(null, null));
storageCapacities = getStorageCapacities(storageCapacities, srcImgStoreId);
storageCapacities = getStorageCapacities(storageCapacities, destImgStoreId);
ThreadPoolExecutor executor = new ThreadPoolExecutor(numConcurrentCopyTasksPerSSVM, numConcurrentCopyTasksPerSSVM, 30,
TimeUnit.MINUTES, new MigrateBlockingQueue<>(numConcurrentCopyTasksPerSSVM));
List<Future<AsyncCallFuture<DataObjectResult>>> futures = new ArrayList<>();
Date start = new Date();
while (true) {
DataObject chosenFileForMigration = null;
if (!files.isEmpty()) {
chosenFileForMigration = files.remove(0);
if (chosenFileForMigration == null) {
message = "Migration completed";
if (chosenFileForMigration.getPhysicalSize() > storageCapacities.get(destImgStoreId).first()) {
s_logger.debug(String.format("%s: %s too large to be migrated to %s", chosenFileForMigration.getType().name(), chosenFileForMigration.getUuid(), destImgStoreId));
if (storageCapacityBelowThreshold(storageCapacities, destImgStoreId)) {
storageCapacities = migrateAway(chosenFileForMigration, storageCapacities, snapshotChains, childTemplates, srcDatastore, destImgStoreId, executor, futures);
} else {
message = "Migration failed. Destination store doesn't have enough capacity for migration";
success = false;
Date end = new Date();
// Migrate any new child snapshots if created during migration
List<Long> migratedSnapshotIdList = snapshotChains.keySet().stream().map(DataObject::getId).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(migratedSnapshotIdList)) {
List<SnapshotDataStoreVO> snaps = snapshotDataStoreDao.findSnapshots(srcImgStoreId, start, end);
snaps.forEach(snap -> {
SnapshotInfo snapshotInfo = snapshotFactory.getSnapshot(snap.getSnapshotId(), snap.getDataStoreId(), DataStoreRole.Image);
SnapshotInfo parentSnapshot = snapshotInfo.getParent();
if (snapshotInfo.getDataStore().getId() == srcImgStoreId && parentSnapshot != null && migratedSnapshotIdList.contains(parentSnapshot.getSnapshotId())) {
futures.add(executor.submit(new MigrateDataTask(snapshotInfo, srcDatastore, dataStoreManager.getDataStore(destImgStoreId, DataStoreRole.Image))));
return handleResponse(futures, null, message, success);
protected Pair<String, Boolean> migrateCompleted(Long destDatastoreId, DataStore srcDatastore, List<DataObject> files, MigrationPolicy migrationPolicy, int skipped) {
String message = "";
boolean success = true;
if (destDatastoreId == srcDatastore.getId() && !files.isEmpty()) {
if (migrationPolicy == MigrationPolicy.BALANCE) {
s_logger.debug("Migration completed : data stores have been balanced ");
if (destDatastoreId == srcDatastore.getId()) {
message = "Seems like source datastore has more free capacity than the destination(s)";
message += "Image stores have been attempted to be balanced";
success = true;
} else {
message = "Files not completely migrated from "+ srcDatastore.getId() + ". Datastore (source): " + srcDatastore.getId() + "has equal or more free space than destination."+
" If you want to continue using the Image Store, please change the read-only status using 'update imagestore' command";
success = false;
} else {
message = "Migration completed";
if (migrationPolicy == MigrationPolicy.COMPLETE && skipped > 0) {
message += ". Not all data objects were migrated. Some were probably skipped due to lack of storage capacity.";
success = false;
return new Pair<String, Boolean>(message, success);
protected Map<Long, Pair<Long, Long>> migrateAway(
DataObject chosenFileForMigration,
Map<Long, Pair<Long, Long>> storageCapacities,
Pair<List<SnapshotInfo>, Long>> snapshotChains,
Map<DataObject, Pair<List<TemplateInfo>, Long>> templateChains,
DataStore srcDatastore,
Long destDatastoreId,
ThreadPoolExecutor executor,
List<Future<AsyncCallFuture<DataObjectResult>>> futures) {
Long fileSize = migrationHelper.getFileSize(chosenFileForMigration, snapshotChains, templateChains);
storageCapacities = assumeMigrate(storageCapacities, srcDatastore.getId(), destDatastoreId, fileSize);
long activeSsvms = migrationHelper.activeSSVMCount(srcDatastore);
long totalJobs = activeSsvms * numConcurrentCopyTasksPerSSVM;
// Increase thread pool size with increase in number of SSVMs
if ( totalJobs > executor.getCorePoolSize()) {
executor.setMaximumPoolSize((int) (totalJobs));
executor.setCorePoolSize((int) (totalJobs));
MigrateDataTask task = new MigrateDataTask(chosenFileForMigration, srcDatastore, dataStoreManager.getDataStore(destDatastoreId, DataStoreRole.Image));
if (chosenFileForMigration instanceof SnapshotInfo ) {
if (chosenFileForMigration instanceof TemplateInfo) {
s_logger.debug(String.format("Migration of %s: %s is initiated. ", chosenFileForMigration.getType().name(), chosenFileForMigration.getUuid()));
return storageCapacities;
private MigrationResponse handleResponse(List<Future<AsyncCallFuture<DataObjectResult>>> futures, MigrationPolicy migrationPolicy, String message, boolean success) {
int successCount = 0;
for (Future<AsyncCallFuture<DataObjectResult>> future : futures) {
try {
AsyncCallFuture<DataObjectResult> res = future.get();
if (res.get().isSuccess()) {
} catch ( InterruptedException | ExecutionException e) {
s_logger.warn("Failed to get result");
message += ". successful migrations: "+successCount;
String policy = migrationPolicy != null ? migrationPolicy.toString() : null;
return new MigrationResponse(message, policy, success);
private void handleSnapshotMigration(Long srcDataStoreId, Date start, Date end, MigrationPolicy policy,
List<Future<AsyncCallFuture<DataObjectResult>>> futures, Map<Long, Pair<Long, Long>> storageCapacities, ThreadPoolExecutor executor) {
DataStore srcDatastore = dataStoreManager.getDataStore(srcDataStoreId, DataStoreRole.Image);
List<SnapshotDataStoreVO> snaps = snapshotDataStoreDao.findSnapshots(srcDataStoreId, start, end);
if (!snaps.isEmpty()) {
for (SnapshotDataStoreVO snap : snaps) {
SnapshotVO snapshotVO = snapshotDao.findById(snap.getSnapshotId());
SnapshotInfo snapshotInfo = snapshotFactory.getSnapshot(snapshotVO.getSnapshotId(), snap.getDataStoreId(), DataStoreRole.Image);
SnapshotInfo parentSnapshot = snapshotInfo.getParent();
if (parentSnapshot == null && policy == MigrationPolicy.COMPLETE) {
List<Long> dstores = migrationHelper.sortDataStores(storageCapacities);
Long storeId = dstores.get(0);
if (storeId.equals(srcDataStoreId)) {
storeId = dstores.get(1);
DataStore datastore = dataStoreManager.getDataStore(storeId, DataStoreRole.Image);
futures.add(executor.submit(new MigrateDataTask(snapshotInfo, srcDatastore, datastore)));
if (parentSnapshot != null) {
DataStore parentDS = dataStoreManager.getDataStore(parentSnapshot.getDataStore().getId(), DataStoreRole.Image);
if (parentDS.getId() != snapshotInfo.getDataStore().getId()) {
futures.add(executor.submit(new MigrateDataTask(snapshotInfo, srcDatastore, parentDS)));
private Map<Long, Pair<Long, Long>> getStorageCapacities(Map<Long, Pair<Long, Long>> storageCapacities, Long srcDataStoreId) {
Map<Long, Pair<Long, Long>> capacities = new Hashtable<>();
for (Long storeId : storageCapacities.keySet()) {
StorageStats stats = statsCollector.getStorageStats(storeId);
if (stats != null) {
if (storageCapacities.get(storeId) == null || storageCapacities.get(storeId).first() == null || storageCapacities.get(storeId).second() == null) {
capacities.put(storeId, new Pair<>(stats.getCapacityBytes() - stats.getByteUsed(), stats.getCapacityBytes()));
} else {
long totalCapacity = stats.getCapacityBytes();
Long freeCapacity = totalCapacity - stats.getByteUsed();
if (storeId.equals(srcDataStoreId) || freeCapacity < storageCapacities.get(storeId).first()) {
capacities.put(storeId, new Pair<>(freeCapacity, totalCapacity));
} else {
capacities.put(storeId, storageCapacities.get(storeId));
} else {
throw new CloudRuntimeException("Stats Collector hasn't yet collected metrics from the Image store, kindly try again later");
return capacities;
* @param storageCapacities Map comprising the metrics(free and total capacities) of the images stores considered
* @return mean standard deviation
private double getStandardDeviation(Map<Long, Pair<Long, Long>> storageCapacities) {
double[] freeCapacities = storageCapacities.values().stream().mapToDouble(x -> ((double) x.first() / x.second())).toArray();
double mean = calculateStorageMean(freeCapacities);
return (calculateStorageStandardDeviation(freeCapacities, mean) / mean);
* @param storageCapacities Map comprising the metrics(free and total capacities) of the images stores considered
* @param srcDsId source image store ID from where data is to be migrated
* @param destDsId destination image store ID to where data is to be migrated
* @param fileSize size of the data object to be migrated so as to recompute the storage metrics
* @return a map - Key: Datastore ID ; Value: Pair<Free Capacity , Total Capacity>
private Map<Long, Pair<Long, Long>> assumeMigrate(Map<Long, Pair<Long, Long>> storageCapacities, Long srcDsId, Long destDsId, Long fileSize) {
Map<Long, Pair<Long, Long>> modifiedCapacities = new Hashtable<>();
Pair<Long, Long> srcDSMetrics = storageCapacities.get(srcDsId);
Pair<Long, Long> destDSMetrics = storageCapacities.get(destDsId);
modifiedCapacities.put(srcDsId, new Pair<>(srcDSMetrics.first() + fileSize, srcDSMetrics.second()));
modifiedCapacities.put(destDsId, new Pair<>(destDSMetrics.first() - fileSize, destDSMetrics.second()));
return modifiedCapacities;
* This function determines if migration should in fact take place or not :
* - For Balanced migration - the mean standard deviation is calculated before and after (supposed) migration
* and a decision is made if migration is afterall beneficial
* - For Complete migration - We check if the destination image store has sufficient capacity i.e., below the threshold of (90%)
* and then proceed with the migration
* @param chosenFile file for migration
* @param srcDatastoreId source image store ID from where data is to be migrated
* @param destDatastoreId destination image store ID to where data is to be migrated
* @param storageCapacities Map comprising the metrics(free and total capacities) of the images stores considered
* @param snapshotChains Map containing details of chain of snapshots and their cumulative size
* @param migrationPolicy determines whether a "Balance" or "Complete" migration operation is to be performed
* @return
private boolean shouldMigrate(
DataObject chosenFile,
Long srcDatastoreId,
Long destDatastoreId,
Map<Long, Pair<Long, Long>> storageCapacities,
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains,
Map<DataObject, Pair<List<TemplateInfo>, Long>> templateChains,
MigrationPolicy migrationPolicy) {
if (migrationPolicy == MigrationPolicy.BALANCE) {
double meanStdDevCurrent = getStandardDeviation(storageCapacities);
Long fileSize = migrationHelper.getFileSize(chosenFile, snapshotChains, templateChains);
Map<Long, Pair<Long, Long>> proposedCapacities = assumeMigrate(storageCapacities, srcDatastoreId, destDatastoreId, fileSize);
double meanStdDevAfter = getStandardDeviation(proposedCapacities);
if (meanStdDevAfter > meanStdDevCurrent) {
s_logger.debug("migrating the file doesn't prove to be beneficial, skipping migration");
return false;
Double threshold = ImageStoreImbalanceThreshold.value();
if (meanStdDevCurrent > threshold && storageCapacityBelowThreshold(storageCapacities, destDatastoreId)) {
return true;
} else {
if (storageCapacityBelowThreshold(storageCapacities, destDatastoreId)) {
return true;
return false;
private boolean storageCapacityBelowThreshold(Map<Long, Pair<Long, Long>> storageCapacities, Long destStoreId) {
Pair<Long, Long> imageStoreCapacity = storageCapacities.get(destStoreId);
long usedCapacity = imageStoreCapacity.second() - imageStoreCapacity.first();
if (imageStoreCapacity != null && (usedCapacity / (imageStoreCapacity.second() * 1.0)) <= CapacityManager.SecondaryStorageCapacityThreshold.value()) {
s_logger.debug("image store: " + destStoreId + " has sufficient capacity to proceed with migration of file");
return true;
s_logger.debug("Image store capacity threshold exceeded, migration not possible");
return false;
private double calculateStorageMean(double[] storageMetrics) {
return new Mean().evaluate(storageMetrics);
private double calculateStorageStandardDeviation(double[] metricValues, double mean) {
StandardDeviation standardDeviation = new StandardDeviation(false);
return standardDeviation.evaluate(metricValues, mean);
private class MigrateDataTask implements Callable<AsyncCallFuture<DataObjectResult>> {
private DataObject file;
private DataStore srcDataStore;
private DataStore destDataStore;
private Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain;
private Map<DataObject, Pair<List<TemplateInfo>, Long>> templateChain;
public MigrateDataTask(DataObject file, DataStore srcDataStore, DataStore destDataStore) {
this.file = file;
this.srcDataStore = srcDataStore;
this.destDataStore = destDataStore;
public void setSnapshotChains(Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain) {
this.snapshotChain = snapshotChain;
public Map<DataObject, Pair<List<SnapshotInfo>, Long>> getSnapshotChain() {
return snapshotChain;
public Map<DataObject, Pair<List<TemplateInfo>, Long>> getTemplateChain() {
return templateChain;
public void setTemplateChain(Map<DataObject, Pair<List<TemplateInfo>, Long>> templateChain) {
this.templateChain = templateChain;
public DataObject getFile() {
return file;
public AsyncCallFuture<DataObjectResult> call() throws Exception {
return secStgSrv.migrateData(file, srcDataStore, destDataStore, snapshotChain, templateChain);