blob: 31562ff87e49bdeaa255657b09b7459fb9fd7f0b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import urlparse
from libcloud.utils.py3 import parse_qsl
from libcloud.compute.drivers.cloudstack import CloudStackNodeDriver
try:
import simplejson as json
except ImportError:
import json
from libcloud.compute.drivers.cloudstack import CloudStackNodeDriver
from libcloud.compute.types import DeploymentError, LibcloudError
from libcloud.compute.base import Node, NodeImage, NodeSize, NodeLocation
from libcloud.test import unittest
from libcloud.test import MockHttpTestCase
from libcloud.test.compute import TestCaseMixin
from libcloud.test.file_fixtures import ComputeFileFixtures
class CloudStackNodeDriverTest(unittest.TestCase, TestCaseMixin):
def setUp(self):
CloudStackNodeDriver.connectionCls.conn_classes = \
(None, CloudStackMockHttp)
self.driver = CloudStackNodeDriver('apikey', 'secret',
path='/test/path',
host='api.dummy.com')
self.driver.path = '/test/path'
self.driver.type = -1
CloudStackMockHttp.fixture_tag = 'default'
self.driver.connection.poll_interval = 0.0
def test_create_node_immediate_failure(self):
size = self.driver.list_sizes()[0]
image = self.driver.list_images()[0]
CloudStackMockHttp.fixture_tag = 'deployfail'
try:
node = self.driver.create_node(name='node-name',
image=image,
size=size)
except:
return
self.assertTrue(False)
def test_create_node_delayed_failure(self):
size = self.driver.list_sizes()[0]
image = self.driver.list_images()[0]
CloudStackMockHttp.fixture_tag = 'deployfail2'
try:
node = self.driver.create_node(name='node-name',
image=image,
size=size)
except:
return
self.assertTrue(False)
def test_create_node_default_location_success(self):
size = self.driver.list_sizes()[0]
image = self.driver.list_images()[0]
default_location = self.driver.list_locations()[0]
node = self.driver.create_node(name='fred',
image=image,
size=size)
self.assertEqual(node.name, 'fred')
self.assertEqual(node.public_ips, [])
self.assertEqual(node.private_ips, ['1.1.1.2'])
self.assertEqual(node.extra['zoneid'], default_location.id)
def test_list_images_no_images_available(self):
CloudStackMockHttp.fixture_tag = 'notemplates'
images = self.driver.list_images()
self.assertEquals(0, len(images))
def test_list_images(self):
_, fixture = CloudStackMockHttp()._load_fixture(
'listTemplates_default.json')
templates = fixture['listtemplatesresponse']['template']
images = self.driver.list_images()
for i, image in enumerate(images):
# NodeImage expects id to be a string,
# the CloudStack fixture has an int
tid = str(templates[i]['id'])
tname = templates[i]['name']
self.assertIsInstance(image.driver, CloudStackNodeDriver)
self.assertEquals(image.id, tid)
self.assertEquals(image.name, tname)
def test_ex_list_disk_offerings(self):
diskOfferings = self.driver.ex_list_disk_offerings()
self.assertEquals(1, len(diskOfferings))
diskOffering, = diskOfferings
self.assertEquals('Disk offer 1', diskOffering.name)
self.assertEquals(10, diskOffering.size)
def test_ex_list_networks(self):
_, fixture = CloudStackMockHttp()._load_fixture(
'listNetworks_default.json')
fixture_networks = fixture['listnetworksresponse']['network']
networks = self.driver.ex_list_networks()
for i, network in enumerate(networks):
self.assertEquals(network.id, fixture_networks[i]['id'])
self.assertEquals(
network.displaytext, fixture_networks[i]['displaytext'])
self.assertEquals(network.name, fixture_networks[i]['name'])
self.assertEquals(
network.networkofferingid,
fixture_networks[i]['networkofferingid'])
self.assertEquals(network.zoneid, fixture_networks[i]['zoneid'])
def test_create_volume(self):
volumeName = 'vol-0'
location = self.driver.list_locations()[0]
volume = self.driver.create_volume(10, volumeName, location)
self.assertEquals(volumeName, volume.name)
self.assertEquals(10, volume.size)
def test_create_volume_no_noncustomized_offering_with_size(self):
"""If the sizes of disk offerings are not configurable and there
are no disk offerings with the requested size, an exception should
be thrown."""
location = self.driver.list_locations()[0]
self.assertRaises(
LibcloudError,
self.driver.create_volume,
'vol-0', location, 11)
def test_create_volume_with_custom_disk_size_offering(self):
CloudStackMockHttp.fixture_tag = 'withcustomdisksize'
volumeName = 'vol-0'
location = self.driver.list_locations()[0]
volume = self.driver.create_volume(10, volumeName, location)
self.assertEquals(volumeName, volume.name)
def test_attach_volume(self):
node = self.driver.list_nodes()[0]
volumeName = 'vol-0'
location = self.driver.list_locations()[0]
volume = self.driver.create_volume(10, volumeName, location)
attachReturnVal = self.driver.attach_volume(volume, node)
self.assertTrue(attachReturnVal)
def test_list_nodes(self):
node = self.driver.list_nodes()[0]
self.assertEquals('test', node.name)
def test_list_locations(self):
location = self.driver.list_locations()[0]
self.assertEquals('Sydney', location.name)
def test_start_node(self):
node = self.driver.list_nodes()[0]
res = node.ex_start()
self.assertEquals('Starting', res)
def test_stop_node(self):
node = self.driver.list_nodes()[0]
res = node.ex_stop()
self.assertEquals('Stopped', res)
def test_list_keypairs(self):
keypairs = self.driver.ex_list_keypairs()
fingerprint = '00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:' + \
'00:00:00:00:00'
self.assertEqual(keypairs[0]['name'], 'cs-keypair')
self.assertEqual(keypairs[0]['fingerprint'], fingerprint)
def test_create_keypair(self):
self.assertRaises(
LibcloudError,
self.driver.ex_create_keypair,
'cs-keypair')
def test_delete_keypair(self):
res = self.driver.ex_delete_keypair('cs-keypair')
self.assertTrue(res)
def test_list_security_groups(self):
groups = self.driver.ex_list_security_groups()
self.assertEqual(groups[0]['name'], 'default')
def test_create_security_group(self):
group = self.driver.ex_create_security_group(name='MySG')
self.assertEqual(group['name'], 'MySG')
def test_delete_security_group(self):
res = self.driver.ex_delete_security_group(name='MySG')
self.assertTrue(res)
def test_authorize_security_group_ingress(self):
res = self.driver.ex_authorize_security_group_ingress('MySG',
'TCP',
'22',
'22',
'0.0.0.0/0')
self.assertTrue(res)
class CloudStackMockHttp(MockHttpTestCase):
fixtures = ComputeFileFixtures('cloudstack')
fixture_tag = 'default'
def _load_fixture(self, fixture):
body = self.fixtures.load(fixture)
return body, json.loads(body)
def _test_path(self, method, url, body, headers):
url = urlparse.urlparse(url)
query = dict(parse_qsl(url.query))
self.assertTrue('apiKey' in query)
self.assertTrue('command' in query)
self.assertTrue('response' in query)
self.assertTrue('signature' in query)
self.assertTrue(query['response'] == 'json')
del query['apiKey']
del query['response']
del query['signature']
command = query.pop('command')
if hasattr(self, '_cmd_' + command):
return getattr(self, '_cmd_' + command)(**query)
else:
fixture = command + '_' + self.fixture_tag + '.json'
body, obj = self._load_fixture(fixture)
return (httplib.OK, body, obj, httplib.responses[httplib.OK])
def _cmd_queryAsyncJobResult(self, jobid):
fixture = 'queryAsyncJobResult' + '_' + str(jobid) + '.json'
body, obj = self._load_fixture(fixture)
return (httplib.OK, body, obj, httplib.responses[httplib.OK])
if __name__ == '__main__':
sys.exit(unittest.main())