blob: 5edf05d1a5c041111f5a8ada84f987202c90719d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.Command.State;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.PingAnswer;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.storage.MigrateVolumeCommand;
import com.cloud.agent.api.to.DataObjectType;
import com.cloud.agent.api.to.DataTO;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.storage.Volume;
import com.cloud.storage.VolumeApiService;
import com.cloud.storage.VolumeVO;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.utils.Pair;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.command.dao.ReconcileCommandDao;
import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.command.CommandResult;
import org.apache.cloudstack.storage.command.CopyCommand;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
import org.apache.cloudstack.storage.volume.VolumeOnStorageTO;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
@Component
public class ReconcileCommandServiceImpl extends ManagerBase implements ReconcileCommandService, Configurable {
final static long ManagementServerId = ManagementServerNode.getManagementServerId();
final static int GracePeriod = 10 * 60; // 10 minutes
private boolean _reconcileCommandsEnabled = false;
private ScheduledExecutorService reconcileCommandsExecutor;
private ExecutorService reconcileCommandTaskExecutor;
CompletionService<ReconcileCommandResult> completionService;
@Inject
ReconcileCommandDao reconcileCommandDao;
@Inject
ManagementServerHostDao managementServerHostDao;
@Inject
HostDao hostDao;
@Inject
AgentManager agentManager;
@Inject
VMInstanceDao vmInstanceDao;
@Inject
VolumeDao volumeDao;
@Inject
EndPointSelector endPointSelector;
@Inject
DataStoreManager dataStoreManager;
@Inject
VolumeDataStoreDao volumeDataStoreDao;
@Inject
VolumeOrchestrationService volumeManager;
@Inject
private VolumeApiService volumeApiService;
@Inject
private AccountManager accountManager;
@Override
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
_reconcileCommandsEnabled = ReconcileCommandsEnabled.value();
if (_reconcileCommandsEnabled) {
// create thread pool and blocking queue
final int workersCount = ReconcileCommandsWorkers.value();
reconcileCommandTaskExecutor = Executors.newFixedThreadPool(workersCount, new NamedThreadFactory("Reconcile-Command-Task-Executor"));
final BlockingQueue<Future<ReconcileCommandResult>> queue = new LinkedBlockingQueue<>(workersCount);
completionService = new ExecutorCompletionService<>(reconcileCommandTaskExecutor, queue);
reconcileCommandsExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Reconcile-Commands-Worker"));
reconcileCommandsExecutor.scheduleWithFixedDelay(new ReconcileCommandsWorker(),
ReconcileCommandsInterval.value(), ReconcileCommandsInterval.value(), TimeUnit.SECONDS);
}
return true;
}
@Override
public boolean stop() {
if (reconcileCommandsExecutor != null) {
reconcileCommandsExecutor.shutdownNow();
}
if (reconcileCommandTaskExecutor != null) {
reconcileCommandTaskExecutor.shutdownNow();
}
if (_reconcileCommandsEnabled) {
reconcileCommandDao.updateCommandsToInterruptedByManagementServerId(ManagementServerId);
}
return true;
}
@Override
public String getConfigComponentName() {
return ReconcileCommandService.class.getName();
}
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[]{ ReconcileCommandsEnabled, ReconcileCommandsInterval, ReconcileCommandsWorkers, ReconcileCommandsMaxAttempts };
}
@Override
public void persistReconcileCommands(Long hostId, Long requestSequence, Command[] commands) {
if (!_reconcileCommandsEnabled) {
return;
}
HostVO host = hostDao.findById(hostId);
if (host == null || !SupportedHypervisorTypes.contains(host.getHypervisorType())) {
return;
}
for (Command cmd : commands) {
if (cmd.isReconcile()) {
persistReconcileCommand(hostId, requestSequence, cmd);
}
}
}
private void persistReconcileCommand(Long hostId, Long requestSequence, Command cmd) {
ReconcileCommandVO reconcileCommandVO = new ReconcileCommandVO();
reconcileCommandVO.setManagementServerId(ManagementServerId);
reconcileCommandVO.setCommandInfo(CommandInfo.GSON.toJson(cmd));
reconcileCommandVO.setCommandName(cmd.toString());
reconcileCommandVO.setCreated(new Date());
reconcileCommandVO.setUpdated(new Date());
reconcileCommandVO.setStateByManagement(State.CREATED);
reconcileCommandVO.setHostId(hostId);
reconcileCommandVO.setRequestSequence(requestSequence);
if (cmd instanceof CopyCommand) {
CopyCommand copyCmd = (CopyCommand)cmd;
DataTO srcData = copyCmd.getSrcTO();
if (srcData != null && srcData.getDataStore() instanceof PrimaryDataStoreTO) {
reconcileCommandVO.setResourceType(ApiCommandResourceType.Volume);
reconcileCommandVO.setResourceId(srcData.getId());
}
} else if (cmd instanceof MigrateCommand) {
MigrateCommand migrateCommand = (MigrateCommand)cmd;
reconcileCommandVO.setResourceType(ApiCommandResourceType.VirtualMachine);
reconcileCommandVO.setResourceId(migrateCommand.getVirtualMachine().getId());
} else if (cmd instanceof MigrateVolumeCommand) {
MigrateVolumeCommand migrateVolumeCommand = (MigrateVolumeCommand)cmd;
reconcileCommandVO.setResourceType(ApiCommandResourceType.Volume);
reconcileCommandVO.setResourceId(migrateVolumeCommand.getVolumeId());
}
reconcileCommandDao.persist(reconcileCommandVO);
}
@Override
public boolean updateReconcileCommand(long requestSeq, Command command, Answer answer, State newStateByManagement, State newStateByAgent) {
String commandKey = getCommandKey(requestSeq, command);
logger.debug(String.format("Updating reconcile command %s with answer %s and new states %s-%s", commandKey, answer, newStateByManagement, newStateByAgent));
ReconcileCommandVO reconcileCommandVO = reconcileCommandDao.findCommand(requestSeq, command.toString());
if (reconcileCommandVO == null) {
logger.debug(String.format("Skipped updating reconcile command %s due to no record is found in DB", commandKey));
return false;
}
boolean updated = false;
if (newStateByManagement != null) {
if (State.RECONCILE_RETRY.equals(newStateByManagement)) {
if (State.RECONCILING.equals(reconcileCommandVO.getStateByManagement())) {
reconcileCommandVO.setStateByManagement(newStateByManagement);
updated = true;
} else {
logger.debug(String.format("Skipping the update of state by management of command %s from %s to %s", commandKey, reconcileCommandVO.getStateByManagement(), newStateByManagement));
}
} else if (!newStateByManagement.equals(reconcileCommandVO.getStateByManagement())) {
reconcileCommandVO.setStateByManagement(newStateByManagement);
updated = true;
}
if (State.RECONCILE_FAILED.equals(newStateByManagement)) {
reconcileCommandVO.setRetryCount(reconcileCommandVO.getRetryCount() + 1);
updated = true;
}
if (ManagementServerId != ManagementServerNode.getManagementServerId()) {
reconcileCommandVO.setManagementServerId(ManagementServerId);
updated = true;
}
}
if (newStateByAgent != null) {
if (!newStateByAgent.equals(reconcileCommandVO.getStateByAgent())) {
reconcileCommandVO.setStateByAgent(newStateByAgent);
updated = true;
}
}
String commandInfo = CommandInfo.GSON.toJson(command);
if (!commandInfo.equals(reconcileCommandVO.getCommandInfo())) {
reconcileCommandVO.setCommandInfo(commandInfo);
updated = true;
}
if (answer != null && (reconcileCommandVO.getAnswerName() == null || answer instanceof ReconcileAnswer
|| reconcileCommandVO.getAnswerName().equals(answer.toString()))) {
reconcileCommandVO.setAnswerName(answer.toString());
reconcileCommandVO.setAnswerInfo(CommandInfo.GSON.toJson(answer));
updated = true;
}
if (updated) {
reconcileCommandVO.setUpdated(new Date());
reconcileCommandDao.update(reconcileCommandVO.getId(), reconcileCommandVO);
}
return true;
}
private String getCommandKey(long requestSeq, Command command) {
return requestSeq + "-" + command;
}
private class ReconcileCommandsWorker extends ManagedContextRunnable {
@Override
protected void runInContext() {
GlobalLock gcLock = GlobalLock.getInternLock("Reconcile.Commands.Lock");
try {
if (gcLock.lock(3)) {
try {
reallyRun();
} finally {
gcLock.unlock();
}
}
} finally {
gcLock.releaseRef();
}
}
private List<ReconcileCommandVO> getReconcileCommands() {
ManagementServerHostVO msHost = managementServerHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
if (msHost == null || msHost.getMsid() != ManagementServerId) {
return new ArrayList<>();
}
return reconcileCommandDao.listByState(State.INTERRUPTED, State.TIMED_OUT, State.RECONCILE_RETRY, State.RECONCILING, State.RECONCILE_FAILED, State.CREATED);
}
public void reallyRun() {
List<ReconcileCommandVO> reconcileCommands = getReconcileCommands();
logger.debug(String.format("Reconciling %s command(s) ...", reconcileCommands.size()));
for (ReconcileCommandVO reconcileCommand : reconcileCommands) {
ReconcileCommandTask task = new ReconcileCommandTask(reconcileCommand);
completionService.submit(task);
}
for (int i = 0; i < reconcileCommands.size(); i++) {
try {
Future<ReconcileCommandResult> future = completionService.take();
ReconcileCommandResult result = future.get();
long requestSequence = result.getRequestSequence();
Command command = result.getCommand();
ReconcileAnswer answer = result.getAnswer();
String commandKey = getCommandKey(requestSequence, command);
if (result.isFailed()) {
throw new CloudRuntimeException(String.format("Failed to reconcile command %s due to: %s", commandKey, result.getResult()));
}
if (answer != null && answer.getResult()) {
logger.debug(String.format("Command %s has been reconciled with answer %s", commandKey, answer));
if (result.isReconciled()) {
if (processReconcileAnswer(requestSequence, command, answer)) {
updateReconcileCommand(requestSequence, result.getCommand(), answer, State.RECONCILED, null);
reconcileCommandDao.removeCommand(requestSequence, result.getCommand().toString(), null);
} else {
updateReconcileCommand(requestSequence, result.getCommand(), answer, State.RECONCILE_RETRY, null);
}
} else {
updateReconcileCommand(requestSequence, result.getCommand(), answer, State.RECONCILE_FAILED, null);
}
} else if (result.isReconciled()) {
logger.info(String.format("Command %s is reconciled but answer is null, skipping the reconciliation", commandKey));
updateReconcileCommand(requestSequence, result.getCommand(), answer, State.RECONCILE_SKIPPED, null);
reconcileCommandDao.removeCommand(requestSequence, result.getCommand().toString(), null);
} else {
logger.info(String.format("Command %s is not reconciled, will retry", commandKey));
updateReconcileCommand(requestSequence, result.getCommand(), answer, State.RECONCILE_RETRY, null);
}
} catch (InterruptedException | ExecutionException e) {
logger.error(String.format("Failed to reconcile command due to: %s", e.getMessage()), e);
throw new CloudRuntimeException("Failed to reconcile command");
}
}
}
}
public static class ReconcileCommandResult extends CommandResult {
long requestSequence;
Command command;
ReconcileAnswer answer;
boolean isReconciled;
public ReconcileCommandResult(long requestSequence, Command command, ReconcileAnswer answer, boolean isReconciled) {
super();
this.requestSequence = requestSequence;
this.command = command;
this.answer = answer;
this.isReconciled = isReconciled;
}
public long getRequestSequence() {
return requestSequence;
}
public Command getCommand() {
return command;
}
public ReconcileAnswer getAnswer() {
return answer;
}
public boolean isReconciled() {
return isReconciled;
}
}
protected class ReconcileCommandTask implements Callable<ReconcileCommandResult> {
long requestSequence;
Command command;
State stateByManagement;
State stateByAgent;
Long hostId;
Long retryCount;
ReconcileCommandVO reconcileCommand;
public ReconcileCommandTask(ReconcileCommandVO reconcileCommand) {
this.requestSequence = reconcileCommand.getRequestSequence();
this.stateByManagement = reconcileCommand.getStateByManagement();
this.stateByAgent = reconcileCommand.getStateByAgent();
this.hostId = reconcileCommand.getHostId();
this.retryCount = reconcileCommand.getRetryCount();
this.reconcileCommand = reconcileCommand;
this.command = ReconcileCommandUtils.parseCommandInfo(reconcileCommand.getCommandName(), reconcileCommand.getCommandInfo());
}
@Override
public ReconcileCommandResult call() {
String commandKey = getCommandKey(requestSequence, command);
HostVO host = hostDao.findByIdIncludingRemoved(hostId);
assert host != null;
if (!SupportedHypervisorTypes.contains(host.getHypervisorType())) {
return new ReconcileCommandResult(requestSequence, command, null, false);
}
logger.debug(String.format("Reconciling command %s with state %s-%s", commandKey, stateByManagement, stateByAgent));
if (State.TIMED_OUT.equals(stateByManagement)) {
logger.debug(String.format("The command %s timed out on management server. Reconciling ...", commandKey));
return reconcile(reconcileCommand);
} else if (Arrays.asList(State.INTERRUPTED, State.RECONCILE_RETRY).contains(stateByManagement)) {
logger.debug(String.format("The command %s is %s on management server. Reconciling ...", commandKey, stateByManagement));
return reconcile(reconcileCommand);
} else if (State.RECONCILING.equals(stateByManagement)) {
Date now = new Date();
if (reconcileCommand.getUpdated() != null && reconcileCommand.getUpdated().getTime() > now.getTime() - GracePeriod * 1000) {
logger.debug(String.format("The command %s is being reconciled, skipping and wait for next run", commandKey));
} else {
logger.debug(String.format("The command %s is %s, the state seems out of date, updating to RECONCILE_READY", commandKey, stateByManagement));
reconcileCommand = reconcileCommandDao.findById(reconcileCommand.getId());
reconcileCommand.setStateByManagement(State.RECONCILE_RETRY);
reconcileCommandDao.update(reconcileCommand.getId(), reconcileCommand);
}
} else if (State.RECONCILE_FAILED.equals(stateByManagement)) {
if (retryCount != null && retryCount <= ReconcileCommandsMaxAttempts.value()) {
logger.debug(String.format("The command %s has been reconciled %s times, retrying", commandKey, retryCount));
return reconcile(reconcileCommand);
} else {
logger.debug(String.format("The command %s has been reconciled %s times, skipping", commandKey, retryCount));
return new ReconcileCommandResult(requestSequence, command, null, true);
}
} else if (State.RECONCILED.equals(stateByManagement)) {
logger.debug(String.format("The command %s has been reconciled, skipping", commandKey));
} else if (stateByAgent == null) { // state by management is CREATED
logger.debug(String.format("Skipping the reconciliation of command %s, because the state by agent is null", commandKey));
} else if (Arrays.asList(State.STARTED, State.PROCESSING, State.PROCESSING_IN_BACKEND).contains(stateByAgent)) {
if (host.getRemoved() == null && Status.Up.equals(host.getStatus())) {
logger.debug(String.format("Skipping the reconciliation of command %s, because the host %s is Up, the command may be still in processing", commandKey, host));
} else if (host.getRemoved() != null) {
logger.debug(String.format("The host %s has been removed on %s, Reconciling command %s ...", host, host.getRemoved(), commandKey));
return reconcile(reconcileCommand);
} else {
logger.debug(String.format("The host %s is in %s state, Reconciling command %s ...", host, host.getStatus(), commandKey));
return reconcile(reconcileCommand);
}
} else if (Arrays.asList(State.COMPLETED, State.FAILED).contains(stateByAgent)) {
Date now = new Date();
if (reconcileCommand.getUpdated() != null && reconcileCommand.getUpdated().getTime() > now.getTime() - GracePeriod * 1000) {
logger.debug(String.format("The command %s is %s on host %s, it seems the answer is not processed by any management server. Skipping ...", commandKey, stateByAgent, host));
} else {
logger.debug(String.format("The command %s is %s on host %s, it seems the answer is not processed by any management server. Reconciling ...", commandKey, stateByAgent, host));
return reconcile(reconcileCommand);
}
} else if (Arrays.asList(State.INTERRUPTED, State.DANGLED_IN_BACKEND).contains(stateByAgent)) {
logger.debug(String.format("The command %s is %s on host %s, the cloudstack agent might has been restarted. Reconciling ...", commandKey, stateByAgent, host));
return reconcile(reconcileCommand);
}
return new ReconcileCommandResult(requestSequence, command, null, false);
}
}
protected ReconcileCommandResult reconcile(ReconcileCommandVO reconcileCommandVO) {
Command command = ReconcileCommandUtils.parseCommandInfo(reconcileCommandVO.getCommandName(), reconcileCommandVO.getCommandInfo());
if (!preReconcileCheck(command)) {
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, true);
}
if (command instanceof MigrateCommand) {
updateReconcileCommand(reconcileCommandVO.getRequestSequence(), command, null, State.RECONCILING, null);
return reconcile(reconcileCommandVO, (MigrateCommand) command);
} else if (command instanceof CopyCommand) {
updateReconcileCommand(reconcileCommandVO.getRequestSequence(), command, null, State.RECONCILING, null);
return reconcile(reconcileCommandVO, (CopyCommand) command);
} else if (command instanceof MigrateVolumeCommand) {
updateReconcileCommand(reconcileCommandVO.getRequestSequence(), command, null, State.RECONCILING, null);
return reconcile(reconcileCommandVO, (MigrateVolumeCommand) command);
} else {
logger.error(String.format("Unsupported reconcile command %s ", command));
}
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, true);
}
boolean preReconcileCheck(Command command) {
if (command instanceof MigrateCommand) {
MigrateCommand migrateCommand = (MigrateCommand) command;
Long vmId = migrateCommand.getVirtualMachine().getId();
VMInstanceVO vm = vmInstanceDao.findById(vmId);
if (vm == null) {
logger.debug(String.format("Skipping reconciliation of command %s as vm %s has been removed", command, vm));
return false;
}
if (MapUtils.isEmpty(migrateCommand.getMigrateStorage())) {
logger.debug(String.format("Skipping reconciliation of command %s as the migration does not migrate volumes of vm %s", command, vm));
return false;
}
List<VolumeVO> volumes = volumeDao.findByInstance(vmId);
for (VolumeVO volume : volumes) {
if (Volume.State.Migrating.equals(volume.getState())) {
return true;
}
}
logger.debug(String.format("Skipping reconciliation of command %s as the vm %s does not have volume in Migrating state", command, vm));
return false;
} else if (command instanceof CopyCommand) {
DataTO srcData = ((CopyCommand) command).getSrcTO();
DataTO destData = ((CopyCommand) command).getDestTO();
if (srcData != null && srcData.getDataStore() instanceof PrimaryDataStoreTO) {
VolumeVO volumeVO = volumeDao.findById(srcData.getId());
if (volumeVO == null || !Arrays.asList(Volume.State.Migrating, Volume.State.Ready).contains(volumeVO.getState())) {
logger.debug(String.format("Skipping reconciliation of command %s as source volume %s is removed or not Migrating or Ready", command, volumeVO));
return false;
}
}
if (destData != null && destData.getDataStore() instanceof PrimaryDataStoreTO) {
VolumeVO volumeVO = volumeDao.findById(destData.getId());
if (volumeVO == null || !Arrays.asList(Volume.State.Migrating, Volume.State.Ready, Volume.State.Creating).contains(volumeVO.getState())) {
logger.debug(String.format("Skipping reconciliation of command %s as destination volume %s is removed or not Migrating or Ready or Creating", command, volumeVO));
return false;
}
}
} else if (command instanceof MigrateVolumeCommand) {
DataTO srcData = ((MigrateVolumeCommand) command).getSrcData();
DataTO destData = ((MigrateVolumeCommand) command).getDestData();
if (srcData == null || destData == null) {
logger.debug(String.format("Skipping reconciliation of command %s as the source volume (%s) or destination volume (%s) is NULL", command, srcData, destData));
return false;
}
if (srcData.getId() != destData.getId()) {
logger.debug(String.format("Skipping reconciliation of command %s as the source volume (id: %s) and destination volume (id: %s) have different ID", command, srcData.getId(), destData.getId()));
return false;
}
VolumeVO volumeVO = volumeDao.findById(srcData.getId());
if (volumeVO == null) {
logger.debug(String.format("Skipping reconciliation of command %s as the volume (id: %s) has been removed", command, srcData.getId()));
return false;
} else if (!Volume.State.Migrating.equals(volumeVO.getState())) {
logger.debug(String.format("Skipping reconciliation of command %s as the volume %s (state: %s) is not Migrating state", command, volumeVO, volumeVO.getState()));
return false;
}
if (!volumeVO.getPath().equals(destData.getPath())) {
logger.debug(String.format("Skipping reconciliation of command %s as the volume path (%s) is not same as destination volume path (%s)", command, volumeVO.getPath(), destData.getPath()));
return false;
}
}
return true;
}
private ReconcileCommandResult reconcile(ReconcileCommandVO reconcileCommandVO, MigrateCommand command) {
Long vmId = command.getVirtualMachine().getId();
VMInstanceVO vm = vmInstanceDao.findById(vmId);
if (vm == null) {
logger.debug(String.format("VM (id: %s) has been removed", vmId));
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, true);
}
if (!VirtualMachine.State.Running.equals(vm.getState())) {
logger.debug(String.format("VM %s (state: %s) is not Running state, wait for next run", vm, vm.getState()));
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, false);
}
ReconcileMigrateAnswer reconcileMigrateAnswer = new ReconcileMigrateAnswer();
reconcileMigrateAnswer.setResourceType(ApiCommandResourceType.VirtualMachine);
reconcileMigrateAnswer.setResourceId(vmId);
Long hostId = vm.getHostId();
HostVO sourceHost = hostDao.findById(hostId);
if (sourceHost != null && sourceHost.getStatus() == Status.Up) {
ReconcileMigrateCommand reconcileMigrateCommand = new ReconcileMigrateCommand(command.getVmName());
Answer reconcileAnswer = agentManager.easySend(sourceHost.getId(), reconcileMigrateCommand);
reconcileMigrateAnswer.setHostId(sourceHost.getId());
if (reconcileAnswer instanceof ReconcileMigrateAnswer) {
reconcileMigrateAnswer.setVmState(((ReconcileMigrateAnswer) reconcileAnswer).getVmState());
reconcileMigrateAnswer.setVmDisks(((ReconcileMigrateAnswer) reconcileAnswer).getVmDisks());
}
}
boolean isReconciled = (reconcileMigrateAnswer.getVmState() != null);
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, reconcileMigrateAnswer, isReconciled);
}
private EndPoint getEndpoint(DataTO srcData, DataTO destData) {
EndPoint endPoint = null;
if (srcData.getDataStore() instanceof PrimaryDataStoreTO) {
PrimaryDataStoreTO srcDataStore = (PrimaryDataStoreTO) srcData.getDataStore();
if (srcDataStore.isManaged()) {
return null;
}
DataStore store = dataStoreManager.getPrimaryDataStore(srcDataStore.getId());
endPoint = endPointSelector.select(store);
} else if (destData != null && destData.getDataStore() instanceof PrimaryDataStoreTO) {
PrimaryDataStoreTO destDataStore = (PrimaryDataStoreTO) destData.getDataStore();
if (destDataStore.isManaged()) {
return null;
}
DataStore store = dataStoreManager.getPrimaryDataStore(destDataStore.getId());
endPoint = endPointSelector.select(store);
}
return endPoint;
}
private ReconcileCommandResult reconcile(ReconcileCommandVO reconcileCommandVO, CopyCommand command) {
DataTO srcData = command.getSrcTO();
DataTO destData = command.getDestTO();
if (srcData == null || destData == null) {
logger.debug(String.format("Unable to reconcile command %s with srcData %s and destData %s", command, srcData, destData));
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, false);
}
Long hostId = reconcileCommandVO.getHostId();
HostVO host = hostDao.findById(hostId);
if (host == null || !Status.Up.equals(host.getStatus())) {
EndPoint endPoint = getEndpoint(srcData, destData);
if (endPoint == null) {
logger.debug(String.format("Unable to reconcile command %s with srcData %s and destData %s as endpoint is null", command, srcData, destData));
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, false);
}
host = hostDao.findById(endPoint.getId());
}
// Send reconcileCommand to the host
logger.info(String.format("Reconciling command %s via host %s", command, host));
ReconcileCopyCommand reconcileCommand = new ReconcileCopyCommand(srcData, destData, command.getOptions(), command.getOptions2());
Answer reconcileAnswer = agentManager.easySend(host.getId(), reconcileCommand);
if (!(reconcileAnswer instanceof ReconcileAnswer)) {
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, true);
}
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, (ReconcileAnswer) reconcileAnswer, true);
}
private ReconcileCommandResult reconcile(ReconcileCommandVO reconcileCommandVO, MigrateVolumeCommand command) {
DataTO srcData = command.getSrcData();
DataTO destData = command.getDestData();
if (srcData == null || destData == null) {
logger.debug(String.format("Unable to reconcile command %s with srcData %s and destData %s", command, srcData, destData));
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, true);
}
VolumeVO volume = volumeDao.findById(srcData.getId());
if (volume == null) {
logger.debug(String.format("Unable to reconcile command %s with removed volume (id: %s)", command, srcData.getId()));
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, true);
}
Long hostId = reconcileCommandVO.getHostId();
HostVO host = hostDao.findById(hostId);
if (host == null || !Status.Up.equals(host.getStatus())) {
EndPoint endPoint = getEndpoint(srcData, destData);
if (endPoint == null) {
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, false);
}
host = hostDao.findById(endPoint.getId());
}
// Send reconcileCommand to the host
logger.info(String.format("Reconciling command %s via host %s", command, host));
ReconcileMigrateVolumeCommand reconcileCommand = new ReconcileMigrateVolumeCommand(srcData, destData);
if (volume.getInstanceId() != null) {
VMInstanceVO vmInstance = vmInstanceDao.findById(volume.getInstanceId());
if (vmInstance != null) {
reconcileCommand.setVmName(vmInstance.getInstanceName());
}
}
Answer reconcileAnswer = agentManager.easySend(host.getId(), reconcileCommand);
if (!(reconcileAnswer instanceof ReconcileAnswer)) {
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, null, true);
}
return new ReconcileCommandResult(reconcileCommandVO.getRequestSequence(), command, (ReconcileAnswer) reconcileAnswer, true);
}
@Override
public void processCommand(Command pingCommand, Answer pingAnswer) {
if (pingCommand instanceof PingCommand && pingAnswer instanceof PingAnswer) {
CommandInfo[] commandInfos = ((PingCommand) pingCommand).getCommandInfos();
for (CommandInfo commandInfo : commandInfos) {
processCommandInfo(commandInfo, (PingAnswer) pingAnswer);
}
}
}
private void processCommandInfo(CommandInfo commandInfo, PingAnswer pingAnswer) {
Command parsedCommand = ReconcileCommandUtils.parseCommandInfo(commandInfo);
Answer parsedAnswer = ReconcileCommandUtils.parseAnswerFromCommandInfo(commandInfo);
if (parsedCommand != null && parsedCommand.isReconcile()) {
if (updateReconcileCommand(commandInfo.getRequestSeq(), parsedCommand, parsedAnswer, null, commandInfo.getState())) {
pingAnswer.addReconcileCommand(getCommandKey(commandInfo.getRequestSeq(), parsedCommand));
}
}
}
@Override
public void processAnswers(long requestSeq, Command[] commands, Answer[] answers) {
if (commands.length != answers.length) {
logger.error(String.format("Incorrect number of commands (%s) and answers (%s)", commands.length, answers.length));
}
for (int i = 0; i < answers.length; i++) {
Command command = commands[i];
Answer answer = answers[i];
if (command.isReconcile() && answer.getResult()) {
reconcileCommandDao.removeCommand(requestSeq, command.toString(), State.COMPLETED);
}
}
}
@Override
public void updateReconcileCommandToInterruptedByManagementServerId(long managementServerId) {
logger.debug("Updating reconcile command to interrupted by management server id " + managementServerId);
reconcileCommandDao.updateCommandsToInterruptedByManagementServerId(managementServerId);
}
@Override
public void updateReconcileCommandToInterruptedByHostId(long hostId) {
logger.debug("Updating reconcile command to interrupted by host id " + hostId);
reconcileCommandDao.updateCommandsToInterruptedByHostId(hostId);
}
private boolean processReconcileAnswer(long requestSequence, Command cmd, ReconcileAnswer reconcileAnswer) {
if (cmd instanceof MigrateCommand && reconcileAnswer instanceof ReconcileMigrateAnswer) {
MigrateCommand command = (MigrateCommand) cmd;
ReconcileMigrateAnswer answer = (ReconcileMigrateAnswer) reconcileAnswer;
return processReconcileMigrateAnswer(command, answer);
} else if (cmd instanceof CopyCommand && reconcileAnswer instanceof ReconcileCopyAnswer) {
CopyCommand command = (CopyCommand) cmd;
ReconcileCopyAnswer answer = (ReconcileCopyAnswer) reconcileAnswer;
return processReconcileCopyAnswer(requestSequence, command, answer);
} else if (cmd instanceof MigrateVolumeCommand && reconcileAnswer instanceof ReconcileMigrateVolumeAnswer) {
MigrateVolumeCommand command = (MigrateVolumeCommand) cmd;
ReconcileMigrateVolumeAnswer answer = (ReconcileMigrateVolumeAnswer) reconcileAnswer;
return processReconcileMigrateVolumeAnswer(requestSequence, command, answer);
}
return true;
}
private boolean processReconcileMigrateAnswer(MigrateCommand command, ReconcileMigrateAnswer reconcileAnswer) {
Long vmId = command.getVirtualMachine().getId();
VMInstanceVO vm = vmInstanceDao.findById(vmId);
if (vm == null) {
logger.debug(String.format("VM (id: %s) has been removed", vmId));
return true;
}
if (!VirtualMachine.State.Running.equals(vm.getState())) {
logger.debug(String.format("VM %s (state: %s) is not Running state", vm, vm.getState()));
return true;
}
Map<String, MigrateCommand.MigrateDiskInfo> migrateDiskInfoMap = command.getMigrateStorage();
if (MapUtils.isEmpty(migrateDiskInfoMap)) {
return true;
}
if (!VirtualMachine.State.Running.equals(reconcileAnswer.getVmState())) {
logger.debug(String.format("VM %s is %s state on the host, will retry", vm, vm.getState()));
return false;
}
List<String> diskPaths = reconcileAnswer.getVmDisks();
logger.debug(String.format("The disks attached to the VM %s after live vm migration are: %s", vm, diskPaths));
List<VolumeVO> volumes = volumeDao.findByInstance(vmId);
for (VolumeVO volumeVO : volumes) {
if (Volume.State.Migrating.equals(volumeVO.getState())) {
logger.debug(String.format("Reconciling vm %s with volume %s in Migrating state", vm, volumeVO));
logger.debug(String.format("Searching for volumes with last_id = %s", volumeVO.getId()));
VolumeVO destVolume = volumeDao.findByLastIdAndState(volumeVO.getId(), Volume.State.Migrating);
if (destVolume != null) {
logger.debug(String.format("Found destination volume with last_id = %s: %s", volumeVO.getId(), destVolume));
Boolean isVolumeMigrated = isVolumeMigrated(diskPaths, volumeVO.getPath(), destVolume.getPath());
if (isVolumeMigrated == null) {
logger.debug(String.format("Unable to determine if source volume %s has been migrated to destination volume %s", volumeVO, destVolume));
continue;
}
if (isVolumeMigrated) {
logger.debug(String.format("Adding destination volume %s to vm %s as part of reconciliation of command %s and answer %s", destVolume, vm, command, reconcileAnswer));
destVolume.setState(Volume.State.Ready);
destVolume.setInstanceId(vmId);
volumeDao.update(destVolume.getId(), destVolume);
logger.debug(String.format("Removing volume %s from vm %s as part of reconciliation of command %s and answer %s", volumeVO, vm, command, reconcileAnswer));
volumeVO.setState(Volume.State.Destroy);
volumeVO.setVolumeType(Volume.Type.DATADISK);
volumeVO.setInstanceId(null);
volumeDao.update(volumeVO.getId(), volumeVO);
} else {
logger.debug(String.format("Removing destination volume %s from vm %s as part of reconciliation of command %s and answer %s", destVolume, vm, command, reconcileAnswer));
destVolume.setState(Volume.State.Destroy);
destVolume.setVolumeType(Volume.Type.DATADISK);
destVolume.setInstanceId(null);
volumeDao.update(destVolume.getId(), destVolume);
logger.debug(String.format("Adding volume %s to vm %s as part of reconciliation of command %s and answer %s", volumeVO, vm, command, reconcileAnswer));
volumeVO.setState(Volume.State.Ready);
volumeVO.setInstanceId(vmId);
volumeDao.update(volumeVO.getId(), volumeVO);
}
}
}
}
volumes = volumeDao.findByInstance(vmId);
logger.debug(String.format("The disks of volumes attached to the VM %s after reconciliation of successful vm migration are: %s", vm, volumes.stream().map(VolumeVO::getPath).collect(Collectors.toList())));
return true;
}
private Boolean isVolumeMigrated(List<String> diskPaths, String sourceVolumePath, String destVolumePath) {
if (StringUtils.isAnyBlank(sourceVolumePath, destVolumePath)) {
return null;
}
for (String diskPath : diskPaths) {
if (diskPath.contains(sourceVolumePath) && !diskPath.contains(destVolumePath)) {
return false; // Not migrated
} else if (!diskPath.contains(sourceVolumePath) && diskPath.contains(destVolumePath)) {
return true; // Migrated
}
}
return null; // Unable to determine
}
private boolean processReconcileCopyAnswer(long requestSequence, CopyCommand command, ReconcileCopyAnswer reconcileAnswer) {
DataTO srcData = command.getSrcTO();
DataTO destData = command.getDestTO();
final Long srcStoreId = srcData.getDataStore() instanceof PrimaryDataStoreTO ? ((PrimaryDataStoreTO) srcData.getDataStore()).getId() : null;
final Long destStoreId = destData.getDataStore() instanceof PrimaryDataStoreTO ? ((PrimaryDataStoreTO) destData.getDataStore()).getId() : null;
VolumeVO sourceVolume = srcData.getObjectType().equals(DataObjectType.VOLUME) && srcData.getDataStore() instanceof PrimaryDataStoreTO ? volumeDao.findByIdIncludingRemoved(srcData.getId()) : null;
VolumeVO destVolume = destData.getObjectType().equals(DataObjectType.VOLUME) && destData.getDataStore() instanceof PrimaryDataStoreTO ? volumeDao.findByIdIncludingRemoved(destData.getId()) : null;
if (reconcileAnswer.isSkipped()) {
logger.debug(String.format("The reconcile command for source volume (id: %s) to destination volume (id: %s) is ignored because it is skipped, due to reason: %s", srcData.getId(), destData.getId(), reconcileAnswer.getReason()));
processVolumesIfReconcileCopyCommandIsSkipped(srcStoreId, sourceVolume, destStoreId, destVolume, command, reconcileAnswer);
return true;
}
if (!reconcileAnswer.getResult()) {
logger.debug(String.format("The reconcile command for source volume (id: %s) to destination volume (id: %s) is ignored because the result is false, due to %s", srcData.getId(), destData.getId(), reconcileAnswer.getDetails()));
return false;
}
ReconcileCommandVO reconcileCommandVO = reconcileCommandDao.findCommand(requestSequence, command.toString());
if (reconcileCommandVO == null) {
logger.debug(String.format("The reconcile command for source volume (id: %s) to destination volume (id: %s) is not found in database, ignoring", srcData.getId(), destData.getId()));
return true;
}
ReconcileCopyAnswer previousReconcileAnswer = null;
if (destVolume != null) {
if (reconcileCommandVO.getAnswerName() == null) {
logger.debug(String.format("The reconcile command for source volume (id: %s) to destination volume (id: %s) does not have previous answer in database, ignoring this time", srcData.getId(), destData.getId()));
return false;
}
Answer previousAnswer = ReconcileCommandUtils.parseAnswerFromAnswerInfo(reconcileCommandVO.getAnswerName(), reconcileCommandVO.getAnswerInfo());
if (!(previousAnswer instanceof ReconcileCopyAnswer)) {
logger.debug(String.format("The reconcile command for source volume (id: %s) to destination volume (id: %s) does not have previous reconcileAnswer in database, ignoring this time", srcData.getId(), destData.getId()));
return false;
}
previousReconcileAnswer = (ReconcileCopyAnswer) previousAnswer;
}
VolumeOnStorageTO volumeOnSource = reconcileAnswer.getVolumeOnSource();
VolumeOnStorageTO volumeOnDestination = reconcileAnswer.getVolumeOnDestination();
Pair<Volume.State, Volume.State> statePair = getVolumeStateOnSourceAndDestination(srcData, destData, volumeOnSource, volumeOnDestination, previousReconcileAnswer);
Volume.State sourceVolumeState = statePair.first();
Volume.State destVolumeState = statePair.second();
logger.debug(String.format("Processing volume (id: %s, state: %s) on source store and volume (id: %s, state: %s) on destination store", srcData.getId(), sourceVolumeState, destData.getId(), destVolumeState));
if (sourceVolume != null && destVolume != null) {
// copy from primary to primary (offline volume migration)
return processReconcileCopyAnswerFromPrimaryToPrimary(srcStoreId, sourceVolume, sourceVolumeState, destStoreId, destVolume, destVolumeState, volumeOnDestination);
} else if (sourceVolume == null && destVolume != null) {
// copy from secondary to primary
return processReconcileCopyAnswerFromImageCacheToPrimary(destStoreId, destVolume, destVolumeState, volumeOnDestination, command, reconcileAnswer);
} else if (sourceVolume != null && sourceVolumeState != null) {
// copy from primary to secondary
return processReconcileCopyAnswerFromPrimaryToSecondary(srcStoreId, sourceVolume, sourceVolumeState, command, reconcileAnswer);
}
return false;
}
private boolean processVolumesIfReconcileCopyCommandIsSkipped(Long srcStoreId, VolumeVO sourceVolume, Long destStoreId, VolumeVO destVolume,
CopyCommand command, ReconcileCopyAnswer reconcileAnswer) {
if (sourceVolume == null && destVolume != null) {
// copy from secondary to primary
return processReconcileCopyAnswerFromImageCacheToPrimary(destStoreId, destVolume, Volume.State.Destroy, null, command, reconcileAnswer);
} else if (sourceVolume != null && destVolume == null) {
// copy from primary to secondary
return processReconcileCopyAnswerFromPrimaryToSecondary(srcStoreId, sourceVolume, Volume.State.Ready, command, reconcileAnswer);
}
return false;
}
private boolean processReconcileCopyAnswerFromPrimaryToPrimary(Long srcStoreId, VolumeVO sourceVolume, Volume.State sourceVolumeState,
Long destStoreId, VolumeVO destVolume, Volume.State destVolumeState, VolumeOnStorageTO volumeOnDestination) {
boolean isSourceMigrating = sourceVolume != null && sourceVolume.getRemoved() == null && sourceVolume.getState().equals(Volume.State.Migrating);
if (Volume.State.Ready.equals(sourceVolumeState)) {
if (isSourceMigrating && srcStoreId != null && srcStoreId.equals(sourceVolume.getPoolId()) && destVolumeState != null) {
logger.debug(String.format("Updating source volume %s to %s state", sourceVolume, Volume.State.Ready));
sourceVolume.setState(Volume.State.Ready);
sourceVolume.setUpdated(new Date());
volumeDao.update(sourceVolume.getId(), sourceVolume);
}
if (Volume.State.Creating.equals(destVolume.getState()) && destStoreId != null && destStoreId.equals(destVolume.getPoolId()) && destVolumeState != null) {
logger.debug(String.format("Updating destination volume %s to %s state", destVolume, destVolumeState));
destVolume.setState(destVolumeState);
destVolume.setUpdated(new Date());
volumeDao.update(destVolume.getId(), destVolume);
}
return true;
} else if (Volume.State.Ready.equals(destVolumeState)) {
if (Volume.State.Creating.equals(destVolume.getState()) && destStoreId != null && destStoreId.equals(destVolume.getPoolId())) {
logger.debug(String.format("Updating destination volume %s to %s state", destVolume, Volume.State.Ready));
destVolume.setState(Volume.State.Ready);
destVolume.setUpdated(new Date());
destVolume.setInstanceId(sourceVolume.getInstanceId());
destVolume.setPath(volumeOnDestination.getPath()); // Update path of destination volume
destVolume.set_iScsiName(volumeOnDestination.getPath());
volumeDao.update(destVolume.getId(), destVolume);
}
if (isSourceMigrating && srcStoreId != null && srcStoreId.equals(sourceVolume.getPoolId()) && sourceVolumeState != null) {
logger.debug(String.format("Updating source volume %s to %s state", sourceVolume, sourceVolumeState));
sourceVolume.setState(sourceVolumeState);
sourceVolume.setUpdated(new Date());
sourceVolume.setInstanceId(null);
volumeDao.update(sourceVolume.getId(), sourceVolume);
}
return true;
}
return false;
}
private boolean processReconcileCopyAnswerFromImageCacheToPrimary(Long destStoreId, VolumeVO destVolume, Volume.State destVolumeState, VolumeOnStorageTO volumeOnDestination,
CopyCommand command, ReconcileCopyAnswer reconcileAnswer) {
if (destVolume.getRemoved() == null && Volume.State.Creating.equals(destVolume.getState()) && destStoreId != null && destStoreId.equals(destVolume.getPoolId())) {
Long lastVolumeId = destVolume.getLastId();
logger.debug(String.format("Searching for last volume with id = %s", destVolume.getLastId()));
VolumeVO lastVolume = volumeDao.findById(lastVolumeId);
if (lastVolume != null && Arrays.asList(Volume.State.Migrating, Volume.State.Ready).contains(lastVolume.getState())
&& Volume.State.Destroy.equals(destVolumeState)) {
destVolume.setState(destVolumeState);
if (volumeOnDestination != null) {
destVolume.setPath(volumeOnDestination.getPath()); // Update path of destination volume
}
volumeDao.update(destVolume.getId(), destVolume);
if (Volume.State.Migrating.equals(lastVolume.getState())) {
lastVolume.setState(Volume.State.Ready); // Update last volume to Ready
volumeDao.update(lastVolume.getId(), lastVolume);
}
// remove record from volume_store_ref with Copying state
VolumeDataStoreVO volumeDataStoreVO = volumeDataStoreDao.findByVolume(lastVolume.getId());
if (volumeDataStoreVO != null && volumeDataStoreVO.getState().equals(ObjectInDataStoreStateMachine.State.Copying)) {
logger.debug(String.format("Removing record (id: %s) for volume %s from volume_store_ref as part of reconciliation of command %s and answer %s", volumeDataStoreVO.getId(), lastVolume, command, reconcileAnswer));
volumeDataStoreDao.remove(volumeDataStoreVO.getId());
}
}
return true;
}
return false;
}
private boolean processReconcileCopyAnswerFromPrimaryToSecondary(Long srcStoreId, VolumeVO sourceVolume, Volume.State sourceVolumeState,
CopyCommand command, ReconcileCopyAnswer reconcileAnswer) {
boolean isSourceMigrating = sourceVolume != null && sourceVolume.getRemoved() == null && sourceVolume.getState().equals(Volume.State.Migrating);
if (isSourceMigrating && srcStoreId != null && srcStoreId.equals(sourceVolume.getPoolId())) {
logger.debug(String.format("Updating source volume %s to %s state", sourceVolume, sourceVolumeState));
sourceVolume.setState(sourceVolumeState); // Update source volume state
volumeDao.update(sourceVolume.getId(), sourceVolume);
// remove record from volume_store_ref with Creating state
VolumeDataStoreVO volumeDataStoreVO = volumeDataStoreDao.findByVolume(sourceVolume.getId());
if (volumeDataStoreVO != null && volumeDataStoreVO.getState().equals(ObjectInDataStoreStateMachine.State.Creating)) {
logger.debug(String.format("Removing record (id: %s) for volume %s from volume_store_ref as part of reconciliation of command %s and answer %s", volumeDataStoreVO.getId(), sourceVolume, command, reconcileAnswer));
volumeDataStoreDao.remove(volumeDataStoreVO.getId());
}
}
logger.debug(String.format("Searching for volumes with last_id = %s", sourceVolume.getId()));
VolumeVO newVolume = volumeDao.findByLastIdAndState(sourceVolume.getId(), Volume.State.Creating);
if (newVolume != null) {
logger.debug(String.format("Removing volume %s as part of reconciliation of command %s and answer %s", newVolume, command, reconcileAnswer));
newVolume.setState(Volume.State.Destroy);
newVolume.setVolumeType(Volume.Type.DATADISK);
newVolume.setInstanceId(null);
newVolume.setRemoved(new Date());
volumeDao.update(newVolume.getId(), newVolume);
}
return true;
}
private boolean processReconcileMigrateVolumeAnswer(long requestSequence, MigrateVolumeCommand command, ReconcileMigrateVolumeAnswer reconcileAnswer) {
DataTO srcData = command.getSrcData();
DataTO destData = command.getDestData();
if (srcData == null || destData == null) {
logger.debug(String.format("The source (%s) and destination (%s) of MigrateCommand must be non-empty", srcData, destData));
return true;
}
if (srcData.getId() != destData.getId()) {
logger.debug(String.format("The source volume (id: %s) and destination volume (id: %s) of MigrateCommand must be same ID", srcData.getId(), destData.getId()));
return true;
}
VolumeVO sourceVolume = volumeDao.findByIdIncludingRemoved(srcData.getId());
if (sourceVolume == null || sourceVolume.getRemoved() != null) {
logger.debug(String.format("Volume (id: %s) has been removed in CloudStack", srcData.getId()));
return true;
}
if (!sourceVolume.getState().equals(Volume.State.Migrating)) {
logger.debug(String.format("Volume %s (state: %s) is not in Migrating state", sourceVolume, sourceVolume.getState()));
return true;
}
if (!(srcData.getDataStore() instanceof PrimaryDataStoreTO) && (destData.getDataStore() instanceof PrimaryDataStoreTO)) {
logger.debug(String.format("The source (role: %s) and destination (role: %s) of MigrateCommand must be Primary", srcData.getDataStore().getRole(), destData.getDataStore().getRole()));
return true;
}
ReconcileCommandVO reconcileCommandVO = reconcileCommandDao.findCommand(requestSequence, command.toString());
if (reconcileCommandVO == null) {
logger.debug(String.format("The reconcile command for migrating volume %s is not found in database, ignoring", sourceVolume));
return true;
}
if (reconcileCommandVO.getAnswerName() == null) {
logger.debug(String.format("The reconcile command for migrating volume %s does not have previous answer in database, ignoring this time", sourceVolume));
return false;
}
Answer previousAnswer = ReconcileCommandUtils.parseAnswerFromAnswerInfo(reconcileCommandVO.getAnswerName(), reconcileCommandVO.getAnswerInfo());
if (!(previousAnswer instanceof ReconcileMigrateVolumeAnswer)) {
logger.debug(String.format("The reconcile command for for migrating volume %s does not have previous reconcileAnswer in database, ignoring this time", sourceVolume));
return false;
}
List<String> diskPaths = reconcileAnswer.getVmDiskPaths();
logger.debug(String.format("The disks attached to the VM %s after live volume migration are: %s", reconcileAnswer.getVmName(), diskPaths));
PrimaryDataStoreTO srcDataStore = (PrimaryDataStoreTO) srcData.getDataStore();
PrimaryDataStoreTO destDataStore = (PrimaryDataStoreTO) destData.getDataStore();
VolumeOnStorageTO volumeOnSource = reconcileAnswer.getVolumeOnSource();
VolumeOnStorageTO volumeOnDestination = reconcileAnswer.getVolumeOnDestination();
ReconcileMigrateVolumeAnswer previousReconcileAnswer = (ReconcileMigrateVolumeAnswer) previousAnswer;
Pair<Volume.State, Volume.State> statePair = getVolumeStateOnSourceAndDestination(srcData, destData, volumeOnSource, volumeOnDestination, previousReconcileAnswer);
Volume.State sourceVolumeState = statePair.first();
Volume.State destVolumeState = statePair.second();
logger.debug(String.format("Processing volume (id: %s, state: %s) on source pool and volume (id: %s, state: %s) on destination pool", srcData.getId(), sourceVolumeState, destData.getId(), destVolumeState));
if (Volume.State.Ready.equals(sourceVolumeState)) {
updateVolumeAndDestroyOldVolume(sourceVolume, srcData, srcDataStore, destData, destDataStore, volumeOnDestination);
return true;
} else if (Volume.State.Ready.equals(destVolumeState)) {
updateVolumeAndDestroyOldVolume(sourceVolume, destData, destDataStore, srcData, srcDataStore, volumeOnSource);
return true;
} else if (CollectionUtils.isNotEmpty(diskPaths)) {
// VM is Running
if (!Volume.State.Migrating.equals(destVolumeState) && volumeOnDestination != null) {
for (String diskPath : diskPaths) {
if (diskPath.equals(volumeOnDestination.getFullPath())) {
logger.debug(String.format("The VM %s is running with the volume %s on destination pool %s, the volume has been migrated", reconcileAnswer.getVmName(), volumeOnDestination.getFullPath(), destData.getDataStore()));
updateVolumeAndDestroyOldVolume(sourceVolume, destData, destDataStore, srcData, srcDataStore, volumeOnSource);
return true;
} else if (diskPath.equals(volumeOnSource.getFullPath())) {
logger.debug(String.format("The VM %s is running with the volume %s on source pool %s, the volume has not been migrated", reconcileAnswer.getVmName(), volumeOnSource.getFullPath(), srcData.getDataStore()));
updateVolumeAndDestroyOldVolume(sourceVolume, srcData, srcDataStore, destData, destDataStore, volumeOnDestination);
return true;
}
}
}
}
return false;
}
private void updateVolumeAndDestroyOldVolume(VolumeVO sourceVolume, DataTO srcData, PrimaryDataStoreTO srcDataStore, DataTO destData, PrimaryDataStoreTO destDataStore, VolumeOnStorageTO volumeToBeDeleted) {
logger.debug(String.format("Updating volume %s to %s state", sourceVolume, Volume.State.Ready));
sourceVolume.setState(Volume.State.Ready);
sourceVolume.setPoolId(srcDataStore.getId()); // restore pool_id and update path
sourceVolume.setPoolType(srcDataStore.getPoolType());
sourceVolume.setPath(srcData.getPath());
sourceVolume.set_iScsiName(srcData.getPath());
sourceVolume.setUpdated(new Date());
volumeDao.update(sourceVolume.getId(), sourceVolume);
if (volumeToBeDeleted != null) {
logger.debug(String.format("Creating a dummy volume from %s on pool %s", sourceVolume, destDataStore.getId()));
Volume newVol = volumeManager.allocateDuplicateVolume(sourceVolume, null, null);
VolumeVO newVolume = (VolumeVO) newVol;
newVolume.setInstanceId(null);
newVolume.setPoolId(destDataStore.getId());
newVolume.setPoolType(destDataStore.getPoolType());
newVolume.setState(Volume.State.Creating);
newVolume.setPath(destData.getPath());
newVolume.set_iScsiName(destData.getPath());
volumeDao.update(newVolume.getId(), newVolume);
logger.debug(String.format("Deleting the dummy volume %s on pool %s", newVolume, destDataStore.getId()));
volumeApiService.destroyVolume(newVolume.getId(), accountManager.getAccount(Account.ACCOUNT_ID_SYSTEM), true, true);
}
}
private Pair<Volume.State, Volume.State> getVolumeStateOnSourceAndDestination(DataTO srcData, DataTO destData, VolumeOnStorageTO volumeOnSource, VolumeOnStorageTO volumeOnDestination, ReconcileVolumeAnswer previousReconcileAnswer) {
final Long srcStoreId = srcData.getDataStore() instanceof PrimaryDataStoreTO ? ((PrimaryDataStoreTO) srcData.getDataStore()).getId() : null;
final Long destStoreId = destData.getDataStore() instanceof PrimaryDataStoreTO ? ((PrimaryDataStoreTO) destData.getDataStore()).getId() : null;
VolumeOnStorageTO previousVolumeOnDestination = previousReconcileAnswer != null ? previousReconcileAnswer.getVolumeOnDestination() : null;
if (volumeOnSource != null) {
if (volumeOnDestination == null) {
if (volumeOnSource.getPath() != null) {
logger.debug(String.format("Volume (id :%s) exist on source (id: %s) and volume (id: %s) does not exist on destination (id: %s), updating state to Ready on source pool", srcData.getId(), srcStoreId, destData.getId(), destStoreId));
return new Pair<>(Volume.State.Ready, null);
} else {
logger.debug(String.format("Volume (id :%s) cannot be found on source (id: %s) and volume (id: %s) does not exist on destination (id: %s), updating state to Ready on source pool", srcData.getId(), srcStoreId, destData.getId(), destStoreId));
return new Pair<>(Volume.State.Destroy, null);
}
}
if (volumeOnDestination.getPath() == null) {
logger.debug(String.format("Volume (id :%s) exist on source (id: %s) and volume (id: %s) cannot be found on destination (id: %s), updating state to Ready on source pool", srcData.getId(), srcStoreId, destData.getId(), destStoreId));
return new Pair<>(Volume.State.Ready, Volume.State.Destroy);
}
boolean isDestinationVolumeChanged = (previousVolumeOnDestination != null && volumeOnDestination.getSize() != previousVolumeOnDestination.getSize());
if (isDestinationVolumeChanged) {
logger.debug(String.format("Volume (id :%s) on destination (id: %s) is still being updated, skipping", destData.getId(), destStoreId));
return new Pair<>(Volume.State.Migrating, Volume.State.Migrating);
} else if (destData.getId() == srcData.getId()) {
logger.debug(String.format("Volume (id :%s) on destination (id: %s) is not updated, cannot determine the state on source and destination pool", destData.getId(), destStoreId));
return new Pair<>(null, null);
} else {
logger.debug(String.format("Volume (id :%s) on destination (id: %s) is not updated, updating state to Ready on source pool and Destroy on destination pool", destData.getId(), destStoreId));
return new Pair<>(Volume.State.Ready, Volume.State.Destroy);
}
} else if (volumeOnDestination != null) {
boolean isDestinationVolumeChanged = (previousVolumeOnDestination != null && volumeOnDestination.getSize() != previousVolumeOnDestination.getSize());
if (isDestinationVolumeChanged) {
logger.debug(String.format("Volume (id :%s) on destination (id: %s) is still being updated, skipping", destData.getId(), destStoreId));
return new Pair<>(srcStoreId != null ? Volume.State.Migrating : null, Volume.State.Migrating);
} else if (srcStoreId != null) {
// from primary to primary
logger.debug(String.format("Volume (id: %s) does not exist on source (id: %s) but volume (id: %s) exist on destination (id: %s), updating state to Ready on destination pool", srcData.getId(), srcStoreId, destData.getId(), destStoreId));
return new Pair<>(Volume.State.Destroy, Volume.State.Ready);
} else {
// from secondary to primary
logger.debug(String.format("Volume (id: %s) exist on destination (id: %s), however it is copied from secondary, updating state to Destroy on destination pool", destData.getId(), destStoreId));
return new Pair<>(null, Volume.State.Destroy);
}
}
return new Pair<>(null, null);
}
@Override
public boolean isReconcileResourceNeeded(long resourceId, ApiCommandResourceType resourceType) {
return !reconcileCommandDao.listByResourceIdAndTypeAndStates(resourceId, resourceType,
State.INTERRUPTED, State.TIMED_OUT, State.RECONCILE_RETRY, State.RECONCILING, State.RECONCILE_FAILED, State.CREATED)
.isEmpty();
}
}