NAS backup provider: Support restore from backup to volumes on Ceph storage pool(s), and take backup for stopped instances with volumes on Ceph storage pool(s) (#11684)

Co-authored-by: Abhisar Sinha <63767682+abh1sar@users.noreply.github.com>
diff --git a/api/src/main/java/org/apache/cloudstack/backup/BackupManager.java b/api/src/main/java/org/apache/cloudstack/backup/BackupManager.java
index 37d2161..f1f0c3c 100644
--- a/api/src/main/java/org/apache/cloudstack/backup/BackupManager.java
+++ b/api/src/main/java/org/apache/cloudstack/backup/BackupManager.java
@@ -55,7 +55,7 @@
     ConfigKey<String> BackupProviderPlugin = new ConfigKey<>("Advanced", String.class,
             "backup.framework.provider.plugin",
             "dummy",
-            "The backup and recovery provider plugin.", true, ConfigKey.Scope.Zone, BackupFrameworkEnabled.key());
+            "The backup and recovery provider plugin. Valid plugin values: dummy, veeam, networker and nas", true, ConfigKey.Scope.Zone, BackupFrameworkEnabled.key());
 
     ConfigKey<Long> BackupSyncPollingInterval = new ConfigKey<>("Advanced", Long.class,
             "backup.framework.sync.interval",
diff --git a/core/src/main/java/org/apache/cloudstack/backup/RestoreBackupCommand.java b/core/src/main/java/org/apache/cloudstack/backup/RestoreBackupCommand.java
index 453b236..9cbb87d 100644
--- a/core/src/main/java/org/apache/cloudstack/backup/RestoreBackupCommand.java
+++ b/core/src/main/java/org/apache/cloudstack/backup/RestoreBackupCommand.java
@@ -22,6 +22,7 @@
 import com.cloud.agent.api.Command;
 import com.cloud.agent.api.LogLevel;
 import com.cloud.vm.VirtualMachine;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 
 import java.util.List;
 
@@ -31,6 +32,7 @@
     private String backupRepoType;
     private String backupRepoAddress;
     private List<String> backupVolumesUUIDs;
+    private List<PrimaryDataStoreTO> restoreVolumePools;
     private List<String> restoreVolumePaths;
     private String diskType;
     private Boolean vmExists;
@@ -74,6 +76,14 @@
         this.backupRepoAddress = backupRepoAddress;
     }
 
+    public List<PrimaryDataStoreTO> getRestoreVolumePools() {
+        return restoreVolumePools;
+    }
+
+    public void setRestoreVolumePools(List<PrimaryDataStoreTO> restoreVolumePools) {
+        this.restoreVolumePools = restoreVolumePools;
+    }
+
     public List<String> getRestoreVolumePaths() {
         return restoreVolumePaths;
     }
diff --git a/core/src/main/java/org/apache/cloudstack/backup/TakeBackupCommand.java b/core/src/main/java/org/apache/cloudstack/backup/TakeBackupCommand.java
index ecebd57..5402b6b 100644
--- a/core/src/main/java/org/apache/cloudstack/backup/TakeBackupCommand.java
+++ b/core/src/main/java/org/apache/cloudstack/backup/TakeBackupCommand.java
@@ -21,6 +21,7 @@
 
 import com.cloud.agent.api.Command;
 import com.cloud.agent.api.LogLevel;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 
 import java.util.List;
 
@@ -29,6 +30,7 @@
     private String backupPath;
     private String backupRepoType;
     private String backupRepoAddress;
+    private List<PrimaryDataStoreTO> volumePools;
     private List<String> volumePaths;
     private Boolean quiesce;
     @LogLevel(LogLevel.Log4jLevel.Off)
@@ -80,6 +82,14 @@
         this.mountOptions = mountOptions;
     }
 
+    public List<PrimaryDataStoreTO> getVolumePools() {
+        return volumePools;
+    }
+
+    public void setVolumePools(List<PrimaryDataStoreTO> volumePools) {
+        this.volumePools = volumePools;
+    }
+
     public List<String> getVolumePaths() {
         return volumePaths;
     }
diff --git a/plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java b/plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java
index 9cd2f20..3813cac 100644
--- a/plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java
+++ b/plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java
@@ -26,9 +26,9 @@
 import com.cloud.hypervisor.Hypervisor;
 import com.cloud.offering.DiskOffering;
 import com.cloud.resource.ResourceManager;
