server: Cleanup allocated snapshots / vm snapshots, and update pending ones to Error on MS start (#8452)

* Remove allocated snapshots / vm snapshots on start

* Check and Cleanup snapshots / vm snapshots on MS start

* rebase fixes

* Update volume state (from Snapshotting) on MS start when its snapshot job not finished and snapshot in Creating state
diff --git a/api/src/main/java/com/cloud/vm/snapshot/VMSnapshotService.java b/api/src/main/java/com/cloud/vm/snapshot/VMSnapshotService.java
index 84a56aa..754e463 100644
--- a/api/src/main/java/com/cloud/vm/snapshot/VMSnapshotService.java
+++ b/api/src/main/java/com/cloud/vm/snapshot/VMSnapshotService.java
@@ -19,6 +19,7 @@
 
 import java.util.List;
 
+import com.cloud.utils.fsm.NoTransitionException;
 import org.apache.cloudstack.api.command.user.vmsnapshot.ListVMSnapshotCmd;
 
 import com.cloud.exception.ConcurrentOperationException;
@@ -53,4 +54,6 @@
      * @param id vm id
      */
     boolean deleteVMSnapshotsFromDB(Long vmId, boolean unmanage);
+
+    void updateOperationFailed(VMSnapshot vmSnapshot) throws NoTransitionException;
 }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java
index 2a2db4a..d318707 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import com.cloud.storage.DataStoreRole;
+import com.cloud.utils.fsm.NoTransitionException;
 
 public interface SnapshotDataFactory {
     SnapshotInfo getSnapshot(long snapshotId, DataStore store);
@@ -42,4 +43,6 @@
     List<SnapshotInfo> listSnapshotOnCache(long snapshotId);
 
     SnapshotInfo getReadySnapshotOnCache(long snapshotId);
+
+    void updateOperationFailed(long snapshotId) throws NoTransitionException;
 }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java
index 6372aa3..2939342 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.cloudstack.engine.subsystem.api.storage;
 
+import com.cloud.utils.fsm.NoTransitionException;
 import com.cloud.vm.snapshot.VMSnapshot;
 
 public interface VMSnapshotStrategy {
@@ -44,4 +45,6 @@
      * @return true if vm snapshot removed from DB, false if not.
      */
     boolean deleteVMSnapshotFromDB(VMSnapshot vmSnapshot, boolean unmanage);
+
+    void updateOperationFailed(VMSnapshot vmSnapshot) throws NoTransitionException;
 }
diff --git a/engine/components-api/src/main/java/com/cloud/vm/snapshot/VMSnapshotManager.java b/engine/components-api/src/main/java/com/cloud/vm/snapshot/VMSnapshotManager.java
index a01d4ee..997b413 100644
--- a/engine/components-api/src/main/java/com/cloud/vm/snapshot/VMSnapshotManager.java
+++ b/engine/components-api/src/main/java/com/cloud/vm/snapshot/VMSnapshotManager.java
@@ -54,5 +54,4 @@
     boolean hasActiveVMSnapshotTasks(Long vmId);
 
     RestoreVMSnapshotCommand createRestoreCommand(UserVmVO userVm, List<VMSnapshotVO> vmSnapshotVOs);
-
 }
diff --git a/engine/schema/src/main/java/com/cloud/vm/snapshot/dao/VMSnapshotDao.java b/engine/schema/src/main/java/com/cloud/vm/snapshot/dao/VMSnapshotDao.java
index 0143aaa..bbdbfb5 100644
--- a/engine/schema/src/main/java/com/cloud/vm/snapshot/dao/VMSnapshotDao.java
+++ b/engine/schema/src/main/java/com/cloud/vm/snapshot/dao/VMSnapshotDao.java
@@ -38,6 +38,8 @@
     VMSnapshotVO findByName(Long vmId, String name);
 
     List<VMSnapshotVO> listByAccountId(Long accountId);
+
     List<VMSnapshotVO> searchByVms(List<Long> vmIds);
