blob: fc1c558d70fc1d82cda7da498542f90988698e65 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" Tests for importVolume and unmanageVolume APIs
"""
# Import Local Modules
from marvin.cloudstackAPI import unmanageVolume, listVolumesForImport, importVolume
from marvin.cloudstackTestCase import cloudstackTestCase, unittest
from marvin.codes import FAILED
from marvin.lib.base import (Account,
Domain,
Volume,
ServiceOffering,
DiskOffering,
VirtualMachine)
from marvin.lib.common import (get_domain,
get_zone,
get_suitable_test_template,
list_volumes,
list_storage_pools)
# Import System modules
from nose.plugins.attrib import attr
_multiprocess_shared_ = True
class TestImportAndUnmanageVolumes(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestImportAndUnmanageVolumes, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.testdata = cls.testClient.getParsedTestDataConfig()
cls.services = testClient.getParsedTestDataConfig()
cls.hypervisor = testClient.getHypervisorInfo()
if cls.testClient.getHypervisorInfo().lower() != "kvm":
raise unittest.SkipTest("This is only available for KVM")
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient)
cls._cleanup = []
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.services["service_offerings"]["tiny"]
)
cls._cleanup.append(cls.service_offering)
template = get_suitable_test_template(
cls.apiclient,
cls.zone.id,
cls.services["ostype"],
cls.hypervisor
)
if template == FAILED:
assert False, "get_test_template() failed to return template"
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["mode"] = cls.zone.networktype
cls.disk_offering = DiskOffering.create(cls.apiclient,
cls.services["disk_offering"])
cls._cleanup.append(cls.disk_offering)
cls.test_domain = Domain.create(
cls.apiclient,
cls.services["domain"])
cls._cleanup.append(cls.test_domain)
cls.test_account = Account.create(
cls.apiclient,
cls.services["account"],
admin=True,
domainid=cls.test_domain.id)
cls._cleanup.append(cls.test_account)
# Create VM
cls.virtual_machine = VirtualMachine.create(
cls.apiclient,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.test_account.name,
domainid=cls.test_account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls._cleanup.append(cls.virtual_machine)
cls.virtual_machine.stop(cls.apiclient, forced=True)
@classmethod
def tearDownClass(cls):
super(TestImportAndUnmanageVolumes, cls).tearDownClass()
@attr(tags=['advanced', 'basic', 'sg'], required_hardware=False)
def test_01_detach_unmanage_import_volume(self):
"""Test attach/detach/unmanage/import volume
"""
volumes = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='ROOT',
listall=True
)
volume = volumes[0]
volume_pool_response = list_storage_pools(
self.apiclient,
id=volume.storageid
)
volume_pool = volume_pool_response[0]
if volume_pool.type.lower() == "powerflex":
self.skipTest("This test is not supported for storage pool type %s on hypervisor KVM" % volume_pool.type)
# Create DATA volume
volume = Volume.create(
self.apiclient,
self.testdata["volume"],
zoneid=self.zone.id,
account=self.test_account.name,
domainid=self.test_account.domainid,
diskofferingid=self.disk_offering.id
)
# Attach and Detach volume
try:
self.virtual_machine.attach_volume(self.apiclient, volume)
except Exception as e:
self.fail("Attach volume failed with Exception: %s" % e)
self.virtual_machine.detach_volume(self.apiclient, volume)
# List volume by id
volumes = Volume.list(self.apiclient,
id = volume.id)
self.assertTrue(isinstance(volumes, list),
"listVolumes response should return a valid list"
)
self.assertTrue(len(volumes) > 0,
"listVolumes response should return a non-empty list"
)
volume = volumes[0]
# Unmanage volume
cmd = unmanageVolume.unmanageVolumeCmd()
cmd.id = volume.id
self.apiclient.unmanageVolume(cmd)
# List VMs for import
cmd = listVolumesForImport.listVolumesForImportCmd()
cmd.storageid = volume.storageid
volumesForImport = self.apiclient.listVolumesForImport(cmd)
self.assertTrue(isinstance(volumesForImport, list),
"Check listVolumesForImport response returns a valid list"
)
# Import volume
cmd = importVolume.importVolumeCmd()
cmd.storageid = volume.storageid
cmd.path = volume.path
self.apiclient.importVolume(cmd)
# List volume by name
volumes = Volume.list(self.apiclient,
storageid = volume.storageid,
name=volume.path)
self.assertTrue(isinstance(volumes, list),
"listVolumes response should return a valid list"
)
self.assertTrue(len(volumes) > 0,
"listVolumes response should return a non-empty list"
)