Merge branch '4.19'
diff --git a/api/src/main/java/com/cloud/event/EventTypes.java b/api/src/main/java/com/cloud/event/EventTypes.java
index ea70b61..d4235cb 100644
--- a/api/src/main/java/com/cloud/event/EventTypes.java
+++ b/api/src/main/java/com/cloud/event/EventTypes.java
@@ -451,6 +451,7 @@
     public static final String EVENT_ENABLE_PRIMARY_STORAGE = "ENABLE.PS";
     public static final String EVENT_DISABLE_PRIMARY_STORAGE = "DISABLE.PS";
     public static final String EVENT_SYNC_STORAGE_POOL = "SYNC.STORAGE.POOL";
+    public static final String EVENT_CHANGE_STORAGE_POOL_SCOPE = "CHANGE.STORAGE.POOL.SCOPE";
 
     // VPN
     public static final String EVENT_REMOTE_ACCESS_VPN_CREATE = "VPN.REMOTE.ACCESS.CREATE";
@@ -1002,6 +1003,7 @@
         // Primary storage pool
         entityEventDetails.put(EVENT_ENABLE_PRIMARY_STORAGE, StoragePool.class);
         entityEventDetails.put(EVENT_DISABLE_PRIMARY_STORAGE, StoragePool.class);
+        entityEventDetails.put(EVENT_CHANGE_STORAGE_POOL_SCOPE, StoragePool.class);
 
         // VPN
         entityEventDetails.put(EVENT_REMOTE_ACCESS_VPN_CREATE, RemoteAccessVpn.class);
diff --git a/api/src/main/java/com/cloud/storage/StorageService.java b/api/src/main/java/com/cloud/storage/StorageService.java
index 77800d8..1ce335b 100644
--- a/api/src/main/java/com/cloud/storage/StorageService.java
+++ b/api/src/main/java/com/cloud/storage/StorageService.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 
 import org.apache.cloudstack.api.command.admin.storage.CancelPrimaryStorageMaintenanceCmd;
+import org.apache.cloudstack.api.command.admin.storage.ChangeStoragePoolScopeCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateSecondaryStagingStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeleteImageStoreCmd;
@@ -35,6 +36,7 @@
 import com.cloud.exception.DiscoveryException;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.exception.PermissionDeniedException;
 import com.cloud.exception.ResourceInUseException;
 import com.cloud.exception.ResourceUnavailableException;
 import org.apache.cloudstack.api.command.admin.storage.heuristics.CreateSecondaryStorageSelectorCmd;
@@ -130,4 +132,6 @@
     boolean deleteObjectStore(DeleteObjectStoragePoolCmd cmd);
 
     ObjectStore updateObjectStore(Long id, UpdateObjectStoragePoolCmd cmd);
+
+    void changeStoragePoolScope(ChangeStoragePoolScopeCmd cmd) throws IllegalArgumentException, InvalidParameterValueException, PermissionDeniedException;
 }
diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
index a2bcd10..6db1ed0 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
@@ -450,6 +450,7 @@
     public static final String STORAGE_POLICY = "storagepolicy";
     public static final String STORAGE_MOTION_ENABLED = "storagemotionenabled";
     public static final String STORAGE_CAPABILITIES = "storagecapabilities";
+    public static final String STORAGE_CUSTOM_STATS = "storagecustomstats";
     public static final String SUBNET = "subnet";
     public static final String OWNER = "owner";
     public static final String SWAP_OWNER = "swapowner";
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ChangeStoragePoolScopeCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ChangeStoragePoolScopeCmd.java
new file mode 100644
index 0000000..d3b6a07
--- /dev/null
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ChangeStoragePoolScopeCmd.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api.command.admin.storage;
+
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiCommandResourceType;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseAsyncCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.response.ClusterResponse;
+import org.apache.cloudstack.api.response.StoragePoolResponse;
+import org.apache.cloudstack.api.response.SuccessResponse;
+import org.apache.cloudstack.context.CallContext;
+
+import com.cloud.event.EventTypes;
+import com.cloud.storage.StoragePool;
+
+@APICommand(name = "changeStoragePoolScope", description = "Changes the scope of a storage pool when the pool is in Disabled state." +
+        "This feature is officially tested and supported for Hypervisors: KVM and VMware, Protocols: NFS and Ceph, and Storage Provider: DefaultPrimary. " +
+        "There might be extra steps involved to make this work for other hypervisors and storage options.",
+        responseObject = SuccessResponse.class, since= "4.19.1", requestHasSensitiveInfo = false, responseHasSensitiveInfo = false)
+public class ChangeStoragePoolScopeCmd extends BaseAsyncCmd {
+
+    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = StoragePoolResponse.class, required = true, description = "the Id of the storage pool")
+    private Long id;
+
+    @Parameter(name = ApiConstants.SCOPE, type = CommandType.STRING, required = true, description = "the scope of the storage: cluster or zone")
+    private String scope;
+
+    @Parameter(name = ApiConstants.CLUSTER_ID, type = CommandType.UUID, entityType = ClusterResponse.class, description = "the Id of the cluster to use if scope is being set to Cluster")
+    private Long clusterId;
+
+    @Override
+    public ApiCommandResourceType getApiResourceType() {
+        return ApiCommandResourceType.StoragePool;
+    }
+
+    @Override
+    public Long getApiResourceId() {
+        return getId();
+    }
+
+    public String getEventType() {
+        return EventTypes.EVENT_CHANGE_STORAGE_POOL_SCOPE;
+    }
+
+    @Override
+    public String getEventDescription() {
+        String description = "Change storage pool scope. Storage pool Id: ";
+        StoragePool pool = _entityMgr.findById(StoragePool.class, getId());
+        if (pool != null) {
+            description += pool.getUuid();
+        } else {
+            description += getId();
+        }
+        description += " to " + getScope();
+        return description;
+    }
+
+    @Override
+    public void execute() {
+        _storageService.changeStoragePoolScope(this);
+        SuccessResponse response = new SuccessResponse(getCommandName());
+        this.setResponseObject(response);
+    }
+
+    @Override
+    public long getEntityOwnerId() {
+        return CallContext.current().getCallingAccountId();
+    }
+
+    public Long getId() {
+        return id;
+    }
+
+    public String getScope() {
+        return scope;
+    }
+
+    public Long getClusterId() {
+        return clusterId;
+    }
+}
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
index 293ed31..57a8793 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
@@ -72,7 +72,8 @@
     @Parameter(name = ApiConstants.HOST_ID, type = CommandType.UUID, entityType = HostResponse.class, description = "host ID of the storage pools")
     private Long hostId;
 
-
+    @Parameter(name = ApiConstants.STORAGE_CUSTOM_STATS, type = CommandType.BOOLEAN, description = "If true, lists the custom stats of the storage pool", since = "4.18.1")
+    private Boolean customStats;
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -129,6 +130,10 @@
         this.scope = scope;
     }
 
+    public Boolean getCustomStats() {
+        return customStats != null && customStats;
+    }
+
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/ListAffectedVmsForStorageScopeChangeCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/ListAffectedVmsForStorageScopeChangeCmd.java
new file mode 100644
index 0000000..d586a81
--- /dev/null
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/ListAffectedVmsForStorageScopeChangeCmd.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.api.command.admin.vm;
+
+import org.apache.cloudstack.acl.RoleType;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseListCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.response.ClusterResponse;
+import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.api.response.StoragePoolResponse;
+import org.apache.cloudstack.api.response.VirtualMachineResponse;
+
+import com.cloud.vm.VirtualMachine;
+
+@APICommand(name = "listAffectedVmsForStorageScopeChange",
+        description = "List user and system VMs that need to be stopped and destroyed respectively for changing the scope of the storage pool from Zone to Cluster.",
+        responseObject = VirtualMachineResponse.class,
+        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.19.1",
+        authorized = {RoleType.Admin})
+public class ListAffectedVmsForStorageScopeChangeCmd extends BaseListCmd {
+
+    @Parameter(name = ApiConstants.CLUSTER_ID,
+            type = CommandType.UUID,
+            entityType = ClusterResponse.class,
+            required = true,
+            description = "the Id of the cluster the scope of the storage pool is being changed to")
+    private Long clusterIdForScopeChange;
+
+    @Parameter(name = ApiConstants.STORAGE_ID,
+            type = CommandType.UUID,
+            entityType = StoragePoolResponse.class,
+            required = true,
+            description = "the Id of the storage pool on which change scope operation is being done")
+    private Long storageId;
+
+    /////////////////////////////////////////////////////
+    /////////////////// Accessors ///////////////////////
+    /////////////////////////////////////////////////////
+
+    public Long getClusterIdForScopeChange() {
+        return clusterIdForScopeChange;
+    }
+
+    public Long getStorageId() {
+        return storageId;
+    }
+
+    /////////////////////////////////////////////////////
+    /////////////// API Implementation///////////////////
+    /////////////////////////////////////////////////////
+
+    @Override
+    public void execute() {
+        ListResponse<VirtualMachineResponse> response = _queryService.listAffectedVmsForStorageScopeChange(this);
+        response.setResponseName(getCommandName());
+        response.setObjectName(VirtualMachine.class.getSimpleName().toLowerCase());
+        setResponseObject(response);
+    }
+}
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
index f514c81..9e7f515 100644
--- a/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
+++ b/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
@@ -97,6 +97,10 @@
     @Param(description = "total min IOPS currently in use by volumes")
     private Long allocatedIops;
 
+    @SerializedName(ApiConstants.STORAGE_CUSTOM_STATS)
+    @Param(description = "the storage pool custom stats", since = "4.18.1")
+    private Map<String, String> customStats;
+
     @SerializedName("tags")
     @Param(description = "the tags for the storage pool")
     private String tags;
@@ -304,6 +308,14 @@
        this.allocatedIops = allocatedIops;
     }
 
+    public Map<String, String> getCustomStats() {
+        return customStats;
+    }
+
+    public void setCustomStats(Map<String, String> customStats) {
+        this.customStats = customStats;
+    }
+
     public String getTags() {
         return tags;
     }
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/VirtualMachineResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/VirtualMachineResponse.java
new file mode 100644
index 0000000..7d67629
--- /dev/null
+++ b/api/src/main/java/org/apache/cloudstack/api/response/VirtualMachineResponse.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.response;
+
+import org.apache.cloudstack.api.BaseResponse;
+import org.apache.cloudstack.api.EntityReference;
+
+import com.cloud.serializer.Param;
+import com.cloud.vm.VirtualMachine;
+import com.google.gson.annotations.SerializedName;
+
+@EntityReference(value = VirtualMachine.class)
+public class VirtualMachineResponse extends BaseResponse {
+    @SerializedName("id")
+    @Param(description = "the ID of the VM")
+    private String id;
+
+    @SerializedName("type")
+    @Param(description = "the type of VM")
+    private String type;
+
+    @SerializedName("name")
+    @Param(description = "the name of the VM")
+    private String name;
+
+    @SerializedName("clusterid")
+    @Param(description = "the cluster ID for the VM")
+    private String clusterId;
+
+    @SerializedName("clustername")
+    @Param(description = "the cluster name for the VM")
+    private String clusterName;
+
+    @SerializedName("hostid")
+    @Param(description = "the host ID for the VM")
+    private String hostId;
+
+    @SerializedName("hostname")
+    @Param(description = "the hostname for the VM")
+    private String hostName;
+
+    @Override
+    public String getObjectId() {
+        return this.getId();
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getVmType() {
+        return type;
+    }
+
+    public void setVmType(String type) {
+        this.type = type;
+    }
+
+    public String getVmName() {
+        return name;
+    }
+
+    public void setVmName(String name) {
+        this.name = name;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getHostId() {
+        return hostId;
+    }
+
+    public void setHostId(String hostId) {
+        this.hostId = hostId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
+    }
+}
diff --git a/api/src/main/java/org/apache/cloudstack/query/QueryService.java b/api/src/main/java/org/apache/cloudstack/query/QueryService.java
index 4c53314..c93e43d 100644
--- a/api/src/main/java/org/apache/cloudstack/query/QueryService.java
+++ b/api/src/main/java/org/apache/cloudstack/query/QueryService.java
@@ -52,6 +52,7 @@
 import org.apache.cloudstack.api.command.user.snapshot.ListSnapshotsCmd;
 import org.apache.cloudstack.api.command.user.tag.ListTagsCmd;
 import org.apache.cloudstack.api.command.user.template.ListTemplatesCmd;
+import org.apache.cloudstack.api.command.admin.vm.ListAffectedVmsForStorageScopeChangeCmd;
 import org.apache.cloudstack.api.command.user.vm.ListVMsCmd;
 import org.apache.cloudstack.api.command.user.vmgroup.ListVMGroupsCmd;
 import org.apache.cloudstack.api.command.user.volume.ListResourceDetailsCmd;
@@ -89,6 +90,7 @@
 import org.apache.cloudstack.api.response.TemplateResponse;
 import org.apache.cloudstack.api.response.UserResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
+import org.apache.cloudstack.api.response.VirtualMachineResponse;
 import org.apache.cloudstack.api.response.VolumeResponse;
 import org.apache.cloudstack.api.response.ZoneResponse;
 import org.apache.cloudstack.framework.config.ConfigKey;
@@ -140,6 +142,8 @@
 
     ListResponse<UserVmResponse> searchForUserVMs(ListVMsCmd cmd);
 
+    ListResponse<VirtualMachineResponse> listAffectedVmsForStorageScopeChange(ListAffectedVmsForStorageScopeChangeCmd cmd);
+
     ListResponse<SecurityGroupResponse> searchForSecurityGroups(ListSecurityGroupsCmd cmd);
 
     ListResponse<DomainRouterResponse> searchForRouters(ListRoutersCmd cmd);
diff --git a/core/src/main/java/com/cloud/agent/api/PrepareStorageClientAnswer.java b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientAnswer.java
new file mode 100644
index 0000000..85afb92
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientAnswer.java
@@ -0,0 +1,43 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+import java.util.Map;
+
+public class PrepareStorageClientAnswer extends Answer {
+    Map<String, String> detailsMap;
+
+    public PrepareStorageClientAnswer() {
+        super();
+    }
+
+    public PrepareStorageClientAnswer(Command command, boolean success, Map<String, String> detailsMap) {
+        super(command, success, "");
+        this.detailsMap = detailsMap;
+    }
+
+    public PrepareStorageClientAnswer(Command command, boolean success, String details) {
+        super(command, success, details);
+    }
+
+    public Map<String, String> getDetailsMap() {
+        return detailsMap;
+    }
+}
diff --git a/core/src/main/java/com/cloud/agent/api/PrepareStorageClientCommand.java b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientCommand.java
new file mode 100644
index 0000000..8dea9c1
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientCommand.java
@@ -0,0 +1,56 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+import java.util.Map;
+
+import com.cloud.storage.Storage.StoragePoolType;
+
+public class PrepareStorageClientCommand extends Command {
+    private StoragePoolType poolType;
+    private String poolUuid;
+    private Map<String, String> details;
+
+    public PrepareStorageClientCommand() {
+    }
+
+    public PrepareStorageClientCommand(StoragePoolType poolType, String poolUuid, Map<String, String> details) {
+        this.poolType = poolType;
+        this.poolUuid = poolUuid;
+        this.details = details;
+    }
+
+    @Override
+    public boolean executeInSequence() {
+        return false;
+    }
+
+    public StoragePoolType getPoolType() {
+        return poolType;
+    }
+
+    public String getPoolUuid() {
+        return poolUuid;
+    }
+
+    public Map<String, String> getDetails() {
+        return details;
+    }
+}
diff --git a/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientAnswer.java b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientAnswer.java
new file mode 100644
index 0000000..1280293
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientAnswer.java
@@ -0,0 +1,34 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+public class UnprepareStorageClientAnswer extends Answer {
+    public UnprepareStorageClientAnswer() {
+        super();
+    }
+
+    public UnprepareStorageClientAnswer(Command command, boolean success) {
+        super(command, success, "");
+    }
+
+    public UnprepareStorageClientAnswer(Command command, boolean success, String details) {
+        super(command, success, details);
+    }
+}
diff --git a/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientCommand.java b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientCommand.java
new file mode 100644
index 0000000..bebd30c
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientCommand.java
@@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+import com.cloud.storage.Storage.StoragePoolType;
+
+public class UnprepareStorageClientCommand extends Command {
+    private StoragePoolType poolType;
+    private String poolUuid;
+
+    public UnprepareStorageClientCommand() {
+    }
+
+    public UnprepareStorageClientCommand(StoragePoolType poolType, String poolUuid) {
+        this.poolType = poolType;
+        this.poolUuid = poolUuid;
+    }
+
+    @Override
+    public boolean executeInSequence() {
+        return false;
+    }
+
+    public StoragePoolType getPoolType() {
+        return poolType;
+    }
+
+    public String getPoolUuid() {
+        return poolUuid;
+    }
+}
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java
index dbe67e6..d52c656 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cloudstack.engine.subsystem.api.storage;
 
+import java.util.Map;
+
 import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
 import org.apache.cloudstack.storage.command.CommandResult;
 
@@ -88,6 +90,22 @@
 
     /**
      * intended for managed storage
+     * returns true if the storage can provide its custom stats
+     */
+    default boolean poolProvidesCustomStorageStats() {
+        return false;
+    }
+
+    /**
+     * intended for managed storage
+     * returns the custom stats if the storage can provide them
+     */
+    default Map<String, String> getCustomStorageStats(StoragePool pool) {
+        return null;
+    }
+
+    /**
+     * intended for managed storage
      * returns the total capacity and used size in bytes
      */
     Pair<Long, Long> getStorageStats(StoragePool storagePool);
@@ -111,6 +129,14 @@
     boolean canHostAccessStoragePool(Host host, StoragePool pool);
 
     /**
+     * intended for managed storage
+     * returns true if the host can prepare storage client to provide access the storage pool
+     */
+    default boolean canHostPrepareStoragePoolAccess(Host host, StoragePool pool) {
+        return false;
+    }
+
+    /**
      * Used by storage pools which want to keep VMs' information
      * @return true if additional VM info is needed (intended for storage pools).
      */
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreLifeCycle.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreLifeCycle.java
index fcbc19c..54f3c63 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreLifeCycle.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreLifeCycle.java
@@ -20,6 +20,7 @@
 
 import java.util.Map;
 
+import com.cloud.hypervisor.Hypervisor;
 import com.cloud.storage.StoragePool;
 
 public interface PrimaryDataStoreLifeCycle extends DataStoreLifeCycle {
@@ -29,4 +30,6 @@
     void updateStoragePool(StoragePool storagePool, Map<String, String> details);
     void enableStoragePool(DataStore store);
     void disableStoragePool(DataStore store);
+    void changeStoragePoolScopeToZone(DataStore store, ClusterScope clusterScope, Hypervisor.HypervisorType hypervisorType);
+    void changeStoragePoolScopeToCluster(DataStore store, ClusterScope clusterScope, Hypervisor.HypervisorType hypervisorType);
 }
diff --git a/engine/components-api/src/main/java/com/cloud/resource/ResourceManager.java b/engine/components-api/src/main/java/com/cloud/resource/ResourceManager.java
index 91197de..b2ae8b8 100755
--- a/engine/components-api/src/main/java/com/cloud/resource/ResourceManager.java
+++ b/engine/components-api/src/main/java/com/cloud/resource/ResourceManager.java
@@ -134,6 +134,10 @@
 
     public List<HostVO> listAllHostsInAllZonesByType(Type type);
 
+    public List<HostVO> listAllHostsInOneZoneNotInClusterByHypervisor(final HypervisorType type, long dcId, long clusterId);
+
+    public List<HostVO> listAllHostsInOneZoneNotInClusterByHypervisors(List<HypervisorType> types, long dcId, long clusterId);
+
     public List<HypervisorType> listAvailHypervisorInZone(Long hostId, Long zoneId);
 
     public HostVO findHostByGuid(String guid);
diff --git a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
index 779c08e..c3909bc 100644
--- a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
+++ b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
@@ -118,7 +118,7 @@
             "storage.pool.disk.wait",
             "Storage",
             "60",
-            "Timeout (in secs) for the storage pool disk (of managed pool) to become available in the host. Currently only supported for PowerFlex.",
+            "Timeout (in secs) for the storage pool disk (of managed pool) to become available in the host. Currently supported for PowerFlex only.",
             true,
             ConfigKey.Scope.StoragePool,
             null);
@@ -127,7 +127,7 @@
             "storage.pool.client.timeout",
             "Storage",
             "60",
-            "Timeout (in secs) for the storage pool client connection timeout (for managed pools). Currently only supported for PowerFlex.",
+            "Timeout (in secs) for the API client connection timeout of storage pool (for managed pools). Currently supported for PowerFlex only.",
             false,
             ConfigKey.Scope.StoragePool,
             null);
@@ -136,11 +136,20 @@
             "storage.pool.client.max.connections",
             "Storage",
             "100",
-            "Maximum connections for the storage pool client (for managed pools). Currently only supported for PowerFlex.",
+            "Maximum connections for the API client of storage pool (for managed pools). Currently supported for PowerFlex only.",
             false,
             ConfigKey.Scope.StoragePool,
             null);
 
