blob: 631680bb7506add12f8ca8c19339056d59798db3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
from libcloud.test import MockHttp
from libcloud.utils.py3 import b, httplib, urlparse, parse_qsl
from libcloud.common.types import MalformedResponseError
from libcloud.common.cloudstack import CloudStackConnection
try:
import simplejson as json
except ImportError:
import json
async_delay = 0
class CloudStackMockDriver:
host = "nonexistent."
path = "/path"
async_poll_frequency = 0
name = "fake"
async_delay = 0
class CloudStackCommonTest(unittest.TestCase):
def setUp(self):
CloudStackConnection.conn_class = CloudStackMockHttp
self.connection = CloudStackConnection("apikey", "secret", host=CloudStackMockDriver.host)
self.connection.poll_interval = 0.0
self.driver = self.connection.driver = CloudStackMockDriver()
def test_sync_request_bad_response(self):
self.driver.path = "/bad/response"
try:
self.connection._sync_request("fake")
except Exception as e:
self.assertTrue(isinstance(e, MalformedResponseError))
return
self.assertTrue(False)
def test_sync_request(self):
self.driver.path = "/sync"
self.connection._sync_request("fake")
def test_async_request_successful(self):
self.driver.path = "/async/success"
result = self.connection._async_request("fake")
self.assertEqual(result, {"fake": "result"})
def test_async_request_unsuccessful(self):
self.driver.path = "/async/fail"
try:
self.connection._async_request("fake")
except Exception as e:
self.assertEqual(CloudStackMockHttp.ERROR_TEXT, str(e))
return
self.assertFalse(True)
def test_async_request_delayed(self):
global async_delay
self.driver.path = "/async/delayed"
async_delay = 2
self.connection._async_request("fake")
self.assertEqual(async_delay, 0)
def test_signature_algorithm(self):
cases = [
({"command": "listVirtualMachines"}, "z/a9Y7J52u48VpqIgiwaGUMCso0="),
(
{
"command": "deployVirtualMachine",
"name": "fred",
"displayname": "George",
"serviceofferingid": 5,
"templateid": 17,
"zoneid": 23,
"networkids": 42,
},
"gHTo7mYmadZ+zluKHzlEKb1i/QU=",
),
(
{
"command": "deployVirtualMachine",
"name": "fred",
"displayname": "George+Ringo",
"serviceofferingid": 5,
"templateid": 17,
"zoneid": 23,
"networkids": 42,
},
"tAgfrreI1ZvWlWLClD3gu4+aKv4=",
),
]
connection = CloudStackConnection("fnord", "abracadabra")
for case in cases:
params = connection.add_default_params(case[0])
self.assertEqual(connection._make_signature(params), b(case[1]))
class CloudStackMockHttp(MockHttp, unittest.TestCase):
ERROR_TEXT = "ERROR TEXT"
def _response(self, status, result, response):
return (status, json.dumps(result), {}, response)
def _check_request(self, url):
url = urlparse.urlparse(url)
query = dict(parse_qsl(url.query))
self.assertTrue("apiKey" in query)
self.assertTrue("command" in query)
self.assertTrue("response" in query)
self.assertTrue("signature" in query)
self.assertTrue(query["response"] == "json")
return query
def _bad_response(self, method, url, body, headers):
self._check_request(url)
result = {"success": True}
return self._response(httplib.OK, result, httplib.responses[httplib.OK])
def _sync(self, method, url, body, headers):
query = self._check_request(url)
result = {query["command"].lower() + "response": {}}
return self._response(httplib.OK, result, httplib.responses[httplib.OK])
def _async_success(self, method, url, body, headers):
query = self._check_request(url)
if query["command"].lower() == "queryasyncjobresult":
self.assertEqual(query["jobid"], "42")
result = {
query["command"].lower()
+ "response": {"jobstatus": 1, "jobresult": {"fake": "result"}}
}
else:
result = {query["command"].lower() + "response": {"jobid": "42"}}
return self._response(httplib.OK, result, httplib.responses[httplib.OK])
def _async_fail(self, method, url, body, headers):
query = self._check_request(url)
if query["command"].lower() == "queryasyncjobresult":
self.assertEqual(query["jobid"], "42")
result = {
query["command"].lower()
+ "response": {
"jobstatus": 2,
"jobresult": {"errortext": self.ERROR_TEXT},
}
}
else:
result = {query["command"].lower() + "response": {"jobid": "42"}}
return self._response(httplib.OK, result, httplib.responses[httplib.OK])
def _async_delayed(self, method, url, body, headers):
global async_delay
query = self._check_request(url)
if query["command"].lower() == "queryasyncjobresult":
self.assertEqual(query["jobid"], "42")
if async_delay == 0:
result = {
query["command"].lower()
+ "response": {"jobstatus": 1, "jobresult": {"fake": "result"}}
}
else:
result = {query["command"].lower() + "response": {"jobstatus": 0}}
async_delay -= 1
else:
result = {query["command"].lower() + "response": {"jobid": "42"}}
return self._response(httplib.OK, result, httplib.responses[httplib.OK])
if __name__ == "__main__":
sys.exit(unittest.main())