| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| """ Test cases for Testing Hypervisor Capabilities |
| """ |
| from nose.plugins.attrib import attr |
| from marvin.cloudstackTestCase import cloudstackTestCase |
| from marvin.lib.utils import (cleanup_resources, |
| validateList) |
| from marvin.lib.base import (Account, |
| ServiceOffering, |
| DiskOffering, |
| VirtualMachine, |
| Volume, |
| Host, |
| VmSnapshot) |
| from marvin.lib.common import (get_domain, |
| get_zone, |
| get_template, |
| list_virtual_machines, |
| list_ssvms, |
| list_routers) |
| from marvin.lib.decoratorGenerators import skipTestIf |
| |
| |
| from marvin.cloudstackAPI import (updateHypervisorCapabilities, |
| listHypervisorCapabilities) |
| |
| from marvin.codes import PASS |
| |
| |
| class TestHypervisorCapabilities(cloudstackTestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| testClient = super(TestHypervisorCapabilities, cls).getClsTestClient() |
| cls.apiclient = testClient.getApiClient() |
| cls.services = testClient.getParsedTestDataConfig() |
| cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests()) |
| cls.domain = get_domain(cls.apiclient) |
| |
| cls.hypervisor = cls.testClient.getHypervisorInfo() |
| # Get Zone, Domain and templates |
| |
| cls.notSupported = True |
| cls._cleanup = [] |
| cls.hosts = Host.list(cls.apiclient, zoneid=cls.zone.id, type='Routing') |
| if isinstance(cls.hosts, list) and len(cls.hosts) > 0: |
| cls.host = cls.hosts[0] |
| cls.notSupported = False |
| |
| cls.hypervisorversion = "default" |
| if hasattr(cls.host, 'hypervisorversion'): |
| cls.hypervisorversion = cls.host.hypervisorversion |
| |
| if cls.notSupported == False: |
| cls.notSupported = True |
| cmdList = listHypervisorCapabilities.listHypervisorCapabilitiesCmd() |
| cmdList.hypervisor = cls.hypervisor |
| capabilities = cls.apiclient.listHypervisorCapabilities(cmdList) |
| for capability in capabilities: |
| if capability.hypervisorversion == cls.hypervisorversion: |
| cls.hostCapability = capability |
| cls.notSupported = False |
| break |
| |
| if cls.notSupported == True: |
| cls.hypervisorversion = "default" |
| for capability in capabilities: |
| if capability.hypervisorversion == cls.hypervisorversion: |
| cls.hostCapability = capability |
| cls.notSupported = False |
| break |
| |
| if cls.notSupported == False: |
| cls.template = get_template( |
| cls.apiclient, |
| cls.zone.id, |
| cls.services["ostype"]) |
| |
| # Create an account |
| cls.account = Account.create( |
| cls.apiclient, |
| cls.services["account"], |
| domainid=cls.domain.id |
| ) |
| cls._cleanup.append(cls.account) |
| |
| # Create user api client of the account |
| cls.userapiclient = testClient.getUserApiClient( |
| UserName=cls.account.name, |
| DomainName=cls.account.domain |
| ) |
| # Create Service offering |
| cls.service_offering = ServiceOffering.create( |
| cls.apiclient, |
| cls.services["service_offering"], |
| hosttags="host1" |
| ) |
| cls._cleanup.append(cls.service_offering) |
| |
| cls.disk_offering = DiskOffering.create( |
| cls.apiclient, |
| cls.services["disk_offering"] |
| ) |
| cls._cleanup.append(cls.disk_offering) |
| |
| return |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestHypervisorCapabilities, cls).tearDownClass() |
| |
| def setUp(self): |
| self.cleanup = [] |
| |
| self.apiclient = self.testClient.getApiClient() |
| self.dbclient = self.testClient.getDbConnection() |
| |
| def tearDown(self): |
| try: |
| if self.notSupported == False: |
| Host.update(self.apiclient, id=self.host.id, hosttags="") |
| self.updateHostHypervisorCapability(self.hostCapability.id, |
| self.hostCapability.maxdatavolumeslimit, |
| self.hostCapability.vmsnapshotenabled) |
| |
| super(TestHypervisorCapabilities, self).tearDown() |
| except Exception as e: |
| raise Exception("Warning: Exception during cleanup : %s" % e) |
| return |
| |
| @skipTestIf("notSupported") |
| @attr(tags=["advanced", "basic"], required_hardware="false") |
| def test_01_check_hypervisor_max_data_volume_limit(self): |
| """ Test hypervisor maxdatavolumeslimit effect |
| |
| # 1. Set maxdatavolumeslimit to 1 for hypervisor |
| # 2. List capabilities and verify value |
| # 3. Deploy a VM and attach a volume to it |
| # 4. Try attach another volume, it should fail |
| # 5. Set maxdatavolumeslimit to 32 for hypervisor |
| # 6. Try attach second volume, it should succeed |
| """ |
| self.updateHostHypervisorCapability(self.hostCapability.id, 1) |
| capabilities = self.listHostHypervisorCapabilities(self.hostCapability.id) |
| self.assertTrue(isinstance(capabilities, list), "listHypervisorCapabilities response not a valid list") |
| self.assertEqual(len(capabilities), 1, "listHypervisorCapabilities response not valid") |
| self.assertEqual(capabilities[0].maxdatavolumeslimit, |
| 1, |
| "listHypervisorCapabilities response maxdatavolumeslimit value not 1") |
| |
| Host.update(self.apiclient, id=self.host.id, hosttags="host1") |
| volume_created1 = Volume.create( |
| self.userapiclient, |
| self.services["volume"], |
| zoneid=self.zone.id, |
| diskofferingid=self.disk_offering.id |
| ) |
| self.cleanup.append(volume_created1) |
| volume_created2 = Volume.create( |
| self.userapiclient, |
| self.services["volume"], |
| zoneid=self.zone.id, |
| diskofferingid=self.disk_offering.id |
| ) |
| self.cleanup.append(volume_created2) |
| vm = VirtualMachine.create( |
| self.userapiclient, |
| self.services["small"], |
| templateid=self.template.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| serviceofferingid=self.service_offering.id, |
| zoneid=self.zone.id |
| ) |
| self.cleanup.append(vm) |
| |
| vm.attach_volume( |
| self.userapiclient, |
| volume_created1 |
| ) |
| |
| try: |
| vm.attach_volume( |
| self.userapiclient, |
| volume_created2 |
| ) |
| vm.detach_volume(self.userapiclient, volume_created1) |
| vm.detach_volume(self.userapiclient, volume_created2) |
| self.fail("Successful to attach 2 DATA disks when max DATA disk limit was set to 1") |
| except Exception as e: |
| self.debug("Failed to attach 2nd DATA disk when max DATA disk limit was set to 1: %s" % e) |
| |
| self.updateHostHypervisorCapability(self.hostCapability.id, 32) |
| |
| vm.attach_volume( |
| self.userapiclient, |
| volume_created2 |
| ) |
| vm.stop(self.userapiclient, forced=True) |
| vm.detach_volume(self.userapiclient, volume_created1) |
| vm.detach_volume(self.userapiclient, volume_created2) |
| |
| @skipTestIf("notSupported") |
| @attr(tags=["advanced", "basic"], required_hardware="false") |
| def test_02_check_hypervisor_vm_snapshot(self): |
| """ Test hypervisor vmsnapshotenabled effect |
| |
| # 1. Set vmsnapshotenabled to false for hypervisor |
| # 2. List capabilities and verify value |
| # 3. Deploy a VM |
| # 4. Try VM snapshot, it should fail |
| # 5. Set vmsnapshotenabled to true for hypervisor |
| # 6. Try VM snapshot again, it should succeed |
| """ |
| if self.hypervisor == "KVM": |
| self.skipTest("Skipping test: Reason - VM Snapshot of running VM is not supported for KVM") |
| self.updateHostHypervisorCapability(self.hostCapability.id, None, False) |
| capabilities = self.listHostHypervisorCapabilities(self.hostCapability.id) |
| self.assertTrue(isinstance(capabilities, list), "listHypervisorCapabilities response not a valid list") |
| self.assertEqual(len(capabilities), 1, "listHypervisorCapabilities response not valid") |
| self.assertEqual(capabilities[0].vmsnapshotenabled, |
| False, |
| "listHypervisorCapabilities response vmsnapshotenabled value not False") |
| |
| Host.update(self.apiclient, id=self.host.id, hosttags="host1") |
| vm = VirtualMachine.create( |
| self.userapiclient, |
| self.services["small"], |
| templateid=self.template.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| serviceofferingid=self.service_offering.id, |
| zoneid=self.zone.id |
| ) |
| self.cleanup.append(vm) |
| |
| try: |
| fail_snapshot = VmSnapshot.create( |
| self.userapiclient, |
| vmid=vm.id, |
| snapshotmemory="false", |
| name="Test Snapshot", |
| description="Test Snapshot Desc" |
| ) |
| self.cleanup.append(fail_snapshot) |
| self.fail("Successful to take VM snapshot even when vmsnapshotenabled was set to False") |
| except Exception as e: |
| self.debug("Failed to take VM snapshot even vmsnapshotenabled was set to False: %s" % e) |
| |
| self.updateHostHypervisorCapability(self.hostCapability.id, None, True) |
| |
| vm_snapshot = VmSnapshot.create( |
| self.userapiclient, |
| vmid=vm.id, |
| snapshotmemory="false", |
| name="Test Snapshot", |
| description="Test Snapshot Desc" |
| ) |
| self.cleanup.append(vm_snapshot) |
| |
| def updateHostHypervisorCapability(self, id, maxDataVolumes, vmSnapshotEnabled=None): |
| cmd = updateHypervisorCapabilities.updateHypervisorCapabilitiesCmd() |
| cmd.id = id |
| if maxDataVolumes != None: |
| cmd.maxdatavolumeslimit = maxDataVolumes |
| if vmSnapshotEnabled != None: |
| cmd.vmsnapshotenabled = vmSnapshotEnabled |
| self.apiclient.updateHypervisorCapabilities(cmd) |
| |
| def listHostHypervisorCapabilities(self, id, hypervisor=None): |
| cmd = listHypervisorCapabilities.listHypervisorCapabilitiesCmd() |
| cmd.id = id |
| return self.apiclient.listHypervisorCapabilities(cmd) |