+    ConfigKey<Integer> STORAGE_POOL_CONNECTED_CLIENTS_LIMIT = new ConfigKey<>(Integer.class,
+            "storage.pool.connected.clients.limit",
+            "Storage",
+            "-1",
+            "Maximum connected storage pool clients supported for the storage (for managed pools), <= 0 for unlimited (default: -1). Currently supported for PowerFlex only.",
+            true,
+            ConfigKey.Scope.StoragePool,
+            null);
+
     ConfigKey<String> STORAGE_POOL_IO_POLICY = new ConfigKey<>(String.class,
             "kvm.storage.pool.io.policy",
             "Storage",
@@ -252,6 +261,10 @@
 
     boolean canPoolProvideStorageStats(StoragePool pool);
 
+    boolean poolProvidesCustomStorageStats(StoragePool pool);
+
+    Map<String, String> getCustomStorageStats(StoragePool pool);
+
     /**
      * Checks if a host has running VMs that are using its local storage pool.
      * @return true if local storage is active on the host
@@ -288,6 +301,8 @@
 
     boolean canHostAccessStoragePool(Host host, StoragePool pool);
 
+    boolean canHostPrepareStoragePoolAccess(Host host, StoragePool pool);
+
     Host getHost(long hostId);
 
     Host updateSecondaryStorage(long secStorageId, String newUrl);
diff --git a/engine/schema/src/main/java/com/cloud/capacity/CapacityVO.java b/engine/schema/src/main/java/com/cloud/capacity/CapacityVO.java
index 132fd3f..cd62935 100644
--- a/engine/schema/src/main/java/com/cloud/capacity/CapacityVO.java
+++ b/engine/schema/src/main/java/com/cloud/capacity/CapacityVO.java
@@ -135,8 +135,8 @@
         return podId;
     }
 
-    public void setPodId(long podId) {
-        this.podId = new Long(podId);
+    public void setPodId(Long podId) {
+        this.podId = podId;
     }
 
     @Override
@@ -144,8 +144,8 @@
         return clusterId;
     }
 
-    public void setClusterId(long clusterId) {
-        this.clusterId = new Long(clusterId);
+    public void setClusterId(Long clusterId) {
+        this.clusterId = clusterId;
     }
 
     @Override
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDao.java b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDao.java
index b099a6d..62ef5b7 100644
--- a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDao.java
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDao.java
@@ -41,4 +41,6 @@
     public void deleteStoragePoolHostDetails(long hostId, long poolId);
 
     List<StoragePoolHostVO> listByHostId(long hostId);
+
+    Pair<List<StoragePoolHostVO>, Integer> listByPoolIdNotInCluster(long clusterId, long poolId);
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDaoImpl.java b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDaoImpl.java
index 9e7bdca..987a42f 100644
--- a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolHostDaoImpl.java
@@ -23,12 +23,18 @@
 import java.util.List;
 import java.util.stream.Collectors;
 
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+
 import org.springframework.stereotype.Component;
 
+import com.cloud.host.HostVO;
 import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
 import com.cloud.storage.StoragePoolHostVO;
 import com.cloud.utils.Pair;
 import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.JoinBuilder;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.TransactionLegacy;
@@ -40,6 +46,11 @@
     protected final SearchBuilder<StoragePoolHostVO> HostSearch;
     protected final SearchBuilder<StoragePoolHostVO> PoolHostSearch;
 
+    protected SearchBuilder<StoragePoolHostVO> poolNotInClusterSearch;
+
+    @Inject
+    HostDao hostDao;
+
     protected static final String HOST_FOR_POOL_SEARCH = "SELECT * FROM storage_pool_host_ref ph,  host h where  ph.host_id = h.id and ph.pool_id=? and h.status=? ";
 
     protected static final String HOSTS_FOR_POOLS_SEARCH = "SELECT DISTINCT(ph.host_id) FROM storage_pool_host_ref ph, host h WHERE ph.host_id = h.id AND h.status = 'Up' AND resource_state = 'Enabled' AND ph.pool_id IN (?)";
@@ -68,6 +79,15 @@
 
     }
 
+    @PostConstruct
+    public void init(){
+        poolNotInClusterSearch = createSearchBuilder();
+        poolNotInClusterSearch.and("poolId", poolNotInClusterSearch.entity().getPoolId(), SearchCriteria.Op.EQ);
+        SearchBuilder<HostVO> hostSearch = hostDao.createSearchBuilder();
+        poolNotInClusterSearch.join("hostSearch", hostSearch, hostSearch.entity().getId(), poolNotInClusterSearch.entity().getHostId(), JoinBuilder.JoinType.INNER);
+        hostSearch.and("clusterId", hostSearch.entity().getClusterId(), SearchCriteria.Op.NEQ);
+    }
+
     @Override
     public List<StoragePoolHostVO> listByPoolId(long id) {
         SearchCriteria<StoragePoolHostVO> sc = PoolSearch.create();
@@ -194,4 +214,12 @@
         remove(sc);
         txn.commit();
     }
+
+    @Override
+    public Pair<List<StoragePoolHostVO>, Integer> listByPoolIdNotInCluster(long clusterId, long poolId) {
+        SearchCriteria<StoragePoolHostVO> sc = poolNotInClusterSearch.create();
+        sc.setParameters("poolId", poolId);
+        sc.setJoinParameters("hostSearch", "clusterId", clusterId);
+        return searchAndCount(sc, null);
+    }
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/VolumeDaoImpl.java b/engine/schema/src/main/java/com/cloud/storage/dao/VolumeDaoImpl.java
index 16004ac..0c4d707 100644
--- a/engine/schema/src/main/java/com/cloud/storage/dao/VolumeDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/VolumeDaoImpl.java
@@ -77,10 +77,11 @@
     protected GenericSearchBuilder<VolumeVO, SumCount> primaryStorageSearch2;
     protected GenericSearchBuilder<VolumeVO, SumCount> secondaryStorageSearch;
     private final SearchBuilder<VolumeVO> poolAndPathSearch;
+
     @Inject
     ReservationDao reservationDao;
     @Inject
-    ResourceTagDao _tagsDao;
+    ResourceTagDao tagsDao;
 
     // need to account for zone-wide primary storage where storage_pool has
     // null-value pod and cluster, where hypervisor information is stored in
@@ -503,7 +504,6 @@
         poolAndPathSearch.and("poolId", poolAndPathSearch.entity().getPoolId(), Op.EQ);
         poolAndPathSearch.and("path", poolAndPathSearch.entity().getPath(), Op.EQ);
         poolAndPathSearch.done();
-
     }
 
     @Override
@@ -741,7 +741,7 @@
         logger.debug(String.format("Removing volume %s from DB", id));
         VolumeVO entry = findById(id);
         if (entry != null) {
-            _tagsDao.removeByIdAndType(id, ResourceObjectType.Volume);
+            tagsDao.removeByIdAndType(id, ResourceObjectType.Volume);
         }
         boolean result = super.remove(id);
 
@@ -764,7 +764,7 @@
             destVol.setInstanceId(instanceId);
             update(srcVolId, srcVol);
             update(destVolId, destVol);
-            _tagsDao.updateResourceId(srcVolId, destVolId, ResourceObjectType.Volume);
+            tagsDao.updateResourceId(srcVolId, destVolId, ResourceObjectType.Volume);
         } catch (Exception e) {
             throw new CloudRuntimeException("Unable to persist the sequence number for this host");
         }
diff --git a/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDao.java b/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDao.java
index ac25eac..52bc5aa 100755
--- a/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDao.java
+++ b/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDao.java
@@ -168,4 +168,6 @@
 
     List<VMInstanceVO> searchRemovedByRemoveDate(final Date startDate, final Date endDate, final Long batchSize,
              List<Long> skippedVmIds);
+
+    Pair<List<VMInstanceVO>, Integer> listByVmsNotInClusterUsingPool(long clusterId, long poolId);
 }
diff --git a/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDaoImpl.java
index 78eb08a..8ab3bad 100755
--- a/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDaoImpl.java
@@ -24,6 +24,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
@@ -35,6 +36,8 @@
 import com.cloud.host.dao.HostDao;
 import com.cloud.hypervisor.Hypervisor;
 import com.cloud.server.ResourceTag.ResourceObjectType;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.VolumeDao;
 import com.cloud.tags.dao.ResourceTagDao;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.Pair;
@@ -97,11 +100,16 @@
     protected SearchBuilder<VMInstanceVO> NotMigratingSearch;
     protected SearchBuilder<VMInstanceVO> BackupSearch;
     protected SearchBuilder<VMInstanceVO> LastHostAndStatesSearch;
+    protected SearchBuilder<VMInstanceVO> VmsNotInClusterUsingPool;
 
     @Inject
-    ResourceTagDao _tagsDao;
+    ResourceTagDao tagsDao;
     @Inject
-    NicDao _nicDao;
+    NicDao nicDao;
+    @Inject
+    VolumeDao volumeDao;
+    @Inject
+    HostDao hostDao;
 
     protected Attribute _updateTimeAttr;
 
@@ -278,7 +286,7 @@
         _updateTimeAttr = _allAttributes.get("updateTime");
         assert _updateTimeAttr != null : "Couldn't get this updateTime attribute";
 
-        SearchBuilder<NicVO> nicSearch = _nicDao.createSearchBuilder();
+        SearchBuilder<NicVO> nicSearch = nicDao.createSearchBuilder();
         nicSearch.and("networkId", nicSearch.entity().getNetworkId(), SearchCriteria.Op.EQ);
         nicSearch.and("removedNic", nicSearch.entity().getRemoved(), SearchCriteria.Op.NULL);
 
@@ -307,6 +315,16 @@
         LastHostAndStatesSearch.and("states", LastHostAndStatesSearch.entity().getState(), Op.IN);
         LastHostAndStatesSearch.done();
 
+        VmsNotInClusterUsingPool = createSearchBuilder();
+        SearchBuilder<VolumeVO> volumeSearch = volumeDao.createSearchBuilder();
+        volumeSearch.and("poolId", volumeSearch.entity().getPoolId(), Op.EQ);
+        volumeSearch.and("removed", volumeSearch.entity().getRemoved(), Op.NULL);
+        VmsNotInClusterUsingPool.join("volumeSearch", volumeSearch, volumeSearch.entity().getInstanceId(), VmsNotInClusterUsingPool.entity().getId(), JoinType.INNER);
+        SearchBuilder<HostVO> hostSearch2 = hostDao.createSearchBuilder();
+        hostSearch2.and("clusterId", hostSearch2.entity().getClusterId(), SearchCriteria.Op.NEQ);
+        VmsNotInClusterUsingPool.join("hostSearch2", hostSearch2, hostSearch2.entity().getId(), VmsNotInClusterUsingPool.entity().getHostId(), JoinType.INNER);
+        VmsNotInClusterUsingPool.and("vmStates", VmsNotInClusterUsingPool.entity().getState(), Op.IN);
+        VmsNotInClusterUsingPool.done();
     }
 
     @Override
@@ -836,7 +854,7 @@
     public List<VMInstanceVO> listNonRemovedVmsByTypeAndNetwork(long networkId, VirtualMachine.Type... types) {
         if (NetworkTypeSearch == null) {
 
-            SearchBuilder<NicVO> nicSearch = _nicDao.createSearchBuilder();
+            SearchBuilder<NicVO> nicSearch = nicDao.createSearchBuilder();
             nicSearch.and("networkId", nicSearch.entity().getNetworkId(), SearchCriteria.Op.EQ);
 
             NetworkTypeSearch = createSearchBuilder();
@@ -873,7 +891,7 @@
         txn.start();
         VMInstanceVO vm = findById(id);
         if (vm != null && vm.getType() == Type.User) {
-            _tagsDao.removeByIdAndType(id, ResourceObjectType.UserVm);
+            tagsDao.removeByIdAndType(id, ResourceObjectType.UserVm);
         }
         boolean result = super.remove(id);
         txn.commit();
@@ -1040,4 +1058,14 @@
         Filter filter = new Filter(VMInstanceVO.class, "id", true, 0L, batchSize);
         return searchIncludingRemoved(sc, filter, null, false);
     }
+
+    public Pair<List<VMInstanceVO>, Integer> listByVmsNotInClusterUsingPool(long clusterId, long poolId) {
+        SearchCriteria<VMInstanceVO> sc = VmsNotInClusterUsingPool.create();
+        sc.setParameters("vmStates", State.Starting, State.Running, State.Stopping, State.Migrating, State.Restoring);
+        sc.setJoinParameters("volumeSearch", "poolId", poolId);
+        sc.setJoinParameters("hostSearch2", "clusterId", clusterId);
+        List<VMInstanceVO> vms = search(sc, null);
+        List<VMInstanceVO> uniqueVms = vms.stream().distinct().collect(Collectors.toList());
+        return new Pair<>(uniqueVms, uniqueVms.size());
+    }
 }
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java
index 8044a2d..e4c2693 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java
@@ -32,6 +32,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.springframework.stereotype.Component;
 
+import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.HostScope;
@@ -43,17 +44,20 @@
 import com.cloud.capacity.Capacity;
 import com.cloud.capacity.CapacityVO;
 import com.cloud.capacity.dao.CapacityDao;
-import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.ScopeType;
+import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePoolHostVO;
 import com.cloud.storage.StoragePoolStatus;
-import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.dao.StoragePoolHostDao;
 import com.cloud.utils.crypt.DBEncryptionUtil;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
 import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.exception.CloudRuntimeException;
 
 @Component
@@ -266,4 +270,48 @@
         return true;
     }
 
+    public void switchToZone(DataStore store, HypervisorType hypervisorType) {
+        StoragePoolVO pool = dataStoreDao.findById(store.getId());
+        CapacityVO capacity = _capacityDao.findByHostIdType(store.getId(), Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED);
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                pool.setScope(ScopeType.ZONE);
+                pool.setPodId(null);
+                pool.setClusterId(null);
+                pool.setHypervisor(hypervisorType);
+                dataStoreDao.update(pool.getId(), pool);
+
+                capacity.setPodId(null);
+                capacity.setClusterId(null);
+                _capacityDao.update(capacity.getId(), capacity);
+            }
+        });
+        logger.debug("Scope of storage pool id=" + pool.getId() + " is changed to zone");
+    }
+
+    public void switchToCluster(DataStore store, ClusterScope clusterScope) {
+        List<StoragePoolHostVO> hostPoolRecords = storagePoolHostDao.listByPoolIdNotInCluster(clusterScope.getScopeId(), store.getId()).first();
+        StoragePoolVO pool = dataStoreDao.findById(store.getId());
+        CapacityVO capacity = _capacityDao.findByHostIdType(store.getId(), Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED);
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                if (hostPoolRecords != null) {
+                    for (StoragePoolHostVO host : hostPoolRecords) {
+                        storagePoolHostDao.deleteStoragePoolHostDetails(host.getHostId(), host.getPoolId());
+                    }
+                }
+                pool.setScope(ScopeType.CLUSTER);
+                pool.setPodId(clusterScope.getPodId());
+                pool.setClusterId(clusterScope.getScopeId());
+                dataStoreDao.update(pool.getId(), pool);
+
+                capacity.setPodId(clusterScope.getPodId());
+                capacity.setClusterId(clusterScope.getScopeId());
+                _capacityDao.update(capacity.getId(), capacity);
+            }
+        });
+        logger.debug("Scope of storage pool id=" + pool.getId() + " is changed to cluster id=" + clusterScope.getScopeId());
+    }
 }
diff --git a/engine/storage/src/test/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelperTest.java b/engine/storage/src/test/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelperTest.java
new file mode 100644
index 0000000..3927b43
--- /dev/null
+++ b/engine/storage/src/test/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelperTest.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.volume.datastore;
+
+import java.util.List;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.cloud.capacity.Capacity;
+import com.cloud.capacity.CapacityVO;
+import com.cloud.capacity.dao.CapacityDao;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.storage.ScopeType;
+import com.cloud.storage.Storage;
+import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.utils.Pair;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PrimaryDataStoreHelperTest {
+
+    @Mock
+    private PrimaryDataStoreDao dataStoreDao;
+
+    @Mock
+    private CapacityDao capacityDao;
+
+    @Mock
+    private StoragePoolHostDao storagePoolHostDao;
+
+    @Spy
+    @InjectMocks
+    PrimaryDataStoreHelper dataStoreHelper;
+
+    private static final Long ZONE_ID = 1L;
+    private static final Long CLUSTER_ID = 2L;
+    private static final Long POD_ID = 3L;
+    private static final Long POOL_ID = 4L;
+    private static final Short capacityType = 0;
+    private static final Float usedPercentage = 0.0f;
+
+    @Test
+    public void testSwitchToZone() {
+        StoragePoolVO pool = new StoragePoolVO(POOL_ID, null, null, Storage.StoragePoolType.NetworkFilesystem, ZONE_ID, POD_ID, 0L, 0L, null, 0, null);
+        pool.setClusterId(CLUSTER_ID);
+        pool.setScope(ScopeType.CLUSTER);
+        CapacityVO capacity = new CapacityVO(ZONE_ID, POD_ID, CLUSTER_ID, capacityType, usedPercentage);
+
+        Mockito.when(dataStoreDao.findById(pool.getId())).thenReturn(pool);
+        Mockito.when(capacityDao.findByHostIdType(pool.getId(), Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED)).thenReturn(capacity);
+        DataStore storeMock = Mockito.mock(DataStore.class);
+        Mockito.when(storeMock.getId()).thenReturn(POOL_ID);
+
+        dataStoreHelper.switchToZone(storeMock, HypervisorType.KVM);
+
+        Assert.assertEquals(pool.getScope(), ScopeType.ZONE);
+        Assert.assertEquals(pool.getPodId(), null);
+        Assert.assertEquals(pool.getClusterId(), null);
+        Assert.assertEquals(pool.getHypervisor(), HypervisorType.KVM);
+        Assert.assertEquals(capacity.getPodId(), null);
+        Assert.assertEquals(capacity.getClusterId(), null);
+    }
+
+    @Test
+    public void testSwitchToCluster() {
+        StoragePoolVO pool = new StoragePoolVO(POOL_ID, null, null, Storage.StoragePoolType.NetworkFilesystem, ZONE_ID, null, 0L, 0L, null, 0, null);
+        pool.setScope(ScopeType.ZONE);
+        CapacityVO capacity = new CapacityVO(ZONE_ID, null, null, capacityType, usedPercentage);
+        ClusterScope clusterScope = new ClusterScope(CLUSTER_ID, POD_ID, ZONE_ID);
+
+        Pair<List<StoragePoolHostVO>, Integer> hostPoolRecords = new Pair<>(null, 0);
+        Mockito.when(storagePoolHostDao.listByPoolIdNotInCluster(CLUSTER_ID, POOL_ID)).thenReturn(hostPoolRecords);
+        Mockito.when(dataStoreDao.findById(pool.getId())).thenReturn(pool);
+        Mockito.when(capacityDao.findByHostIdType(pool.getId(), Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED)).thenReturn(capacity);
+        DataStore storeMock = Mockito.mock(DataStore.class);
+        Mockito.when(storeMock.getId()).thenReturn(POOL_ID);
+
+        dataStoreHelper.switchToCluster(storeMock, clusterScope);
+
+        Mockito.verify(storagePoolHostDao, Mockito.never()).deleteStoragePoolHostDetails(Mockito.anyLong(), Mockito.anyLong());
+
+        Assert.assertEquals(pool.getScope(), ScopeType.CLUSTER);
+        Assert.assertEquals(pool.getPodId(), POD_ID);
+        Assert.assertEquals(pool.getClusterId(), CLUSTER_ID);
+        Assert.assertEquals(capacity.getPodId(), POD_ID);
+        Assert.assertEquals(capacity.getClusterId(), CLUSTER_ID);
+    }
+}
diff --git a/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/BasePrimaryDataStoreLifeCycleImpl.java b/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/BasePrimaryDataStoreLifeCycleImpl.java
new file mode 100644
index 0000000..adc74a7
--- /dev/null
+++ b/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/BasePrimaryDataStoreLifeCycleImpl.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.lifecycle;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.storage.volume.datastore.PrimaryDataStoreHelper;
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.DeleteStoragePoolCommand;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.resource.ResourceManager;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StoragePool;
+import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.utils.Pair;
+
+public class BasePrimaryDataStoreLifeCycleImpl {
+    private static final Logger s_logger = Logger.getLogger(BasePrimaryDataStoreLifeCycleImpl.class);
+    @Inject
+    AgentManager agentMgr;
+    @Inject
+    protected ResourceManager resourceMgr;
+    @Inject
+    StorageManager storageMgr;
+    @Inject
+    PrimaryDataStoreHelper dataStoreHelper;
+    @Inject
+    protected HostDao hostDao;
+    @Inject
+    protected StoragePoolHostDao storagePoolHostDao;
+
+    private List<HostVO> getPoolHostsList(ClusterScope clusterScope, HypervisorType hypervisorType) {
+        List<HostVO> hosts;
+        if (hypervisorType != null) {
+             hosts = resourceMgr
+                    .listAllHostsInOneZoneNotInClusterByHypervisor(hypervisorType, clusterScope.getZoneId(), clusterScope.getScopeId());
+        } else {
+            List<HypervisorType> hypervisorTypes = Arrays.asList(HypervisorType.KVM, HypervisorType.VMware);
+            hosts = resourceMgr
+                    .listAllHostsInOneZoneNotInClusterByHypervisors(hypervisorTypes, clusterScope.getZoneId(), clusterScope.getScopeId());
+        }
+        return hosts;
+    }
+
+    public void changeStoragePoolScopeToZone(DataStore store, ClusterScope clusterScope, HypervisorType hypervisorType) {
+        List<HostVO> hosts = getPoolHostsList(clusterScope, hypervisorType);
+        s_logger.debug("Changing scope of the storage pool to Zone");
+        if (hosts != null) {
+            for (HostVO host : hosts) {
+                try {
+                    storageMgr.connectHostToSharedPool(host.getId(), store.getId());
+                } catch (Exception e) {
+                    s_logger.warn("Unable to establish a connection between " + host + " and " + store, e);
+                }
+            }
+        }
+        dataStoreHelper.switchToZone(store, hypervisorType);
+    }
+
+    public void changeStoragePoolScopeToCluster(DataStore store, ClusterScope clusterScope, HypervisorType hypervisorType) {
+        Pair<List<StoragePoolHostVO>, Integer> hostPoolRecords = storagePoolHostDao.listByPoolIdNotInCluster(clusterScope.getScopeId(), store.getId());
+        s_logger.debug("Changing scope of the storage pool to Cluster");
+        if (hostPoolRecords.second() > 0) {
+            StoragePool pool = (StoragePool) store;
+            for (StoragePoolHostVO host : hostPoolRecords.first()) {
+                DeleteStoragePoolCommand deleteCmd = new DeleteStoragePoolCommand(pool);
+                final Answer answer = agentMgr.easySend(host.getHostId(), deleteCmd);
+
+                if (answer != null) {
+                    if (!answer.getResult()) {
+                        s_logger.debug("Failed to delete storage pool: " + answer.getResult());
+                    } else if (HypervisorType.KVM != hypervisorType) {
+                        break;
+                    }
+                }
+            }
+        }
+        dataStoreHelper.switchToCluster(store, clusterScope);
+    }
+}
diff --git a/engine/storage/volume/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/BasePrimaryDataStoreLifeCycleImplTest.java b/engine/storage/volume/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/BasePrimaryDataStoreLifeCycleImplTest.java
new file mode 100644
index 0000000..355eb07
--- /dev/null
+++ b/engine/storage/volume/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/BasePrimaryDataStoreLifeCycleImplTest.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.lifecycle;
+
+import static org.mockito.ArgumentMatchers.eq;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
+import org.apache.cloudstack.storage.datastore.PrimaryDataStoreImpl;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.volume.datastore.PrimaryDataStoreHelper;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.DeleteStoragePoolCommand;
+import com.cloud.host.HostVO;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.resource.ResourceManager;
+import com.cloud.storage.Storage;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.utils.Pair;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BasePrimaryDataStoreLifeCycleImplTest {
+
+    @Mock
+    private StoragePoolHostDao storagePoolHostDao;
+
+    @Mock
+    private PrimaryDataStoreHelper dataStoreHelper;
+
+    @Mock
+    private AgentManager agentManager;
+
+    @Mock
+    private ResourceManager resourceManager;
+
+    @Mock
+    private StorageManager storageManager;
+
+    @Spy
+    @InjectMocks
+    private BasePrimaryDataStoreLifeCycleImpl dataStoreLifeCycle;
+
+    private static final Long POOL_ID = 1L;
+    private static final Long CLUSTER_ID = 2L;
+    private static final Long POD_ID = 3L;
+    private static final Long ZONE_ID = 4L;
+    private static final Long HOST_ID = 5L;
+
+    private static ClusterScope clusterScope;
+    private static PrimaryDataStoreImpl store;
+
+
+    @BeforeClass
+    public static void init() {
+        clusterScope = new ClusterScope(CLUSTER_ID, POD_ID, ZONE_ID);
+        StoragePoolVO pool = new StoragePoolVO(POOL_ID, null, null, Storage.StoragePoolType.NetworkFilesystem, 0L, 0L, 0L, 0L, null, 0, null);
+        store = new PrimaryDataStoreImpl();
+        store.configure(pool, null, null);
+    }
+
+    @Test
+    public void testChangeStoragePoolScopeToZone() throws Exception {
+        Mockito.when(resourceManager.listAllHostsInOneZoneNotInClusterByHypervisor(HypervisorType.KVM, ZONE_ID, CLUSTER_ID)).thenReturn(null);
+
+        dataStoreLifeCycle.changeStoragePoolScopeToZone(store, clusterScope, HypervisorType.KVM);
+
+        Mockito.verify(dataStoreHelper, Mockito.times(1)).switchToZone(store, HypervisorType.KVM);
+
+        HostVO host = new HostVO(null);
+        ReflectionTestUtils.setField(host, "id", HOST_ID);
+        List<HypervisorType> hypervisorTypes = Arrays.asList(HypervisorType.KVM, HypervisorType.VMware);
+        Mockito.when(resourceManager.listAllHostsInOneZoneNotInClusterByHypervisors(hypervisorTypes, ZONE_ID, CLUSTER_ID)).thenReturn(Arrays.asList(host));
+        Mockito.when(storageManager.connectHostToSharedPool(HOST_ID, POOL_ID)).thenReturn(true);
+
+        dataStoreLifeCycle.changeStoragePoolScopeToZone(store, clusterScope, null);
+
+        Mockito.verify(dataStoreHelper, Mockito.times(1)).switchToZone(store, null);
+    }
+
+    @Test
+    public void testChangeStoragePoolScopeToCluster() {
+        Pair<List<StoragePoolHostVO>, Integer> hostPoolRecords = new Pair<>(null, 0);
+        Mockito.when(storagePoolHostDao.listByPoolIdNotInCluster(CLUSTER_ID, POOL_ID)).thenReturn(hostPoolRecords);
+        Mockito.doNothing().when(dataStoreHelper).switchToCluster(store, clusterScope);
+
+        dataStoreLifeCycle.changeStoragePoolScopeToCluster(store, clusterScope, HypervisorType.KVM);
+
+        hostPoolRecords.set(Arrays.asList(new StoragePoolHostVO(POOL_ID, HOST_ID, null)), 1);
+        Answer answer = new Answer(null, false, null);
+        Mockito.when(storagePoolHostDao.listByPoolIdNotInCluster(CLUSTER_ID, POOL_ID)).thenReturn(hostPoolRecords);
+        Mockito.when(agentManager.easySend(eq(HOST_ID), Mockito.any(DeleteStoragePoolCommand.class))).thenReturn(answer);
+
+        dataStoreLifeCycle.changeStoragePoolScopeToCluster(store, clusterScope, HypervisorType.KVM);
+
+        Mockito.verify(dataStoreHelper, Mockito.times(2)).switchToCluster(store, clusterScope);
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapper.java
new file mode 100644
index 0000000..79afd46
--- /dev/null
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapper.java
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.PrepareStorageClientAnswer;
+import com.cloud.agent.api.PrepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.cloud.utils.Ternary;
+
+@ResourceWrapper(handles = PrepareStorageClientCommand.class)
+public class LibvirtPrepareStorageClientCommandWrapper extends CommandWrapper<PrepareStorageClientCommand, Answer, LibvirtComputingResource> {
+
+    private static final Logger s_logger = Logger.getLogger(LibvirtPrepareStorageClientCommandWrapper.class);
+
+    @Override
+    public Answer execute(PrepareStorageClientCommand cmd, LibvirtComputingResource libvirtComputingResource) {
+        final KVMStoragePoolManager storagePoolMgr = libvirtComputingResource.getStoragePoolMgr();
+        Ternary<Boolean, Map<String, String>, String> prepareStorageClientResult = storagePoolMgr.prepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid(), cmd.getDetails());
+        if (!prepareStorageClientResult.first()) {
+            String msg = prepareStorageClientResult.third();
+            s_logger.debug("Unable to prepare storage client, due to: " + msg);
+            return new PrepareStorageClientAnswer(cmd, false, msg);
+        }
+        Map<String, String> details = prepareStorageClientResult.second();
+        return new PrepareStorageClientAnswer(cmd, true, details);
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapper.java
new file mode 100644
index 0000000..f98782f
--- /dev/null
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapper.java
@@ -0,0 +1,49 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.UnprepareStorageClientAnswer;
+import com.cloud.agent.api.UnprepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.cloud.utils.Pair;
+
+@ResourceWrapper(handles = UnprepareStorageClientCommand.class)
+public class LibvirtUnprepareStorageClientCommandWrapper extends CommandWrapper<UnprepareStorageClientCommand, Answer, LibvirtComputingResource> {
+
+    private static final Logger s_logger = Logger.getLogger(LibvirtUnprepareStorageClientCommandWrapper.class);
+
+    @Override
+    public Answer execute(UnprepareStorageClientCommand cmd, LibvirtComputingResource libvirtComputingResource) {
+        final KVMStoragePoolManager storagePoolMgr = libvirtComputingResource.getStoragePoolMgr();
+        Pair<Boolean, String> unprepareStorageClientResult = storagePoolMgr.unprepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid());
+        if (!unprepareStorageClientResult.first()) {
+            String msg = unprepareStorageClientResult.second();
+            s_logger.debug("Couldn't unprepare storage client, due to: " + msg);
+            return new UnprepareStorageClientAnswer(cmd, false, msg);
+        }
+        return new UnprepareStorageClientAnswer(cmd, true);
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
index 27f70b7..3c8026c 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
@@ -45,6 +45,8 @@
 import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StorageLayer;
 import com.cloud.storage.Volume;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.VirtualMachine;
 
@@ -475,4 +477,13 @@
         return adaptor.createTemplateFromDirectDownloadFile(templateFilePath, destTemplatePath, destPool, format, timeout);
     }
 
+    public Ternary<Boolean, Map<String, String>, String> prepareStorageClient(StoragePoolType type, String uuid, Map<String, String> details) {
+        StorageAdaptor adaptor = getStorageAdaptor(type);
+        return adaptor.prepareStorageClient(type, uuid, details);
+    }
+
+    public Pair<Boolean, String> unprepareStorageClient(StoragePoolType type, String uuid) {
+        StorageAdaptor adaptor = getStorageAdaptor(type);
+        return adaptor.unprepareStorageClient(type, uuid);
+    }
 }
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
index 1b66067..b33f494 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
 import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
 import org.apache.cloudstack.utils.cryptsetup.CryptSetup;
 import org.apache.cloudstack.utils.cryptsetup.CryptSetupException;
@@ -43,6 +44,8 @@
 
 import com.cloud.storage.Storage;
 import com.cloud.storage.StorageManager;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.script.OutputInterpreter;
 import com.cloud.utils.script.Script;
@@ -564,6 +567,67 @@
         qemu.resize(options, objects, usableSizeBytes);
     }
 
+    public Ternary<Boolean, Map<String, String>, String> prepareStorageClient(Storage.StoragePoolType type, String uuid, Map<String, String> details) {
+        if (!ScaleIOUtil.isSDCServiceInstalled()) {
+            logger.debug("SDC service not installed on host, preparing the SDC client not possible");
+            return new Ternary<>(false, null, "SDC service not installed on host");
+        }
+
+        if (!ScaleIOUtil.isSDCServiceEnabled()) {
+            logger.debug("SDC service not enabled on host, enabling it");
+            if (!ScaleIOUtil.enableSDCService()) {
+                return new Ternary<>(false, null, "SDC service not enabled on host");
+            }
+        }
+
+        if (!ScaleIOUtil.isSDCServiceActive()) {
+            if (!ScaleIOUtil.startSDCService()) {
+                return new Ternary<>(false, null, "Couldn't start SDC service on host");
+            }
+        } else if (!ScaleIOUtil.restartSDCService()) {
+            return new Ternary<>(false, null, "Couldn't restart SDC service on host");
+        }
+
+        return new Ternary<>( true, getSDCDetails(details), "Prepared client successfully");
+    }
+
+    public Pair<Boolean, String> unprepareStorageClient(Storage.StoragePoolType type, String uuid) {
+        if (!ScaleIOUtil.isSDCServiceInstalled()) {
+            logger.debug("SDC service not installed on host, no need to unprepare the SDC client");
+            return new Pair<>(true, "SDC service not installed on host, no need to unprepare the SDC client");
+        }
+
+        if (!ScaleIOUtil.isSDCServiceEnabled()) {
+            logger.debug("SDC service not enabled on host, no need to unprepare the SDC client");
+            return new Pair<>(true, "SDC service not enabled on host, no need to unprepare the SDC client");
+        }
+
+        if (!ScaleIOUtil.stopSDCService()) {
+            return new Pair<>(false, "Couldn't stop SDC service on host");
+        }
+
+        return new Pair<>(true, "Unprepared SDC client successfully");
+    }
+
+    private Map<String, String> getSDCDetails(Map<String, String> details) {
+        Map<String, String> sdcDetails = new HashMap<String, String>();
+        if (details == null || !details.containsKey(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID))  {
+            return sdcDetails;
+        }
+
+        String storageSystemId = details.get(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID);
+        String sdcId = ScaleIOUtil.getSdcId(storageSystemId);
+        if (sdcId != null) {
+            sdcDetails.put(ScaleIOGatewayClient.SDC_ID, sdcId);
+        } else {
+            String sdcGuId = ScaleIOUtil.getSdcGuid();
+            if (sdcGuId != null) {
+                sdcDetails.put(ScaleIOGatewayClient.SDC_GUID, sdcGuId);
+            }
+        }
+        return sdcDetails;
+    }
+
     /**
      * Calculates usable size from raw size, assuming qcow2 requires 192k/1GB for metadata
      * We also remove 128MiB for encryption/fragmentation/safety factor.
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
index 6d58dcc..9a27d44 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
@@ -16,6 +16,7 @@
 // under the License.
 package com.cloud.hypervisor.kvm.storage;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -23,6 +24,8 @@
 
 import com.cloud.storage.Storage;
 import com.cloud.storage.Storage.StoragePoolType;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
 
 public interface StorageAdaptor {
 
@@ -114,4 +117,25 @@
     default boolean supportsPhysicalDiskCopy(StoragePoolType type) {
         return StoragePoolType.PowerFlex == type;
     }
+
+    /**
+     * Prepares the storage client.
+     * @param type type of the storage pool
+     * @param uuid uuid of the storage pool
+     * @param details any details of the storage pool that are required for client preparation
+     * @return status, client details, & message in case failed
+     */
+    default Ternary<Boolean, Map<String, String>, String> prepareStorageClient(StoragePoolType type, String uuid, Map<String, String> details) {
+        return new Ternary<>(true, new HashMap<>(), "");
+    }
+
+    /**
+     * Unprepares the storage client.
+     * @param type type of the storage pool
+     * @param uuid uuid of the storage pool
+     * @return status, & message in case failed
+     */
+    default Pair<Boolean, String> unprepareStorageClient(StoragePoolType type, String uuid) {
+        return new Pair<>(true, "");
+    }
 }