+
     List<VMSnapshotVO> searchRemovedByVms(List<Long> vmIds, Long batchSize);
 }
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java
index 4d8919c..cd314b0 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java
@@ -23,11 +23,16 @@
 
 import javax.inject.Inject;
 
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.Volume;
+import com.cloud.utils.fsm.NoTransitionException;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 import org.apache.commons.collections.CollectionUtils;
@@ -73,7 +78,7 @@
         for (SnapshotDataStoreVO snapshotDataStoreVO : allSnapshotsFromVolumeAndDataStore) {
             DataStore store = storeMgr.getDataStore(snapshotDataStoreVO.getDataStoreId(), role);
             SnapshotVO snapshot = snapshotDao.findById(snapshotDataStoreVO.getSnapshotId());
-            if (snapshot == null){ //snapshot may have been removed;
+            if (snapshot == null) { //snapshot may have been removed;
                 continue;
             }
             SnapshotObject info = SnapshotObject.getSnapshotObject(snapshot, store);
@@ -107,8 +112,6 @@
         return infos;
     }
 
-
-
     @Override
     public SnapshotInfo getSnapshot(long snapshotId, long storeId, DataStoreRole role) {
         SnapshotVO snapshot = snapshotDao.findById(snapshotId);
@@ -202,4 +205,17 @@
         return snapObjs;
     }
 
+    @Override
+    public void updateOperationFailed(long snapshotId) throws NoTransitionException {
+        List<SnapshotDataStoreVO> snapshotStoreRefs = snapshotStoreDao.findBySnapshotId(snapshotId);
+        for (SnapshotDataStoreVO snapshotStoreRef : snapshotStoreRefs) {
+            SnapshotInfo snapshotInfo = getSnapshot(snapshotStoreRef.getSnapshotId(), snapshotStoreRef.getDataStoreId(), snapshotStoreRef.getRole());
+            if (snapshotInfo != null) {
+                VolumeInfo volumeInfo = snapshotInfo.getBaseVolume();
+                volumeInfo.stateTransit(Volume.Event.OperationFailed);
+                ((SnapshotObject)snapshotInfo).processEvent(Snapshot.Event.OperationFailed);
+                snapshotInfo.processEvent(ObjectInDataStoreStateMachine.Event.OperationFailed);
+            }
+        }
+    }
 }
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java
index 09f569e..1801c87 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java
@@ -481,4 +481,14 @@
         }
         return StrategyPriority.DEFAULT;
     }
+
+    @Override
+    public void updateOperationFailed(VMSnapshot vmSnapshot) throws NoTransitionException {
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshot, VMSnapshot.Event.OperationFailed);
+        } catch (NoTransitionException e) {
+            logger.debug("Failed to change vm snapshot state with event OperationFailed");
+            throw e;
+        }
+    }
 }
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java
index 1ec6e20..8afbcab 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java
@@ -479,6 +479,16 @@
         return vmSnapshotDao.remove(vmSnapshot.getId());
     }
 
+    @Override
+    public void updateOperationFailed(VMSnapshot vmSnapshot) throws NoTransitionException {
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshot, VMSnapshot.Event.OperationFailed);
+        } catch (NoTransitionException e) {
+            logger.debug("Failed to change vm snapshot state with event OperationFailed");
+            throw e;
+        }
+    }
+
     private void publishUsageEvent(String type, VMSnapshot vmSnapshot, UserVm userVm, VolumeObjectTO volumeTo) {
         VolumeVO volume = volumeDao.findById(volumeTo.getId());
         Long diskOfferingId = volume.getDiskOfferingId();
diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 47bf27b..80140b0 100644
--- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -35,6 +35,11 @@
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
+import com.cloud.storage.SnapshotVO;
+import com.cloud.vm.snapshot.VMSnapshot;
+import com.cloud.vm.snapshot.VMSnapshotService;
+import com.cloud.vm.snapshot.VMSnapshotVO;
+import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 import org.apache.cloudstack.api.ApiCommandResourceType;
 import org.apache.cloudstack.api.ApiErrorCode;
 import org.apache.cloudstack.context.CallContext;
@@ -153,11 +158,15 @@
     @Inject
     private SnapshotDao _snapshotDao;
     @Inject
+    private VMSnapshotDao _vmSnapshotDao;
+    @Inject
     private SnapshotService snapshotSrv;
     @Inject
     private SnapshotDataFactory snapshotFactory;
     @Inject
     private SnapshotDetailsDao _snapshotDetailsDao;
+    @Inject
+    private VMSnapshotService _vmSnapshotService;
 
     @Inject
     private VolumeDataFactory volFactory;
@@ -1149,6 +1158,10 @@
                     return cleanupVirtualMachine(job.getInstanceId());
                 case Network:
                     return cleanupNetwork(job.getInstanceId());
+                case Snapshot:
+                    return cleanupSnapshot(job.getInstanceId());
+                case VmSnapshot:
+                    return cleanupVmSnapshot(job.getInstanceId());
             }
         } catch (Exception e) {
             logger.warn("Error while cleaning up resource: [" + job.getInstanceType().toString()  + "] with Id: " + job.getInstanceId(), e);
@@ -1187,7 +1200,7 @@
         return true;
     }
 
