blob: bbe635448871cc997c6d1587dbf0a18bb4483691 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Maintainer: Jed Smith <jed@linode.com>
# Based upon code written by Alex Polvi <polvi@cloudkick.com>
#
import sys
import unittest
from libcloud.test import MockHttp
from libcloud.utils.py3 import httplib
from libcloud.compute.base import Node, StorageVolume, NodeAuthSSHKey, NodeAuthPassword
from libcloud.test.compute import TestCaseMixin
from libcloud.test.file_fixtures import ComputeFileFixtures
from libcloud.compute.drivers.linode import LinodeNodeDriver
class LinodeTest(unittest.TestCase, TestCaseMixin):
# The Linode test suite
def setUp(self):
LinodeNodeDriver.connectionCls.conn_class = LinodeMockHttp
LinodeMockHttp.use_param = "api_action"
self.driver = LinodeNodeDriver("foo", api_version="3.0")
def test_list_nodes(self):
nodes = self.driver.list_nodes()
self.assertEqual(len(nodes), 1)
node = nodes[0]
self.assertEqual(node.id, "8098")
self.assertEqual(node.name, "api-node3")
self.assertEqual(node.extra["PLANID"], "2")
self.assertTrue("75.127.96.245" in node.public_ips)
self.assertEqual(node.private_ips, [])
def test_reboot_node(self):
# An exception would indicate failure
node = self.driver.list_nodes()[0]
self.driver.reboot_node(node)
def test_destroy_node(self):
# An exception would indicate failure
node = self.driver.list_nodes()[0]
self.driver.destroy_node(node)
def test_create_node_password_auth(self):
# Will exception on failure
self.driver.create_node(
name="Test",
location=self.driver.list_locations()[0],
size=self.driver.list_sizes()[0],
image=self.driver.list_images()[6],
auth=NodeAuthPassword("test123"),
)
def test_create_node_ssh_key_auth(self):
# Will exception on failure
node = self.driver.create_node(
name="Test",
location=self.driver.list_locations()[0],
size=self.driver.list_sizes()[0],
image=self.driver.list_images()[6],
auth=NodeAuthSSHKey("foo"),
)
self.assertTrue(isinstance(node, Node))
def test_list_sizes(self):
sizes = self.driver.list_sizes()
self.assertEqual(len(sizes), 9)
for size in sizes:
self.assertEqual(size.ram, int(size.name.split(" ")[1]))
def test_list_images(self):
images = self.driver.list_images()
self.assertEqual(len(images), 30)
def test_create_node_response(self):
# should return a node object
node = self.driver.create_node(
name="node-name",
location=self.driver.list_locations()[0],
size=self.driver.list_sizes()[0],
image=self.driver.list_images()[0],
auth=NodeAuthPassword("foobar"),
)
self.assertTrue(isinstance(node, Node))
def test_destroy_volume(self):
# Will exception on failure
node = self.driver.list_nodes()[0]
volume = StorageVolume(
id=55648,
name="test",
size=1024,
driver=self.driver,
extra={"LINODEID": node.id},
)
self.driver.destroy_volume(volume)
def test_ex_create_volume(self):
# should return a StorageVolume object
node = self.driver.list_nodes()[0]
volume = self.driver.ex_create_volume(
size=4096, name="Another test image", node=node, fs_type="ext4"
)
self.assertTrue(isinstance(volume, StorageVolume))
def test_ex_list_volumes(self):
# should return list of StorageVolume objects
node = self.driver.list_nodes()[0]
volumes = self.driver.ex_list_volumes(node=node)
self.assertTrue(isinstance(volumes, list))
self.assertTrue(isinstance(volumes[0], StorageVolume))
self.assertEqual(len(volumes), 2)
class LinodeMockHttp(MockHttp):
fixtures = ComputeFileFixtures("linode")
def _avail_datacenters(self, method, url, body, headers):
body = self.fixtures.load("_avail_datacenters.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _avail_linodeplans(self, method, url, body, headers):
body = self.fixtures.load("_avail_linodeplans.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _avail_distributions(self, method, url, body, headers):
body = self.fixtures.load("_avail_distributions.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_create(self, method, url, body, headers):
body = '{"ERRORARRAY":[],"ACTION":"linode.create","DATA":{"LinodeID":8098}}'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_disk_create(self, method, url, body, headers):
body = (
'{"ERRORARRAY":[],"ACTION":"linode.disk.create","DATA":{"JobID":1298,"DiskID":55647}}'
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_disk_delete(self, method, url, body, headers):
body = (
'{"ERRORARRAY":[],"ACTION":"linode.disk.delete","DATA":{"JobID":1298,"DiskID":55648}}'
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_disk_createfromdistribution(self, method, url, body, headers):
body = '{"ERRORARRAY":[],"ACTION":"linode.disk.createFromDistribution","DATA":{"JobID":1298,"DiskID":55647}}'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_disk_list(self, method, url, body, headers):
body = self.fixtures.load("_linode_disk_list.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_delete(self, method, url, body, headers):
body = '{"ERRORARRAY":[],"ACTION":"linode.delete","DATA":{"LinodeID":8098}}'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_update(self, method, url, body, headers):
body = '{"ERRORARRAY":[],"ACTION":"linode.update","DATA":{"LinodeID":8098}}'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_reboot(self, method, url, body, headers):
body = '{"ERRORARRAY":[],"ACTION":"linode.reboot","DATA":{"JobID":1305}}'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _avail_kernels(self, method, url, body, headers):
body = self.fixtures.load("_avail_kernels.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_boot(self, method, url, body, headers):
body = '{"ERRORARRAY":[],"ACTION":"linode.boot","DATA":{"JobID":1300}}'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_config_create(self, method, url, body, headers):
body = '{"ERRORARRAY":[],"ACTION":"linode.config.create","DATA":{"ConfigID":31239}}'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_list(self, method, url, body, headers):
body = self.fixtures.load("_linode_list.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _linode_ip_list(self, method, url, body, headers):
body = self.fixtures.load("_linode_ip_list.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _batch(self, method, url, body, headers):
body = self.fixtures.load("_batch.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if __name__ == "__main__":
sys.exit(unittest.main())