diff --git a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapperTest.java b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapperTest.java
new file mode 100644
index 0000000..e7dffee
--- /dev/null
+++ b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapperTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.cloud.agent.api.PrepareStorageClientAnswer;
+import com.cloud.agent.api.PrepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.storage.Storage;
+import com.cloud.utils.Ternary;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LibvirtPrepareStorageClientCommandWrapperTest {
+
+    @Spy
+    LibvirtPrepareStorageClientCommandWrapper libvirtPrepareStorageClientCommandWrapperSpy = Mockito.spy(LibvirtPrepareStorageClientCommandWrapper.class);
+
+    @Mock
+    LibvirtComputingResource libvirtComputingResourceMock;
+
+    private final static String poolUuid = "345fc603-2d7e-47d2-b719-a0110b3732e6";
+    private final static String systemId = "218ce1797566a00f";
+    private final static String sdcId = "301b852c00000003";
+
+    @Test
+    public void testPrepareStorageClientSuccess() {
+        Map<String, String> details = new HashMap<>();
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+        PrepareStorageClientCommand cmd = Mockito.mock(PrepareStorageClientCommand.class);
+        Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+        Mockito.when(cmd.getDetails()).thenReturn(details);
+
+        KVMStoragePoolManager storagePoolMgr = Mockito.mock(KVMStoragePoolManager.class);
+        Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        details.put(ScaleIOGatewayClient.SDC_ID, sdcId);
+        Mockito.when(storagePoolMgr.prepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid(), cmd.getDetails())).thenReturn(new Ternary<>(true, details, ""));
+
+        PrepareStorageClientAnswer result = (PrepareStorageClientAnswer) libvirtPrepareStorageClientCommandWrapperSpy.execute(cmd, libvirtComputingResourceMock);
+
+        Assert.assertTrue(result.getResult());
+        Assert.assertEquals(sdcId, result.getDetailsMap().get(ScaleIOGatewayClient.SDC_ID));
+    }
+
+    @Test
+    public void testPrepareStorageClientFailure() {
+        Map<String, String> details = new HashMap<>();
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+        PrepareStorageClientCommand cmd = Mockito.mock(PrepareStorageClientCommand.class);
+        Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+        Mockito.when(cmd.getDetails()).thenReturn(details);
+
+        KVMStoragePoolManager storagePoolMgr = Mockito.mock(KVMStoragePoolManager.class);
+        Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        Mockito.when(storagePoolMgr.prepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid(), cmd.getDetails())).thenReturn(new Ternary<>(false, new HashMap<>() , "Prepare storage client failed"));
+
+        PrepareStorageClientAnswer result = (PrepareStorageClientAnswer) libvirtPrepareStorageClientCommandWrapperSpy.execute(cmd, libvirtComputingResourceMock);
+
+        Assert.assertFalse(result.getResult());
+        Assert.assertEquals("Prepare storage client failed", result.getDetails());
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapperTest.java b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapperTest.java
new file mode 100644
index 0000000..7409b28
--- /dev/null
+++ b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapperTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.cloud.agent.api.UnprepareStorageClientAnswer;
+import com.cloud.agent.api.UnprepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.storage.Storage;
+import com.cloud.utils.Pair;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LibvirtUnprepareStorageClientCommandWrapperTest {
+
+    @Spy
+    LibvirtUnprepareStorageClientCommandWrapper libvirtUnprepareStorageClientCommandWrapperSpy = Mockito.spy(LibvirtUnprepareStorageClientCommandWrapper.class);
+
+    @Mock
+    LibvirtComputingResource libvirtComputingResourceMock;
+
+    private final static String poolUuid = "345fc603-2d7e-47d2-b719-a0110b3732e6";
+
+    @Test
+    public void testUnprepareStorageClientSuccess() {
+        UnprepareStorageClientCommand cmd = Mockito.mock(UnprepareStorageClientCommand.class);
+        Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+
+        KVMStoragePoolManager storagePoolMgr = Mockito.mock(KVMStoragePoolManager.class);
+        Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        Mockito.when(storagePoolMgr.unprepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid())).thenReturn(new Pair<>(true, ""));
+
+        UnprepareStorageClientAnswer result = (UnprepareStorageClientAnswer) libvirtUnprepareStorageClientCommandWrapperSpy.execute(cmd, libvirtComputingResourceMock);
+
+        Assert.assertTrue(result.getResult());
+    }
+
+    @Test
+    public void testUnprepareStorageClientFailure() {
+        UnprepareStorageClientCommand cmd = Mockito.mock(UnprepareStorageClientCommand.class);
+        Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+
+        KVMStoragePoolManager storagePoolMgr = Mockito.mock(KVMStoragePoolManager.class);
+        Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        Mockito.when(storagePoolMgr.unprepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid())).thenReturn(new Pair<>(false, "Unprepare storage client failed"));
+
+        UnprepareStorageClientAnswer result = (UnprepareStorageClientAnswer) libvirtUnprepareStorageClientCommandWrapperSpy.execute(cmd, libvirtComputingResourceMock);
+
+        Assert.assertFalse(result.getResult());
+        Assert.assertEquals("Unprepare storage client failed", result.getDetails());
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java
index 25fab1a..07aea0c 100644
--- a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java
+++ b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java
@@ -17,13 +17,50 @@
 
 package com.cloud.hypervisor.kvm.storage;
 
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
+import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import com.cloud.storage.Storage;
+import com.cloud.storage.StorageLayer;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
+import com.cloud.utils.script.Script;
+
 @RunWith(MockitoJUnitRunner.class)
 public class ScaleIOStorageAdaptorTest {
+
+    @Mock
+    StorageLayer storageLayer;
+    ScaleIOStorageAdaptor scaleIOStorageAdaptor;
+
+    private final static String poolUuid = "345fc603-2d7e-47d2-b719-a0110b3732e6";
+    private static MockedStatic<Script> mockedScript;
+
+    @Before
+    public void setUp() {
+        mockedScript = Mockito.mockStatic(Script.class);
+        scaleIOStorageAdaptor = Mockito.spy(ScaleIOStorageAdaptor.class);
+    }
+
+    @After
+    public void tearDown() {
+        mockedScript.close();
+    }
+
     @Test
     public void getUsableBytesFromRawBytesTest() {
         Assert.assertEquals("Overhead calculated for 8Gi size", 8454111232L, ScaleIOStorageAdaptor.getUsableBytesFromRawBytes(8L << 30));
@@ -31,4 +68,158 @@
         Assert.assertEquals("Overhead calculated for 500Gi size", 536636342272L, ScaleIOStorageAdaptor.getUsableBytesFromRawBytes(500L << 30));
         Assert.assertEquals("Unsupported small size", 0, ScaleIOStorageAdaptor.getUsableBytesFromRawBytes(1L));
     }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotInstalled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(4);
+
+        Ternary<Boolean, Map<String, String>, String> result = scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("SDC service not installed on host", result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotEnabled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-enabled scini"))).thenReturn(1);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl enable scini"))).thenReturn(1);
+
+        Ternary<Boolean, Map<String, String>, String> result = scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("SDC service not enabled on host", result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotRestarted() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-active scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl restart scini"))).thenReturn(1);
+
+        Ternary<Boolean, Map<String, String>, String> result = scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("Couldn't restart SDC service on host", result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceRestarted() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-active scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl restart scini"))).thenReturn(0);
+
+        Ternary<Boolean, Map<String, String>, String> result = scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid, new HashMap<>());
+
+        Assert.assertTrue(result.first());
+        Assert.assertNotNull(result.second());
+        Assert.assertTrue(result.second().isEmpty());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotStarted() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-active scini"))).thenReturn(1);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl start scini"))).thenReturn(1);
+
+        Ternary<Boolean, Map<String, String>, String> result = scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("Couldn't start SDC service on host", result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceStartedReturnSDCId() {
+        Map<String, String> details = new HashMap<>();
+        String systemId = "218ce1797566a00f";
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+
+        try (MockedStatic<ScaleIOUtil> ignored = Mockito.mockStatic(ScaleIOUtil.class)) {
+            when(ScaleIOUtil.isSDCServiceInstalled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceEnabled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceActive()).thenReturn(false);
+            when(ScaleIOUtil.startSDCService()).thenReturn(true);
+            String sdcId = "301b852c00000003";
+            when(ScaleIOUtil.getSdcId(systemId)).thenReturn(sdcId);
+
+            Ternary<Boolean, Map<String, String>, String> result = scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid, details);
+
+            Assert.assertTrue(result.first());
+            Assert.assertNotNull(result.second());
+            Assert.assertEquals(sdcId, result.second().get(ScaleIOGatewayClient.SDC_ID));
+        }
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceStartedReturnSDCGuid() {
+        Map<String, String> details = new HashMap<>();
+        String systemId = "218ce1797566a00f";
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+
+        String sdcGuid = "B0E3BFB8-C20B-43BF-93C8-13339E85AA50";
+        try (MockedStatic<ScaleIOUtil> ignored = Mockito.mockStatic(ScaleIOUtil.class)) {
+            when(ScaleIOUtil.isSDCServiceInstalled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceEnabled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceActive()).thenReturn(false);
+            when(ScaleIOUtil.startSDCService()).thenReturn(true);
+            when(ScaleIOUtil.getSdcId(systemId)).thenReturn(null);
+            when(ScaleIOUtil.getSdcGuid()).thenReturn(sdcGuid);
+
+            Ternary<Boolean, Map<String, String>, String> result = scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid, details);
+            Assert.assertTrue(result.first());
+            Assert.assertNotNull(result.second());
+            Assert.assertEquals(sdcGuid, result.second().get(ScaleIOGatewayClient.SDC_GUID));
+        }
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceNotInstalled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(4);
+
+        Pair<Boolean, String> result = scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid);
+
+        Assert.assertTrue(result.first());
+        Assert.assertEquals("SDC service not installed on host, no need to unprepare the SDC client", result.second());
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceNotEnabled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-enabled scini"))).thenReturn(1);
+
+        Pair<Boolean, String> result = scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid);
+
+        Assert.assertTrue(result.first());
+        Assert.assertEquals("SDC service not enabled on host, no need to unprepare the SDC client", result.second());
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceNotStopped() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl stop scini"))).thenReturn(1);
+
+        Pair<Boolean, String> result = scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid);
+
+        Assert.assertFalse(result.first());
+        Assert.assertEquals("Couldn't stop SDC service on host", result.second());
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceStopped() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl stop scini"))).thenReturn(0);
+
+        Pair<Boolean, String> result = scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, poolUuid);
+
+        Assert.assertTrue(result.first());
+    }
 }
diff --git a/plugins/storage/volume/adaptive/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/AdaptiveDataStoreLifeCycleImpl.java b/plugins/storage/volume/adaptive/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/AdaptiveDataStoreLifeCycleImpl.java
index c9e4798..86c3bfa 100644
--- a/plugins/storage/volume/adaptive/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/AdaptiveDataStoreLifeCycleImpl.java
+++ b/plugins/storage/volume/adaptive/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/AdaptiveDataStoreLifeCycleImpl.java
@@ -63,7 +63,7 @@
 /**
  * Manages the lifecycle of a Managed Data Store in CloudStack
  */
-public class AdaptiveDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
+public class AdaptiveDataStoreLifeCycleImpl extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     @Inject
     private PrimaryDataStoreDao _storagePoolDao;
     protected Logger logger = LogManager.getLogger(getClass());
diff --git a/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java
index 7324bb6..01058d7 100644
--- a/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java
@@ -66,7 +66,7 @@
 import com.cloud.storage.dao.StoragePoolHostDao;
 import com.cloud.utils.exception.CloudRuntimeException;
 
-public class ElastistorPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycle {
+public class ElastistorPrimaryDataStoreLifeCycle extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
index ca1487d..3944cdf 100644
--- a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
@@ -20,11 +20,11 @@
 
 import com.cloud.agent.api.StoragePoolInfo;
 import com.cloud.capacity.CapacityManager;
+import com.cloud.dc.ClusterDetailsDao;
 import com.cloud.dc.ClusterVO;
 import com.cloud.dc.DataCenterVO;
-import com.cloud.dc.dao.DataCenterDao;
-import com.cloud.dc.ClusterDetailsDao;
 import com.cloud.dc.dao.ClusterDao;
+import com.cloud.dc.dao.DataCenterDao;
 import com.cloud.host.Host;
 import com.cloud.host.HostVO;
 import com.cloud.host.dao.HostDao;
@@ -43,10 +43,10 @@
 import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.HostScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreLifeCycle;
 import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreParameters;
 import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
-import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.storage.datastore.util.DateraUtil;
@@ -59,7 +59,7 @@
 import java.util.List;
 import java.util.Map;
 
-public class DateraPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycle {
+public class DateraPrimaryDataStoreLifeCycle extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject
@@ -396,6 +396,15 @@
         dataStoreHelper.disable(dataStore);
     }
 
+    @Override
+    public void changeStoragePoolScopeToZone(DataStore store, ClusterScope clusterScope, HypervisorType hypervisorType) {
+        /*
+         * We need to attach all VMware, Xenserver and KVM hosts in the zone.
+         * So pass hypervisorType as null.
+         */
+        super.changeStoragePoolScopeToZone(store, clusterScope, null);
+    }
+
     private HypervisorType getHypervisorTypeForCluster(long clusterId) {
         ClusterVO cluster = _clusterDao.findById(clusterId);
 
diff --git a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
index 3b0add2..9600f06 100644
--- a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
+++ b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
@@ -73,7 +73,7 @@
 import java.util.Map;
 import java.util.UUID;
 
-public class CloudStackPrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
+public class CloudStackPrimaryDataStoreLifeCycleImpl extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
     @Inject
     protected ResourceManager _resourceMgr;
diff --git a/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java b/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java
index b33fa17..7b3f8db 100644
--- a/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java
+++ b/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java
@@ -54,7 +54,7 @@
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
 
-public class LinstorPrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
+public class LinstorPrimaryDataStoreLifeCycleImpl extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject
diff --git a/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java
index c1d3668..57cd4de 100644
--- a/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java
@@ -30,6 +30,7 @@
 import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreLifeCycle;
 import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreParameters;
 import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
+import org.apache.cloudstack.storage.datastore.lifecycle.BasePrimaryDataStoreLifeCycleImpl;
 import org.apache.cloudstack.storage.datastore.util.NexentaUtil;
 import org.apache.cloudstack.storage.volume.datastore.PrimaryDataStoreHelper;
 import org.apache.logging.log4j.Logger;
@@ -46,6 +47,7 @@
 import com.cloud.storage.StoragePoolAutomation;
 
 public class NexentaPrimaryDataStoreLifeCycle
+        extends BasePrimaryDataStoreLifeCycleImpl
         implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
@@ -178,6 +180,15 @@
     }
 
     @Override
+    public void changeStoragePoolScopeToZone(DataStore store, ClusterScope clusterScope, Hypervisor.HypervisorType hypervisorType) {
+        /*
+         * We need to attach all VMware, Xenserver and KVM hosts in the zone.
+         * So pass hypervisorType as null.
+         */
+        super.changeStoragePoolScopeToZone(store, clusterScope, null);
+    }
+
+    @Override
     public boolean deleteDataStore(DataStore store) {
         return dataStoreHelper.deletePrimaryDataStore(store);
     }
diff --git a/plugins/storage/volume/sample/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SamplePrimaryDataStoreLifeCycleImpl.java b/plugins/storage/volume/sample/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SamplePrimaryDataStoreLifeCycleImpl.java
index 3a0ce83..24d2772 100644
--- a/plugins/storage/volume/sample/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SamplePrimaryDataStoreLifeCycleImpl.java
+++ b/plugins/storage/volume/sample/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SamplePrimaryDataStoreLifeCycleImpl.java
@@ -146,4 +146,12 @@
     @Override
     public void disableStoragePool(DataStore store) {
     }
+
+    @Override
+    public void changeStoragePoolScopeToZone(DataStore store, ClusterScope clusterScope, HypervisorType hypervisorType) {
+    }
+
+    @Override
+    public void changeStoragePoolScopeToCluster(DataStore store, ClusterScope clusterScope, HypervisorType hypervisorType) {
+    }
 }
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java
index 73b69bd..fd2b93b 100644
--- a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java
@@ -79,6 +79,7 @@
     VolumeStatistics getVolumeStatistics(String volumeId);
     String getSystemId(String protectionDomainId);
     List<Volume> listVolumesInStoragePool(String poolId);
+    List<Volume> listVolumesMappedToSdc(String sdcId);
 
     // SDC APIs
     List<Sdc> listSdcs();
@@ -86,6 +87,7 @@
     String getSdcIdByGuid(String sdcGuid);
     Sdc getSdcByIp(String ipAddress);
     Sdc getConnectedSdcByIp(String ipAddress);
+    int getConnectedSdcsCount();
     boolean haveConnectedSdcs();
     boolean isSdcConnected(String sdcId);
     boolean isSdcConnectedByIP(String ipAddress);
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java
index 32c717b..62581ca 100644
--- a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java
@@ -1004,6 +1004,17 @@
         return new ArrayList<>();
     }
 
+    @Override
+    public List<Volume> listVolumesMappedToSdc(String sdcId) {
+        Preconditions.checkArgument(StringUtils.isNotEmpty(sdcId), "SDC id cannot be null");
+
+        Volume[] volumes = get("/instances/Sdc::" + sdcId + "/relationships/Volume", Volume[].class);
+        if (volumes != null) {
+            return Arrays.asList(volumes);
+        }
+        return new ArrayList<>();
+    }
+
     ///////////////////////////////////////////////
     //////////////// SDC APIs /////////////////////
     ///////////////////////////////////////////////
@@ -1063,6 +1074,21 @@
     }
 
     @Override
+    public int getConnectedSdcsCount() {
+        List<Sdc> sdcs = listSdcs();
+        int connectedSdcsCount = 0;
+        if(sdcs != null) {
+            for (Sdc sdc : sdcs) {
+                if (MDM_CONNECTED_STATE.equalsIgnoreCase(sdc.getMdmConnectionState())) {
+                    connectedSdcsCount++;
+                }
+            }
+        }
+
+        return connectedSdcsCount;
+    }
+
+    @Override
     public boolean haveConnectedSdcs() {
         List<Sdc> sdcs = listSdcs();
         if(sdcs != null) {
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
index 9ddfd5b..8044e78 100644
--- a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
@@ -56,6 +56,8 @@
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.datastore.manager.ScaleIOSDCManager;
+import org.apache.cloudstack.storage.datastore.manager.ScaleIOSDCManagerImpl;
 import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
 import org.apache.cloudstack.storage.to.SnapshotObjectTO;
 import org.apache.cloudstack.storage.to.VolumeObjectTO;
@@ -100,6 +102,7 @@
 import com.cloud.storage.dao.VolumeDetailsDao;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
@@ -142,9 +145,10 @@
     private VolumeService volumeService;
     @Inject
     private VolumeOrchestrationService volumeMgr;
+    private ScaleIOSDCManager sdcManager;
 
     public ScaleIOPrimaryDataStoreDriver() {
-
+        sdcManager = new ScaleIOSDCManagerImpl();
     }
 
     public ScaleIOGatewayClient getScaleIOClient(final Long storagePoolId) throws Exception {
@@ -152,7 +156,8 @@
     }
 
     private boolean setVolumeLimitsOnSDC(VolumeVO volume, Host host, DataStore dataStore, Long iopsLimit, Long bandwidthLimitInKbps) throws Exception {
-        final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
+        sdcManager = ComponentContext.inject(sdcManager);
+        final String sdcId = sdcManager.prepareSDC(host, dataStore);
         if (StringUtils.isBlank(sdcId)) {
             alertHostSdcDisconnection(host);
             throw new CloudRuntimeException("Unable to grant access to volume: " + volume.getId() + ", no Sdc connected with host ip: " + host.getPrivateIpAddress());
@@ -188,6 +193,13 @@
     @Override
     public boolean grantAccess(DataObject dataObject, Host host, DataStore dataStore) {
         try {
+            sdcManager = ComponentContext.inject(sdcManager);
+            final String sdcId = sdcManager.prepareSDC(host, dataStore);
+            if (StringUtils.isBlank(sdcId)) {
+                alertHostSdcDisconnection(host);
+                throw new CloudRuntimeException(String.format("Unable to grant access to %s: %s, no Sdc connected with host ip: %s", dataObject.getType(), dataObject.getId(), host.getPrivateIpAddress()));
+            }
+
             if (DataObjectType.VOLUME.equals(dataObject.getType())) {
                 final VolumeVO volume = volumeDao.findById(dataObject.getId());
                 logger.debug("Granting access for PowerFlex volume: " + volume.getPath());
@@ -195,25 +207,11 @@
             } else if (DataObjectType.TEMPLATE.equals(dataObject.getType())) {
                 final VMTemplateStoragePoolVO templatePoolRef = vmTemplatePoolDao.findByPoolTemplate(dataStore.getId(), dataObject.getId(), null);
                 logger.debug("Granting access for PowerFlex template volume: " + templatePoolRef.getInstallPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    alertHostSdcDisconnection(host);
-                    throw new CloudRuntimeException("Unable to grant access to template: " + dataObject.getId() + ", no Sdc connected with host ip: " + host.getPrivateIpAddress());
-                }
-
                 final ScaleIOGatewayClient client = getScaleIOClient(dataStore.getId());
                 return client.mapVolumeToSdc(ScaleIOUtil.getVolumePath(templatePoolRef.getInstallPath()), sdcId);
             } else if (DataObjectType.SNAPSHOT.equals(dataObject.getType())) {
                 SnapshotInfo snapshot = (SnapshotInfo) dataObject;
                 logger.debug("Granting access for PowerFlex volume snapshot: " + snapshot.getPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    alertHostSdcDisconnection(host);
-                    throw new CloudRuntimeException("Unable to grant access to snapshot: " + dataObject.getId() + ", no Sdc connected with host ip: " + host.getPrivateIpAddress());
-                }
-
                 final ScaleIOGatewayClient client = getScaleIOClient(dataStore.getId());
                 return client.mapVolumeToSdc(ScaleIOUtil.getVolumePath(snapshot.getPath()), sdcId);
             }
@@ -237,40 +235,29 @@
         }
 
         try {
+            final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
+            if (StringUtils.isBlank(sdcId)) {
+                logger.warn(String.format("Unable to revoke access for %s: %s, no Sdc connected with host ip: %s", dataObject.getType(), dataObject.getId(), host.getPrivateIpAddress()));
+                return;
+            }
+            final ScaleIOGatewayClient client = getScaleIOClient(dataStore.getId());
             if (DataObjectType.VOLUME.equals(dataObject.getType())) {
                 final VolumeVO volume = volumeDao.findById(dataObject.getId());
                 logger.debug("Revoking access for PowerFlex volume: " + volume.getPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    throw new CloudRuntimeException("Unable to revoke access for volume: " + dataObject.getId() + ", no Sdc connected with host ip: " + host.getPrivateIpAddress());
-                }
-
-                final ScaleIOGatewayClient client = getScaleIOClient(dataStore.getId());
                 client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(volume.getPath()), sdcId);
             } else if (DataObjectType.TEMPLATE.equals(dataObject.getType())) {
                 final VMTemplateStoragePoolVO templatePoolRef = vmTemplatePoolDao.findByPoolTemplate(dataStore.getId(), dataObject.getId(), null);
                 logger.debug("Revoking access for PowerFlex template volume: " + templatePoolRef.getInstallPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    throw new CloudRuntimeException("Unable to revoke access for template: " + dataObject.getId() + ", no Sdc connected with host ip: " + host.getPrivateIpAddress());
-                }
-
-                final ScaleIOGatewayClient client = getScaleIOClient(dataStore.getId());
                 client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(templatePoolRef.getInstallPath()), sdcId);
             } else if (DataObjectType.SNAPSHOT.equals(dataObject.getType())) {
                 SnapshotInfo snapshot = (SnapshotInfo) dataObject;
                 logger.debug("Revoking access for PowerFlex volume snapshot: " + snapshot.getPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    throw new CloudRuntimeException("Unable to revoke access for snapshot: " + dataObject.getId() + ", no Sdc connected with host ip: " + host.getPrivateIpAddress());
-                }
-
-                final ScaleIOGatewayClient client = getScaleIOClient(dataStore.getId());
                 client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(snapshot.getPath()), sdcId);
             }
+            if (client.listVolumesMappedToSdc(sdcId).isEmpty()) {
+                sdcManager = ComponentContext.inject(sdcManager);
+                sdcManager.stopSDC(host, dataStore);
+            }
         } catch (Exception e) {
             logger.warn("Failed to revoke access due to: " + e.getMessage(), e);
         }
@@ -287,11 +274,16 @@
 
             final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
             if (StringUtils.isBlank(sdcId)) {
-                throw new CloudRuntimeException("Unable to revoke access for volume: " + volumePath + ", no Sdc connected with host ip: " + host.getPrivateIpAddress());
+                logger.warn(String.format("Unable to revoke access for volume: %s, no Sdc connected with host ip: %s", volumePath, host.getPrivateIpAddress()));
+                return;
             }
 
             final ScaleIOGatewayClient client = getScaleIOClient(dataStore.getId());
             client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(volumePath), sdcId);
+            if (client.listVolumesMappedToSdc(sdcId).isEmpty()) {
+                sdcManager = ComponentContext.inject(sdcManager);
+                sdcManager.stopSDC(host, dataStore);
+            }
         } catch (Exception e) {
             logger.warn("Failed to revoke access due to: " + e.getMessage(), e);
         }
@@ -1375,6 +1367,28 @@
     }
 
     @Override
+    public boolean poolProvidesCustomStorageStats() {
+        return true;
+    }
+
+    @Override
+    public Map<String, String> getCustomStorageStats(StoragePool pool) {
+        Preconditions.checkArgument(pool != null, "pool cannot be null");
+        Map<String, String> customStats = new HashMap<>();
+
+        try {
+            final ScaleIOGatewayClient client = getScaleIOClient(pool.getId());
+            int connectedSdcsCount = client.getConnectedSdcsCount();
+            customStats.put(ScaleIOUtil.CONNECTED_SDC_COUNT_STAT, String.valueOf(connectedSdcsCount));
+        } catch (Exception e) {
+            String errMsg = "Unable to get custom storage stats for the pool: " + pool.getId() + " due to " + e.getMessage();
+            logger.error(errMsg);
+        }
+
+        return customStats;
+    }
+
+    @Override
     public Pair<Long, Long> getStorageStats(StoragePool storagePool) {
         Preconditions.checkArgument(storagePool != null, "storagePool cannot be null");
 
@@ -1386,7 +1400,7 @@
                 Long usedBytes = poolStatistics.getNetUsedCapacityInBytes();
                 return new Pair<Long, Long>(capacityBytes, usedBytes);
             }
-        }  catch (Exception e) {
+        } catch (Exception e) {
             String errMsg = "Unable to get storage stats for the pool: " + storagePool.getId() + " due to " + e.getMessage();
             logger.warn(errMsg);
             throw new CloudRuntimeException(errMsg, e);
@@ -1441,6 +1455,16 @@
         }
     }
 
+    @Override
+    public boolean canHostPrepareStoragePoolAccess(Host host, StoragePool pool) {
+        if (host == null || pool == null) {
+            return false;
+        }
+
+        sdcManager = ComponentContext.inject(sdcManager);
+        return sdcManager.areSDCConnectionsWithinLimit(pool.getId());
+    }
+
     private void alertHostSdcDisconnection(Host host) {
         if (host == null) {
             return;
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
index a1186ae..d37d0f1 100644
--- a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
@@ -75,7 +75,7 @@
 import com.cloud.utils.crypt.DBEncryptionUtil;
 import com.cloud.utils.exception.CloudRuntimeException;
 
-public class ScaleIOPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycle {
+public class ScaleIOPrimaryDataStoreLifeCycle extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject
@@ -261,8 +261,6 @@
             throw new CloudRuntimeException("Unsupported hypervisor type: " + cluster.getHypervisorType().toString());
         }
 
-        checkConnectedSdcs(dataStore.getId());
-
         PrimaryDataStoreInfo primaryDataStoreInfo = (PrimaryDataStoreInfo) dataStore;
         List<HostVO> hostsInCluster = resourceManager.listAllUpAndEnabledHosts(Host.Type.Routing, primaryDataStoreInfo.getClusterId(),
                 primaryDataStoreInfo.getPodId(), primaryDataStoreInfo.getDataCenterId());
@@ -279,14 +277,12 @@
                     poolHosts.add(host);
                 }
             } catch (Exception e) {
-                logger.warn("Unable to establish a connection between " + host + " and " + primaryDataStoreInfo, e);
+                logger.warn("Unable to establish a connection between host: " + host + " and pool: " + dataStore + "on the cluster: " + primaryDataStoreInfo.getClusterId(), e);
             }
         }
 
         if (poolHosts.isEmpty()) {
             logger.warn("No host can access storage pool '" + primaryDataStoreInfo + "' on cluster '" + primaryDataStoreInfo.getClusterId() + "'.");
-            primaryDataStoreDao.expunge(primaryDataStoreInfo.getId());
-            throw new CloudRuntimeException("Failed to create storage pool in the cluster: " + primaryDataStoreInfo.getClusterId() + " as it is not accessible to hosts");
         }
 
         dataStoreHelper.attachCluster(dataStore);