+import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.ScopeType;
 import com.cloud.storage.Storage;
-import com.cloud.storage.StoragePoolHostVO;
 import com.cloud.storage.Volume;
 import com.cloud.storage.VolumeApiServiceImpl;
 import com.cloud.storage.VolumeVO;
@@ -49,10 +49,13 @@
 
 import org.apache.cloudstack.backup.dao.BackupDao;
 import org.apache.cloudstack.backup.dao.BackupRepositoryDao;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
 
@@ -107,6 +110,9 @@
     private PrimaryDataStoreDao primaryDataStoreDao;
 
     @Inject
+    DataStoreManager dataStoreMgr;
+
+    @Inject
     private AgentManager agentManager;
 
     @Inject
@@ -203,8 +209,9 @@
         if (VirtualMachine.State.Stopped.equals(vm.getState())) {
             List<VolumeVO> vmVolumes = volumeDao.findByInstance(vm.getId());
             vmVolumes.sort(Comparator.comparing(Volume::getDeviceId));
-            List<String> volumePaths = getVolumePaths(vmVolumes);
-            command.setVolumePaths(volumePaths);
+            Pair<List<PrimaryDataStoreTO>, List<String>> volumePoolsAndPaths = getVolumePoolsAndPaths(vmVolumes);
+            command.setVolumePools(volumePoolsAndPaths.first());
+            command.setVolumePaths(volumePoolsAndPaths.second());
         }
 
         BackupAnswer answer;
@@ -303,7 +310,9 @@
         restoreCommand.setMountOptions(backupRepository.getMountOptions());
         restoreCommand.setVmName(vm.getName());
         restoreCommand.setBackupVolumesUUIDs(backedVolumesUUIDs);
-        restoreCommand.setRestoreVolumePaths(getVolumePaths(restoreVolumes));
+        Pair<List<PrimaryDataStoreTO>, List<String>> volumePoolsAndPaths = getVolumePoolsAndPaths(restoreVolumes);
+        restoreCommand.setRestoreVolumePools(volumePoolsAndPaths.first());
+        restoreCommand.setRestoreVolumePaths(volumePoolsAndPaths.second());
         restoreCommand.setVmExists(vm.getRemoved() == null);
         restoreCommand.setVmState(vm.getState());
         restoreCommand.setMountTimeout(NASBackupRestoreMountTimeout.value());
@@ -319,31 +328,42 @@
         return new Pair<>(answer.getResult(), answer.getDetails());
     }
 