-    private boolean cleanupNetwork(final long networkId) throws Exception {
+    private boolean cleanupNetwork(final long networkId) {
         NetworkVO networkVO = networkDao.findById(networkId);
         if (networkVO == null) {
             logger.warn("Network not found. Skip Cleanup. NetworkId: " + networkId);
@@ -1206,6 +1219,46 @@
         return true;
     }
 
+    private boolean cleanupSnapshot(final long snapshotId) {
+        SnapshotVO snapshotVO = _snapshotDao.findById(snapshotId);
+        if (snapshotVO == null) {
+            logger.warn("Snapshot not found. Skip Cleanup. SnapshotId: " + snapshotId);
+            return true;
+        }
+        if (Snapshot.State.Allocated.equals(snapshotVO.getState())) {
+            _snapshotDao.remove(snapshotId);
+        }
+        if (Snapshot.State.Creating.equals(snapshotVO.getState())) {
+            try {
+                snapshotFactory.updateOperationFailed(snapshotId);
+            } catch (NoTransitionException e) {
+                snapshotVO.setState(Snapshot.State.Error);
+                _snapshotDao.update(snapshotVO.getId(), snapshotVO);
+            }
+        }
+        return true;
+    }
+
+    private boolean cleanupVmSnapshot(final long vmSnapshotId) {
+        VMSnapshotVO vmSnapshotVO = _vmSnapshotDao.findById(vmSnapshotId);
+        if (vmSnapshotVO == null) {
+            logger.warn("VM Snapshot not found. Skip Cleanup. VMSnapshotId: " + vmSnapshotId);
+            return true;
+        }
+        if (VMSnapshot.State.Allocated.equals(vmSnapshotVO.getState())) {
+            _vmSnapshotDao.remove(vmSnapshotId);
+        }
+        if (VMSnapshot.State.Creating.equals(vmSnapshotVO.getState())) {
+            try {
+                _vmSnapshotService.updateOperationFailed(vmSnapshotVO);
+            } catch (NoTransitionException e) {
+                vmSnapshotVO.setState(VMSnapshot.State.Error);
+                _vmSnapshotDao.update(vmSnapshotVO.getId(), vmSnapshotVO);
+            }
+        }
+        return true;
+    }
+
     private void cleanupFailedVolumesCreatedFromSnapshots(final long volumeId) {
         try {
             VolumeDetailVO volumeDetail = _volumeDetailsDao.findDetail(volumeId, VolumeService.SNAPSHOT_ID);
diff --git a/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
index 907182e..26f7603 100644
--- a/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
@@ -28,6 +28,7 @@
 import javax.naming.ConfigurationException;
 
 import com.cloud.storage.snapshot.SnapshotManager;
+import com.cloud.utils.fsm.NoTransitionException;
 import org.apache.cloudstack.annotation.AnnotationService;
 import org.apache.cloudstack.annotation.dao.AnnotationDao;
 import org.apache.cloudstack.api.ApiConstants;
@@ -1388,6 +1389,12 @@
     }
 
     @Override
+    public void updateOperationFailed(VMSnapshot vmSnapshot) throws NoTransitionException {
+        VMSnapshotStrategy strategy = findVMSnapshotStrategy(vmSnapshot);
+        strategy.updateOperationFailed(vmSnapshot);
+    }
+
+    @Override
     public String getConfigComponentName() {
         return VMSnapshotManager.class.getSimpleName();
     }