@@ -304,8 +300,6 @@
             throw new CloudRuntimeException("Unsupported hypervisor type: " + hypervisorType.toString());
         }
 
-        checkConnectedSdcs(dataStore.getId());
-
         logger.debug("Attaching the pool to each of the hosts in the zone: " + scope.getScopeId());
         List<HostVO> hosts = resourceManager.listAllUpAndEnabledHostsInOneZoneByHypervisor(hypervisorType, scope.getScopeId());
         List<HostVO> poolHosts = new ArrayList<HostVO>();
@@ -315,35 +309,17 @@
                     poolHosts.add(host);
                 }
             } catch (Exception e) {
-                logger.warn("Unable to establish a connection between " + host + " and " + dataStore, e);
+                logger.warn("Unable to establish a connection between host: " + host + " and pool: " + dataStore + "in the zone: " + scope.getScopeId(), e);
             }
         }
         if (poolHosts.isEmpty()) {
-            logger.warn("No host can access storage pool " + dataStore + " in this zone.");
-            primaryDataStoreDao.expunge(dataStore.getId());
-            throw new CloudRuntimeException("Failed to create storage pool as it is not accessible to hosts.");
+            logger.warn("No host can access storage pool " + dataStore + " in the zone: " + scope.getScopeId());
         }
 
         dataStoreHelper.attachZone(dataStore);
         return true;
     }
 
