blob: e0545060088ed3c0d6128f1b963f3a881b830522 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
from mock import MagicMock
from libcloud.test import MockHttp
from libcloud.utils.py3 import httplib
from libcloud.dns.drivers.zonomi import ZonomiDNSDriver
from libcloud.test.secrets import DNS_PARAMS_ZONOMI
from libcloud.test.file_fixtures import DNSFileFixtures
from libcloud.dns.types import RecordType
from libcloud.dns.types import ZoneDoesNotExistError, ZoneAlreadyExistsError
from libcloud.dns.types import RecordDoesNotExistError
from libcloud.dns.types import RecordAlreadyExistsError
from libcloud.dns.base import Zone, Record
class ZonomiTests(unittest.TestCase):
def setUp(self):
ZonomiDNSDriver.connectionCls.conn_class = ZonomiMockHttp
ZonomiMockHttp.type = None
self.driver = ZonomiDNSDriver(*DNS_PARAMS_ZONOMI)
self.test_zone = Zone(
id="zone.com",
domain="zone.com",
driver=self.driver,
type="master",
ttl=None,
extra={},
)
self.test_record = Record(
id="record.zone.com",
name="record.zone.com",
data="127.0.0.1",
type="A",
zone=self.test_zone,
driver=self,
extra={},
)
def test_list_record_types(self):
record_types = self.driver.list_record_types()
self.assertEqual(len(record_types), 3)
self.assertTrue(RecordType.A in record_types)
self.assertTrue(RecordType.MX in record_types)
self.assertTrue(RecordType.TXT in record_types)
def test_list_zones_empty(self):
ZonomiMockHttp.type = "EMPTY_ZONES_LIST"
zones = self.driver.list_zones()
self.assertEqual(zones, [])
def test_list_zones_success(self):
zones = self.driver.list_zones()
self.assertEqual(len(zones), 3)
zone = zones[0]
self.assertEqual(zone.id, "thegamertest.com")
self.assertEqual(zone.domain, "thegamertest.com")
self.assertEqual(zone.type, "master")
self.assertIsNone(zone.ttl)
self.assertEqual(zone.driver, self.driver)
second_zone = zones[1]
self.assertEqual(second_zone.id, "lonelygamer.com")
self.assertEqual(second_zone.domain, "lonelygamer.com")
self.assertEqual(second_zone.type, "master")
self.assertIsNone(second_zone.ttl)
self.assertEqual(second_zone.driver, self.driver)
third_zone = zones[2]
self.assertEqual(third_zone.id, "gamertest.com")
self.assertEqual(third_zone.domain, "gamertest.com")
self.assertEqual(third_zone.type, "master")
self.assertIsNone(third_zone.ttl)
self.assertEqual(third_zone.driver, self.driver)
def test_get_zone_GET_ZONE_DOES_NOT_EXIST(self):
ZonomiMockHttp.type = "GET_ZONE_DOES_NOT_EXIST"
try:
self.driver.get_zone("testzone.com")
except ZoneDoesNotExistError as e:
self.assertEqual(e.zone_id, "testzone.com")
else:
self.fail("Exception was not thrown.")
def test_get_zone_GET_ZONE_SUCCESS(self):
ZonomiMockHttp.type = "GET_ZONE_SUCCESS"
zone = self.driver.get_zone(zone_id="gamertest.com")
self.assertEqual(zone.id, "gamertest.com")
self.assertEqual(zone.domain, "gamertest.com")
self.assertEqual(zone.type, "master")
self.assertIsNone(zone.ttl)
self.assertEqual(zone.driver, self.driver)
def test_delete_zone_DELETE_ZONE_DOES_NOT_EXIST(self):
ZonomiMockHttp.type = "DELETE_ZONE_DOES_NOT_EXIST"
try:
self.driver.delete_zone(zone=self.test_zone)
except ZoneDoesNotExistError as e:
self.assertEqual(e.zone_id, self.test_zone.id)
else:
self.fail("Exception was not thrown.")
def test_delete_zone_delete_zone_success(self):
ZonomiMockHttp.type = "DELETE_ZONE_SUCCESS"
status = self.driver.delete_zone(zone=self.test_zone)
self.assertEqual(status, True)
def test_create_zone_already_exists(self):
ZonomiMockHttp.type = "CREATE_ZONE_ALREADY_EXISTS"
try:
self.driver.create_zone(domain="gamertest.com")
except ZoneAlreadyExistsError as e:
self.assertEqual(e.zone_id, "gamertest.com")
else:
self.fail("Exception was not thrown.")
def test_create_zone_create_zone_success(self):
ZonomiMockHttp.type = "CREATE_ZONE_SUCCESS"
zone = self.driver.create_zone(domain="myzone.com")
self.assertEqual(zone.id, "myzone.com")
self.assertEqual(zone.domain, "myzone.com")
self.assertEqual(zone.type, "master")
self.assertIsNone(zone.ttl)
def test_list_records_empty_list(self):
ZonomiMockHttp.type = "LIST_RECORDS_EMPTY_LIST"
pass
def test_list_records_success(self):
ZonomiMockHttp.type = "LIST_RECORDS_SUCCESS"
records = self.driver.list_records(zone=self.test_zone)
self.assertEqual(len(records), 4)
record = records[0]
self.assertEqual(record.id, "zone.com")
self.assertEqual(record.type, "SOA")
self.assertEqual(record.data, "ns1.zonomi.com. soacontact.zonomi.com. 13")
self.assertEqual(record.name, "zone.com")
self.assertEqual(record.zone, self.test_zone)
second_record = records[1]
self.assertEqual(second_record.id, "zone.com")
self.assertEqual(second_record.name, "zone.com")
self.assertEqual(second_record.type, "NS")
self.assertEqual(second_record.data, "ns1.zonomi.com")
self.assertEqual(second_record.zone, self.test_zone)
third_record = records[2]
self.assertEqual(third_record.id, "oltjano")
self.assertEqual(third_record.name, "oltjano")
self.assertEqual(third_record.type, "A")
self.assertEqual(third_record.data, "127.0.0.1")
self.assertEqual(third_record.zone, self.test_zone)
fourth_record = records[3]
self.assertEqual(fourth_record.id, "zone.com")
self.assertEqual(fourth_record.name, "zone.com")
self.assertEqual(fourth_record.type, "NS")
self.assertEqual(fourth_record.data, "ns5.zonomi.com")
self.assertEqual(fourth_record.zone, self.test_zone)
def test_get_record_does_not_exist(self):
ZonomiMockHttp.type = "GET_RECORD_DOES_NOT_EXIST"
zone = Zone(
id="zone.com",
domain="zone.com",
type="master",
ttl=None,
driver=self.driver,
)
self.driver.get_zone = MagicMock(return_value=zone)
record_id = "nonexistent"
try:
self.driver.get_record(record_id=record_id, zone_id="zone.com")
except RecordDoesNotExistError as e:
self.assertEqual(e.record_id, record_id)
else:
self.fail("Exception was not thrown.")
def test_get_record_success(self):
ZonomiMockHttp.type = "GET_RECORD_SUCCESS"
zone = Zone(
id="zone.com",
domain="zone.com",
type="master",
ttl=None,
driver=self.driver,
)
self.driver.get_zone = MagicMock(return_value=zone)
record = self.driver.get_record(record_id="oltjano", zone_id="zone.com")
self.assertEqual(record.id, "oltjano")
self.assertEqual(record.name, "oltjano")
self.assertEqual(record.type, "A")
self.assertEqual(record.data, "127.0.0.1")
def test_delete_record_does_not_exist(self):
ZonomiMockHttp.type = "DELETE_RECORD_DOES_NOT_EXIST"
record = self.test_record
try:
self.driver.delete_record(record=record)
except RecordDoesNotExistError as e:
self.assertEqual(e.record_id, record.id)
else:
self.fail("Exception was not thrown.")
def test_delete_record_success(self):
ZonomiMockHttp.type = "DELETE_RECORD_SUCCESS"
record = self.test_record
status = self.driver.delete_record(record=record)
self.assertEqual(status, True)
def test_create_record_already_exists(self):
zone = self.test_zone
ZonomiMockHttp.type = "CREATE_RECORD_ALREADY_EXISTS"
try:
self.driver.create_record(
name="createrecord", type="A", data="127.0.0.1", zone=zone, extra={}
)
except RecordAlreadyExistsError as e:
self.assertEqual(e.record_id, "createrecord")
else:
self.fail("Exception was not thrown.")
def test_create_record_success(self):
ZonomiMockHttp.type = "CREATE_RECORD_SUCCESS"
zone = self.test_zone
record = self.driver.create_record(
name="createrecord", zone=zone, type="A", data="127.0.0.1", extra={}
)
self.assertEqual(record.id, "createrecord")
self.assertEqual(record.name, "createrecord")
self.assertEqual(record.type, "A")
self.assertEqual(record.data, "127.0.0.1")
self.assertEqual(record.zone, zone)
def test_convert_to_slave(self):
zone = self.test_zone
result = self.driver.ex_convert_to_secondary(zone, "1.2.3.4")
self.assertTrue(result)
def test_convert_to_slave_couldnt_convert(self):
zone = self.test_zone
ZonomiMockHttp.type = "COULDNT_CONVERT"
try:
self.driver.ex_convert_to_secondary(zone, "1.2.3.4")
except ZoneDoesNotExistError as e:
self.assertEqual(e.zone_id, "zone.com")
else:
self.fail("Exception was not thrown.")
def test_convert_to_master(self):
zone = self.test_zone
result = self.driver.ex_convert_to_master(zone)
self.assertTrue(result)
def test_convert_to_master_couldnt_convert(self):
zone = self.test_zone
ZonomiMockHttp.type = "COULDNT_CONVERT"
try:
self.driver.ex_convert_to_master(zone)
except ZoneDoesNotExistError as e:
self.assertEqual(e.zone_id, "zone.com")
else:
self.fail("Exception was not thrown.")
class ZonomiMockHttp(MockHttp):
fixtures = DNSFileFixtures("zonomi")
def _app_dns_dyndns_jsp_EMPTY_ZONES_LIST(self, method, url, body, headers):
body = self.fixtures.load("empty_zones_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp(self, method, url, body, headers):
body = self.fixtures.load("list_zones.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_GET_ZONE_DOES_NOT_EXIST(self, method, url, body, headers):
body = self.fixtures.load("list_zones.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_GET_ZONE_SUCCESS(self, method, url, body, headers):
body = self.fixtures.load("list_zones.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_DELETE_ZONE_DOES_NOT_EXIST(
self, method, url, body, headers
):
body = self.fixtures.load("delete_zone_does_not_exist.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_DELETE_ZONE_SUCCESS(self, method, url, body, headers):
body = self.fixtures.load("delete_zone.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_addzone_jsp_CREATE_ZONE_SUCCESS(self, method, url, body, headers):
body = self.fixtures.load("create_zone.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_addzone_jsp_CREATE_ZONE_ALREADY_EXISTS(
self, method, url, body, headers
):
body = self.fixtures.load("create_zone_already_exists.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_LIST_RECORDS_EMPTY_LIST(self, method, url, body, headers):
body = self.fixtures.load("list_records_empty_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_LIST_RECORDS_SUCCESS(self, method, url, body, headers):
body = self.fixtures.load("list_records.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_DELETE_RECORD_SUCCESS(self, method, url, body, headers):
body = self.fixtures.load("delete_record.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_DELETE_RECORD_DOES_NOT_EXIST(
self, method, url, body, headers
):
body = self.fixtures.load("delete_record_does_not_exist.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_CREATE_RECORD_SUCCESS(self, method, url, body, headers):
body = self.fixtures.load("create_record.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_CREATE_RECORD_ALREADY_EXISTS(
self, method, url, body, headers
):
body = self.fixtures.load("create_record_already_exists.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_GET_RECORD_SUCCESS(self, method, url, body, headers):
body = self.fixtures.load("list_records.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_dyndns_jsp_GET_RECORD_DOES_NOT_EXIST(self, method, url, body, headers):
body = self.fixtures.load("list_records.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_converttosecondary_jsp(self, method, url, body, headers):
body = self.fixtures.load("converted_to_slave.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_converttosecondary_jsp_COULDNT_CONVERT(
self, method, url, body, headers
):
body = self.fixtures.load("couldnt_convert.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_converttomaster_jsp(self, method, url, body, headers):
body = self.fixtures.load("converted_to_master.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _app_dns_converttomaster_jsp_COULDNT_CONVERT(self, method, url, body, headers):
body = self.fixtures.load("couldnt_convert.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if __name__ == "__main__":
sys.exit(unittest.main())