| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License.You may obtain a copy of the License at |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import sys |
| import unittest |
| from mock import MagicMock |
| |
| |
| from libcloud.test import MockHttp |
| from libcloud.utils.py3 import httplib |
| from libcloud.dns.drivers.zonomi import ZonomiDNSDriver |
| from libcloud.test.secrets import DNS_PARAMS_ZONOMI |
| from libcloud.test.file_fixtures import DNSFileFixtures |
| from libcloud.dns.types import RecordType |
| from libcloud.dns.types import ZoneDoesNotExistError, ZoneAlreadyExistsError |
| from libcloud.dns.types import RecordDoesNotExistError |
| from libcloud.dns.types import RecordAlreadyExistsError |
| from libcloud.dns.base import Zone, Record |
| |
| |
| class ZonomiTests(unittest.TestCase): |
| def setUp(self): |
| ZonomiDNSDriver.connectionCls.conn_class = ZonomiMockHttp |
| ZonomiMockHttp.type = None |
| self.driver = ZonomiDNSDriver(*DNS_PARAMS_ZONOMI) |
| self.test_zone = Zone( |
| id="zone.com", |
| domain="zone.com", |
| driver=self.driver, |
| type="master", |
| ttl=None, |
| extra={}, |
| ) |
| self.test_record = Record( |
| id="record.zone.com", |
| name="record.zone.com", |
| data="127.0.0.1", |
| type="A", |
| zone=self.test_zone, |
| driver=self, |
| extra={}, |
| ) |
| |
| def test_list_record_types(self): |
| record_types = self.driver.list_record_types() |
| self.assertEqual(len(record_types), 3) |
| self.assertTrue(RecordType.A in record_types) |
| self.assertTrue(RecordType.MX in record_types) |
| self.assertTrue(RecordType.TXT in record_types) |
| |
| def test_list_zones_empty(self): |
| ZonomiMockHttp.type = "EMPTY_ZONES_LIST" |
| zones = self.driver.list_zones() |
| |
| self.assertEqual(zones, []) |
| |
| def test_list_zones_success(self): |
| zones = self.driver.list_zones() |
| |
| self.assertEqual(len(zones), 3) |
| |
| zone = zones[0] |
| self.assertEqual(zone.id, "thegamertest.com") |
| self.assertEqual(zone.domain, "thegamertest.com") |
| self.assertEqual(zone.type, "master") |
| self.assertIsNone(zone.ttl) |
| self.assertEqual(zone.driver, self.driver) |
| |
| second_zone = zones[1] |
| self.assertEqual(second_zone.id, "lonelygamer.com") |
| self.assertEqual(second_zone.domain, "lonelygamer.com") |
| self.assertEqual(second_zone.type, "master") |
| self.assertIsNone(second_zone.ttl) |
| self.assertEqual(second_zone.driver, self.driver) |
| |
| third_zone = zones[2] |
| self.assertEqual(third_zone.id, "gamertest.com") |
| self.assertEqual(third_zone.domain, "gamertest.com") |
| self.assertEqual(third_zone.type, "master") |
| self.assertIsNone(third_zone.ttl) |
| self.assertEqual(third_zone.driver, self.driver) |
| |
| def test_get_zone_GET_ZONE_DOES_NOT_EXIST(self): |
| ZonomiMockHttp.type = "GET_ZONE_DOES_NOT_EXIST" |
| try: |
| self.driver.get_zone("testzone.com") |
| except ZoneDoesNotExistError as e: |
| self.assertEqual(e.zone_id, "testzone.com") |
| else: |
| self.fail("Exception was not thrown.") |
| |
| def test_get_zone_GET_ZONE_SUCCESS(self): |
| ZonomiMockHttp.type = "GET_ZONE_SUCCESS" |
| zone = self.driver.get_zone(zone_id="gamertest.com") |
| |
| self.assertEqual(zone.id, "gamertest.com") |
| self.assertEqual(zone.domain, "gamertest.com") |
| self.assertEqual(zone.type, "master") |
| self.assertIsNone(zone.ttl) |
| self.assertEqual(zone.driver, self.driver) |
| |
| def test_delete_zone_DELETE_ZONE_DOES_NOT_EXIST(self): |
| ZonomiMockHttp.type = "DELETE_ZONE_DOES_NOT_EXIST" |
| try: |
| self.driver.delete_zone(zone=self.test_zone) |
| except ZoneDoesNotExistError as e: |
| self.assertEqual(e.zone_id, self.test_zone.id) |
| else: |
| self.fail("Exception was not thrown.") |
| |
| def test_delete_zone_delete_zone_success(self): |
| ZonomiMockHttp.type = "DELETE_ZONE_SUCCESS" |
| status = self.driver.delete_zone(zone=self.test_zone) |
| |
| self.assertEqual(status, True) |
| |
| def test_create_zone_already_exists(self): |
| ZonomiMockHttp.type = "CREATE_ZONE_ALREADY_EXISTS" |
| try: |
| self.driver.create_zone(domain="gamertest.com") |
| except ZoneAlreadyExistsError as e: |
| self.assertEqual(e.zone_id, "gamertest.com") |
| else: |
| self.fail("Exception was not thrown.") |
| |
| def test_create_zone_create_zone_success(self): |
| ZonomiMockHttp.type = "CREATE_ZONE_SUCCESS" |
| |
| zone = self.driver.create_zone(domain="myzone.com") |
| |
| self.assertEqual(zone.id, "myzone.com") |
| self.assertEqual(zone.domain, "myzone.com") |
| self.assertEqual(zone.type, "master") |
| self.assertIsNone(zone.ttl) |
| |
| def test_list_records_empty_list(self): |
| ZonomiMockHttp.type = "LIST_RECORDS_EMPTY_LIST" |
| pass |
| |
| def test_list_records_success(self): |
| ZonomiMockHttp.type = "LIST_RECORDS_SUCCESS" |
| records = self.driver.list_records(zone=self.test_zone) |
| |
| self.assertEqual(len(records), 4) |
| |
| record = records[0] |
| self.assertEqual(record.id, "zone.com") |
| self.assertEqual(record.type, "SOA") |
| self.assertEqual(record.data, "ns1.zonomi.com. soacontact.zonomi.com. 13") |
| self.assertEqual(record.name, "zone.com") |
| self.assertEqual(record.zone, self.test_zone) |
| |
| second_record = records[1] |
| self.assertEqual(second_record.id, "zone.com") |
| self.assertEqual(second_record.name, "zone.com") |
| self.assertEqual(second_record.type, "NS") |
| self.assertEqual(second_record.data, "ns1.zonomi.com") |
| self.assertEqual(second_record.zone, self.test_zone) |
| |
| third_record = records[2] |
| self.assertEqual(third_record.id, "oltjano") |
| self.assertEqual(third_record.name, "oltjano") |
| self.assertEqual(third_record.type, "A") |
| self.assertEqual(third_record.data, "127.0.0.1") |
| self.assertEqual(third_record.zone, self.test_zone) |
| |
| fourth_record = records[3] |
| self.assertEqual(fourth_record.id, "zone.com") |
| self.assertEqual(fourth_record.name, "zone.com") |
| self.assertEqual(fourth_record.type, "NS") |
| self.assertEqual(fourth_record.data, "ns5.zonomi.com") |
| self.assertEqual(fourth_record.zone, self.test_zone) |
| |
| def test_get_record_does_not_exist(self): |
| ZonomiMockHttp.type = "GET_RECORD_DOES_NOT_EXIST" |
| zone = Zone( |
| id="zone.com", |
| domain="zone.com", |
| type="master", |
| ttl=None, |
| driver=self.driver, |
| ) |
| self.driver.get_zone = MagicMock(return_value=zone) |
| record_id = "nonexistent" |
| try: |
| self.driver.get_record(record_id=record_id, zone_id="zone.com") |
| except RecordDoesNotExistError as e: |
| self.assertEqual(e.record_id, record_id) |
| else: |
| self.fail("Exception was not thrown.") |
| |
| def test_get_record_success(self): |
| ZonomiMockHttp.type = "GET_RECORD_SUCCESS" |
| zone = Zone( |
| id="zone.com", |
| domain="zone.com", |
| type="master", |
| ttl=None, |
| driver=self.driver, |
| ) |
| self.driver.get_zone = MagicMock(return_value=zone) |
| record = self.driver.get_record(record_id="oltjano", zone_id="zone.com") |
| |
| self.assertEqual(record.id, "oltjano") |
| self.assertEqual(record.name, "oltjano") |
| self.assertEqual(record.type, "A") |
| self.assertEqual(record.data, "127.0.0.1") |
| |
| def test_delete_record_does_not_exist(self): |
| ZonomiMockHttp.type = "DELETE_RECORD_DOES_NOT_EXIST" |
| record = self.test_record |
| try: |
| self.driver.delete_record(record=record) |
| except RecordDoesNotExistError as e: |
| self.assertEqual(e.record_id, record.id) |
| else: |
| self.fail("Exception was not thrown.") |
| |
| def test_delete_record_success(self): |
| ZonomiMockHttp.type = "DELETE_RECORD_SUCCESS" |
| record = self.test_record |
| status = self.driver.delete_record(record=record) |
| |
| self.assertEqual(status, True) |
| |
| def test_create_record_already_exists(self): |
| zone = self.test_zone |
| ZonomiMockHttp.type = "CREATE_RECORD_ALREADY_EXISTS" |
| try: |
| self.driver.create_record( |
| name="createrecord", type="A", data="127.0.0.1", zone=zone, extra={} |
| ) |
| except RecordAlreadyExistsError as e: |
| self.assertEqual(e.record_id, "createrecord") |
| else: |
| self.fail("Exception was not thrown.") |
| |
| def test_create_record_success(self): |
| ZonomiMockHttp.type = "CREATE_RECORD_SUCCESS" |
| zone = self.test_zone |
| record = self.driver.create_record( |
| name="createrecord", zone=zone, type="A", data="127.0.0.1", extra={} |
| ) |
| |
| self.assertEqual(record.id, "createrecord") |
| self.assertEqual(record.name, "createrecord") |
| self.assertEqual(record.type, "A") |
| self.assertEqual(record.data, "127.0.0.1") |
| self.assertEqual(record.zone, zone) |
| |
| def test_convert_to_slave(self): |
| zone = self.test_zone |
| result = self.driver.ex_convert_to_secondary(zone, "1.2.3.4") |
| self.assertTrue(result) |
| |
| def test_convert_to_slave_couldnt_convert(self): |
| zone = self.test_zone |
| ZonomiMockHttp.type = "COULDNT_CONVERT" |
| try: |
| self.driver.ex_convert_to_secondary(zone, "1.2.3.4") |
| except ZoneDoesNotExistError as e: |
| self.assertEqual(e.zone_id, "zone.com") |
| else: |
| self.fail("Exception was not thrown.") |
| |
| def test_convert_to_master(self): |
| zone = self.test_zone |
| result = self.driver.ex_convert_to_master(zone) |
| self.assertTrue(result) |
| |
| def test_convert_to_master_couldnt_convert(self): |
| zone = self.test_zone |
| ZonomiMockHttp.type = "COULDNT_CONVERT" |
| try: |
| self.driver.ex_convert_to_master(zone) |
| except ZoneDoesNotExistError as e: |
| self.assertEqual(e.zone_id, "zone.com") |
| else: |
| self.fail("Exception was not thrown.") |
| |
| |
| class ZonomiMockHttp(MockHttp): |
| fixtures = DNSFileFixtures("zonomi") |
| |
| def _app_dns_dyndns_jsp_EMPTY_ZONES_LIST(self, method, url, body, headers): |
| body = self.fixtures.load("empty_zones_list.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp(self, method, url, body, headers): |
| body = self.fixtures.load("list_zones.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_GET_ZONE_DOES_NOT_EXIST(self, method, url, body, headers): |
| body = self.fixtures.load("list_zones.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_GET_ZONE_SUCCESS(self, method, url, body, headers): |
| body = self.fixtures.load("list_zones.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_DELETE_ZONE_DOES_NOT_EXIST( |
| self, method, url, body, headers |
| ): |
| body = self.fixtures.load("delete_zone_does_not_exist.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_DELETE_ZONE_SUCCESS(self, method, url, body, headers): |
| body = self.fixtures.load("delete_zone.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_addzone_jsp_CREATE_ZONE_SUCCESS(self, method, url, body, headers): |
| body = self.fixtures.load("create_zone.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_addzone_jsp_CREATE_ZONE_ALREADY_EXISTS( |
| self, method, url, body, headers |
| ): |
| body = self.fixtures.load("create_zone_already_exists.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_LIST_RECORDS_EMPTY_LIST(self, method, url, body, headers): |
| body = self.fixtures.load("list_records_empty_list.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_LIST_RECORDS_SUCCESS(self, method, url, body, headers): |
| body = self.fixtures.load("list_records.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_DELETE_RECORD_SUCCESS(self, method, url, body, headers): |
| body = self.fixtures.load("delete_record.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_DELETE_RECORD_DOES_NOT_EXIST( |
| self, method, url, body, headers |
| ): |
| body = self.fixtures.load("delete_record_does_not_exist.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_CREATE_RECORD_SUCCESS(self, method, url, body, headers): |
| body = self.fixtures.load("create_record.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_CREATE_RECORD_ALREADY_EXISTS( |
| self, method, url, body, headers |
| ): |
| body = self.fixtures.load("create_record_already_exists.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_GET_RECORD_SUCCESS(self, method, url, body, headers): |
| body = self.fixtures.load("list_records.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_dyndns_jsp_GET_RECORD_DOES_NOT_EXIST(self, method, url, body, headers): |
| body = self.fixtures.load("list_records.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_converttosecondary_jsp(self, method, url, body, headers): |
| body = self.fixtures.load("converted_to_slave.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_converttosecondary_jsp_COULDNT_CONVERT( |
| self, method, url, body, headers |
| ): |
| body = self.fixtures.load("couldnt_convert.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_converttomaster_jsp(self, method, url, body, headers): |
| body = self.fixtures.load("converted_to_master.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _app_dns_converttomaster_jsp_COULDNT_CONVERT(self, method, url, body, headers): |
| body = self.fixtures.load("couldnt_convert.xml") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(unittest.main()) |