-    private void checkConnectedSdcs(Long dataStoreId) {
-        boolean haveConnectedSdcs = false;
-        try {
-            ScaleIOGatewayClient client = ScaleIOGatewayClientConnectionPool.getInstance().getClient(dataStoreId, storagePoolDetailsDao);
-            haveConnectedSdcs = client.haveConnectedSdcs();
-        } catch (NoSuchAlgorithmException | KeyManagementException | URISyntaxException e) {
-            logger.error(String.format("Failed to create storage pool for datastore: %s", dataStoreId), e);
-            throw new CloudRuntimeException(String.format("Failed to establish connection with PowerFlex Gateway to create storage pool for datastore: %s", dataStoreId));
-        }
-
-        if (!haveConnectedSdcs) {
-            logger.debug(String.format("No connected SDCs found for the PowerFlex storage pool of datastore: %s", dataStoreId));
-            throw new CloudRuntimeException(String.format("Failed to create storage pool as connected SDCs not found for datastore: %s", dataStoreId));
-        }
-    }
-
     @Override
     public boolean maintain(DataStore store) {
         storagePoolAutomation.maintain(store);
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManager.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManager.java
new file mode 100644
index 0000000..696643c
--- /dev/null
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManager.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.manager;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+
+import com.cloud.host.Host;
+
+public interface ScaleIOSDCManager {
+    /**
+     * Checks SDC connections limit.
+     * @param storagePoolId the storage pool id
+     * @return true if SDC connections are within limit
+     */
+    boolean areSDCConnectionsWithinLimit(Long storagePoolId);
+
+    /**
+     * Prepares/starts the SDC on the host.
+     * @param host the host
+     * @param dataStore the datastore
+     * @return SDC Id of the host
+     */
+    String prepareSDC(Host host, DataStore dataStore);
+
+    /**
+     * Stops the SDC on the host.
+     * @param host the host
+     * @param dataStore the datastore
+     * @return true if SDC stopped on the host
+     */
+    boolean stopSDC(Host host, DataStore dataStore);
+}
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManagerImpl.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManagerImpl.java
new file mode 100644
index 0000000..b121a1d
--- /dev/null
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManagerImpl.java
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.manager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStore;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClientConnectionPool;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.PrepareStorageClientAnswer;
+import com.cloud.agent.api.PrepareStorageClientCommand;
+import com.cloud.agent.api.UnprepareStorageClientCommand;
+import com.cloud.configuration.Config;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Host;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+@Component
+public class ScaleIOSDCManagerImpl implements ScaleIOSDCManager {
+    private static final Logger LOGGER = Logger.getLogger(ScaleIOSDCManagerImpl.class);
+
+    @Inject
+    AgentManager agentManager;
+    @Inject
+    StoragePoolHostDao storagePoolHostDao;
+    @Inject
+    StoragePoolDetailsDao storagePoolDetailsDao;
+    @Inject
+    ConfigurationDao configDao;
+
+    private static final String POWERFLEX_SDC_HOSTID_SYSTEMID_LOCK_FORMAT = "PowerFlexSDC-HostId:%s-SystemId:%s";
+    private static final String POWERFLEX_SDC_SYSTEMID_LOCK_FORMAT = "PowerFlexSDC-SystemId:%s";
+
+    public ScaleIOSDCManagerImpl() {
+
+    }
+
+    @Override
+    public boolean areSDCConnectionsWithinLimit(Long storagePoolId) {
+        try {
+            int connectedClientsLimit = StorageManager.STORAGE_POOL_CONNECTED_CLIENTS_LIMIT.valueIn(storagePoolId);
+            if (connectedClientsLimit <= 0) {
+                return true;
+            }
+
+            int connectedSdcsCount = getScaleIOClient(storagePoolId).getConnectedSdcsCount();
+            if (connectedSdcsCount < connectedClientsLimit) {
+                LOGGER.debug(String.format("Current connected SDCs count: %d - SDC connections are within the limit (%d) on PowerFlex Storage with pool id: %d", connectedSdcsCount, connectedClientsLimit, storagePoolId));
+                return true;
+            }
+            LOGGER.debug(String.format("Current connected SDCs count: %d - SDC connections limit (%d) reached on PowerFlex Storage with pool id: %d", connectedSdcsCount, connectedClientsLimit, storagePoolId));
+            return false;
+        } catch (Exception e) {
+            String errMsg = "Unable to check SDC connections for the PowerFlex storage pool with id: " + storagePoolId + " due to " + e.getMessage();
+            LOGGER.warn(errMsg, e);
+            return false;
+        }
+    }
+
+    @Override
+    public String prepareSDC(Host host, DataStore dataStore) {
+        String systemId = storagePoolDetailsDao.findDetail(dataStore.getId(), ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID).getValue();
+        if (systemId == null) {
+            throw new CloudRuntimeException("Unable to prepare SDC, failed to get the system id for PowerFlex storage pool: " + dataStore.getName());
+        }
+
+        GlobalLock hostIdStorageSystemIdLock = null;
+        GlobalLock storageSystemIdLock = null;
+        try {
+            String hostIdStorageSystemIdLockString = String.format(POWERFLEX_SDC_HOSTID_SYSTEMID_LOCK_FORMAT, host.getId(), systemId);
+            hostIdStorageSystemIdLock = GlobalLock.getInternLock(hostIdStorageSystemIdLockString);
+            if (hostIdStorageSystemIdLock == null) {
+                throw new CloudRuntimeException("Unable to prepare SDC, couldn't get global lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            int storagePoolMaxWaitSeconds = NumbersUtil.parseInt(configDao.getValue(Config.StoragePoolMaxWaitSeconds.key()), 3600);
+            if (!hostIdStorageSystemIdLock.lock(storagePoolMaxWaitSeconds)) {
+                LOGGER.debug("Unable to prepare SDC, couldn't lock on " + hostIdStorageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to prepare SDC, couldn't lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            long poolId = dataStore.getId();
+            long hostId = host.getId();
+            String sdcId = getConnectedSdc(poolId, hostId);
+            if (StringUtils.isNotBlank(sdcId)) {
+                LOGGER.debug(String.format("SDC %s already connected for the pool: %d on host: %d, no need to prepare/start it", sdcId, poolId, hostId));
+                return sdcId;
+            }
+
+            String storageSystemIdLockString = String.format(POWERFLEX_SDC_SYSTEMID_LOCK_FORMAT, systemId);
+            storageSystemIdLock = GlobalLock.getInternLock(storageSystemIdLockString);
+            if (storageSystemIdLock == null) {
+                LOGGER.error("Unable to prepare SDC, couldn't get global lock on: " + storageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to prepare SDC, couldn't get global lock on " + storageSystemIdLockString);
+            }
+
+            if (!storageSystemIdLock.lock(storagePoolMaxWaitSeconds)) {
+                LOGGER.error("Unable to prepare SDC, couldn't lock on " + storageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to prepare SDC, couldn't lock on " + storageSystemIdLockString);
+            }
+
+            if (!areSDCConnectionsWithinLimit(poolId)) {
+                String errorMsg = String.format("Unable to check SDC connections or the connections limit reached for Powerflex storage (System ID: %s)", systemId);
+                LOGGER.error(errorMsg);
+                throw new CloudRuntimeException(errorMsg);
+            }
+
+            sdcId = prepareSDCOnHost(host, dataStore, systemId);
+            StoragePoolHostVO storagePoolHost = storagePoolHostDao.findByPoolHost(poolId, hostId);
+
+            if (StringUtils.isBlank(sdcId)) {
+                if (storagePoolHost != null) {
+                    storagePoolHostDao.deleteStoragePoolHostDetails(hostId, poolId);
+                }
+            } else {
+                if (storagePoolHost == null) {
+                    storagePoolHost = new StoragePoolHostVO(poolId, hostId, sdcId);
+                    storagePoolHostDao.persist(storagePoolHost);
+                } else {
+                    storagePoolHost.setLocalPath(sdcId);
+                    storagePoolHostDao.update(storagePoolHost.getId(), storagePoolHost);
+                }
+            }
+
+            int waitTimeInSecs = 15; // Wait for 15 secs (usual tests with SDC service start took 10-15 secs)
+            if (hostSdcConnected(sdcId, poolId, waitTimeInSecs)) {
+                return sdcId;
+            }
+            return null;
+        } finally {
+            if (storageSystemIdLock != null) {
+                storageSystemIdLock.unlock();
+                storageSystemIdLock.releaseRef();
+            }
+            if (hostIdStorageSystemIdLock != null) {
+                hostIdStorageSystemIdLock.unlock();
+                hostIdStorageSystemIdLock.releaseRef();
+            }
+        }
+    }
+
+    private String prepareSDCOnHost(Host host, DataStore dataStore, String systemId) {
+        LOGGER.debug(String.format("Preparing SDC on the host %s (%s)", host.getId(), host.getName()));
+        Map<String,String> details = new HashMap<>();
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+        PrepareStorageClientCommand cmd = new PrepareStorageClientCommand(((PrimaryDataStore) dataStore).getPoolType(), dataStore.getUuid(), details);
+        int timeoutSeconds = 60;
+        cmd.setWait(timeoutSeconds);
+
+        PrepareStorageClientAnswer prepareStorageClientAnswer;
+        try {
+            prepareStorageClientAnswer = (PrepareStorageClientAnswer) agentManager.send(host.getId(), cmd);
+        } catch (AgentUnavailableException | OperationTimedoutException e) {
+            String err = String.format("Failed to prepare SDC on the host %s, due to: %s", host.getName(), e.getMessage());
+            LOGGER.error(err);
+            throw new CloudRuntimeException(err);
+        }
+
+        if (prepareStorageClientAnswer == null) {
+            String err = String.format("Unable to prepare SDC on the host %s", host.getName());
+            LOGGER.error(err);
+            throw new CloudRuntimeException(err);
+        }
+
+        if (!prepareStorageClientAnswer.getResult()) {
+            String err = String.format("Unable to prepare SDC on the host %s, due to: %s", host.getName(), prepareStorageClientAnswer.getDetails());
+            LOGGER.error(err);
+            throw new CloudRuntimeException(err);
+        }
+
+        Map<String,String> poolDetails = prepareStorageClientAnswer.getDetailsMap();
+        if (MapUtils.isEmpty(poolDetails)) {
+            LOGGER.warn(String.format("PowerFlex storage SDC details not found on the host: %s, try (re)install SDC and restart agent", host.getId()));
+            return null;
+        }
+
+        String sdcId = null;
+        if (poolDetails.containsKey(ScaleIOGatewayClient.SDC_ID)) {
+            sdcId = poolDetails.get(ScaleIOGatewayClient.SDC_ID);
+        } else if (poolDetails.containsKey(ScaleIOGatewayClient.SDC_GUID)) {
+            String sdcGuid = poolDetails.get(ScaleIOGatewayClient.SDC_GUID);
+            sdcId = getHostSdcId(sdcGuid, dataStore.getId());
+        }
+
+        if (StringUtils.isBlank(sdcId)) {
+            LOGGER.warn(String.format("Couldn't retrieve PowerFlex storage SDC details from the host: %s, try (re)install SDC and restart agent", host.getId()));
+            return null;
+        }
+
+        return sdcId;
+    }
+
+    @Override
+    public boolean stopSDC(Host host, DataStore dataStore) {
+        String systemId = storagePoolDetailsDao.findDetail(dataStore.getId(), ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID).getValue();
+        if (systemId == null) {
+            throw new CloudRuntimeException("Unable to unprepare SDC, failed to get the system id for PowerFlex storage pool: " + dataStore.getName());
+        }
+
+        GlobalLock lock = null;
+        try {
+            String hostIdStorageSystemIdLockString = String.format(POWERFLEX_SDC_HOSTID_SYSTEMID_LOCK_FORMAT, host.getId(), systemId);
+            lock = GlobalLock.getInternLock(hostIdStorageSystemIdLockString);
+            if (lock == null) {
+                throw new CloudRuntimeException("Unable to unprepare SDC, couldn't get global lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            int storagePoolMaxWaitSeconds = NumbersUtil.parseInt(configDao.getValue(Config.StoragePoolMaxWaitSeconds.key()), 3600);
+            if (!lock.lock(storagePoolMaxWaitSeconds)) {
+                LOGGER.debug("Unable to unprepare SDC, couldn't lock on " + hostIdStorageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to unprepare SDC, couldn't lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            long poolId = dataStore.getId();
+            long hostId = host.getId();
+            String sdcId = getConnectedSdc(poolId, hostId);
+            if (StringUtils.isBlank(sdcId)) {
+                LOGGER.debug("SDC not connected, no need to unprepare it");
+                return true;
+            }
+
+            return unprepareSDCOnHost(host, dataStore);
+        } finally {
+            if (lock != null) {
+                lock.unlock();
+                lock.releaseRef();
+            }
+        }
+    }
+
+    private boolean unprepareSDCOnHost(Host host, DataStore dataStore) {
+        LOGGER.debug(String.format("Unpreparing SDC on the host %s (%s)", host.getId(), host.getName()));
+        UnprepareStorageClientCommand cmd = new UnprepareStorageClientCommand(((PrimaryDataStore) dataStore).getPoolType(), dataStore.getUuid());
+        int timeoutSeconds = 60;
+        cmd.setWait(timeoutSeconds);
+
+        Answer unprepareStorageClientAnswer;
+        try {
+            unprepareStorageClientAnswer = agentManager.send(host.getId(), cmd);
+        } catch (AgentUnavailableException | OperationTimedoutException e) {
+            String err = String.format("Failed to unprepare SDC on the host %s due to: %s", host.getName(), e.getMessage());
+            LOGGER.error(err);
+            return false;
+        }
+
+        if (!unprepareStorageClientAnswer.getResult()) {
+            String err = String.format("Unable to unprepare SDC on the the host %s due to: %s", host.getName(), unprepareStorageClientAnswer.getDetails());
+            LOGGER.error(err);
+            return false;
+        }
+        return true;
+    }
+
+    private String getHostSdcId(String sdcGuid, long poolId) {
+        try {
+            LOGGER.debug(String.format("Try to get host SDC Id for pool: %s, with SDC guid %s", poolId, sdcGuid));
+            ScaleIOGatewayClient client = getScaleIOClient(poolId);
+            return client.getSdcIdByGuid(sdcGuid);
+        } catch (Exception e) {
+            LOGGER.error(String.format("Failed to get host SDC Id for pool: %s", poolId), e);
+            throw new CloudRuntimeException(String.format("Failed to establish connection with PowerFlex Gateway to get host SDC Id for pool: %s", poolId));
+        }
+    }
+
+    private String getConnectedSdc(long poolId, long hostId) {
+        try {
+            StoragePoolHostVO poolHostVO = storagePoolHostDao.findByPoolHost(poolId, hostId);
+            if (poolHostVO == null) {
+                return null;
+            }
+
+            final ScaleIOGatewayClient client = getScaleIOClient(poolId);
+            if (client.isSdcConnected(poolHostVO.getLocalPath())) {
+                return poolHostVO.getLocalPath();
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Unable to get connected SDC for the host: " + hostId + " and storage pool: " + poolId + " due to " + e.getMessage(), e);
+        }
+
+        return null;
+    }
+
+    private boolean hostSdcConnected(String sdcId, long poolId, int waitTimeInSecs) {
+        LOGGER.debug(String.format("Waiting (for %d secs) for the SDC %s of the pool id: %d to connect", waitTimeInSecs, sdcId, poolId));
+        int timeBetweenTries = 1000; // Try more frequently (every sec) and return early if connected
+        while (waitTimeInSecs > 0) {
+            if (isHostSdcConnected(sdcId, poolId)) {
+                return true;
+            }
+            waitTimeInSecs--;
+            try {
+                Thread.sleep(timeBetweenTries);
+            } catch (Exception ignore) {
+            }
+        }
+        return isHostSdcConnected(sdcId, poolId);
+    }
+
+    private boolean isHostSdcConnected(String sdcId, long poolId) {
+        try {
+            final ScaleIOGatewayClient client = getScaleIOClient(poolId);
+            return client.isSdcConnected(sdcId);
+        } catch (Exception e) {
+            LOGGER.error("Failed to check host SDC connection", e);
+            throw new CloudRuntimeException("Failed to establish connection with PowerFlex Gateway to check host SDC connection");
+        }
+    }
+
+    private ScaleIOGatewayClient getScaleIOClient(final Long storagePoolId) throws Exception {
+        return ScaleIOGatewayClientConnectionPool.getInstance().getClient(storagePoolId, storagePoolDetailsDao);
+    }
+}
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java
index c20f1f0..737cc81 100644
--- a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java
@@ -70,12 +70,33 @@
     public boolean hostConnect(long hostId, long poolId) {
         HostVO host = _hostDao.findById(hostId);
         if (host == null) {
-            logger.error("Failed to add host by HostListener as host was not found with id : " + hostId);
+            logger.error("Failed to connect host by HostListener as host was not found with id : " + hostId);
             return false;
         }
 
         StoragePool storagePool = (StoragePool)_dataStoreMgr.getDataStore(poolId, DataStoreRole.Primary);
+        StoragePoolHostVO storagePoolHost = _storagePoolHostDao.findByPoolHost(poolId, hostId);
+        String sdcId = getSdcIdOfHost(host, storagePool);
+        if (StringUtils.isBlank(sdcId)) {
+            if (storagePoolHost != null) {
+                _storagePoolHostDao.deleteStoragePoolHostDetails(hostId, poolId);
+            }
+        } else {
+            if (storagePoolHost == null) {
+                storagePoolHost = new StoragePoolHostVO(poolId, hostId, sdcId);
+                _storagePoolHostDao.persist(storagePoolHost);
+            } else {
+                storagePoolHost.setLocalPath(sdcId);
+                _storagePoolHostDao.update(storagePoolHost.getId(), storagePoolHost);
+            }
+            logger.info("Connection established between storage pool: " + storagePool + " and host: " + hostId);
+        }
+        return true;
+    }
 
+    private String getSdcIdOfHost(HostVO host, StoragePool storagePool) {
+        long hostId = host.getId();
+        long poolId = storagePool.getId();
         String systemId = _storagePoolDetailsDao.findDetail(poolId, ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID).getValue();
         if (systemId == null) {
             throw new CloudRuntimeException("Failed to get the system id for PowerFlex storage pool " + storagePool.getName());
@@ -87,10 +108,10 @@
         ModifyStoragePoolAnswer answer  = sendModifyStoragePoolCommand(cmd, storagePool, hostId);
         Map<String,String> poolDetails = answer.getPoolInfo().getDetails();
         if (MapUtils.isEmpty(poolDetails)) {
-            String msg = "SDC details not found on the host: " + hostId + ", (re)install SDC and restart agent";
+            String msg = "PowerFlex storage SDC details not found on the host: " + hostId + ", (re)install SDC and restart agent";
             logger.warn(msg);
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "SDC not found on host: " + host.getUuid(), msg);
-            return false;
+            return null;
         }
 
         String sdcId = null;
@@ -102,30 +123,13 @@
         }
 
         if (StringUtils.isBlank(sdcId)) {
-            String msg = "Couldn't retrieve SDC details from the host: " + hostId + ", (re)install SDC and restart agent";
+            String msg = "Couldn't retrieve PowerFlex storage SDC details from the host: " + hostId + ", (re)install SDC and restart agent";
             logger.warn(msg);
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "SDC details not found on host: " + host.getUuid(), msg);
-            return false;
+            return null;
         }
 
-        if (!isHostSdcConnected(sdcId, poolId)) {
-            logger.warn("SDC not connected on the host: " + hostId);
-            String msg = "SDC not connected on the host: " + hostId + ", reconnect the SDC to MDM and restart agent";
-            _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "SDC disconnected on host: " + host.getUuid(), msg);
-            return false;
-        }
-
-        StoragePoolHostVO storagePoolHost = _storagePoolHostDao.findByPoolHost(poolId, hostId);
-        if (storagePoolHost == null) {
-            storagePoolHost = new StoragePoolHostVO(poolId, hostId, sdcId);
-            _storagePoolHostDao.persist(storagePoolHost);
-        } else {
-            storagePoolHost.setLocalPath(sdcId);
-            _storagePoolHostDao.update(storagePoolHost.getId(), storagePoolHost);
-        }
-
-        logger.info("Connection established between storage pool: " + storagePool + " and host: " + hostId);
-        return true;
+        return sdcId;
     }
 
     private String getHostSdcId(String sdcGuid, long poolId) {
@@ -139,16 +143,6 @@
         }
     }
 
-    private boolean isHostSdcConnected(String sdcId, long poolId) {
-        try {
-            ScaleIOGatewayClient client = ScaleIOGatewayClientConnectionPool.getInstance().getClient(poolId, _storagePoolDetailsDao);
-            return client.isSdcConnected(sdcId);
-        } catch (NoSuchAlgorithmException | KeyManagementException | URISyntaxException e) {
-            logger.error("Failed to check host sdc connection", e);
-            throw new CloudRuntimeException("Failed to establish connection with PowerFlex Gateway to check host sdc connection");
-        }
-    }
-
     private ModifyStoragePoolAnswer sendModifyStoragePoolCommand(ModifyStoragePoolCommand cmd, StoragePool storagePool, long hostId) {
         Answer answer = _agentMgr.easySend(hostId, cmd);
 
@@ -157,15 +151,15 @@
         }
 
         if (!answer.getResult()) {
-            String msg = "Unable to attach storage pool " + storagePool.getId() + " to host " + hostId;
+            String msg = "Unable to attach  PowerFlex storage pool " + storagePool.getId() + " to host " + hostId;
 
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, storagePool.getDataCenterId(), storagePool.getPodId(), msg, msg);
 
-            throw new CloudRuntimeException("Unable to establish a connection from agent to storage pool " + storagePool.getId() + " due to " + answer.getDetails() +
+            throw new CloudRuntimeException("Unable to establish a connection from agent to  PowerFlex storage pool " + storagePool.getId() + " due to " + answer.getDetails() +
                     " (" + storagePool.getId() + ")");
         }
 
-        assert (answer instanceof ModifyStoragePoolAnswer) : "ModifyStoragePoolAnswer expected ; Pool = " + storagePool.getId() + " Host = " + hostId;
+        assert (answer instanceof ModifyStoragePoolAnswer) : "ModifyStoragePoolAnswer expected ; PowerFlex Storage Pool = " + storagePool.getId() + " Host = " + hostId;
 
         return (ModifyStoragePoolAnswer) answer;
     }
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java
index a2e0129..4bb8df9 100644
--- a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java
@@ -50,6 +50,16 @@
 
     private static final String RESCAN_CMD = "drv_cfg --rescan";
 
+    private static final String SDC_SERVICE_STATUS_CMD = "systemctl status scini";
+    private static final String SDC_SERVICE_START_CMD = "systemctl start scini";
+    private static final String SDC_SERVICE_STOP_CMD = "systemctl stop scini";
+    private static final String SDC_SERVICE_RESTART_CMD = "systemctl restart scini";
+
+    private static final String SDC_SERVICE_IS_ACTIVE_CMD = "systemctl is-active scini";
+    private static final String SDC_SERVICE_IS_ENABLED_CMD = "systemctl is-enabled scini";
+    private static final String SDC_SERVICE_ENABLE_CMD = "systemctl enable scini";
+
+    public static final String CONNECTED_SDC_COUNT_STAT = "ConnectedSDCCount";
     /**
      * Cmd for querying volumes in SDC
      * Sample output for cmd: drv_cfg --query_vols:
@@ -183,4 +193,39 @@
 
         return String.format("%s:%s", volumePath, volumeName);
     }
+
+    public static boolean isSDCServiceInstalled() {
+        int exitValue = Script.runSimpleBashScriptForExitValue(SDC_SERVICE_STATUS_CMD);
+        return exitValue != 4;
+    }
+
+    public static boolean isSDCServiceActive() {
+        int exitValue = Script.runSimpleBashScriptForExitValue(SDC_SERVICE_IS_ACTIVE_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean isSDCServiceEnabled() {
+        int exitValue = Script.runSimpleBashScriptForExitValue(SDC_SERVICE_IS_ENABLED_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean enableSDCService() {
+        int exitValue = Script.runSimpleBashScriptForExitValue(SDC_SERVICE_ENABLE_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean startSDCService() {
+        int exitValue = Script.runSimpleBashScriptForExitValue(SDC_SERVICE_START_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean stopSDCService() {
+        int exitValue = Script.runSimpleBashScriptForExitValue(SDC_SERVICE_STOP_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean restartSDCService() {
+        int exitValue = Script.runSimpleBashScriptForExitValue(SDC_SERVICE_RESTART_CMD);
+        return exitValue == 0;
+    }
 }
diff --git a/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml b/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml
index 8b86e21..55e74cd 100755
--- a/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml
+++ b/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml
@@ -32,4 +32,6 @@
     <bean id="scaleioDataStoreProvider"
           class="org.apache.cloudstack.storage.datastore.provider.ScaleIOPrimaryDatastoreProvider" />
 
+    <bean id="scaleioSDCManager" class="org.apache.cloudstack.storage.datastore.manager.ScaleIOSDCManagerImpl" />
+
 </beans>
diff --git a/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java b/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java
index 6a6a600..52dcad5 100644
--- a/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java
+++ b/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java
@@ -23,6 +23,7 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
@@ -130,9 +131,9 @@
         ScaleIOGatewayClientImpl client = mock(ScaleIOGatewayClientImpl.class);
         ScaleIOGatewayClientConnectionPool pool = mock(ScaleIOGatewayClientConnectionPool.class);
         scaleIOGatewayClientConnectionPoolMocked.when(() -> ScaleIOGatewayClientConnectionPool.getInstance()).thenReturn(pool);
-        when(pool.getClient(1L, storagePoolDetailsDao)).thenReturn(client);
+        lenient().when(pool.getClient(1L, storagePoolDetailsDao)).thenReturn(client);
 
-        when(client.haveConnectedSdcs()).thenReturn(true);
+        lenient().when(client.haveConnectedSdcs()).thenReturn(true);
 
         final ZoneScope scope = new ZoneScope(1L);
 
diff --git a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java
index df15aa3..5080a41 100644
--- a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java
@@ -64,7 +64,7 @@
 
 import com.google.common.base.Preconditions;
 
-public class SolidFirePrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycle {
+public class SolidFirePrimaryDataStoreLifeCycle extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject private CapacityManager _capacityMgr;
@@ -388,4 +388,13 @@
     public void disableStoragePool(DataStore dataStore) {
         _dataStoreHelper.disable(dataStore);
     }
+
+    @Override
+    public void changeStoragePoolScopeToZone(DataStore store, ClusterScope clusterScope, HypervisorType hypervisorType) {
+        /*
+         * We need to attach all VMware, Xenserver and KVM hosts in the zone.
+         * So pass hypervisorType as null.
+         */
+        super.changeStoragePoolScopeToZone(store, clusterScope, null);
+    }
 }
diff --git a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java
index e32fef5..9742bfb 100644
--- a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java
@@ -73,7 +73,7 @@
 import com.cloud.utils.db.GlobalLock;
 import com.cloud.utils.exception.CloudRuntimeException;
 
-public class SolidFireSharedPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycle {
+public class SolidFireSharedPrimaryDataStoreLifeCycle extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject private AccountDao accountDao;
diff --git a/plugins/storage/volume/storpool/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/StorPoolPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/storpool/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/StorPoolPrimaryDataStoreLifeCycle.java
index 339ee62..e13b7f7 100644
--- a/plugins/storage/volume/storpool/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/StorPoolPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/storpool/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/StorPoolPrimaryDataStoreLifeCycle.java
@@ -61,7 +61,7 @@
 import com.cloud.storage.dao.VMTemplatePoolDao;
 import com.cloud.utils.exception.CloudRuntimeException;
 
-public class StorPoolPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycle {
+public class StorPoolPrimaryDataStoreLifeCycle extends BasePrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLifeCycle {
     protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject
diff --git a/server/src/main/java/com/cloud/api/ApiDBUtils.java b/server/src/main/java/com/cloud/api/ApiDBUtils.java
index 3269bb8..7d6a127 100644
--- a/server/src/main/java/com/cloud/api/ApiDBUtils.java
+++ b/server/src/main/java/com/cloud/api/ApiDBUtils.java
@@ -2021,8 +2021,8 @@
         return s_volJoinDao.newVolumeView(vr);
     }
 
-    public static StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO vr) {
-        return s_poolJoinDao.newStoragePoolResponse(vr);
+    public static StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO vr, boolean customStats) {
+        return s_poolJoinDao.newStoragePoolResponse(vr, customStats);
     }
 
     public static StorageTagResponse newStorageTagResponse(StoragePoolTagVO vr) {
diff --git a/server/src/main/java/com/cloud/api/ApiResponseHelper.java b/server/src/main/java/com/cloud/api/ApiResponseHelper.java
index 064338b..7b341b0 100644
--- a/server/src/main/java/com/cloud/api/ApiResponseHelper.java
+++ b/server/src/main/java/com/cloud/api/ApiResponseHelper.java
@@ -1452,7 +1452,7 @@
     @Override
     public StoragePoolResponse createStoragePoolResponse(StoragePool pool) {
         List<StoragePoolJoinVO> viewPools = ApiDBUtils.newStoragePoolView(pool);
-        List<StoragePoolResponse> listPools = ViewResponseHelper.createStoragePoolResponse(viewPools.toArray(new StoragePoolJoinVO[viewPools.size()]));
+        List<StoragePoolResponse> listPools = ViewResponseHelper.createStoragePoolResponse(false, viewPools.toArray(new StoragePoolJoinVO[viewPools.size()]));
         assert listPools != null && listPools.size() == 1 : "There should be one storage pool returned";
         return listPools.get(0);
     }
diff --git a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
index 00eacfd..a39299d 100644
--- a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
+++ b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
@@ -71,6 +71,7 @@
 import org.apache.cloudstack.api.command.admin.storage.heuristics.ListSecondaryStorageSelectorsCmd;
 import org.apache.cloudstack.api.command.admin.template.ListTemplatesCmdByAdmin;
 import org.apache.cloudstack.api.command.admin.user.ListUsersCmd;
+import org.apache.cloudstack.api.command.admin.vm.ListAffectedVmsForStorageScopeChangeCmd;
 import org.apache.cloudstack.api.command.admin.zone.ListZonesCmdByAdmin;
 import org.apache.cloudstack.api.command.user.account.ListAccountsCmd;
 import org.apache.cloudstack.api.command.user.account.ListProjectAccountsCmd;
@@ -128,6 +129,7 @@
 import org.apache.cloudstack.api.response.TemplateResponse;
 import org.apache.cloudstack.api.response.UserResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
+import org.apache.cloudstack.api.response.VirtualMachineResponse;
 import org.apache.cloudstack.api.response.VolumeResponse;
 import org.apache.cloudstack.api.response.ZoneResponse;
 import org.apache.cloudstack.backup.BackupOfferingVO;
@@ -213,8 +215,10 @@
 import com.cloud.api.query.vo.VolumeJoinVO;
 import com.cloud.cluster.ManagementServerHostVO;
 import com.cloud.cluster.dao.ManagementServerHostDao;
+import com.cloud.dc.ClusterVO;
 import com.cloud.dc.DataCenter;
 import com.cloud.dc.DedicatedResourceVO;
+import com.cloud.dc.dao.ClusterDao;
 import com.cloud.dc.dao.DedicatedResourceDao;
 import com.cloud.domain.Domain;
 import com.cloud.domain.DomainVO;
@@ -594,6 +598,10 @@
     @Inject
     private StoragePoolHostDao storagePoolHostDao;
 
+    @Inject
+    private ClusterDao clusterDao;
+
+
     private SearchCriteria<ServiceOfferingJoinVO> getMinimumCpuServiceOfferingJoinSearchCriteria(int cpu) {
         SearchCriteria<ServiceOfferingJoinVO> sc = _srvOfferingJoinDao.createSearchCriteria();
         SearchCriteria<ServiceOfferingJoinVO> sc1 = _srvOfferingJoinDao.createSearchCriteria();
@@ -1148,6 +1156,58 @@
         return response;
     }
 
+    @Override
+    public ListResponse<VirtualMachineResponse> listAffectedVmsForStorageScopeChange(ListAffectedVmsForStorageScopeChangeCmd cmd) {
+        Long poolId = cmd.getStorageId();
+        StoragePoolVO pool = storagePoolDao.findById(poolId);
+        if (pool == null) {
+            throw new IllegalArgumentException("Unable to find storage pool with ID: " + poolId);
+        }
+
+        ListResponse<VirtualMachineResponse> response = new ListResponse<>();
+        List<VirtualMachineResponse> responsesList = new ArrayList<>();
+        if (pool.getScope() != ScopeType.ZONE) {
+            response.setResponses(responsesList, 0);
+            return response;
+        }
+
+        Pair<List<VMInstanceVO>, Integer> vms = _vmInstanceDao.listByVmsNotInClusterUsingPool(cmd.getClusterIdForScopeChange(), poolId);
+        for (VMInstanceVO vm : vms.first()) {
+            VirtualMachineResponse resp = new VirtualMachineResponse();
+            resp.setObjectName(VirtualMachine.class.getSimpleName().toLowerCase());
+            resp.setId(vm.getUuid());
+            resp.setVmType(vm.getType().toString());
+
+            UserVmJoinVO userVM = null;
+            if (!vm.getType().isUsedBySystem()) {
+                userVM = _userVmJoinDao.findById(vm.getId());
+            }
+            if (userVM != null) {
+                if (userVM.getDisplayName() != null) {
+                    resp.setVmName(userVM.getDisplayName());
+                } else {
+                    resp.setVmName(userVM.getName());
+                }
+            } else {
+                resp.setVmName(vm.getInstanceName());
+            }
+
+            HostVO host = hostDao.findById(vm.getHostId());
+            if (host != null) {
+                resp.setHostId(host.getUuid());
+                resp.setHostName(host.getName());
+                ClusterVO cluster = clusterDao.findById(host.getClusterId());
+                if (cluster != null) {
+                    resp.setClusterId(cluster.getUuid());
+                    resp.setClusterName(cluster.getName());
+                }
+            }
+            responsesList.add(resp);
+        }
+        response.setResponses(responsesList, vms.second());
+        return response;
+    }
+
     private Object getObjectPossibleMethodValue(Object obj, String methodName) {
         Object result = null;
 
@@ -2971,7 +3031,7 @@
     public ListResponse<StoragePoolResponse> searchForStoragePools(ListStoragePoolsCmd cmd) {
         Pair<List<StoragePoolJoinVO>, Integer> result = (ScopeType.HOST.name().equalsIgnoreCase(cmd.getScope()) && cmd.getHostId() != null) ?
                 searchForLocalStorages(cmd) : searchForStoragePoolsInternal(cmd);
-        return createStoragesPoolResponse(result);
+        return createStoragesPoolResponse(result, cmd.getCustomStats());
     }
 
     private Pair<List<StoragePoolJoinVO>, Integer> searchForLocalStorages(ListStoragePoolsCmd cmd) {
@@ -2999,10 +3059,10 @@
         }
     }
 
-    private ListResponse<StoragePoolResponse> createStoragesPoolResponse(Pair<List<StoragePoolJoinVO>, Integer> storagePools) {
+    private ListResponse<StoragePoolResponse> createStoragesPoolResponse(Pair<List<StoragePoolJoinVO>, Integer> storagePools, boolean getCustomStats) {
         ListResponse<StoragePoolResponse> response = new ListResponse<>();
 
-        List<StoragePoolResponse> poolResponses = ViewResponseHelper.createStoragePoolResponse(storagePools.first().toArray(new StoragePoolJoinVO[storagePools.first().size()]));
+        List<StoragePoolResponse> poolResponses = ViewResponseHelper.createStoragePoolResponse(getCustomStats, storagePools.first().toArray(new StoragePoolJoinVO[storagePools.first().size()]));
         Map<String, Long> poolUuidToIdMap = storagePools.first().stream().collect(Collectors.toMap(StoragePoolJoinVO::getUuid, StoragePoolJoinVO::getId, (a, b) -> a));
         for (StoragePoolResponse poolResponse : poolResponses) {
             DataStore store = dataStoreManager.getPrimaryDataStore(poolResponse.getId());
diff --git a/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java b/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
index 06bfbad..db650bf 100644
--- a/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
+++ b/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
@@ -313,14 +313,14 @@
         return new ArrayList<VolumeResponse>(vrDataList.values());
     }
 
-    public static List<StoragePoolResponse> createStoragePoolResponse(StoragePoolJoinVO... pools) {
+    public static List<StoragePoolResponse> createStoragePoolResponse(boolean customStats, StoragePoolJoinVO... pools) {
         LinkedHashMap<Long, StoragePoolResponse> vrDataList = new LinkedHashMap<>();
         // Initialise the vrdatalist with the input data
         for (StoragePoolJoinVO vr : pools) {
             StoragePoolResponse vrData = vrDataList.get(vr.getId());
             if (vrData == null) {
                 // first time encountering this vm
-                vrData = ApiDBUtils.newStoragePoolResponse(vr);
+                vrData = ApiDBUtils.newStoragePoolResponse(vr, customStats);
             } else {
                 // update tags
                 vrData = ApiDBUtils.fillStoragePoolDetails(vrData, vr);
diff --git a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
index 26ee3f0..6e0b594 100644
--- a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
+++ b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
@@ -28,7 +28,7 @@
 
 public interface StoragePoolJoinDao extends GenericDao<StoragePoolJoinVO, Long> {
 
-    StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO host);
+    StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO host, boolean customStats);
 
     StoragePoolResponse setStoragePoolResponse(StoragePoolResponse response, StoragePoolJoinVO host);
 
diff --git a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
index 14de5ff..e5c1164 100644
--- a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
@@ -50,6 +50,9 @@
 import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.Map;
 
 @Component
 public class StoragePoolJoinDaoImpl extends GenericDaoBase<StoragePoolJoinVO, Long> implements StoragePoolJoinDao {
@@ -100,7 +103,7 @@
     }
 
     @Override
-    public StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO pool) {
+    public StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO pool, boolean customStats) {
         StoragePool storagePool = storagePoolDao.findById(pool.getId());
         StoragePoolResponse poolResponse = new StoragePoolResponse();
         poolResponse.setId(pool.getUuid());
@@ -147,6 +150,13 @@
             PrimaryDataStoreDriver driver = (PrimaryDataStoreDriver) store.getDriver();
             long usedIops = driver.getUsedIops(storagePool);
             poolResponse.setAllocatedIops(usedIops);
+
+            if (customStats && driver.poolProvidesCustomStorageStats()) {
+                Map<String, String> storageCustomStats = driver.getCustomStorageStats(storagePool);
+                if (MapUtils.isNotEmpty(storageCustomStats)) {
+                    poolResponse.setCustomStats(storageCustomStats);
+                }
+            }
         }
 
         // TODO: StatsCollector does not persist data
diff --git a/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java b/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
index 71977e7..dc302ba 100644
--- a/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
+++ b/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
@@ -1652,6 +1652,15 @@
         }
 
         logger.debug("Host: " + host.getId() + (hostCanAccessSPool ? " can" : " cannot") + " access pool: " + pool.getId());
+        if (!hostCanAccessSPool) {
+            if (_storageMgr.canHostPrepareStoragePoolAccess(host, pool)) {
+                logger.debug("Host: " + host.getId() + " can prepare access to pool: " + pool.getId());
+                hostCanAccessSPool = true;
+            } else {
+                logger.debug("Host: " + host.getId() + " cannot prepare access to pool: " + pool.getId());
+            }
+        }
+
         return hostCanAccessSPool;
     }
 
diff --git a/server/src/main/java/com/cloud/network/IpAddressManagerImpl.java b/server/src/main/java/com/cloud/network/IpAddressManagerImpl.java
index 6902068..770c919 100644
--- a/server/src/main/java/com/cloud/network/IpAddressManagerImpl.java
+++ b/server/src/main/java/com/cloud/network/IpAddressManagerImpl.java
@@ -717,50 +717,59 @@
     @Override
     @DB
     public boolean disassociatePublicIpAddress(long addrId, long userId, Account caller) {
-
         boolean success = true;
-        IPAddressVO ipToBeDisassociated = _ipAddressDao.findById(addrId);
 
-        PublicIpQuarantine publicIpQuarantine = null;
-        // Cleanup all ip address resources - PF/LB/Static nat rules
-        if (!cleanupIpResources(addrId, userId, caller)) {
-            success = false;
-            logger.warn("Failed to release resources for ip address id=" + addrId);
-        }
+        try {
+            IPAddressVO ipToBeDisassociated = _ipAddressDao.acquireInLockTable(addrId);
 
-        IPAddressVO ip = markIpAsUnavailable(addrId);
-        if (ip == null) {
-            return true;
-        }
+            if (ipToBeDisassociated == null) {
+                logger.error(String.format("Unable to acquire lock on public IP %s.", addrId));
+                throw new CloudRuntimeException("Unable to acquire lock on public IP.");
+            }
 
-        if (logger.isDebugEnabled()) {
-            logger.debug("Releasing ip id=" + addrId + "; sourceNat = " + ip.isSourceNat());
-        }
+            PublicIpQuarantine publicIpQuarantine = null;
+            // Cleanup all ip address resources - PF/LB/Static nat rules
+            if (!cleanupIpResources(addrId, userId, caller)) {
+                success = false;
+                logger.warn("Failed to release resources for ip address id=" + addrId);
+            }
 
-        if (ip.getAssociatedWithNetworkId() != null) {
-            Network network = _networksDao.findById(ip.getAssociatedWithNetworkId());
-            try {
-                if (!applyIpAssociations(network, rulesContinueOnErrFlag)) {
-                    logger.warn("Unable to apply ip address associations for " + network);
-                    success = false;
+            IPAddressVO ip = markIpAsUnavailable(addrId);
+            if (ip == null) {
+                return true;
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Releasing ip id=" + addrId + "; sourceNat = " + ip.isSourceNat());
+            }
+
+            if (ip.getAssociatedWithNetworkId() != null) {
+                Network network = _networksDao.findById(ip.getAssociatedWithNetworkId());
+                try {
+                    if (!applyIpAssociations(network, rulesContinueOnErrFlag)) {
+                        logger.warn("Unable to apply ip address associations for " + network);
+                        success = false;
+                    }
+                } catch (ResourceUnavailableException e) {
+                    throw new CloudRuntimeException("We should never get to here because we used true when applyIpAssociations", e);
                 }
-            } catch (ResourceUnavailableException e) {
-                throw new CloudRuntimeException("We should never get to here because we used true when applyIpAssociations", e);
+            } else if (ip.getState() == State.Releasing) {
+                publicIpQuarantine = addPublicIpAddressToQuarantine(ipToBeDisassociated, caller.getDomainId());
+                _ipAddressDao.unassignIpAddress(ip.getId());
             }
-        } else if (ip.getState() == State.Releasing) {
-            publicIpQuarantine = addPublicIpAddressToQuarantine(ipToBeDisassociated, caller.getDomainId());
-            _ipAddressDao.unassignIpAddress(ip.getId());
-        }
 
-        annotationDao.removeByEntityType(AnnotationService.EntityType.PUBLIC_IP_ADDRESS.name(), ip.getUuid());
+            annotationDao.removeByEntityType(AnnotationService.EntityType.PUBLIC_IP_ADDRESS.name(), ip.getUuid());
 
-        if (success) {
-            if (ip.isPortable()) {
-                releasePortableIpAddress(addrId);
+            if (success) {
+                if (ip.isPortable()) {
+                    releasePortableIpAddress(addrId);
+                }
+                logger.debug("Released a public ip id=" + addrId);
+            } else if (publicIpQuarantine != null) {
+                removePublicIpAddressFromQuarantine(publicIpQuarantine.getId(), "Public IP address removed from quarantine as there was an error while disassociating it.");
             }
-            logger.debug("Released a public ip id=" + addrId);
-        } else if (publicIpQuarantine != null) {
-            removePublicIpAddressFromQuarantine(publicIpQuarantine.getId(), "Public IP address removed from quarantine as there was an error while disassociating it.");
+        } finally {
+            _ipAddressDao.releaseFromLockTable(addrId);
         }
 
         return success;
diff --git a/server/src/main/java/com/cloud/network/NetworkModelImpl.java b/server/src/main/java/com/cloud/network/NetworkModelImpl.java
index 11c3042..aadce94 100644
--- a/server/src/main/java/com/cloud/network/NetworkModelImpl.java
+++ b/server/src/main/java/com/cloud/network/NetworkModelImpl.java
@@ -1598,6 +1598,10 @@
         }
 
         NetworkVO network = _networksDao.findById(networkId);
+        if (network == null) {
+            throw new CloudRuntimeException("Could not find network associated with public IP.");
+        }
+
         NetworkOfferingVO offering = _networkOfferingDao.findById(network.getNetworkOfferingId());
         if (offering.getGuestType() != GuestType.Isolated) {
             return true;
diff --git a/server/src/main/java/com/cloud/network/firewall/FirewallManagerImpl.java b/server/src/main/java/com/cloud/network/firewall/FirewallManagerImpl.java
index e9a9352..0a4ef98 100644
--- a/server/src/main/java/com/cloud/network/firewall/FirewallManagerImpl.java
+++ b/server/src/main/java/com/cloud/network/firewall/FirewallManagerImpl.java
@@ -202,57 +202,54 @@
         return createFirewallRule(sourceIpAddressId, caller, rule.getXid(), rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol(),
             rule.getSourceCidrList(), null, rule.getIcmpCode(), rule.getIcmpType(), null, rule.getType(), rule.getNetworkId(), rule.getTrafficType(), rule.isDisplay());
     }
+
     //Destination CIDR capability is currently implemented for egress rules only. For others, the field is passed as null.
     @DB
-    protected FirewallRule createFirewallRule(final Long ipAddrId, Account caller, final String xId, final Integer portStart, final Integer portEnd,
-        final String protocol, final List<String> sourceCidrList, final List<String> destCidrList, final Integer icmpCode, final Integer icmpType, final Long relatedRuleId,
- final FirewallRule.FirewallRuleType type,
-            final Long networkId, final FirewallRule.TrafficType trafficType, final Boolean forDisplay) throws NetworkRuleConflictException {
-
+    protected FirewallRule createFirewallRule(final Long ipAddrId, Account caller, final String xId, final Integer portStart, final Integer portEnd, final String protocol,
+                                              final List<String> sourceCidrList, final List<String> destCidrList, final Integer icmpCode, final Integer icmpType, final Long relatedRuleId,
+                                              final FirewallRule.FirewallRuleType type, final Long networkId, final FirewallRule.TrafficType trafficType, final Boolean forDisplay) throws NetworkRuleConflictException {
         IPAddressVO ipAddress = null;
-        if (ipAddrId != null) {
-            // this for ingress firewall rule, for egress id is null
-             ipAddress = _ipAddressDao.findById(ipAddrId);
-        // Validate ip address
-        if (ipAddress == null && type == FirewallRule.FirewallRuleType.User) {
-                throw new InvalidParameterValueException("Unable to create firewall rule; " + "couldn't locate IP address by id in the system");
-        }
-        _networkModel.checkIpForService(ipAddress, Service.Firewall, null);
-        }
+        try {
+            // Validate ip address
+            if (ipAddrId != null) {
+                // this for ingress firewall rule, for egress id is null
+                ipAddress = _ipAddressDao.acquireInLockTable(ipAddrId);
+                if (ipAddress == null) {
+                    throw new InvalidParameterValueException("Unable to create firewall rule; " + "couldn't locate IP address by id in the system");
+                }
+                _networkModel.checkIpForService(ipAddress, Service.Firewall, null);
+            }
 
-        validateFirewallRule(caller, ipAddress, portStart, portEnd, protocol, Purpose.Firewall, type, networkId, trafficType);
+            validateFirewallRule(caller, ipAddress, portStart, portEnd, protocol, Purpose.Firewall, type, networkId, trafficType);
 
-        // icmp code and icmp type can't be passed in for any other protocol rather than icmp
-        if (!protocol.equalsIgnoreCase(NetUtils.ICMP_PROTO) && (icmpCode != null || icmpType != null)) {
-            throw new InvalidParameterValueException("Can specify icmpCode and icmpType for ICMP protocol only");
-        }
+            // icmp code and icmp type can't be passed in for any other protocol rather than icmp
+            if (!protocol.equalsIgnoreCase(NetUtils.ICMP_PROTO) && (icmpCode != null || icmpType != null)) {
+                throw new InvalidParameterValueException("Can specify icmpCode and icmpType for ICMP protocol only");
+            }
 
-        if (protocol.equalsIgnoreCase(NetUtils.ICMP_PROTO) && (portStart != null || portEnd != null)) {
-            throw new InvalidParameterValueException("Can't specify start/end port when protocol is ICMP");
-        }
+            if (protocol.equalsIgnoreCase(NetUtils.ICMP_PROTO) && (portStart != null || portEnd != null)) {
+                throw new InvalidParameterValueException("Can't specify start/end port when protocol is ICMP");
+            }
 
-        Long accountId = null;
-        Long domainId = null;
+            Long accountId = null;
+            Long domainId = null;
 
-        if (ipAddress != null) {
-            //Ingress firewall rule
-            accountId = ipAddress.getAllocatedToAccountId();
-            domainId = ipAddress.getAllocatedInDomainId();
-        } else if (networkId != null) {
-            //egress firewall rule
+            if (ipAddress != null) {
+                //Ingress firewall rule
+                accountId = ipAddress.getAllocatedToAccountId();
+                domainId = ipAddress.getAllocatedInDomainId();
+            } else if (networkId != null) {
+                //egress firewall rule
                 Network network = _networkModel.getNetwork(networkId);
                 accountId = network.getAccountId();
                 domainId = network.getDomainId();
-        }
+            }
 
-        final Long accountIdFinal = accountId;
-        final Long domainIdFinal = domainId;
-        return Transaction.execute(new TransactionCallbackWithException<FirewallRuleVO, NetworkRuleConflictException>() {
-            @Override
-            public FirewallRuleVO doInTransaction(TransactionStatus status) throws NetworkRuleConflictException {
-                FirewallRuleVO newRule =
-                        new FirewallRuleVO(xId, ipAddrId, portStart, portEnd, protocol.toLowerCase(), networkId, accountIdFinal, domainIdFinal, Purpose.Firewall,
-                                sourceCidrList, destCidrList, icmpCode, icmpType, relatedRuleId, trafficType);
+            final Long accountIdFinal = accountId;
+            final Long domainIdFinal = domainId;
+            return Transaction.execute((TransactionCallbackWithException<FirewallRuleVO, NetworkRuleConflictException>) status -> {
+                FirewallRuleVO newRule = new FirewallRuleVO(xId, ipAddrId, portStart, portEnd, protocol.toLowerCase(), networkId, accountIdFinal, domainIdFinal, Purpose.Firewall,
+                        sourceCidrList, destCidrList, icmpCode, icmpType, relatedRuleId, trafficType);
                 newRule.setType(type);
                 if (forDisplay != null) {
                     newRule.setDisplay(forDisplay);
@@ -269,8 +266,12 @@
                 CallContext.current().putContextParameter(FirewallRule.class, newRule.getId());
 
                 return newRule;
+            });
+        } finally {
+            if (ipAddrId != null) {
+                _ipAddressDao.releaseFromLockTable(ipAddrId);
             }
-        });
+        }
     }
 
     @Override
@@ -676,9 +677,19 @@
     }
 
     @Override
+    @DB
     public boolean applyIngressFirewallRules(long ipId, Account caller) throws ResourceUnavailableException {
-        List<FirewallRuleVO> rules = _firewallDao.listByIpAndPurpose(ipId, Purpose.Firewall);
-        return applyFirewallRules(rules, false, caller);
+        try {
+            IPAddressVO ipAddress = _ipAddressDao.acquireInLockTable(ipId);
+            if (ipAddress == null) {
+                logger.error(String.format("Unable to acquire lock for public IP [%s].", ipId));
+                throw new CloudRuntimeException("Unable to acquire lock for public IP.");
+            }
+            List<FirewallRuleVO> rules = _firewallDao.listByIpAndPurpose(ipId, Purpose.Firewall);
+            return applyFirewallRules(rules, false, caller);
+        } finally {
+            _ipAddressDao.releaseFromLockTable(ipId);
+        }
     }
 
     @Override
diff --git a/server/src/main/java/com/cloud/network/lb/LoadBalancingRulesManagerImpl.java b/server/src/main/java/com/cloud/network/lb/LoadBalancingRulesManagerImpl.java
index 844c3c1..6edb9c4 100644
--- a/server/src/main/java/com/cloud/network/lb/LoadBalancingRulesManagerImpl.java
+++ b/server/src/main/java/com/cloud/network/lb/LoadBalancingRulesManagerImpl.java
@@ -1812,13 +1812,12 @@
         }
         return cidr;
     }
+
     @DB
     @Override
-    public LoadBalancer createPublicLoadBalancer(final String xId, final String name, final String description, final int srcPort, final int destPort,
- final long sourceIpId,
-            final String protocol, final String algorithm, final boolean openFirewall, final CallContext caller, final String lbProtocol, final Boolean forDisplay, String cidrList)
-            throws NetworkRuleConflictException {
-
+    public LoadBalancer createPublicLoadBalancer(final String xId, final String name, final String description, final int srcPort, final int destPort, final long sourceIpId,
+                                                 final String protocol, final String algorithm, final boolean openFirewall, final CallContext caller, final String lbProtocol,
+                                                 final Boolean forDisplay, String cidrList) throws NetworkRuleConflictException {
         if (!NetUtils.isValidPort(destPort)) {
             throw new InvalidParameterValueException("privatePort is an invalid value: " + destPort);
         }
@@ -1827,55 +1826,41 @@
             throw new InvalidParameterValueException("Invalid algorithm: " + algorithm);
         }
 
-        final IPAddressVO ipAddr = _ipAddressDao.findById(sourceIpId);
-        // make sure ip address exists
-        if (ipAddr == null || !ipAddr.readyToUse()) {
-            InvalidParameterValueException ex = new InvalidParameterValueException("Unable to create load balancer rule, invalid IP address id specified");
-            if (ipAddr == null) {
-                ex.addProxyObject(String.valueOf(sourceIpId), "sourceIpId");
-            } else {
+        try {
+            final IPAddressVO ipAddr = _ipAddressDao.acquireInLockTable(sourceIpId);
+
+            // make sure ip address exists
+            if (ipAddr == null || !ipAddr.readyToUse()) {
+                InvalidParameterValueException ex = new InvalidParameterValueException("Unable to create load balancer rule, invalid IP address id specified");
+                if (ipAddr == null) {
+                    ex.addProxyObject(String.valueOf(sourceIpId), "sourceIpId");
+                } else {
+                    ex.addProxyObject(ipAddr.getUuid(), "sourceIpId");
+                }
+                throw ex;
+            } else if (ipAddr.isOneToOneNat()) {
+                InvalidParameterValueException ex = new InvalidParameterValueException("Unable to create load balancer rule; specified sourceip id has static nat enabled");
                 ex.addProxyObject(ipAddr.getUuid(), "sourceIpId");
+                throw ex;
             }
-            throw ex;
-        } else if (ipAddr.isOneToOneNat()) {
-            InvalidParameterValueException ex = new InvalidParameterValueException("Unable to create load balancer rule; specified sourceip id has static nat enabled");
-            ex.addProxyObject(ipAddr.getUuid(), "sourceIpId");
-            throw ex;
-        }
 
-        _accountMgr.checkAccess(caller.getCallingAccount(), null, true, ipAddr);
+            _accountMgr.checkAccess(caller.getCallingAccount(), null, true, ipAddr);
 
-        final Long networkId = ipAddr.getAssociatedWithNetworkId();
-        if (networkId == null) {
-            InvalidParameterValueException ex =
-                new InvalidParameterValueException("Unable to create load balancer rule ; specified sourceip id is not associated with any network");
-            ex.addProxyObject(ipAddr.getUuid(), "sourceIpId");
-            throw ex;
-        }
+            final Long networkId = ipAddr.getAssociatedWithNetworkId();
+            if (networkId == null) {
+                InvalidParameterValueException ex =
+                        new InvalidParameterValueException("Unable to create load balancer rule ; specified sourceip id is not associated with any network");
+                ex.addProxyObject(ipAddr.getUuid(), "sourceIpId");
+                throw ex;
+            }
 
-        // verify that lb service is supported by the network
-        isLbServiceSupportedInNetwork(networkId, Scheme.Public);
+            // verify that lb service is supported by the network
+            isLbServiceSupportedInNetwork(networkId, Scheme.Public);
 
-        _firewallMgr.validateFirewallRule(caller.getCallingAccount(), ipAddr, srcPort, srcPort, protocol, Purpose.LoadBalancing, FirewallRuleType.User, networkId, null);
+            _firewallMgr.validateFirewallRule(caller.getCallingAccount(), ipAddr, srcPort, srcPort, protocol, Purpose.LoadBalancing, FirewallRuleType.User, networkId, null);
 
-        LoadBalancerVO newRule =
-            new LoadBalancerVO(xId, name, description, sourceIpId, srcPort, destPort, algorithm, networkId, ipAddr.getAllocatedToAccountId(),
-                ipAddr.getAllocatedInDomainId(), lbProtocol, cidrList);
-
-        // verify rule is supported by Lb provider of the network
-        Ip sourceIp = getSourceIp(newRule);
-        LoadBalancingRule loadBalancing =
-            new LoadBalancingRule(newRule, new ArrayList<LbDestination>(), new ArrayList<LbStickinessPolicy>(), new ArrayList<LbHealthCheckPolicy>(), sourceIp, null,
-                lbProtocol);
-        if (!validateLbRule(loadBalancing)) {
-            throw new InvalidParameterValueException("LB service provider cannot support this rule");
-        }
-
-        return Transaction.execute(new TransactionCallbackWithException<LoadBalancerVO, NetworkRuleConflictException>() {
-            @Override
-            public LoadBalancerVO doInTransaction(TransactionStatus status) throws NetworkRuleConflictException {
-                LoadBalancerVO newRule =
-                    new LoadBalancerVO(xId, name, description, sourceIpId, srcPort, destPort, algorithm, networkId, ipAddr.getAllocatedToAccountId(),
+            return Transaction.execute((TransactionCallbackWithException<LoadBalancerVO, NetworkRuleConflictException>) status -> {
+                LoadBalancerVO newRule = new LoadBalancerVO(xId, name, description, sourceIpId, srcPort, destPort, algorithm, networkId, ipAddr.getAllocatedToAccountId(),
                         ipAddr.getAllocatedInDomainId(), lbProtocol, cidrList);
 
                 if (forDisplay != null) {
@@ -1884,9 +1869,7 @@
 
                 // verify rule is supported by Lb provider of the network
                 Ip sourceIp = getSourceIp(newRule);
-                LoadBalancingRule loadBalancing =
-                    new LoadBalancingRule(newRule, new ArrayList<LbDestination>(), new ArrayList<LbStickinessPolicy>(), new ArrayList<LbHealthCheckPolicy>(), sourceIp,
-                        null, lbProtocol);
+                LoadBalancingRule loadBalancing = new LoadBalancingRule(newRule, new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), sourceIp, null, lbProtocol);
                 if (!validateLbRule(loadBalancing)) {
                     throw new InvalidParameterValueException("LB service provider cannot support this rule");
                 }
@@ -1906,10 +1889,10 @@
                         throw new CloudRuntimeException("Unable to update the state to add for " + newRule);
                     }
                     logger.debug("Load balancer " + newRule.getId() + " for Ip address id=" + sourceIpId + ", public port " + srcPort + ", private port " + destPort +
-                        " is added successfully.");
+                            " is added successfully.");
                     CallContext.current().setEventDetails("Load balancer Id: " + newRule.getId());
                     UsageEventUtils.publishUsageEvent(EventTypes.EVENT_LOAD_BALANCER_CREATE, ipAddr.getAllocatedToAccountId(), ipAddr.getDataCenterId(), newRule.getId(),
-                        null, LoadBalancingRule.class.getName(), newRule.getUuid());
+                            null, LoadBalancingRule.class.getName(), newRule.getUuid());
 
                     return newRule;
                 } catch (Exception e) {
@@ -1924,9 +1907,10 @@
                         removeLBRule(newRule);
                     }
                 }
-            }
-        });
-
+            });
+        } finally {
+            _ipAddressDao.releaseFromLockTable(sourceIpId);
+        }
     }
 
     @Override
diff --git a/server/src/main/java/com/cloud/network/rules/RulesManagerImpl.java b/server/src/main/java/com/cloud/network/rules/RulesManagerImpl.java
index 15d1db4..55f7609 100644
--- a/server/src/main/java/com/cloud/network/rules/RulesManagerImpl.java
+++ b/server/src/main/java/com/cloud/network/rules/RulesManagerImpl.java
@@ -205,124 +205,122 @@
 
         final Long ipAddrId = rule.getSourceIpAddressId();
 
-        IPAddressVO ipAddress = _ipAddressDao.findById(ipAddrId);
-
-        // Validate ip address
-        if (ipAddress == null) {
-            throw new InvalidParameterValueException("Unable to create port forwarding rule; ip id=" + ipAddrId + " doesn't exist in the system");
-        } else if (ipAddress.isOneToOneNat()) {
-            throw new InvalidParameterValueException("Unable to create port forwarding rule; ip id=" + ipAddrId + " has static nat enabled");
-        }
-
-        final Long networkId = rule.getNetworkId();
-        Network network = _networkModel.getNetwork(networkId);
-        //associate ip address to network (if needed)
-        boolean performedIpAssoc = false;
-        Nic guestNic;
-        if (ipAddress.getAssociatedWithNetworkId() == null) {
-            boolean assignToVpcNtwk = network.getVpcId() != null && ipAddress.getVpcId() != null && ipAddress.getVpcId().longValue() == network.getVpcId();
-            if (assignToVpcNtwk) {
-                _networkModel.checkIpForService(ipAddress, Service.PortForwarding, networkId);
-
-                logger.debug("The ip is not associated with the VPC network id=" + networkId + ", so assigning");
-                try {
-                    ipAddress = _ipAddrMgr.associateIPToGuestNetwork(ipAddrId, networkId, false);
-                    performedIpAssoc = true;
-                } catch (Exception ex) {
-                    throw new CloudRuntimeException("Failed to associate ip to VPC network as " + "a part of port forwarding rule creation");
-                }
-            }
-        } else {
-            _networkModel.checkIpForService(ipAddress, Service.PortForwarding, null);
-        }
-
-        if (ipAddress.getAssociatedWithNetworkId() == null) {
-            throw new InvalidParameterValueException("Ip address " + ipAddress + " is not assigned to the network " + network);
-        }
-
         try {
-            _firewallMgr.validateFirewallRule(caller, ipAddress, rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol(), Purpose.PortForwarding,
-                FirewallRuleType.User, networkId, rule.getTrafficType());
+            IPAddressVO ipAddress = _ipAddressDao.acquireInLockTable(ipAddrId);
 
-            final Long accountId = ipAddress.getAllocatedToAccountId();
-            final Long domainId = ipAddress.getAllocatedInDomainId();
-
-            // start port can't be bigger than end port
-            if (rule.getDestinationPortStart() > rule.getDestinationPortEnd()) {
-                throw new InvalidParameterValueException("Start port can't be bigger than end port");
+            // Validate ip address
+            if (ipAddress == null) {
+                throw new InvalidParameterValueException("Unable to create port forwarding rule; ip id=" + ipAddrId + " doesn't exist in the system");
+            } else if (ipAddress.isOneToOneNat()) {
+                throw new InvalidParameterValueException("Unable to create port forwarding rule; ip id=" + ipAddrId + " has static nat enabled");
             }
 
-            // check that the port ranges are of equal size
-            if ((rule.getDestinationPortEnd() - rule.getDestinationPortStart()) != (rule.getSourcePortEnd() - rule.getSourcePortStart())) {
-                throw new InvalidParameterValueException("Source port and destination port ranges should be of equal sizes.");
-            }
+            final Long networkId = rule.getNetworkId();
+            Network network = _networkModel.getNetwork(networkId);
+            //associate ip address to network (if needed)
+            boolean performedIpAssoc = false;
+            Nic guestNic;
+            if (ipAddress.getAssociatedWithNetworkId() == null) {
+                boolean assignToVpcNtwk = network.getVpcId() != null && ipAddress.getVpcId() != null && ipAddress.getVpcId().longValue() == network.getVpcId();
+                if (assignToVpcNtwk) {
+                    _networkModel.checkIpForService(ipAddress, Service.PortForwarding, networkId);
 
-            // validate user VM exists
-            UserVm vm = _vmDao.findById(vmId);
-            if (vm == null) {
-                throw new InvalidParameterValueException("Unable to create port forwarding rule on address " + ipAddress + ", invalid virtual machine id specified (" +
-                    vmId + ").");
-            } else if (vm.getState() == VirtualMachine.State.Destroyed || vm.getState() == VirtualMachine.State.Expunging) {
-                throw new InvalidParameterValueException("Invalid user vm: " + vm.getId());
-            }
-
-            // Verify that vm has nic in the network
-            Ip dstIp = rule.getDestinationIpAddress();
-            guestNic = _networkModel.getNicInNetwork(vmId, networkId);
-            if (guestNic == null || guestNic.getIPv4Address() == null) {
-                throw new InvalidParameterValueException("Vm doesn't belong to network associated with ipAddress");
-            } else {
-                dstIp = new Ip(guestNic.getIPv4Address());
-            }
-
-            if (vmIp != null) {
-                //vm ip is passed so it can be primary or secondary ip addreess.
-                if (!dstIp.equals(vmIp)) {
-                    //the vm ip is secondary ip to the nic.
-                    // is vmIp is secondary ip or not
-                    NicSecondaryIp secondaryIp = _nicSecondaryDao.findByIp4AddressAndNicId(vmIp.toString(), guestNic.getId());
-                    if (secondaryIp == null) {
-                        throw new InvalidParameterValueException("IP Address is not in the VM nic's network ");
+                    logger.debug("The ip is not associated with the VPC network id=" + networkId + ", so assigning");
+                    try {
+                        ipAddress = _ipAddrMgr.associateIPToGuestNetwork(ipAddrId, networkId, false);
+                        performedIpAssoc = true;
+                    } catch (Exception ex) {
+                        throw new CloudRuntimeException("Failed to associate ip to VPC network as " + "a part of port forwarding rule creation");
                     }
-                    dstIp = vmIp;
                 }
+            } else {
+                _networkModel.checkIpForService(ipAddress, Service.PortForwarding, null);
             }
 
-            //if start port and end port are passed in, and they are not equal to each other, perform the validation
-            boolean validatePortRange = false;
-            if (rule.getSourcePortStart().intValue() != rule.getSourcePortEnd().intValue() || rule.getDestinationPortStart() != rule.getDestinationPortEnd()) {
-                validatePortRange = true;
+            if (ipAddress.getAssociatedWithNetworkId() == null) {
+                throw new InvalidParameterValueException("Ip address " + ipAddress + " is not assigned to the network " + network);
             }
 
-            if (validatePortRange) {
-                //source start port and source dest port should be the same. The same applies to dest ports
-                if (rule.getSourcePortStart().intValue() != rule.getDestinationPortStart()) {
-                    throw new InvalidParameterValueException("Private port start should be equal to public port start");
+            try {
+                _firewallMgr.validateFirewallRule(caller, ipAddress, rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol(), Purpose.PortForwarding,
+                        FirewallRuleType.User, networkId, rule.getTrafficType());
+
+                final Long accountId = ipAddress.getAllocatedToAccountId();
+                final Long domainId = ipAddress.getAllocatedInDomainId();
+
+                // start port can't be bigger than end port
+                if (rule.getDestinationPortStart() > rule.getDestinationPortEnd()) {
+                    throw new InvalidParameterValueException("Start port can't be bigger than end port");
                 }
 
-                if (rule.getSourcePortEnd().intValue() != rule.getDestinationPortEnd()) {
-                    throw new InvalidParameterValueException("Private port end should be equal to public port end");
+                // check that the port ranges are of equal size
+                if ((rule.getDestinationPortEnd() - rule.getDestinationPortStart()) != (rule.getSourcePortEnd() - rule.getSourcePortStart())) {
+                    throw new InvalidParameterValueException("Source port and destination port ranges should be of equal sizes.");
                 }
-            }
 
-            final Ip dstIpFinal = dstIp;
-            final IPAddressVO ipAddressFinal = ipAddress;
-            return Transaction.execute(new TransactionCallbackWithException<PortForwardingRuleVO, NetworkRuleConflictException>() {
-                @Override
-                public PortForwardingRuleVO doInTransaction(TransactionStatus status) throws NetworkRuleConflictException {
+                // validate user VM exists
+                UserVm vm = _vmDao.findById(vmId);
+                if (vm == null) {
+                    throw new InvalidParameterValueException("Unable to create port forwarding rule on address " + ipAddress + ", invalid virtual machine id specified (" +
+                            vmId + ").");
+                } else if (vm.getState() == VirtualMachine.State.Destroyed || vm.getState() == VirtualMachine.State.Expunging) {
+                    throw new InvalidParameterValueException("Invalid user vm: " + vm.getId());
+                }
+
+                // Verify that vm has nic in the network
+                Ip dstIp = rule.getDestinationIpAddress();
+                guestNic = _networkModel.getNicInNetwork(vmId, networkId);
+                if (guestNic == null || guestNic.getIPv4Address() == null) {
+                    throw new InvalidParameterValueException("Vm doesn't belong to network associated with ipAddress");
+                } else {
+                    dstIp = new Ip(guestNic.getIPv4Address());
+                }
+
+                if (vmIp != null) {
+                    //vm ip is passed so it can be primary or secondary ip addreess.
+                    if (!dstIp.equals(vmIp)) {
+                        //the vm ip is secondary ip to the nic.
+                        // is vmIp is secondary ip or not
+                        NicSecondaryIp secondaryIp = _nicSecondaryDao.findByIp4AddressAndNicId(vmIp.toString(), guestNic.getId());
+                        if (secondaryIp == null) {
+                            throw new InvalidParameterValueException("IP Address is not in the VM nic's network ");
+                        }
+                        dstIp = vmIp;
+                    }
+                }
+
+                //if start port and end port are passed in, and they are not equal to each other, perform the validation
+                boolean validatePortRange = false;
+                if (rule.getSourcePortStart().intValue() != rule.getSourcePortEnd().intValue() || rule.getDestinationPortStart() != rule.getDestinationPortEnd()) {
+                    validatePortRange = true;
+                }
+
+                if (validatePortRange) {
+                    //source start port and source dest port should be the same. The same applies to dest ports
+                    if (rule.getSourcePortStart().intValue() != rule.getDestinationPortStart()) {
+                        throw new InvalidParameterValueException("Private port start should be equal to public port start");
+                    }
+
+                    if (rule.getSourcePortEnd().intValue() != rule.getDestinationPortEnd()) {
+                        throw new InvalidParameterValueException("Private port end should be equal to public port end");
+                    }
+                }
+
+                final Ip dstIpFinal = dstIp;
+                final IPAddressVO ipAddressFinal = ipAddress;
+                return Transaction.execute((TransactionCallbackWithException<PortForwardingRuleVO, NetworkRuleConflictException>) status -> {
                     PortForwardingRuleVO newRule =
-                        new PortForwardingRuleVO(rule.getXid(), rule.getSourceIpAddressId(), rule.getSourcePortStart(), rule.getSourcePortEnd(), dstIpFinal,
-                            rule.getDestinationPortStart(), rule.getDestinationPortEnd(), rule.getProtocol().toLowerCase(), networkId, accountId, domainId, vmId);
+                            new PortForwardingRuleVO(rule.getXid(), rule.getSourceIpAddressId(), rule.getSourcePortStart(), rule.getSourcePortEnd(), dstIpFinal,
+                                    rule.getDestinationPortStart(), rule.getDestinationPortEnd(), rule.getProtocol().toLowerCase(), networkId, accountId, domainId, vmId);
 
                     if (forDisplay != null) {
                         newRule.setDisplay(forDisplay);
                     }
                     newRule = _portForwardingDao.persist(newRule);
-
                     // create firewallRule for 0.0.0.0/0 cidr
                     if (openFirewall) {
                         _firewallMgr.createRuleForAllCidrs(ipAddrId, caller, rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol(), null, null,
-                            newRule.getId(), networkId);
+                                newRule.getId(), networkId);
                     }
 
                     try {
@@ -332,7 +330,7 @@
                         }
                         CallContext.current().setEventDetails("Rule Id: " + newRule.getId());
                         UsageEventUtils.publishUsageEvent(EventTypes.EVENT_NET_RULE_ADD, newRule.getAccountId(), ipAddressFinal.getDataCenterId(), newRule.getId(), null,
-                            PortForwardingRule.class.getName(), newRule.getUuid());
+                                PortForwardingRule.class.getName(), newRule.getUuid());
                         return newRule;
                     } catch (Exception e) {
                         if (newRule != null) {
@@ -347,16 +345,17 @@
 
                         throw new CloudRuntimeException("Unable to add rule for the ip id=" + ipAddrId, e);
                     }
+                });
+            } finally {
+                // release ip address if ipassoc was perfored
+                if (performedIpAssoc) {
+                    //if the rule is the last one for the ip address assigned to VPC, unassign it from the network
+                    IpAddress ip = _ipAddressDao.findById(ipAddress.getId());
+                    _vpcMgr.unassignIPFromVpcNetwork(ip.getId(), networkId);
                 }
-            });
-
-        } finally {
-            // release ip address if ipassoc was perfored
-            if (performedIpAssoc) {
-                //if the rule is the last one for the ip address assigned to VPC, unassign it from the network
-                IpAddress ip = _ipAddressDao.findById(ipAddress.getId());
-                _vpcMgr.unassignIPFromVpcNetwork(ip.getId(), networkId);
             }
+        } finally {
+            _ipAddressDao.releaseFromLockTable(ipAddrId);
         }
     }
 
@@ -368,46 +367,44 @@
 
         final Long ipAddrId = rule.getSourceIpAddressId();
 
-        IPAddressVO ipAddress = _ipAddressDao.findById(ipAddrId);
+        try {
+            IPAddressVO ipAddress = _ipAddressDao.acquireInLockTable(ipAddrId);
 
-        // Validate ip address
-        if (ipAddress == null) {
-            throw new InvalidParameterValueException("Unable to create static nat rule; ip id=" + ipAddrId + " doesn't exist in the system");
-        } else if (ipAddress.isSourceNat() || !ipAddress.isOneToOneNat() || ipAddress.getAssociatedWithVmId() == null) {
-            throw new NetworkRuleConflictException("Can't do static nat on ip address: " + ipAddress.getAddress());
-        }
+            // Validate ip address
+            if (ipAddress == null) {
+                throw new InvalidParameterValueException("Unable to create static nat rule; ip id=" + ipAddrId + " doesn't exist in the system");
+            } else if (ipAddress.isSourceNat() || !ipAddress.isOneToOneNat() || ipAddress.getAssociatedWithVmId() == null) {
+                throw new NetworkRuleConflictException("Can't do static nat on ip address: " + ipAddress.getAddress());
+            }
 
-        _firewallMgr.validateFirewallRule(caller, ipAddress, rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol(), Purpose.StaticNat,
-            FirewallRuleType.User, null, rule.getTrafficType());
+            _firewallMgr.validateFirewallRule(caller, ipAddress, rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol(), Purpose.StaticNat,
+                    FirewallRuleType.User, null, rule.getTrafficType());
 
-        final Long networkId = ipAddress.getAssociatedWithNetworkId();
-        final Long accountId = ipAddress.getAllocatedToAccountId();
-        final Long domainId = ipAddress.getAllocatedInDomainId();
+            final Long networkId = ipAddress.getAssociatedWithNetworkId();
+            final Long accountId = ipAddress.getAllocatedToAccountId();
+            final Long domainId = ipAddress.getAllocatedInDomainId();
 
-        _networkModel.checkIpForService(ipAddress, Service.StaticNat, null);
+            _networkModel.checkIpForService(ipAddress, Service.StaticNat, null);
 
-        Network network = _networkModel.getNetwork(networkId);
-        NetworkOffering off = _entityMgr.findById(NetworkOffering.class, network.getNetworkOfferingId());
-        if (off.isElasticIp()) {
-            throw new InvalidParameterValueException("Can't create ip forwarding rules for the network where elasticIP service is enabled");
-        }
+            Network network = _networkModel.getNetwork(networkId);
+            NetworkOffering off = _entityMgr.findById(NetworkOffering.class, network.getNetworkOfferingId());
+            if (off.isElasticIp()) {
+                throw new InvalidParameterValueException("Can't create ip forwarding rules for the network where elasticIP service is enabled");
+            }
 
-        //String dstIp = _networkModel.getIpInNetwork(ipAddress.getAssociatedWithVmId(), networkId);
-        final String dstIp = ipAddress.getVmIp();
-        return Transaction.execute(new TransactionCallbackWithException<StaticNatRule, NetworkRuleConflictException>() {
-            @Override
-            public StaticNatRule doInTransaction(TransactionStatus status) throws NetworkRuleConflictException {
-
+            //String dstIp = _networkModel.getIpInNetwork(ipAddress.getAssociatedWithVmId(), networkId);
+            final String dstIp = ipAddress.getVmIp();
+            return Transaction.execute((TransactionCallbackWithException<StaticNatRule, NetworkRuleConflictException>) status -> {
                 FirewallRuleVO newRule =
-                    new FirewallRuleVO(rule.getXid(), rule.getSourceIpAddressId(), rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol().toLowerCase(),
-                        networkId, accountId, domainId, rule.getPurpose(), null, null, null, null, null);
+                        new FirewallRuleVO(rule.getXid(), rule.getSourceIpAddressId(), rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol().toLowerCase(),
+                                networkId, accountId, domainId, rule.getPurpose(), null, null, null, null, null);
 
                 newRule = _firewallDao.persist(newRule);
 
                 // create firewallRule for 0.0.0.0/0 cidr
                 if (openFirewall) {
                     _firewallMgr.createRuleForAllCidrs(ipAddrId, caller, rule.getSourcePortStart(), rule.getSourcePortEnd(), rule.getProtocol(), null, null,
-                        newRule.getId(), networkId);
+                            newRule.getId(), networkId);
                 }
 
                 try {
@@ -417,11 +414,9 @@
                     }
                     CallContext.current().setEventDetails("Rule Id: " + newRule.getId());
                     UsageEventUtils.publishUsageEvent(EventTypes.EVENT_NET_RULE_ADD, newRule.getAccountId(), 0, newRule.getId(), null, FirewallRule.class.getName(),
-                        newRule.getUuid());
+                            newRule.getUuid());
 
-                    StaticNatRule staticNatRule = new StaticNatRuleImpl(newRule, dstIp);
-
-                    return staticNatRule;
+                    return new StaticNatRuleImpl(newRule, dstIp);
                 } catch (Exception e) {
                     if (newRule != null) {
                         // no need to apply the rule as it wasn't programmed on the backend yet
@@ -434,9 +429,10 @@
                     }
                     throw new CloudRuntimeException("Unable to add static nat rule for the ip id=" + newRule.getSourceIpAddressId(), e);
                 }
-            }
-        });
-
+            });
+        } finally {
+            _ipAddressDao.releaseFromLockTable(ipAddrId);
+        }
     }
 
     @Override
diff --git a/server/src/main/java/com/cloud/network/vpn/RemoteAccessVpnManagerImpl.java b/server/src/main/java/com/cloud/network/vpn/RemoteAccessVpnManagerImpl.java
index 6fdf549..6cef834 100644
--- a/server/src/main/java/com/cloud/network/vpn/RemoteAccessVpnManagerImpl.java
+++ b/server/src/main/java/com/cloud/network/vpn/RemoteAccessVpnManagerImpl.java
@@ -153,6 +153,7 @@
         return vpns;
     }
 
+
     @Override
     @DB
     public RemoteAccessVpn createRemoteAccessVpn(final long publicIpId, String ipRange, boolean openFirewall, final Boolean forDisplay) throws NetworkRuleConflictException {
@@ -170,92 +171,97 @@
             throw new InvalidParameterValueException("The Ip address is not ready to be used yet: " + ipAddr.getAddress());
         }
 
-        IPAddressVO ipAddress = _ipAddressDao.findById(publicIpId);
+        try {
+            IPAddressVO ipAddress = _ipAddressDao.acquireInLockTable(publicIpId);
 
-        Long networkId = ipAddress.getAssociatedWithNetworkId();
-        if (networkId != null) {
-            _networkMgr.checkIpForService(ipAddress, Service.Vpn, null);
-        }
-
-        final Long vpcId = ipAddress.getVpcId();
-        if (vpcId != null && ipAddress.isSourceNat()) {
-            assert networkId == null;
-            openFirewall = false;
-        }
-
-        final boolean openFirewallFinal = openFirewall;
-
-        if (networkId == null && vpcId == null) {
-            throw new InvalidParameterValueException("Unable to create remote access vpn for the ipAddress: " + ipAddr.getAddress().addr() +
-                    " as ip is not associated with any network or VPC");
-        }
-
-        RemoteAccessVpnVO vpnVO = _remoteAccessVpnDao.findByPublicIpAddress(publicIpId);
-
-        if (vpnVO != null) {
-            if (vpnVO.getState() == RemoteAccessVpn.State.Added) {
-                return vpnVO;
+            if (ipAddress == null) {
+                logger.error(String.format("Unable to acquire lock on public IP %s.", publicIpId));
+                throw new CloudRuntimeException("Unable to acquire lock on public IP.");
             }
 
-            throw new InvalidParameterValueException(String.format("A remote Access VPN already exists for the public IP address [%s].", ipAddr.getAddress().toString()));
-        }
+            Long networkId = ipAddress.getAssociatedWithNetworkId();
+            if (networkId != null) {
+                _networkMgr.checkIpForService(ipAddress, Service.Vpn, null);
+            }
 
-        if (ipRange == null) {
-            ipRange = RemoteAccessVpnClientIpRange.valueIn(ipAddr.getAccountId());
-        }
+            final Long vpcId = ipAddress.getVpcId();
+            if (vpcId != null && ipAddress.isSourceNat()) {
+                assert networkId == null;
+                openFirewall = false;
+            }
 
-        validateIpRange(ipRange, InvalidParameterValueException.class);
+            final boolean openFirewallFinal = openFirewall;
 
-        String[] range = ipRange.split("-");
+            if (networkId == null && vpcId == null) {
+                throw new InvalidParameterValueException("Unable to create remote access vpn for the ipAddress: " + ipAddr.getAddress().addr() +
+                        " as ip is not associated with any network or VPC");
+            }
 
-        Pair<String, Integer> cidr = null;
+            RemoteAccessVpnVO vpnVO = _remoteAccessVpnDao.findByPublicIpAddress(publicIpId);
 
-        if (networkId != null) {
-            long ipAddressOwner = ipAddr.getAccountId();
-            vpnVO = _remoteAccessVpnDao.findByAccountAndNetwork(ipAddressOwner, networkId);
             if (vpnVO != null) {
                 if (vpnVO.getState() == RemoteAccessVpn.State.Added) {
                     return vpnVO;
                 }
 
-                throw new InvalidParameterValueException(String.format("A remote access VPN already exists for the account [%s].", ipAddressOwner));
+                throw new InvalidParameterValueException(String.format("A remote Access VPN already exists for the public IP address [%s].", ipAddr.getAddress().toString()));
             }
-            Network network = _networkMgr.getNetwork(networkId);
-            if (!_networkMgr.areServicesSupportedInNetwork(network.getId(), Service.Vpn)) {
-                throw new InvalidParameterValueException("Vpn service is not supported in network id=" + ipAddr.getAssociatedWithNetworkId());
+
+            if (ipRange == null) {
+                ipRange = RemoteAccessVpnClientIpRange.valueIn(ipAddr.getAccountId());
             }
-            cidr = NetUtils.getCidr(network.getCidr());
-        } else {
-            Vpc vpc = _vpcDao.findById(vpcId);
-            cidr = NetUtils.getCidr(vpc.getCidr());
-        }
 
-        String[] guestIpRange = NetUtils.getIpRangeFromCidr(cidr.first(), cidr.second());
-        if (NetUtils.ipRangesOverlap(range[0], range[1], guestIpRange[0], guestIpRange[1])) {
-            throw new InvalidParameterValueException("Invalid ip range: " + ipRange + " overlaps with guest ip range " + guestIpRange[0] + "-" + guestIpRange[1]);
-        }
+            validateIpRange(ipRange, InvalidParameterValueException.class);
 
-        long startIp = NetUtils.ip2Long(range[0]);
-        final String newIpRange = NetUtils.long2Ip(++startIp) + "-" + range[1];
-        final String sharedSecret = PasswordGenerator.generatePresharedKey(_pskLength);
+            String[] range = ipRange.split("-");
 
-        return Transaction.execute(new TransactionCallbackWithException<RemoteAccessVpn, NetworkRuleConflictException>() {
-            @Override
-            public RemoteAccessVpn doInTransaction(TransactionStatus status) throws NetworkRuleConflictException {
+            Pair<String, Integer> cidr = null;
+
+            if (networkId != null) {
+                long ipAddressOwner = ipAddr.getAccountId();
+                vpnVO = _remoteAccessVpnDao.findByAccountAndNetwork(ipAddressOwner, networkId);
+                if (vpnVO != null) {
+                    if (vpnVO.getState() == RemoteAccessVpn.State.Added) {
+                        return vpnVO;
+                    }
+
+                    throw new InvalidParameterValueException(String.format("A remote access VPN already exists for the account [%s].", ipAddressOwner));
+                }
+                Network network = _networkMgr.getNetwork(networkId);
+                if (!_networkMgr.areServicesSupportedInNetwork(network.getId(), Service.Vpn)) {
+                    throw new InvalidParameterValueException("Vpn service is not supported in network id=" + ipAddr.getAssociatedWithNetworkId());
+                }
+                cidr = NetUtils.getCidr(network.getCidr());
+            } else {
+                Vpc vpc = _vpcDao.findById(vpcId);
+                cidr = NetUtils.getCidr(vpc.getCidr());
+            }
+
+            String[] guestIpRange = NetUtils.getIpRangeFromCidr(cidr.first(), cidr.second());
+            if (NetUtils.ipRangesOverlap(range[0], range[1], guestIpRange[0], guestIpRange[1])) {
+                throw new InvalidParameterValueException("Invalid ip range: " + ipRange + " overlaps with guest ip range " + guestIpRange[0] + "-" + guestIpRange[1]);
+            }
+
+            long startIp = NetUtils.ip2Long(range[0]);
+            final String newIpRange = NetUtils.long2Ip(++startIp) + "-" + range[1];
+            final String sharedSecret = PasswordGenerator.generatePresharedKey(_pskLength);
+
+            return Transaction.execute((TransactionCallbackWithException<RemoteAccessVpn, NetworkRuleConflictException>) status -> {
                 if (vpcId == null) {
                     _rulesMgr.reservePorts(ipAddr, NetUtils.UDP_PROTO, Purpose.Vpn, openFirewallFinal, caller, NetUtils.VPN_PORT, NetUtils.VPN_L2TP_PORT,
-                        NetUtils.VPN_NATT_PORT);
+                            NetUtils.VPN_NATT_PORT);
                 }
-                RemoteAccessVpnVO vpnVO =
-                    new RemoteAccessVpnVO(ipAddr.getAccountId(), ipAddr.getDomainId(), ipAddr.getAssociatedWithNetworkId(), publicIpId, vpcId, range[0], newIpRange,
-                        sharedSecret);
+                RemoteAccessVpnVO remoteAccessVpnVO = new RemoteAccessVpnVO(ipAddr.getAccountId(), ipAddr.getDomainId(), ipAddr.getAssociatedWithNetworkId(),
+                        publicIpId, vpcId, range[0], newIpRange, sharedSecret);
 
                 if (forDisplay != null) {
-                    vpnVO.setDisplay(forDisplay);
+                    remoteAccessVpnVO.setDisplay(forDisplay);
                 }
-                return _remoteAccessVpnDao.persist(vpnVO);
-            }
-        });
+                return _remoteAccessVpnDao.persist(remoteAccessVpnVO);
+            });
+        } finally {
+            _ipAddressDao.releaseFromLockTable(publicIpId);
+        }
     }
 
     private void validateRemoteAccessVpnConfiguration() throws ConfigurationException {
diff --git a/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java b/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java
index adbc833..fb73ac5 100755
--- a/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java
+++ b/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java
@@ -3410,6 +3410,26 @@
     }
 
     @Override
+    public List<HostVO> listAllHostsInOneZoneNotInClusterByHypervisor(final HypervisorType type, final long dcId, final long clusterId) {
+        final QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
+        sc.and(sc.entity().getHypervisorType(), Op.EQ, type);
+        sc.and(sc.entity().getDataCenterId(), Op.EQ, dcId);
+        sc.and(sc.entity().getClusterId(), Op.NEQ, clusterId);
+        sc.and(sc.entity().getStatus(), Op.EQ, Status.Up);
+        return sc.list();
+    }
+
+    @Override
+    public List<HostVO> listAllHostsInOneZoneNotInClusterByHypervisors(List<HypervisorType> types, final long dcId, final long clusterId) {
+        final QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
+        sc.and(sc.entity().getHypervisorType(), Op.IN, types);
+        sc.and(sc.entity().getDataCenterId(), Op.EQ, dcId);
+        sc.and(sc.entity().getClusterId(), Op.NEQ, clusterId);
+        sc.and(sc.entity().getStatus(), Op.EQ, Status.Up);
+        return sc.list();
+    }
+
+    @Override
     public boolean isGPUDeviceAvailable(final long hostId, final String groupName, final String vgpuType) {
         if(!listAvailableGPUDevice(hostId, groupName, vgpuType).isEmpty()) {
             return true;
diff --git a/server/src/main/java/com/cloud/server/ManagementServerImpl.java b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
index 0adcd83..a485cfe 100644
--- a/server/src/main/java/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
@@ -211,6 +211,7 @@
 import org.apache.cloudstack.api.command.admin.storage.AddImageStoreS3CMD;
 import org.apache.cloudstack.api.command.admin.storage.AddObjectStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.CancelPrimaryStorageMaintenanceCmd;
+import org.apache.cloudstack.api.command.admin.storage.ChangeStoragePoolScopeCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateSecondaryStagingStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeleteImageStoreCmd;
@@ -523,6 +524,7 @@
 import org.apache.cloudstack.api.command.user.vm.AddNicToVMCmd;
 import org.apache.cloudstack.api.command.user.vm.DeployVMCmd;
 import org.apache.cloudstack.api.command.user.vm.DestroyVMCmd;
+import org.apache.cloudstack.api.command.admin.vm.ListAffectedVmsForStorageScopeChangeCmd;
 import org.apache.cloudstack.api.command.user.vm.GetVMPasswordCmd;
 import org.apache.cloudstack.api.command.user.vm.ListNicsCmd;
 import org.apache.cloudstack.api.command.user.vm.ListVMsCmd;
@@ -3566,6 +3568,7 @@
         cmdList.add(UpgradeRouterCmd.class);
         cmdList.add(AddSwiftCmd.class);
         cmdList.add(CancelPrimaryStorageMaintenanceCmd.class);
+        cmdList.add(ChangeStoragePoolScopeCmd.class);
         cmdList.add(CreateStoragePoolCmd.class);
         cmdList.add(DeletePoolCmd.class);
         cmdList.add(ListSwiftsCmd.class);
@@ -4003,6 +4006,7 @@
         cmdList.add(CreateSecondaryStorageSelectorCmd.class);
         cmdList.add(UpdateSecondaryStorageSelectorCmd.class);
         cmdList.add(RemoveSecondaryStorageSelectorCmd.class);
+        cmdList.add(ListAffectedVmsForStorageScopeChangeCmd.class);
 
 
         // Out-of-band management APIs for admins
diff --git a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
index aa5c90e..aa6b75b 100644
--- a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
@@ -54,6 +54,7 @@
 import org.apache.cloudstack.annotation.dao.AnnotationDao;
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.command.admin.storage.CancelPrimaryStorageMaintenanceCmd;
+import org.apache.cloudstack.api.command.admin.storage.ChangeStoragePoolScopeCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateSecondaryStagingStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeleteImageStoreCmd;
@@ -256,6 +257,7 @@
 import com.cloud.vm.DiskProfile;
 import com.cloud.vm.UserVmManager;
 import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.VirtualMachine;
 import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.VMInstanceDao;
 import com.google.common.collect.Sets;
@@ -410,6 +412,9 @@
 
     private final Map<String, HypervisorHostListener> hostListeners = new HashMap<>();
 
+    private final Set<HypervisorType> zoneWidePoolSupportedHypervisorTypes = Sets.newHashSet(HypervisorType.KVM, HypervisorType.VMware,
+            HypervisorType.Hyperv, HypervisorType.LXC, HypervisorType.Any, HypervisorType.Simulator);
+
     private static final String NFS_MOUNT_OPTIONS_INCORRECT = "An incorrect mount option was specified";
 
     public boolean share(VMInstanceVO vm, List<VolumeVO> vols, HostVO host, boolean cancelPreviousShare) throws StorageUnavailableException {
@@ -570,6 +575,31 @@
     }
 
     @Override
+    public boolean poolProvidesCustomStorageStats(StoragePool pool) {
+        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
+        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
+        return storeDriver instanceof PrimaryDataStoreDriver && ((PrimaryDataStoreDriver)storeDriver).poolProvidesCustomStorageStats();
+    }
+
+    @Override
+    public Map<String, String> getCustomStorageStats(StoragePool pool) {
+        if (pool == null) {
+            return null;
+        }
+
+        if (!pool.isManaged()) {
+            return null;
+        }
+
+        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
+        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
+        if (storeDriver instanceof PrimaryDataStoreDriver) {
+            return ((PrimaryDataStoreDriver)storeDriver).getCustomStorageStats(pool);
+        }
+        return null;
+    }
+
+    @Override
     public Answer getVolumeStats(StoragePool pool, Command cmd) {
         DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
         DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
@@ -938,9 +968,7 @@
                 throw new InvalidParameterValueException("Missing parameter hypervisor. Hypervisor type is required to create zone wide primary storage.");
             }
 
-            Set<HypervisorType> supportedHypervisorTypes = Sets.newHashSet(HypervisorType.KVM, HypervisorType.VMware,
-                    HypervisorType.Hyperv, HypervisorType.LXC, HypervisorType.Any, HypervisorType.Simulator);
-            if (!supportedHypervisorTypes.contains(hypervisorType)) {
+            if (!zoneWidePoolSupportedHypervisorTypes.contains(hypervisorType)) {
                 throw new InvalidParameterValueException("Zone wide storage pool is not supported for hypervisor type " + hypervisor);
             }
         } else {
@@ -1220,6 +1248,115 @@
         return (PrimaryDataStoreInfo)_dataStoreMgr.getDataStore(pool.getId(), DataStoreRole.Primary);
     }
 
+    private void changeStoragePoolScopeToZone(StoragePoolVO primaryStorage) {
+        /*
+         * For cluster wide primary storage the hypervisor type might not be set.
+         * So, get it from the clusterVO.
+         */
+        Long clusterId = primaryStorage.getClusterId();
+        ClusterVO clusterVO = _clusterDao.findById(clusterId);
+        HypervisorType hypervisorType = clusterVO.getHypervisorType();
+        if (!zoneWidePoolSupportedHypervisorTypes.contains(hypervisorType)) {
+            throw new InvalidParameterValueException("Primary storage scope change to Zone is not supported for hypervisor type " + hypervisorType);
+        }
+
+        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(primaryStorage.getStorageProviderName());
+        PrimaryDataStoreLifeCycle lifeCycle = (PrimaryDataStoreLifeCycle) storeProvider.getDataStoreLifeCycle();
+
+        DataStore primaryStore = _dataStoreMgr.getPrimaryDataStore(primaryStorage.getId());
+        ClusterScope clusterScope = new ClusterScope(primaryStorage.getClusterId(), null, primaryStorage.getDataCenterId());
+
+        lifeCycle.changeStoragePoolScopeToZone(primaryStore, clusterScope, hypervisorType);
+    }
+
+    private void changeStoragePoolScopeToCluster(StoragePoolVO primaryStorage, Long clusterId) {
+        if (clusterId == null) {
+            throw new InvalidParameterValueException("Cluster ID not provided");
+        }
+        ClusterVO clusterVO = _clusterDao.findById(clusterId);
+        if (clusterVO == null) {
+            throw new InvalidParameterValueException("Unable to find cluster by id " + clusterId);
+        }
+        if (clusterVO.getAllocationState().equals(Grouping.AllocationState.Disabled)) {
+            throw new PermissionDeniedException("Cannot perform this operation, Cluster is currently disabled: " + clusterId);
+        }
+
+        List<VirtualMachine.State> states = Arrays.asList(State.Starting, State.Running, State.Stopping, State.Migrating, State.Restoring);
+
+        Long id = primaryStorage.getId();
+        Pair<List<VMInstanceVO>, Integer> vmsNotInClusterUsingPool = _vmInstanceDao.listByVmsNotInClusterUsingPool(clusterId, id);
+        if (vmsNotInClusterUsingPool.second() != 0) {
+            throw new CloudRuntimeException(String.format("Cannot change scope of the storage pool [%s] to cluster [%s] " +
+                    "as there are %s VMs with volumes in this pool that are running on other clusters. " +
+                    "All such User VMs must be stopped and System VMs must be destroyed before proceeding. " +
+                    "Please use the API listAffectedVmsForStorageScopeChange to get the list.",
+                    primaryStorage.getName(), clusterVO.getName(), vmsNotInClusterUsingPool.second()));
+        }
+
+        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(primaryStorage.getStorageProviderName());
+        PrimaryDataStoreLifeCycle lifeCycle = (PrimaryDataStoreLifeCycle) storeProvider.getDataStoreLifeCycle();
+
+        DataStore primaryStore = _dataStoreMgr.getPrimaryDataStore(id);
+        ClusterScope clusterScope = new ClusterScope(clusterId, clusterVO.getPodId(), primaryStorage.getDataCenterId());
+
+        lifeCycle.changeStoragePoolScopeToCluster(primaryStore, clusterScope, primaryStorage.getHypervisor());
+    }
+
+    @Override
+    @ActionEvent(eventType = EventTypes.EVENT_CHANGE_STORAGE_POOL_SCOPE, eventDescription = "changing storage pool scope")
+    public void changeStoragePoolScope(ChangeStoragePoolScopeCmd cmd) throws IllegalArgumentException, InvalidParameterValueException, PermissionDeniedException {
+        Long id = cmd.getId();
+
+        Long accountId = cmd.getEntityOwnerId();
+        if (!_accountMgr.isRootAdmin(accountId)) {
+            throw new PermissionDeniedException("Only root admin can perform this operation");
+        }
+
+        ScopeType newScope = EnumUtils.getEnumIgnoreCase(ScopeType.class, cmd.getScope());
+        if (newScope != ScopeType.ZONE && newScope != ScopeType.CLUSTER) {
+            throw new InvalidParameterValueException("Invalid scope " + cmd.getScope() + "for Primary storage");
+        }
+
+        StoragePoolVO primaryStorage = _storagePoolDao.findById(id);
+        if (primaryStorage == null) {
+            throw new IllegalArgumentException("Unable to find storage pool with ID: " + id);
+        }
+
+        String eventDetails = String.format(" Storage pool Id: %s to %s",primaryStorage.getUuid(), newScope);
+        CallContext.current().setEventDetails(eventDetails);
+
+        ScopeType currentScope = primaryStorage.getScope();
+        if (currentScope.equals(newScope)) {
+            throw new InvalidParameterValueException("New scope must be different than the current scope");
+        }
+
+        if (currentScope != ScopeType.ZONE && currentScope != ScopeType.CLUSTER) {
+            throw new InvalidParameterValueException("This operation is supported only for Primary storages having scope "
+                    + ScopeType.CLUSTER + " or " + ScopeType.ZONE);
+        }
+
+        if (!primaryStorage.getStatus().equals(StoragePoolStatus.Disabled)) {
+            throw new InvalidParameterValueException("Scope of the Primary storage with id "
+                    + primaryStorage.getUuid() +
+                    " cannot be changed, as it is not in the Disabled state");
+        }
+
+        Long zoneId = primaryStorage.getDataCenterId();
+        DataCenterVO zone = _dcDao.findById(zoneId);
+        if (zone == null) {
+            throw new InvalidParameterValueException("Unable to find zone by id " + zoneId);
+        }
+        if (zone.getAllocationState().equals(Grouping.AllocationState.Disabled)) {
+            throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zoneId);
+        }
+
+        if (newScope.equals(ScopeType.ZONE)) {
+            changeStoragePoolScopeToZone(primaryStorage);
+        } else {
+            changeStoragePoolScopeToCluster(primaryStorage, cmd.getClusterId());
+        }
+    }
+
     @Override
     public void removeStoragePoolFromCluster(long hostId, String iScsiName, StoragePool storagePool) {
         final Map<String, String> details = new HashMap<>();
@@ -2653,6 +2790,21 @@
     }
 
     @Override
+    public boolean canHostPrepareStoragePoolAccess(Host host, StoragePool pool) {
+        if (host == null || pool == null) {
+            return false;
+        }
+
+        if (!pool.isManaged()) {
+            return true;
+        }
+
+        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
+        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
+        return storeDriver instanceof PrimaryDataStoreDriver && ((PrimaryDataStoreDriver)storeDriver).canHostPrepareStoragePoolAccess(host, pool);
+    }
+
+    @Override
     @DB
     public Host getHost(long hostId) {
         return _hostDao.findById(hostId);
@@ -3863,6 +4015,7 @@
                 STORAGE_POOL_DISK_WAIT,
                 STORAGE_POOL_CLIENT_TIMEOUT,
                 STORAGE_POOL_CLIENT_MAX_CONNECTIONS,
+                STORAGE_POOL_CONNECTED_CLIENTS_LIMIT,
                 STORAGE_POOL_IO_POLICY,
                 PRIMARY_STORAGE_DOWNLOAD_WAIT,
                 SecStorageMaxMigrateSessions,
diff --git a/server/src/test/java/com/cloud/api/query/QueryManagerImplTest.java b/server/src/test/java/com/cloud/api/query/QueryManagerImplTest.java
index 91fd691..f5de105 100644
--- a/server/src/test/java/com/cloud/api/query/QueryManagerImplTest.java
+++ b/server/src/test/java/com/cloud/api/query/QueryManagerImplTest.java
@@ -18,19 +18,26 @@
 package com.cloud.api.query;
 
 import com.cloud.api.query.dao.TemplateJoinDao;
+import com.cloud.api.query.dao.UserVmJoinDao;
 import com.cloud.api.query.vo.EventJoinVO;
 import com.cloud.api.query.vo.TemplateJoinVO;
+import com.cloud.api.query.vo.UserVmJoinVO;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.dao.ClusterDao;
 import com.cloud.event.EventVO;
 import com.cloud.event.dao.EventDao;
 import com.cloud.event.dao.EventJoinDao;
 import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.exception.PermissionDeniedException;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
 import com.cloud.network.Network;
 import com.cloud.network.VNF;
 import com.cloud.network.dao.NetworkVO;
 import com.cloud.server.ResourceTag;
 import com.cloud.storage.BucketVO;
 import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.ScopeType;
 import com.cloud.storage.dao.BucketDao;
 import com.cloud.storage.dao.VMTemplateDao;
 import com.cloud.user.Account;
@@ -43,10 +50,14 @@
 import com.cloud.utils.db.Filter;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.VMInstanceDao;
+
 import org.apache.cloudstack.acl.SecurityChecker;
 import org.apache.cloudstack.api.ApiCommandResourceType;
 import org.apache.cloudstack.api.command.admin.storage.ListObjectStoragePoolsCmd;
+import org.apache.cloudstack.api.command.admin.vm.ListAffectedVmsForStorageScopeChangeCmd;
 import org.apache.cloudstack.api.command.user.bucket.ListBucketsCmd;
 import org.apache.cloudstack.api.command.user.event.ListEventsCmd;
 import org.apache.cloudstack.api.command.user.resource.ListDetailOptionsCmd;
@@ -54,10 +65,13 @@
 import org.apache.cloudstack.api.response.EventResponse;
 import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.api.response.ObjectStoreResponse;
+import org.apache.cloudstack.api.response.VirtualMachineResponse;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.storage.datastore.db.ObjectStoreDao;
 import org.apache.cloudstack.storage.datastore.db.ObjectStoreVO;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -68,6 +82,7 @@
 import org.mockito.Mockito;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -116,10 +131,25 @@
     ObjectStoreDao objectStoreDao;
 
     @Mock
+    VMInstanceDao vmInstanceDao;
+
+    @Mock
+    PrimaryDataStoreDao storagePoolDao;
+
+    @Mock
+    HostDao hostDao;
+
+    @Mock
+    ClusterDao clusterDao;
+
+    @Mock
     BucketDao bucketDao;
     @Mock
     VMTemplateDao templateDao;
 
+    @Mock
+    UserVmJoinDao userVmJoinDao;
+
     private AccountVO account;
     private UserVO user;
 
@@ -406,4 +436,45 @@
         result = queryManager.getHostTagsFromTemplateForServiceOfferingsListing(account, templateId);
         Assert.assertTrue(CollectionUtils.isNotEmpty(result));
     }
+
+    public void testListAffectedVmsForScopeChange() {
+        Long clusterId = 1L;
+        Long poolId = 2L;
+        Long hostId = 3L;
+        Long vmId = 4L;
+        String vmName = "VM1";
+
+        ListAffectedVmsForStorageScopeChangeCmd cmd = new ListAffectedVmsForStorageScopeChangeCmd();
+        ReflectionTestUtils.setField(cmd, "clusterIdForScopeChange", clusterId);
+        ReflectionTestUtils.setField(cmd, "storageId", poolId);
+
+        StoragePoolVO pool = Mockito.mock(StoragePoolVO.class);
+        Mockito.when(pool.getScope()).thenReturn(ScopeType.CLUSTER);
+        Mockito.when(storagePoolDao.findById(poolId)).thenReturn(pool);
+        ListResponse<VirtualMachineResponse> response = queryManager.listAffectedVmsForStorageScopeChange(cmd);
+        Assert.assertEquals(response.getResponses().size(), 0);
+
+        VMInstanceVO instance = Mockito.mock(VMInstanceVO.class);
+        UserVmJoinVO userVM = Mockito.mock(UserVmJoinVO.class);
+        String instanceUuid = String.valueOf(UUID.randomUUID());
+        Pair<List<VMInstanceVO>, Integer> vms = new Pair<>(List.of(instance), 1);
+        HostVO host = Mockito.mock(HostVO.class);
+        ClusterVO cluster = Mockito.mock(ClusterVO.class);
+
+        Mockito.when(pool.getScope()).thenReturn(ScopeType.ZONE);
+        Mockito.when(instance.getUuid()).thenReturn(instanceUuid);
+        Mockito.when(instance.getType()).thenReturn(VirtualMachine.Type.Instance);
+        Mockito.when(instance.getHostId()).thenReturn(hostId);
+        Mockito.when(instance.getId()).thenReturn(vmId);
+        Mockito.when(userVM.getDisplayName()).thenReturn(vmName);
+        Mockito.when(vmInstanceDao.listByVmsNotInClusterUsingPool(clusterId, poolId)).thenReturn(vms);
+        Mockito.when(userVmJoinDao.findById(vmId)).thenReturn(userVM);
+        Mockito.when(hostDao.findById(hostId)).thenReturn(host);
+        Mockito.when(host.getClusterId()).thenReturn(clusterId);
+        Mockito.when(clusterDao.findById(clusterId)).thenReturn(cluster);
+
+        response = queryManager.listAffectedVmsForStorageScopeChange(cmd);
+        Assert.assertEquals(response.getResponses().get(0).getId(), instanceUuid);
+        Assert.assertEquals(response.getResponses().get(0).getName(), vmName);
+    }
 }
diff --git a/server/src/test/java/com/cloud/resource/MockResourceManagerImpl.java b/server/src/test/java/com/cloud/resource/MockResourceManagerImpl.java
index f9c07ef..6aae7a0 100755
--- a/server/src/test/java/com/cloud/resource/MockResourceManagerImpl.java
+++ b/server/src/test/java/com/cloud/resource/MockResourceManagerImpl.java
@@ -431,6 +431,17 @@
         return null;
     }
 
+    @Override
+    public List<HostVO> listAllHostsInOneZoneNotInClusterByHypervisor(HypervisorType type, long dcId, long clusterId) {
+        return null;
+    }
+
+    @Override
+    public List<HostVO> listAllHostsInOneZoneNotInClusterByHypervisors(List<HypervisorType> types, long dcId, long clusterId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
     /* (non-Javadoc)
      * @see com.cloud.resource.ResourceManager#listAvailHypervisorInZone(java.lang.Long, java.lang.Long)
      */
diff --git a/server/src/test/java/com/cloud/storage/StorageManagerImplTest.java b/server/src/test/java/com/cloud/storage/StorageManagerImplTest.java
index 3092822..4c89312 100644
--- a/server/src/test/java/com/cloud/storage/StorageManagerImplTest.java
+++ b/server/src/test/java/com/cloud/storage/StorageManagerImplTest.java
@@ -21,17 +21,22 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.cloudstack.api.command.admin.storage.ChangeStoragePoolScopeCmd;
 import com.cloud.agent.api.StoragePoolInfo;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.DataCenter;
 import com.cloud.dc.DataCenterVO;
+import com.cloud.dc.dao.ClusterDao;
 import com.cloud.dc.dao.DataCenterDao;
 import com.cloud.exception.ConnectionException;
 import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.exception.PermissionDeniedException;
 import com.cloud.host.Host;
-import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.storage.dao.VolumeDao;
-import com.cloud.user.AccountManager;
+import com.cloud.user.AccountManagerImpl;
 import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.dao.VMInstanceDao;
 
@@ -54,6 +59,7 @@
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Command;
@@ -93,10 +99,13 @@
     @Mock
     DataCenterDao dataCenterDao;
     @Mock
-    AccountManager accountManager;
+    AccountManagerImpl accountMgr;
     @Mock
     StoragePoolDetailsDao storagePoolDetailsDao;
 
+    @Mock
+    ClusterDao clusterDao;
+
     @Spy
     @InjectMocks
     private StorageManagerImpl storageManagerImpl;
@@ -506,11 +515,74 @@
                 .update(StorageManager.DataStoreDownloadFollowRedirects.key(),StorageManager.DataStoreDownloadFollowRedirects.defaultValue());
     }
 
+    private ChangeStoragePoolScopeCmd mockChangeStoragePooolScopeCmd(String newScope) {
+        ChangeStoragePoolScopeCmd cmd = new ChangeStoragePoolScopeCmd();
+        ReflectionTestUtils.setField(cmd, "id", 1L);
+        ReflectionTestUtils.setField(cmd, "clusterId", 1L);
+        ReflectionTestUtils.setField(cmd, "scope", newScope);
+        return cmd;
+    }
+
+    private StoragePoolVO mockStoragePoolVOForChangeStoragePoolScope(ScopeType currentScope, StoragePoolStatus status) {
+        StoragePoolVO primaryStorage = new StoragePoolVO();
+        primaryStorage.setId(1L);
+        primaryStorage.setDataCenterId(1L);
+        primaryStorage.setClusterId(1L);
+        primaryStorage.setStatus(StoragePoolStatus.Disabled);
+        primaryStorage.setScope(currentScope);
+        primaryStorage.setStatus(status);
+        return primaryStorage;
+    }
+
+    private void prepareTestChangeStoragePoolScope(ScopeType currentScope, StoragePoolStatus status) {
+        final DataCenterVO zone = new DataCenterVO(1L, null, null, null, null, null, null, null, null, null, DataCenter.NetworkType.Advanced, null, null);
+        StoragePoolVO primaryStorage = mockStoragePoolVOForChangeStoragePoolScope(currentScope, status);
+
+        Mockito.when(accountMgr.isRootAdmin(Mockito.any())).thenReturn(true);
+        Mockito.when(dataCenterDao.findById(1L)).thenReturn(zone);
+        Mockito.when(storagePoolDao.findById(1L)).thenReturn(primaryStorage);
+    }
+
+    @Test(expected = InvalidParameterValueException.class)
+    public void testChangeStoragePoolScopeNotDisabledException() {
+        prepareTestChangeStoragePoolScope(ScopeType.CLUSTER, StoragePoolStatus.Initialized);
+
+        ChangeStoragePoolScopeCmd cmd = mockChangeStoragePooolScopeCmd("ZONE");
+        storageManagerImpl.changeStoragePoolScope(cmd);
+    }
+
+    @Test(expected = InvalidParameterValueException.class)
+    public void testChangeStoragePoolScopeToZoneHypervisorNotSupported() {
+        prepareTestChangeStoragePoolScope(ScopeType.CLUSTER, StoragePoolStatus.Disabled);
+
+        final ClusterVO cluster = new ClusterVO();
+        cluster.setHypervisorType(String.valueOf(HypervisorType.XenServer));
+        Mockito.when(clusterDao.findById(1L)).thenReturn(cluster);
+
+        ChangeStoragePoolScopeCmd cmd = mockChangeStoragePooolScopeCmd("ZONE");
+        storageManagerImpl.changeStoragePoolScope(cmd);
+    }
+
+    @Test(expected = CloudRuntimeException.class)
+    public void testChangeStoragePoolScopeToClusterVolumesPresentException() {
+        prepareTestChangeStoragePoolScope(ScopeType.ZONE, StoragePoolStatus.Disabled);
+
+        final ClusterVO cluster = new ClusterVO();
+        Mockito.when(clusterDao.findById(1L)).thenReturn(cluster);
+
+        VMInstanceVO instance = Mockito.mock(VMInstanceVO.class);
+        Pair<List<VMInstanceVO>, Integer> vms = new Pair<>(List.of(instance), 1);
+        Mockito.when(vmInstanceDao.listByVmsNotInClusterUsingPool(1L, 1L)).thenReturn(vms);
+
+        ChangeStoragePoolScopeCmd cmd = mockChangeStoragePooolScopeCmd("CLUSTER");
+        storageManagerImpl.changeStoragePoolScope(cmd);
+    }
+
     @Test
     public void testCheckNFSMountOptionsForCreateNoNFSMountOptions() {
         Map<String, String> details = new HashMap<>();
         try {
-            storageManagerImpl.checkNFSMountOptionsForCreate(details, Hypervisor.HypervisorType.XenServer, "");
+            storageManagerImpl.checkNFSMountOptionsForCreate(details, HypervisorType.XenServer, "");
         } catch (Exception e) {
             Assert.fail();
         }
@@ -521,8 +593,8 @@
         Map<String, String> details = new HashMap<>();
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, "vers=4.1");
         InvalidParameterValueException exception = Assert.assertThrows(InvalidParameterValueException.class,
-                () -> storageManagerImpl.checkNFSMountOptionsForCreate(details, Hypervisor.HypervisorType.XenServer, ""));
-        Assert.assertEquals(exception.getMessage(), "NFS options can not be set for the hypervisor type " + Hypervisor.HypervisorType.XenServer);
+                () -> storageManagerImpl.checkNFSMountOptionsForCreate(details, HypervisorType.XenServer, ""));
+        Assert.assertEquals(exception.getMessage(), "NFS options can not be set for the hypervisor type " + HypervisorType.XenServer);
     }
 
     @Test
@@ -530,7 +602,7 @@
         Map<String, String> details = new HashMap<>();
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, "vers=4.1");
         InvalidParameterValueException exception = Assert.assertThrows(InvalidParameterValueException.class,
-                () -> storageManagerImpl.checkNFSMountOptionsForCreate(details, Hypervisor.HypervisorType.KVM, ""));
+                () -> storageManagerImpl.checkNFSMountOptionsForCreate(details, HypervisorType.KVM, ""));
         Assert.assertEquals(exception.getMessage(), "NFS options can only be set on pool type " + Storage.StoragePoolType.NetworkFilesystem);
     }
 
@@ -552,7 +624,7 @@
         StoragePoolVO pool = new StoragePoolVO();
         Long accountId = 1L;
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, "vers=4.1");
-        Mockito.when(accountManager.isRootAdmin(accountId)).thenReturn(false);
+        Mockito.when(accountMgr.isRootAdmin(accountId)).thenReturn(false);
         PermissionDeniedException exception = Assert.assertThrows(PermissionDeniedException.class,
                 () -> storageManagerImpl.checkNFSMountOptionsForUpdate(details, pool, accountId));
         Assert.assertEquals(exception.getMessage(), "Only root admin can modify nfs options");
@@ -564,11 +636,11 @@
         StoragePoolVO pool = new StoragePoolVO();
         Long accountId = 1L;
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, "vers=4.1");
-        Mockito.when(accountManager.isRootAdmin(accountId)).thenReturn(true);
-        pool.setHypervisor(Hypervisor.HypervisorType.XenServer);
+        Mockito.when(accountMgr.isRootAdmin(accountId)).thenReturn(true);
+        pool.setHypervisor(HypervisorType.XenServer);
         InvalidParameterValueException exception = Assert.assertThrows(InvalidParameterValueException.class,
                 () -> storageManagerImpl.checkNFSMountOptionsForUpdate(details, pool, accountId));
-        Assert.assertEquals(exception.getMessage(), "NFS options can only be set for the hypervisor type " + Hypervisor.HypervisorType.KVM);
+        Assert.assertEquals(exception.getMessage(), "NFS options can only be set for the hypervisor type " + HypervisorType.KVM);
     }
 
     @Test
@@ -577,8 +649,8 @@
         StoragePoolVO pool = new StoragePoolVO();
         Long accountId = 1L;
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, "vers=4.1");
-        Mockito.when(accountManager.isRootAdmin(accountId)).thenReturn(true);
-        pool.setHypervisor(Hypervisor.HypervisorType.KVM);
+        Mockito.when(accountMgr.isRootAdmin(accountId)).thenReturn(true);
+        pool.setHypervisor(HypervisorType.KVM);
         pool.setPoolType(Storage.StoragePoolType.FiberChannel);
         InvalidParameterValueException exception = Assert.assertThrows(InvalidParameterValueException.class,
                 () -> storageManagerImpl.checkNFSMountOptionsForUpdate(details, pool, accountId));
@@ -591,8 +663,8 @@
         StoragePoolVO pool = new StoragePoolVO();
         Long accountId = 1L;
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, "vers=4.1");
-        Mockito.when(accountManager.isRootAdmin(accountId)).thenReturn(true);
-        pool.setHypervisor(Hypervisor.HypervisorType.KVM);
+        Mockito.when(accountMgr.isRootAdmin(accountId)).thenReturn(true);
+        pool.setHypervisor(HypervisorType.KVM);
         pool.setPoolType(Storage.StoragePoolType.NetworkFilesystem);
         pool.setStatus(StoragePoolStatus.Up);
         InvalidParameterValueException exception = Assert.assertThrows(InvalidParameterValueException.class,
@@ -605,7 +677,7 @@
         String nfsMountOpts = "vers=4.1, nconnect=4,vers=4.2";
         Map<String, String> details = new HashMap<>();
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, nfsMountOpts);
-        storageManagerImpl.checkNFSMountOptionsForCreate(details, Hypervisor.HypervisorType.KVM, "nfs");
+        storageManagerImpl.checkNFSMountOptionsForCreate(details, HypervisorType.KVM, "nfs");
     }
 
     @Test(expected = InvalidParameterValueException.class)