-    private List<String> getVolumePaths(List<VolumeVO> volumes) {
+    private Pair<List<PrimaryDataStoreTO>, List<String>> getVolumePoolsAndPaths(List<VolumeVO> volumes) {
+        List<PrimaryDataStoreTO> volumePools = new ArrayList<>();
         List<String> volumePaths = new ArrayList<>();
         for (VolumeVO volume : volumes) {
             StoragePoolVO storagePool = primaryDataStoreDao.findById(volume.getPoolId());
             if (Objects.isNull(storagePool)) {
                 throw new CloudRuntimeException("Unable to find storage pool associated to the volume");
             }
-            String volumePathPrefix;
-            if (ScopeType.HOST.equals(storagePool.getScope())) {
-                volumePathPrefix = storagePool.getPath();
-            } else if (Storage.StoragePoolType.SharedMountPoint.equals(storagePool.getPoolType())) {
-                volumePathPrefix = storagePool.getPath();
-            } else {
-                volumePathPrefix = String.format("/mnt/%s", storagePool.getUuid());
-            }
+
+            DataStore dataStore = dataStoreMgr.getDataStore(storagePool.getId(), DataStoreRole.Primary);
+            volumePools.add(dataStore != null ? (PrimaryDataStoreTO)dataStore.getTO() : null);
+
+            String volumePathPrefix = getVolumePathPrefix(storagePool);
             volumePaths.add(String.format("%s/%s", volumePathPrefix, volume.getPath()));
         }
-        return volumePaths;
+        return new Pair<>(volumePools, volumePaths);
+    }
+
+    private String getVolumePathPrefix(StoragePoolVO storagePool) {
+        String volumePathPrefix;
+        if (ScopeType.HOST.equals(storagePool.getScope()) ||
+                Storage.StoragePoolType.SharedMountPoint.equals(storagePool.getPoolType()) ||
+                Storage.StoragePoolType.RBD.equals(storagePool.getPoolType())) {
+            volumePathPrefix = storagePool.getPath();
+        } else {
+            // Should be Storage.StoragePoolType.NetworkFilesystem
+            volumePathPrefix = String.format("/mnt/%s", storagePool.getUuid());
+        }
+        return volumePathPrefix;
     }
 
     @Override
     public Pair<Boolean, String> restoreBackedUpVolume(Backup backup, Backup.VolumeInfo backupVolumeInfo, String hostIp, String dataStoreUuid, Pair<String, VirtualMachine.State> vmNameAndState) {
         final VolumeVO volume = volumeDao.findByUuid(backupVolumeInfo.getUuid());
         final DiskOffering diskOffering = diskOfferingDao.findByUuid(backupVolumeInfo.getDiskOfferingId());
-        final StoragePoolHostVO dataStore = storagePoolHostDao.findByUuid(dataStoreUuid);
+        final StoragePoolVO pool = primaryDataStoreDao.findByUuid(dataStoreUuid);
         final HostVO hostVO = hostDao.findByIp(hostIp);
 
         LOG.debug("Restoring vm volume {} from backup {} on the NAS Backup Provider", backupVolumeInfo, backup);
@@ -360,19 +380,26 @@
         restoredVolume.setUuid(volumeUUID);
         restoredVolume.setRemoved(null);
         restoredVolume.setDisplayVolume(true);
-        restoredVolume.setPoolId(dataStore.getPoolId());
+        restoredVolume.setPoolId(pool.getId());
+        restoredVolume.setPoolType(pool.getPoolType());
         restoredVolume.setPath(restoredVolume.getUuid());
         restoredVolume.setState(Volume.State.Copying);
-        restoredVolume.setFormat(Storage.ImageFormat.QCOW2);
         restoredVolume.setSize(backupVolumeInfo.getSize());
         restoredVolume.setDiskOfferingId(diskOffering.getId());
+        if (pool.getPoolType() != Storage.StoragePoolType.RBD) {
+            restoredVolume.setFormat(Storage.ImageFormat.QCOW2);
+        } else {
+            restoredVolume.setFormat(Storage.ImageFormat.RAW);
+        }
 
         RestoreBackupCommand restoreCommand = new RestoreBackupCommand();
         restoreCommand.setBackupPath(backup.getExternalId());
         restoreCommand.setBackupRepoType(backupRepository.getType());
         restoreCommand.setBackupRepoAddress(backupRepository.getAddress());
         restoreCommand.setVmName(vmNameAndState.first());
-        restoreCommand.setRestoreVolumePaths(Collections.singletonList(String.format("%s/%s", dataStore.getLocalPath(), volumeUUID)));
+        restoreCommand.setRestoreVolumePaths(Collections.singletonList(String.format("%s/%s", getVolumePathPrefix(pool), volumeUUID)));
+        DataStore dataStore = dataStoreMgr.getDataStore(pool.getId(), DataStoreRole.Primary);
+        restoreCommand.setRestoreVolumePools(Collections.singletonList(dataStore != null ? (PrimaryDataStoreTO)dataStore.getTO() : null));
         restoreCommand.setDiskType(backupVolumeInfo.getType().name().toLowerCase(Locale.ROOT));
         restoreCommand.setMountOptions(backupRepository.getMountOptions());
         restoreCommand.setVmExists(null);
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapper.java
index 243cf2e..fd94013 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapper.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapper.java
@@ -21,15 +21,25 @@
 
 import com.cloud.agent.api.Answer;
 import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMPhysicalDisk;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
 import com.cloud.resource.CommandWrapper;
 import com.cloud.resource.ResourceWrapper;
+import com.cloud.storage.Storage;
 import com.cloud.utils.Pair;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.script.Script;
 import com.cloud.vm.VirtualMachine;
 import org.apache.cloudstack.backup.BackupAnswer;
 import org.apache.cloudstack.backup.RestoreBackupCommand;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
+import org.apache.cloudstack.utils.qemu.QemuImg;
+import org.apache.cloudstack.utils.qemu.QemuImgException;
+import org.apache.cloudstack.utils.qemu.QemuImgFile;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.libvirt.LibvirtException;
 
 import java.io.File;
 import java.io.IOException;
@@ -45,7 +55,8 @@
     private static final String MOUNT_COMMAND = "sudo mount -t %s %s %s";
     private static final String UMOUNT_COMMAND = "sudo umount %s";
     private static final String FILE_PATH_PLACEHOLDER = "%s/%s";
-    private static final String ATTACH_DISK_COMMAND = " virsh attach-disk %s %s %s --driver qemu --subdriver qcow2 --cache none";
+    private static final String ATTACH_QCOW2_DISK_COMMAND = " virsh attach-disk %s %s %s --driver qemu --subdriver qcow2 --cache none";
+    private static final String ATTACH_RBD_DISK_XML_COMMAND = " virsh attach-device %s /dev/stdin <<EOF%sEOF";
     private static final String CURRRENT_DEVICE = "virsh domblklist --domain %s | tail -n 3 | head -n 1 | awk '{print $1}'";
     private static final String RSYNC_COMMAND = "rsync -az %s %s";
 
@@ -59,23 +70,27 @@
         Boolean vmExists = command.isVmExists();
         String diskType = command.getDiskType();
         List<String> backedVolumeUUIDs = command.getBackupVolumesUUIDs();
+        List<PrimaryDataStoreTO> restoreVolumePools = command.getRestoreVolumePools();
         List<String> restoreVolumePaths = command.getRestoreVolumePaths();
         String restoreVolumeUuid = command.getRestoreVolumeUUID();
         Integer mountTimeout = command.getMountTimeout() * 1000;
+        int timeout = command.getWait();
+        KVMStoragePoolManager storagePoolMgr = serverResource.getStoragePoolMgr();
 
         String newVolumeId = null;
         try {
             String mountDirectory = mountBackupDirectory(backupRepoAddress, backupRepoType, mountOptions, mountTimeout);
             if (Objects.isNull(vmExists)) {
+                PrimaryDataStoreTO volumePool = restoreVolumePools.get(0);
                 String volumePath = restoreVolumePaths.get(0);
                 int lastIndex = volumePath.lastIndexOf("/");
                 newVolumeId = volumePath.substring(lastIndex + 1);
-                restoreVolume(backupPath, volumePath, diskType, restoreVolumeUuid,
-                        new Pair<>(vmName, command.getVmState()), mountDirectory);
+                restoreVolume(storagePoolMgr, backupPath, volumePool, volumePath, diskType, restoreVolumeUuid,
+                        new Pair<>(vmName, command.getVmState()), mountDirectory, timeout);
             } else if (Boolean.TRUE.equals(vmExists)) {
-                restoreVolumesOfExistingVM(restoreVolumePaths, backedVolumeUUIDs, backupPath, mountDirectory);
+                restoreVolumesOfExistingVM(storagePoolMgr, restoreVolumePools, restoreVolumePaths, backedVolumeUUIDs, backupPath, mountDirectory, timeout);
             } else {
-                restoreVolumesOfDestroyedVMs(restoreVolumePaths, vmName, backupPath, mountDirectory);
+                restoreVolumesOfDestroyedVMs(storagePoolMgr, restoreVolumePools, restoreVolumePaths, vmName, backupPath, mountDirectory, timeout);
             }
         } catch (CloudRuntimeException e) {
             String errorMessage = e.getMessage() != null ? e.getMessage() : "";
@@ -94,17 +109,18 @@
         }
     }
 
-    private void restoreVolumesOfExistingVM(List<String> restoreVolumePaths, List<String> backedVolumesUUIDs,
-                                            String backupPath, String mountDirectory) {
+    private void restoreVolumesOfExistingVM(KVMStoragePoolManager storagePoolMgr, List<PrimaryDataStoreTO> restoreVolumePools, List<String> restoreVolumePaths, List<String> backedVolumesUUIDs,
+                                            String backupPath, String mountDirectory, int timeout) {
         String diskType = "root";
         try {
             for (int idx = 0; idx < restoreVolumePaths.size(); idx++) {
+                PrimaryDataStoreTO restoreVolumePool = restoreVolumePools.get(idx);
                 String restoreVolumePath = restoreVolumePaths.get(idx);
                 String backupVolumeUuid = backedVolumesUUIDs.get(idx);
                 Pair<String, String> bkpPathAndVolUuid = getBackupPath(mountDirectory, null, backupPath, diskType, backupVolumeUuid);
                 diskType = "datadisk";
                 verifyBackupFile(bkpPathAndVolUuid.first(), bkpPathAndVolUuid.second());
-                if (!replaceVolumeWithBackup(restoreVolumePath, bkpPathAndVolUuid.first())) {
+                if (!replaceVolumeWithBackup(storagePoolMgr, restoreVolumePool, restoreVolumePath, bkpPathAndVolUuid.first(), timeout)) {
                     throw new CloudRuntimeException(String.format("Unable to restore contents from the backup volume [%s].", bkpPathAndVolUuid.second()));
                 }
             }
@@ -114,15 +130,16 @@
         }
     }
 
-    private void restoreVolumesOfDestroyedVMs(List<String> volumePaths, String vmName, String backupPath, String mountDirectory) {
+    private void restoreVolumesOfDestroyedVMs(KVMStoragePoolManager storagePoolMgr, List<PrimaryDataStoreTO> volumePools, List<String> volumePaths, String vmName, String backupPath, String mountDirectory, int timeout) {
         String diskType = "root";
         try {
             for (int i = 0; i < volumePaths.size(); i++) {
+                PrimaryDataStoreTO volumePool = volumePools.get(i);
                 String volumePath = volumePaths.get(i);
                 Pair<String, String> bkpPathAndVolUuid = getBackupPath(mountDirectory, volumePath, backupPath, diskType, null);
                 diskType = "datadisk";
                 verifyBackupFile(bkpPathAndVolUuid.first(), bkpPathAndVolUuid.second());
-                if (!replaceVolumeWithBackup(volumePath, bkpPathAndVolUuid.first())) {
+                if (!replaceVolumeWithBackup(storagePoolMgr, volumePool, volumePath, bkpPathAndVolUuid.first(), timeout)) {
                     throw new CloudRuntimeException(String.format("Unable to restore contents from the backup volume [%s].", bkpPathAndVolUuid.second()));
                 }
             }
@@ -132,17 +149,17 @@
         }
     }
 
-    private void restoreVolume(String backupPath, String volumePath, String diskType, String volumeUUID,
-                               Pair<String, VirtualMachine.State> vmNameAndState, String mountDirectory) {
+    private void restoreVolume(KVMStoragePoolManager storagePoolMgr, String backupPath, PrimaryDataStoreTO volumePool, String volumePath, String diskType, String volumeUUID,
+                               Pair<String, VirtualMachine.State> vmNameAndState, String mountDirectory, int timeout) {
         Pair<String, String> bkpPathAndVolUuid;
         try {
             bkpPathAndVolUuid = getBackupPath(mountDirectory, volumePath, backupPath, diskType, volumeUUID);
             verifyBackupFile(bkpPathAndVolUuid.first(), bkpPathAndVolUuid.second());
-            if (!replaceVolumeWithBackup(volumePath, bkpPathAndVolUuid.first())) {
+            if (!replaceVolumeWithBackup(storagePoolMgr, volumePool, volumePath, bkpPathAndVolUuid.first(), timeout, true)) {
                 throw new CloudRuntimeException(String.format("Unable to restore contents from the backup volume [%s].", bkpPathAndVolUuid.second()));
             }
             if (VirtualMachine.State.Running.equals(vmNameAndState.second())) {
-                if (!attachVolumeToVm(vmNameAndState.first(), volumePath)) {
+                if (!attachVolumeToVm(storagePoolMgr, vmNameAndState.first(), volumePool, volumePath)) {
                     throw new CloudRuntimeException(String.format("Failed to attach volume to VM: %s", vmNameAndState.first()));
                 }
             }
@@ -220,14 +237,63 @@
         return exitValue == 0;
     }
 
-    private boolean replaceVolumeWithBackup(String volumePath, String backupPath) {
-        int exitValue = Script.runSimpleBashScriptForExitValue(String.format(RSYNC_COMMAND, backupPath, volumePath));
-        return exitValue == 0;
+    private boolean replaceVolumeWithBackup(KVMStoragePoolManager storagePoolMgr, PrimaryDataStoreTO volumePool, String volumePath, String backupPath, int timeout) {
+        return replaceVolumeWithBackup(storagePoolMgr, volumePool, volumePath, backupPath, timeout, false);
     }
 
-    private boolean attachVolumeToVm(String vmName, String volumePath) {
+    private boolean replaceVolumeWithBackup(KVMStoragePoolManager storagePoolMgr, PrimaryDataStoreTO volumePool, String volumePath, String backupPath, int timeout, boolean createTargetVolume) {
+        if (volumePool.getPoolType() != Storage.StoragePoolType.RBD) {
+            int exitValue = Script.runSimpleBashScriptForExitValue(String.format(RSYNC_COMMAND, backupPath, volumePath));
+            return exitValue == 0;
+        }
+
+        return replaceRbdVolumeWithBackup(storagePoolMgr, volumePool, volumePath, backupPath, timeout, createTargetVolume);
+    }
+
+    private boolean replaceRbdVolumeWithBackup(KVMStoragePoolManager storagePoolMgr, PrimaryDataStoreTO volumePool, String volumePath, String backupPath, int timeout, boolean createTargetVolume) {
+        KVMStoragePool volumeStoragePool = storagePoolMgr.getStoragePool(volumePool.getPoolType(), volumePool.getUuid());
+        QemuImg qemu;
+        try {
+            qemu = new QemuImg(timeout * 1000, true, false);
+            if (!createTargetVolume) {
+                KVMPhysicalDisk rdbDisk = volumeStoragePool.getPhysicalDisk(volumePath);
+                logger.debug("Restoring RBD volume: {}", rdbDisk.toString());
+                qemu.setSkipTargetVolumeCreation(true);
+            }
+        } catch (LibvirtException ex) {
+            throw new CloudRuntimeException("Failed to create qemu-img command to restore RBD volume with backup", ex);
+        }
+
+        QemuImgFile srcBackupFile = null;
+        QemuImgFile destVolumeFile = null;
+        try {
+            srcBackupFile = new QemuImgFile(backupPath, QemuImg.PhysicalDiskFormat.QCOW2);
+            String rbdDestVolumeFile = KVMPhysicalDisk.RBDStringBuilder(volumeStoragePool, volumePath);
+            destVolumeFile = new QemuImgFile(rbdDestVolumeFile, QemuImg.PhysicalDiskFormat.RAW);
+
+            logger.debug("Starting convert backup  {} to RBD volume  {}", backupPath, volumePath);
+            qemu.convert(srcBackupFile, destVolumeFile);
+            logger.debug("Successfully converted backup {} to RBD volume  {}", backupPath, volumePath);
+        } catch (QemuImgException | LibvirtException e) {
+            String srcFilename = srcBackupFile != null ? srcBackupFile.getFileName() : null;
+            String destFilename = destVolumeFile != null ? destVolumeFile.getFileName() : null;
+            logger.error("Failed to convert backup {} to volume {}, the error was: {}", srcFilename, destFilename, e.getMessage());
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean attachVolumeToVm(KVMStoragePoolManager storagePoolMgr, String vmName, PrimaryDataStoreTO volumePool, String volumePath) {
         String deviceToAttachDiskTo = getDeviceToAttachDisk(vmName);
-        int exitValue = Script.runSimpleBashScriptForExitValue(String.format(ATTACH_DISK_COMMAND, vmName, volumePath, deviceToAttachDiskTo));
+        int exitValue;
+        if (volumePool.getPoolType() != Storage.StoragePoolType.RBD) {
+            exitValue = Script.runSimpleBashScriptForExitValue(String.format(ATTACH_QCOW2_DISK_COMMAND, vmName, volumePath, deviceToAttachDiskTo));
+        } else {
+            String xmlForRbdDisk = getXmlForRbdDisk(storagePoolMgr, volumePool, volumePath, deviceToAttachDiskTo);
+            logger.debug("RBD disk xml to attach: {}", xmlForRbdDisk);
+            exitValue = Script.runSimpleBashScriptForExitValue(String.format(ATTACH_RBD_DISK_XML_COMMAND, vmName, xmlForRbdDisk));
+        }
         return exitValue == 0;
     }
 
@@ -237,4 +303,42 @@
         char incrementedChar = (char) (lastChar + 1);
         return currentDevice.substring(0, currentDevice.length() - 1) + incrementedChar;
     }
+
+    private String getXmlForRbdDisk(KVMStoragePoolManager storagePoolMgr, PrimaryDataStoreTO volumePool, String volumePath, String deviceToAttachDiskTo) {
+        StringBuilder diskBuilder = new StringBuilder();
+        diskBuilder.append("\n<disk ");
+        diskBuilder.append(" device='disk'");
+        diskBuilder.append(" type='network'");
+        diskBuilder.append(">\n");
+
+        diskBuilder.append("<source ");
+        diskBuilder.append(" protocol='rbd'");
+        diskBuilder.append(" name='" + volumePath + "'");
+        diskBuilder.append(">\n");
+        for (String sourceHost : volumePool.getHost().split(",")) {
+            diskBuilder.append("<host name='");
+            diskBuilder.append(sourceHost.replace("[", "").replace("]", ""));
+            if (volumePool.getPort() != 0) {
+                diskBuilder.append("' port='");
+                diskBuilder.append(volumePool.getPort());
+            }
+            diskBuilder.append("'/>\n");
+        }
+        diskBuilder.append("</source>\n");
+        String authUserName = null;
+        final KVMStoragePool primaryPool = storagePoolMgr.getStoragePool(volumePool.getPoolType(), volumePool.getUuid());
+        if (primaryPool != null) {
+            authUserName = primaryPool.getAuthUserName();
+        }
+        if (StringUtils.isNotBlank(authUserName)) {
+            diskBuilder.append("<auth username='" + authUserName + "'>\n");
+            diskBuilder.append("<secret type='ceph' uuid='" + volumePool.getUuid() + "'/>\n");
+            diskBuilder.append("</auth>\n");
+        }
+        diskBuilder.append("<target dev='" + deviceToAttachDiskTo + "'");
+        diskBuilder.append(" bus='virtio'");
+        diskBuilder.append("/>\n");
+        diskBuilder.append("</disk>\n");
+        return diskBuilder.toString();
+    }
 }
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtTakeBackupCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtTakeBackupCommandWrapper.java
index c7a6708..11fa605 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtTakeBackupCommandWrapper.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtTakeBackupCommandWrapper.java
@@ -22,12 +22,17 @@
 import com.amazonaws.util.CollectionUtils;
 import com.cloud.agent.api.Answer;
 import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMPhysicalDisk;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
 import com.cloud.resource.CommandWrapper;
 import com.cloud.resource.ResourceWrapper;
+import com.cloud.storage.Storage;
 import com.cloud.utils.Pair;
 import com.cloud.utils.script.Script;
 import org.apache.cloudstack.backup.BackupAnswer;
 import org.apache.cloudstack.backup.TakeBackupCommand;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,7 +49,24 @@
         final String backupRepoType = command.getBackupRepoType();
         final String backupRepoAddress = command.getBackupRepoAddress();
         final String mountOptions = command.getMountOptions();
-        final List<String> diskPaths = command.getVolumePaths();
+        List<PrimaryDataStoreTO> volumePools = command.getVolumePools();
+        final List<String> volumePaths = command.getVolumePaths();
+        KVMStoragePoolManager storagePoolMgr = libvirtComputingResource.getStoragePoolMgr();
+
+        List<String> diskPaths = new ArrayList<>();
+        if (Objects.nonNull(volumePaths)) {
+            for (int idx = 0; idx < volumePaths.size(); idx++) {
+                PrimaryDataStoreTO volumePool = volumePools.get(idx);
+                String volumePath = volumePaths.get(idx);
+                if (volumePool.getPoolType() != Storage.StoragePoolType.RBD) {
+                    diskPaths.add(volumePath);
+                } else {
+                    KVMStoragePool volumeStoragePool = storagePoolMgr.getStoragePool(volumePool.getPoolType(), volumePool.getUuid());
+                    String rbdDestVolumeFile = KVMPhysicalDisk.RBDStringBuilder(volumeStoragePool, volumePath);
+                    diskPaths.add(rbdDestVolumeFile);
+                }
+            }
+        }
 
         List<String[]> commands = new ArrayList<>();
         commands.add(new String[]{
@@ -56,7 +78,7 @@
                 "-m", Objects.nonNull(mountOptions) ? mountOptions : "",
                 "-p", backupPath,
                 "-q", command.getQuiesce() != null && command.getQuiesce() ? "true" : "false",
-                "-d", (Objects.nonNull(diskPaths) && !diskPaths.isEmpty()) ? String.join(",", diskPaths) : ""
+                "-d", diskPaths.isEmpty() ? "" : String.join(",", diskPaths)
         });
 
         Pair<Integer, String> result = Script.executePipedCommands(commands, libvirtComputingResource.getCmdsTimeout());
diff --git a/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java b/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java
index beba085..0a8ea27 100644
--- a/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java
+++ b/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java
@@ -61,6 +61,7 @@
     private String cloudQemuImgPath = "cloud-qemu-img";
     private int timeout;
     private boolean skipZero = false;
+    private boolean skipTargetVolumeCreation = false;
     private boolean noCache = false;
     private long version;
 
@@ -435,6 +436,8 @@
             // with target-is-zero we skip zeros in 1M chunks for compatibility
             script.add("-S");
             script.add("1M");
+        } else if (skipTargetVolumeCreation) {
+            script.add("-n");
         }
 
         script.add("-O");
@@ -881,6 +884,10 @@
         this.skipZero = skipZero;
     }
 
+    public void setSkipTargetVolumeCreation(boolean skipTargetVolumeCreation) {
+        this.skipTargetVolumeCreation = skipTargetVolumeCreation;
+    }
+
     public boolean supportsImageFormat(QemuImg.PhysicalDiskFormat format) {
         final Script s = new Script(_qemuImgPath, timeout);
         s.add("--help");
diff --git a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapperTest.java b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapperTest.java
index d120abd..7bcd0bf 100644
--- a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapperTest.java
+++ b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRestoreBackupCommandWrapperTest.java
@@ -22,6 +22,7 @@
 import com.cloud.vm.VirtualMachine;
 import org.apache.cloudstack.backup.BackupAnswer;
 import org.apache.cloudstack.backup.RestoreBackupCommand;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,6 +66,8 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(null);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -105,6 +108,8 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(true);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getBackupVolumesUUIDs()).thenReturn(Arrays.asList("volume-123"));
         when(command.getMountTimeout()).thenReturn(30);
@@ -141,6 +146,8 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(false);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getMountTimeout()).thenReturn(30);
 
@@ -176,6 +183,8 @@
         when(command.getMountOptions()).thenReturn("username=user,password=pass");
         when(command.isVmExists()).thenReturn(null);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -215,6 +224,8 @@
         lenient().when(command.getMountOptions()).thenReturn("rw");
         lenient().when(command.isVmExists()).thenReturn(null);
         lenient().when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         lenient().when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         lenient().when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         lenient().when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -249,6 +260,8 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(null);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -293,6 +306,8 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(null);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -339,6 +354,8 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(null);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -387,6 +404,8 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(null);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -439,6 +458,8 @@
         lenient().when(command.getMountOptions()).thenReturn("rw");
         lenient().when(command.isVmExists()).thenReturn(null);
         lenient().when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(primaryDataStore));
         lenient().when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList("/var/lib/libvirt/images/volume-123"));
         lenient().when(command.getRestoreVolumeUUID()).thenReturn("volume-123");
         lenient().when(command.getVmState()).thenReturn(VirtualMachine.State.Running);
@@ -467,6 +488,12 @@
         when(command.getMountOptions()).thenReturn("rw");
         when(command.isVmExists()).thenReturn(true);
         when(command.getDiskType()).thenReturn("root");
+        PrimaryDataStoreTO primaryDataStore1 = Mockito.mock(PrimaryDataStoreTO.class);
+        PrimaryDataStoreTO primaryDataStore2 = Mockito.mock(PrimaryDataStoreTO.class);
+        when(command.getRestoreVolumePools()).thenReturn(Arrays.asList(
+                primaryDataStore1,
+                primaryDataStore2
+        ));
         when(command.getRestoreVolumePaths()).thenReturn(Arrays.asList(
                 "/var/lib/libvirt/images/volume-123",
                 "/var/lib/libvirt/images/volume-456"
diff --git a/scripts/vm/hypervisor/kvm/nasbackup.sh b/scripts/vm/hypervisor/kvm/nasbackup.sh
index 588c379..e298006 100755
--- a/scripts/vm/hypervisor/kvm/nasbackup.sh
+++ b/scripts/vm/hypervisor/kvm/nasbackup.sh
@@ -165,7 +165,14 @@
 
   name="root"
   for disk in $DISK_PATHS; do
-    volUuid="${disk##*/}"
+    if [[ "$disk" == rbd:* ]]; then
+      # disk for rbd => rbd:<pool>/<uuid>:mon_host=<monitor_host>...
+      # sample: rbd:cloudstack/53d5c355-d726-4d3e-9422-046a503a0b12:mon_host=10.0.1.2...
+      beforeUuid="${disk#*/}"     # Remove up to first slash after rbd:
+      volUuid="${beforeUuid%%:*}" # Remove everything after colon to get the uuid
+    else
+      volUuid="${disk##*/}"
+    fi
     output="$dest/$name.$volUuid.qcow2"
     if ! qemu-img convert -O qcow2 "$disk" "$output" > "$logFile" 2> >(cat >&2); then
       echo "qemu-img convert failed for $disk $output"