@@ -614,11 +686,11 @@
         Map<String, String> details = new HashMap<>();
         details.put(ApiConstants.NFS_MOUNT_OPTIONS, nfsMountOpts);
         StoragePoolVO pool = new StoragePoolVO();
-        pool.setHypervisor(Hypervisor.HypervisorType.KVM);
+        pool.setHypervisor(HypervisorType.KVM);
         pool.setPoolType(Storage.StoragePoolType.NetworkFilesystem);
         pool.setStatus(StoragePoolStatus.Maintenance);
         Long accountId = 1L;
-        Mockito.when(accountManager.isRootAdmin(accountId)).thenReturn(true);
+        Mockito.when(accountMgr.isRootAdmin(accountId)).thenReturn(true);
         storageManagerImpl.checkNFSMountOptionsForUpdate(details, pool, accountId);
     }
 
diff --git a/test/integration/smoke/test_primary_storage_scope.py b/test/integration/smoke/test_primary_storage_scope.py
new file mode 100644
index 0000000..e85a06b
--- /dev/null
+++ b/test/integration/smoke/test_primary_storage_scope.py
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+""" BVT tests for Primary Storage
+"""
+
+# Import System modules
+# Import Local Modules
+from marvin.cloudstackTestCase import *
+from marvin.lib.base import (Host, StoragePool, Cluster, updateStoragePool, changeStoragePoolScope)
+from marvin.lib.common import (get_zone, get_pod, list_clusters)
+from marvin.lib.utils import cleanup_resources
+from nose.plugins.attrib import attr
+
+class TestPrimaryStorageScope(cloudstackTestCase):
+
+    def setUp(self):
+
+        self.apiclient = self.testClient.getApiClient()
+        self.dbclient = self.testClient.getDbConnection()
+        self.services = self.testClient.getParsedTestDataConfig()
+        self._cleanup = []
+        self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
+        self.pod = get_pod(self.apiclient, self.zone.id)
+        self.debug("here")
+        self.debug(self.services)
+        self.cluster1 = list_clusters(self.apiclient)[0]
+        self.debug("here1")
+        self.debug(self.cluster1)
+        self.cluster = {
+            'clustername': 'C0_testScope',
+            'clustertype': 'CloudManaged'
+        }
+        self.cluster2 = Cluster.create(self.apiclient,
+                                       self.cluster,
+                                       zoneid=self.zone.id,
+                                       podid=self.pod.id,
+                                       hypervisor=self.cluster1.hypervisortype
+                                       )
+        self._cleanup.append(self.cluster2)
+        self.storage = StoragePool.create(self.apiclient,
+                                          self.services["nfs"],
+                                          scope = 'ZONE',
+                                          zoneid=self.zone.id,
+                                          hypervisor=self.cluster1.hypervisortype
+                                          )
+        self._cleanup.append(self.storage)
+        self.debug("Created storage pool %s in zone scope", self.storage.id)
+        return
+
+    def tearDown(self):
+        try:
+            cleanup_resources(self.apiclient, self._cleanup)
+        except Exception as e:
+            raise Exception("Warning: Exception during cleanup : %s" % e)
+        return
+
+    @attr(tags=["advanced", "advancedns", "smoke", "basic", "sg"], required_hardware="true")
+    def test_01_primary_storage_scope_change(self):
+        """Test primary storage pool scope change
+        """
+
+        # Disable storage pool
+        cmd = updateStoragePool.updateStoragePoolCmd()
+        cmd.id = self.storage.id
+        cmd.enabled = False
+        self.apiclient.updateStoragePool(cmd)
+
+        self.debug("Disabled storage pool : %s" % self.storage.id)
+
+        # Change storage pool scope to Cluster2
+        cmd = changeStoragePoolScope.changeStoragePoolScopeCmd()
+        cmd.id = self.storage.id
+        cmd.scope = "CLUSTER"
+        cmd.clusterid = self.cluster2.id
+        self.apiclient.changeStoragePoolScope(cmd)
+
+        self.debug("Changed scope of storage pool %s to cluster" % self.storage.id)
+
+        pool_id = self.dbclient.execute("select id from storage_pool where uuid=\"" + self.storage.id + "\"")[0][0]
+        host1 = Host.list(self.apiclient, clusterid=self.cluster1.id, listall=True)[0]
+        host1_id = self.dbclient.execute("select id from host where uuid=\"" + host1.id + "\"")[0][0]
+
+        pool_row = self.dbclient.execute("select cluster_id, pod_id, scope from storage_pool where id=" + str(pool_id))[0]
+        capacity_row = self.dbclient.execute("select cluster_id, pod_id from op_host_capacity where capacity_type=3 and host_id=" + str(pool_id))[0]
+        pool_host_rows = self.dbclient.execute("select id from storage_pool_host_ref where host_id=" + str(host1_id) + " and pool_id=" + str(pool_id))
+
+        self.assertIsNotNone(
+            pool_row[0],
+            "Cluster id should not be NULL for cluster scope"
+        )
+        self.assertIsNotNone(
+            pool_row[1],
+            "Pod id should not be NULL for cluster scope"
+        )
+        self.assertEqual(
+            pool_row[2],
+            "CLUSTER",
+            "Storage pool scope not changed to Cluster"
+        )
+        self.assertIsNotNone(
+            capacity_row[0],
+            "Cluster id should not be NULL in the op_host_capacity table"
+        )
+        self.assertIsNotNone(
+            capacity_row[1],
+            "Pod id set should not be NULL in the op_host_capacity table"
+        )
+        self.assertEqual(
+            len(pool_host_rows),
+            0,
+            "Storage pool not removed from the storage_pool_host_ref table for host on another cluster"
+        )
+
+        # Change storage pool scope to Zone
+        cmd = changeStoragePoolScope.changeStoragePoolScopeCmd()
+        cmd.id = self.storage.id
+        cmd.scope = "ZONE"
+        self.apiclient.changeStoragePoolScope(cmd)
+
+        self.debug("Changed scope of storage pool %s to zone" % self.storage.id)
+
+        pool_row = self.dbclient.execute("select cluster_id, pod_id, scope from storage_pool where id=" + str(pool_id))[0]
+        capacity_row = self.dbclient.execute("select cluster_id, pod_id from op_host_capacity where capacity_type=3 and host_id=" + str(pool_id))[0]
+        pool_host_rows = self.dbclient.execute("select id from storage_pool_host_ref where host_id=" + str(host1_id) + " and pool_id=" + str(pool_id))
+
+        self.assertIsNone(
+            pool_row[0],
+            "Cluster id not set to NULL for zone scope"
+        )
+        self.assertIsNone(
+            pool_row[1],
+            "Pod id not set to NULL for zone scope"
+        )
+        self.assertEqual(
+            pool_row[2],
+            "ZONE",
+            "Storage pool scope not changed to ZONE"
+        )
+        self.assertIsNone(
+            capacity_row[0],
+            "Cluster id not set to NULL in the op_host_capacity table"
+        )
+        self.assertIsNone(
+            capacity_row[1],
+            "Pod id not set to NULL in the op_host_capacity table"
+        )
+        self.assertEqual(
+            len(pool_host_rows),
+            1,
+            "Storage pool not added to the storage_pool_host_ref table for host on another cluster"
+        )
+
+        # Enable storage pool
+        cmd = updateStoragePool.updateStoragePoolCmd()
+        cmd.id = self.storage.id
+        cmd.enabled = True
+        response = self.apiclient.updateStoragePool(cmd)
+        self.assertEqual(
+            response.state,
+            "Up",
+            "Storage pool couldn't be enabled"
+        )
diff --git a/tools/apidoc/gen_toc.py b/tools/apidoc/gen_toc.py
index 73e81ff..872acc2 100644
--- a/tools/apidoc/gen_toc.py
+++ b/tools/apidoc/gen_toc.py
@@ -145,6 +145,7 @@
     'StorageMaintenance': 'Storage Pool',
     'StoragePool': 'Storage Pool',
     'StorageProvider': 'Storage Pool',
+    'StorageScope' : 'Storage Pool',
     'updateStorageCapabilities' : 'Storage Pool',
     'SecurityGroup': 'Security Group',
     'SSH': 'SSH',
diff --git a/ui/public/locales/en.json b/ui/public/locales/en.json
index 6d1b529..67b36d7 100644
--- a/ui/public/locales/en.json
+++ b/ui/public/locales/en.json
@@ -62,6 +62,7 @@
 "label.action.change.password": "Change password",
 "label.action.clear.webhook.deliveries": "Clear deliveries",
 "label.action.delete.webhook.deliveries": "Delete deliveries",
+"label.action.change.primary.storage.scope": "Change primary storage scope",
 "label.action.configure.stickiness": "Stickiness",
 "label.action.copy.iso": "Copy ISO",
 "label.action.copy.snapshot": "Copy Snapshot",
@@ -2546,6 +2547,8 @@
 "message.action.manage.cluster": "Please confirm that you want to manage the cluster.",
 "message.action.patch.router": "Please confirm that you want to live patch the router. <br> This operation is equivalent updating the router packages and restarting the Network without cleanup.",
 "message.action.patch.systemvm": "Please confirm that you want to patch the System VM.",
+"message.action.primary.storage.scope.cluster": "Please confirm that you want to change the scope from zone to the specified cluster.<br>This operation will update the database and disconnect the storage pool from all hosts that were previously connected to the primary storage and are not part of the specified cluster.",
+"message.action.primary.storage.scope.zone": "Please confirm that you want to change the scope from cluster to zone.<br>This operation will update the database and connect the storage pool to all hosts of the zone running the same hypervisor as set on the storage pool.",
 "message.action.primarystorage.enable.maintenance.mode": "Warning: placing the primary storage into maintenance mode will cause all Instances using volumes from it to be stopped.  Do you want to continue?",
 "message.action.reboot.instance": "Please confirm that you want to reboot this Instance.",
 "message.action.reboot.router": "All services provided by this virtual router will be interrupted. Please confirm that you want to reboot this router.",
@@ -2665,6 +2668,8 @@
 "message.change.offering.for.volume.failed": "Change offering for the volume failed",
 "message.change.offering.for.volume.processing": "Changing offering for the volume...",
 "message.change.password": "Please change your password.",
+"message.change.scope.failed": "Scope change failed",
+"message.change.scope.processing": "Scope change in progress",
 "message.cluster.dedicated": "Cluster Dedicated",
 "message.cluster.dedication.released": "Cluster dedication released.",
 "message.config.health.monitor.failed": "Configure Health Monitor failed",
@@ -3275,6 +3280,7 @@
 "message.success.change.offering": "Successfully changed offering",
 "message.success.change.password": "Successfully changed password for User",
 "message.success.clear.webhook.deliveries": "Successfully cleared webhook deliveries",
+"message.success.change.scope": "Successfully changed scope for storage pool",
 "message.success.config.backup.schedule": "Successfully configured Instance backup schedule",
 "message.success.config.health.monitor": "Successfully Configure Health Monitor",
 "message.success.config.sticky.policy": "Successfully configured sticky policy",
@@ -3464,6 +3470,7 @@
 "message.volumes.unmanaged": "Volumes not controlled by CloudStack.",
 "message.vr.alert.upon.network.offering.creation.l2": "As virtual routers are not created for L2 Networks, the compute offering will not be used.",
 "message.vr.alert.upon.network.offering.creation.others": "As none of the obligatory services for creating a virtual router (VPN, DHCP, DNS, Firewall, LB, UserData, SourceNat, StaticNat, PortForwarding) are enabled, the virtual router will not be created and the compute offering will not be used.",
+"message.warn.change.primary.storage.scope": "This feature is tested and supported for the following configurations:<br>KVM - NFS/Ceph - DefaultPrimary<br>VMware - NFS - DefaultPrimary<br>*There might be extra steps involved to make it work for other configurations.",
 "message.warn.filetype": "jpg, jpeg, png, bmp and svg are the only supported image formats.",
 "message.warn.importing.instance.without.nic": "WARNING: This Instance is being imported without NICs and many Network resources will not be available. Consider creating a NIC via vCenter before importing or as soon as the Instance is imported.",
 "message.warn.zone.mtu.update": "Please note that this limit won't affect pre-existing Network’s MTU settings",
diff --git a/ui/src/components/view/InfoCard.vue b/ui/src/components/view/InfoCard.vue
index 45dda5b..267a759 100644
--- a/ui/src/components/view/InfoCard.vue
+++ b/ui/src/components/view/InfoCard.vue
@@ -527,7 +527,7 @@
           </span>
         </div>
         <div class="resource-detail-item" v-if="resource.templateid">
-          <div class="resource-detail-item__label">{{ $t('label.templatename') }}</div>
+          <div class="resource-detail-item__label">{{ resource.templateformat === 'ISO'? $t('label.iso') : $t('label.templatename') }}</div>
           <div class="resource-detail-item__details">
             <resource-icon v-if="resource.icon" :image="getImage(resource.icon.base64image)" size="1x" style="margin-right: 5px"/>
             <SaveOutlined v-else />
@@ -535,7 +535,7 @@
           </div>
         </div>
         <div class="resource-detail-item" v-if="resource.isoid">
-          <div class="resource-detail-item__label">{{ $t('label.iso') }}</div>
+          <div class="resource-detail-item__label">{{ $t('label.isoname') }}</div>
           <div class="resource-detail-item__details">
             <resource-icon v-if="resource.icon" :image="getImage(resource.icon.base64image)" size="1x" style="margin-right: 5px"/>
             <UsbOutlined v-else />
diff --git a/ui/src/config/section/infra/primaryStorages.js b/ui/src/config/section/infra/primaryStorages.js
index 2433336..c4932b2 100644
--- a/ui/src/config/section/infra/primaryStorages.js
+++ b/ui/src/config/section/infra/primaryStorages.js
@@ -136,6 +136,26 @@
       show: (record) => { return ['Maintenance', 'PrepareForMaintenance', 'ErrorInMaintenance'].includes(record.state) }
     },
     {
+      api: 'changeStoragePoolScope',
+      icon: 'swap-outlined',
+      label: 'label.action.change.primary.storage.scope',
+      dataView: true,
+      popup: true,
+      show: (record) => {
+        return (record.state === 'Disabled' &&
+          (record.scope === 'CLUSTER' ||
+           record.scope === 'ZONE') &&
+          (record.hypervisor === 'KVM' ||
+           record.hypervisor === 'VMware' ||
+           record.hypervisor === 'HyperV' ||
+           record.hypervisor === 'LXC' ||
+           record.hypervisor === 'Any' ||
+           record.hypervisor === 'Simulator')
+        )
+      },
+      component: shallowRef(defineAsyncComponent(() => import('@/views/infra/ChangeStoragePoolScope.vue')))
+    },
+    {
       api: 'deleteStoragePool',
       icon: 'delete-outlined',
       label: 'label.action.delete.primary.storage',
diff --git a/ui/src/core/lazy_lib/icons_use.js b/ui/src/core/lazy_lib/icons_use.js
index 4a43b19..f3b2491 100644
--- a/ui/src/core/lazy_lib/icons_use.js
+++ b/ui/src/core/lazy_lib/icons_use.js
@@ -21,6 +21,7 @@
   ApiOutlined,
   AppstoreOutlined,
   ArrowDownOutlined,
+  ArrowRightOutlined,
   ArrowUpOutlined,
   ArrowsAltOutlined,
   AuditOutlined,
@@ -185,6 +186,7 @@
     app.component('ApiOutlined', ApiOutlined)
     app.component('AppstoreOutlined', AppstoreOutlined)
     app.component('ArrowDownOutlined', ArrowDownOutlined)
+    app.component('ArrowRightOutlined', ArrowRightOutlined)
     app.component('ArrowUpOutlined', ArrowUpOutlined)
     app.component('ArrowsAltOutlined', ArrowsAltOutlined)
     app.component('AuditOutlined', AuditOutlined)
diff --git a/ui/src/views/infra/ChangeStoragePoolScope.vue b/ui/src/views/infra/ChangeStoragePoolScope.vue
new file mode 100644
index 0000000..1e1c142
--- /dev/null
+++ b/ui/src/views/infra/ChangeStoragePoolScope.vue
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+<template>
+  <a-spin :spinning="loading">
+    <div class="form-layout" v-ctrl-enter="handleSubmitForm">
+      <div class="form">
+        <a-form
+          :ref="formRef"
+          :model="form"
+          :rules="rules"
+          layout="vertical"
+          @submit="handleSubmitForm">
+          <a-alert type="warning">
+            <template #message>
+              <span
+                v-html="(resource.scope=='ZONE' ? $t('message.action.primary.storage.scope.cluster') : $t('message.action.primary.storage.scope.zone')) +
+                        '<br><br>' + $t('message.warn.change.primary.storage.scope')"></span>
+            </template>
+          </a-alert>
+          <p></p>
+          <a-form-item name="clusterid" ref="clusterid" v-if="resource.scope=='ZONE'">
+            <template #label>
+              <tooltip-label :title="$t('label.clustername')" :tooltip="placeholder.clusterid"/>
+            </template>
+            <a-select
+              v-model:value="form.clusterid"
+              :placeholder="placeholder.clusterid"
+              showSearch
+              optionFilterProp="label"
+              :filterOption="(input, option) => {
+                return option.label.toLowerCase().indexOf(input.toLowerCase()) >= 0
+              }"
+              @change="handleChangeCluster">
+              <a-select-option
+                v-for="cluster in clustersList"
+                :value="cluster.id"
+                :key="cluster.id"
+                :label="cluster.name">
+                {{ cluster.name }}
+              </a-select-option>
+            </a-select>
+          </a-form-item>
+
+          <div :span="24" class="action-button">
+            <a-button @click="closeAction">{{ $t('label.cancel') }}</a-button>
+            <a-button @click="handleSubmitForm" ref="submit" type="primary">{{ $t('label.ok') }}</a-button>
+          </div>
+        </a-form>
+      </div>
+    </div>
+  </a-spin>
+</template>
+
+<script>
+import { ref, reactive, toRaw } from 'vue'
+import { api } from '@/api'
+import { mixinForm } from '@/utils/mixin'
+import DedicateDomain from '../../components/view/DedicateDomain'
+import ResourceIcon from '@/components/view/ResourceIcon'
+import TooltipLabel from '@/components/widgets/TooltipLabel'
+
+export default {
+  name: 'ChangeStoragePoolScope',
+  mixins: [mixinForm],
+  components: {
+    DedicateDomain,
+    ResourceIcon,
+    TooltipLabel
+  },
+  props: {
+    resource: {
+      type: Object,
+      required: true
+    }
+  },
+  data () {
+    return {
+      loading: false,
+      clustersList: [],
+      selectedCluster: null,
+      placeholder: {
+        clusterid: null
+      }
+    }
+  },
+  created () {
+    this.initForm()
+    this.fetchData()
+  },
+  methods: {
+    initForm () {
+      this.formRef = ref()
+      this.form = reactive({ })
+      this.rules = reactive({
+        clusterid: [{ required: true, message: this.$t('message.error.select') }]
+      })
+    },
+    fetchData () {
+      this.fetchClusters(this.resource.zoneid)
+    },
+    fetchClusters (zoneId) {
+      this.form.clusterid = null
+      this.clustersList = []
+      if (!zoneId) return
+      this.zoneId = zoneId
+      this.loading = true
+      api('listClusters', { zoneid: zoneId }).then(response => {
+        this.clustersList = response.listclustersresponse.cluster || []
+        this.form.clusterid = this.clustersList[0].id || null
+        if (this.form.clusterid) {
+          this.handleChangeCluster(this.form.clusterid)
+        }
+      }).catch(error => {
+        this.$notifyError(error)
+        this.clustersList = []
+        this.form.clusterid = null
+      }).finally(() => {
+        this.loading = false
+      })
+    },
+    handleChangeCluster (value) {
+      this.form.clusterid = value
+      this.selectedCluster = this.clustersList.find(i => i.id === this.form.clusterid)
+    },
+    handleSubmitForm () {
+      if (this.loading) return
+      this.formRef.value.validate().then(() => {
+        const formRaw = toRaw(this.form)
+        const values = this.handleRemoveFields(formRaw)
+
+        this.args = {}
+        if (this.resource.scope === 'ZONE') {
+          this.args = {
+            id: this.resource.id,
+            scope: 'CLUSTER',
+            clusterid: values.clusterid
+          }
+        } else {
+          this.args = {
+            id: this.resource.id,
+            scope: 'ZONE'
+          }
+        }
+
+        this.changeStoragePoolScope(this.args)
+      }).catch(error => {
+        this.formRef.value.scrollToField(error.errorFields[0].name)
+      })
+    },
+    closeAction () {
+      this.$emit('close-action')
+    },
+    changeStoragePoolScope (args) {
+      api('changeStoragePoolScope', args).then(json => {
+        this.$pollJob({
+          jobId: json.changestoragepoolscoperesponse.jobid,
+          title: this.$t('message.success.change.scope'),
+          description: args.name,
+          successMessage: this.$t('message.success.change.scope'),
+          successMethod: (result) => {
+            this.closeAction()
+          },
+          errorMessage: this.$t('message.change.scope.failed'),
+          loadingMessage: this.$t('message.change.scope.processing'),
+          catchMessage: this.$t('error.fetching.async.job.result')
+        })
+        this.closeAction()
+      }).catch(error => {
+        this.$notifyError(error)
+      }).finally(() => {
+        this.loading = false
+      })
+    }
+
+  }
+}
+</script>
+
+<style lang="scss">
+  .form {
+    &__label {
+      margin-bottom: 5px;
+
+      .required {
+        margin-left: 10px;
+      }
+    }
+    &__item {
+      margin-bottom: 20px;
+    }
+    .ant-select {
+      width: 85vw;
+      @media (min-width: 760px) {
+        width: 400px;
+      }
+    }
+  }
+
+  .required {
+    color: #ff0000;
+    &-label {
+      display: none;
+      &--error {
+        display: block;
+      }
+    }
+  }
+</style>