blob: 35655be47f6694c56759261e3c0a05711574330b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import sys
import unittest
from libcloud.test import MockHttp
from libcloud.dns.types import RecordType, ZoneDoesNotExistError, RecordDoesNotExistError
from libcloud.utils.py3 import httplib
from libcloud.test.secrets import DNS_PARAMS_WORLDWIDEDNS
from libcloud.test.file_fixtures import DNSFileFixtures
from libcloud.common.worldwidedns import InvalidDomainName, NonExistentDomain
from libcloud.dns.drivers.worldwidedns import WorldWideDNSError, WorldWideDNSDriver
class WorldWideDNSTests(unittest.TestCase):
def setUp(self):
WorldWideDNSDriver.connectionCls.conn_class = WorldWideDNSMockHttp
WorldWideDNSMockHttp.type = None
self.driver = WorldWideDNSDriver(*DNS_PARAMS_WORLDWIDEDNS)
def assertHasKeys(self, dictionary, keys):
for key in keys:
self.assertTrue(key in dictionary, 'key "%s" not in dictionary' % (key))
def test_list_record_types(self):
record_types = self.driver.list_record_types()
self.assertEqual(len(record_types), 6)
self.assertTrue(RecordType.A in record_types)
self.assertTrue(RecordType.CNAME in record_types)
self.assertTrue(RecordType.MX in record_types)
self.assertTrue(RecordType.TXT in record_types)
self.assertTrue(RecordType.SRV in record_types)
self.assertTrue(RecordType.NS in record_types)
def test_list_zones_success(self):
zones = self.driver.list_zones()
self.assertEqual(len(zones), 1)
zone = zones[0]
self.assertEqual(zone.id, "niteowebsponsoredthisone.com")
self.assertEqual(zone.type, "master")
self.assertEqual(zone.domain, "niteowebsponsoredthisone.com")
self.assertEqual(zone.ttl, "43200")
self.assertHasKeys(
zone.extra,
[
"HOSTMASTER",
"REFRESH",
"RETRY",
"EXPIRE",
"SECURE",
"S1",
"T1",
"D1",
"S2",
"T2",
"D2",
"S3",
"T3",
"D3",
],
)
def test_list_records_success(self):
zone = self.driver.list_zones()[0]
records = self.driver.list_records(zone=zone)
self.assertEqual(len(records), 3)
www = records[0]
self.assertEqual(www.id, "1")
self.assertEqual(www.name, "www")
self.assertEqual(www.type, RecordType.A)
self.assertEqual(www.data, "0.0.0.0")
self.assertEqual(www.extra, {})
def test_list_records_zone_does_not_exist(self):
WorldWideDNSMockHttp.type = "ZONE_DOES_NOT_EXIST"
try:
zone = self.driver.list_zones()[0]
self.driver.list_records(zone=zone)
except NonExistentDomain as e:
self.assertEqual(e.code, 405)
else:
self.fail("Exception was not thrown")
def test_get_zone_success(self):
zone = self.driver.get_zone(zone_id="niteowebsponsoredthisone.com")
self.assertEqual(zone.id, "niteowebsponsoredthisone.com")
self.assertEqual(zone.type, "master")
self.assertEqual(zone.domain, "niteowebsponsoredthisone.com")
self.assertEqual(zone.ttl, "43200")
self.assertHasKeys(
zone.extra,
[
"HOSTMASTER",
"REFRESH",
"RETRY",
"EXPIRE",
"SECURE",
"S1",
"T1",
"D1",
"S2",
"T2",
"D2",
"S3",
"T3",
"D3",
],
)
def test_get_zone_does_not_exist(self):
WorldWideDNSMockHttp.type = "GET_ZONE_DOES_NOT_EXIST"
try:
self.driver.get_zone(zone_id="unexistentzone")
except ZoneDoesNotExistError as e:
self.assertEqual(e.zone_id, "unexistentzone")
else:
self.fail("Exception was not thrown")
def test_get_record_success(self):
record = self.driver.get_record(zone_id="niteowebsponsoredthisone.com", record_id="1")
self.assertEqual(record.id, "1")
self.assertEqual(record.name, "www")
self.assertEqual(record.type, RecordType.A)
self.assertEqual(record.data, "0.0.0.0")
self.assertEqual(record.extra, {})
def test_get_record_zone_does_not_exist(self):
try:
self.driver.get_record(zone_id="unexistentzone", record_id="3585100")
except ZoneDoesNotExistError as e:
self.assertEqual(e.zone_id, "unexistentzone")
else:
self.fail("Exception was not thrown")
def test_get_record_record_does_not_exist(self):
try:
self.driver.get_record(zone_id="niteowebsponsoredthisone.com", record_id="3585100")
except RecordDoesNotExistError as e:
self.assertEqual(e.record_id, "3585100")
else:
self.fail("Exception was not thrown")
def test_create_zone_success(self):
zone = self.driver.create_zone(domain="niteowebsponsoredthisone.com", type="master")
self.assertEqual(zone.id, "niteowebsponsoredthisone.com")
self.assertEqual(zone.domain, "niteowebsponsoredthisone.com")
self.assertEqual(zone.ttl, "43200")
self.assertEqual(zone.type, "master")
def test_create_zone_validaton_error(self):
WorldWideDNSMockHttp.type = "VALIDATION_ERROR"
try:
self.driver.create_zone(domain="foo.%.com", type="master", ttl=None, extra=None)
except InvalidDomainName as e:
self.assertEqual(e.code, 410)
else:
self.fail("Exception was not thrown")
def test_update_zone_success(self):
zone = self.driver.list_zones()[0]
WorldWideDNSMockHttp.type = "UPDATE_ZONE"
updated_zone = self.driver.update_zone(
zone=zone,
domain="niteowebsponsoredthisone.com", # noqa
ttl=3800,
extra={"HOSTMASTER": "mail.niteowebsponsoredthisone.com"},
) # noqa
self.assertEqual(zone.extra["HOSTMASTER"], "hostmaster.niteowebsponsoredthisone.com")
self.assertEqual(updated_zone.id, zone.id)
self.assertEqual(updated_zone.domain, "niteowebsponsoredthisone.com")
self.assertEqual(updated_zone.type, zone.type)
self.assertEqual(updated_zone.ttl, "3800")
self.assertEqual(updated_zone.extra["HOSTMASTER"], "mail.niteowebsponsoredthisone.com")
self.assertEqual(updated_zone.extra["REFRESH"], zone.extra["REFRESH"])
self.assertEqual(updated_zone.extra["RETRY"], zone.extra["RETRY"])
self.assertEqual(updated_zone.extra["EXPIRE"], zone.extra["EXPIRE"])
self.assertEqual(updated_zone.extra["SECURE"], zone.extra["SECURE"])
self.assertEqual(updated_zone.extra["S1"], zone.extra["S1"])
self.assertEqual(updated_zone.extra["T1"], zone.extra["T1"])
self.assertEqual(updated_zone.extra["D1"], zone.extra["D1"])
self.assertEqual(updated_zone.extra["S2"], zone.extra["S2"])
self.assertEqual(updated_zone.extra["T2"], zone.extra["T2"])
self.assertEqual(updated_zone.extra["D2"], zone.extra["D2"])
self.assertEqual(updated_zone.extra["S3"], zone.extra["S3"])
self.assertEqual(updated_zone.extra["T3"], zone.extra["T3"])
self.assertEqual(updated_zone.extra["D3"], zone.extra["D3"])
def test_create_record_success(self):
zone = self.driver.list_zones()[0]
WorldWideDNSMockHttp.type = "CREATE_RECORD"
record = self.driver.create_record(
name="domain4",
zone=zone,
type=RecordType.A,
data="0.0.0.4",
extra={"entry": 4},
)
self.assertEqual(record.id, "4")
self.assertEqual(record.name, "domain4")
self.assertNotEqual(record.zone.extra.get("S4"), zone.extra.get("S4"))
self.assertNotEqual(record.zone.extra.get("D4"), zone.extra.get("D4"))
self.assertEqual(record.type, RecordType.A)
self.assertEqual(record.data, "0.0.0.4")
def test_create_record_finding_entry(self):
zone = self.driver.list_zones()[0]
WorldWideDNSMockHttp.type = "CREATE_RECORD"
record = self.driver.create_record(
name="domain4", zone=zone, type=RecordType.A, data="0.0.0.4"
)
WorldWideDNSMockHttp.type = "CREATE_SECOND_RECORD"
zone = record.zone
record2 = self.driver.create_record(
name="domain1", zone=zone, type=RecordType.A, data="0.0.0.1"
)
self.assertEqual(record.id, "4")
self.assertEqual(record2.id, "5")
def test_create_record_max_entry_reached(self):
zone = self.driver.list_zones()[0]
WorldWideDNSMockHttp.type = "CREATE_RECORD_MAX_ENTRIES"
record = self.driver.create_record(
name="domain40", zone=zone, type=RecordType.A, data="0.0.0.40"
)
WorldWideDNSMockHttp.type = "CREATE_RECORD"
zone = record.zone
try:
self.driver.create_record(
name="domain41", zone=zone, type=RecordType.A, data="0.0.0.41"
)
except WorldWideDNSError as e:
self.assertEqual(e.value, "All record entries are full")
else:
self.fail("Exception was not thrown")
def test_create_record_max_entry_reached_give_entry(self):
WorldWideDNSMockHttp.type = "CREATE_RECORD_MAX_ENTRIES"
zone = self.driver.list_zones()[0]
record = self.driver.get_record(zone.id, "23")
self.assertEqual(record.id, "23")
self.assertEqual(record.name, "domain23")
self.assertEqual(record.type, "A")
self.assertEqual(record.data, "0.0.0.23")
# No matter if we have all entries full, if we choose a specific
# entry, the record will be replaced with the new one.
WorldWideDNSMockHttp.type = "CREATE_RECORD_MAX_ENTRIES_WITH_ENTRY"
record = self.driver.create_record(
name="domain23b",
zone=zone,
type=RecordType.A,
data="0.0.0.41",
extra={"entry": 23},
)
zone = record.zone
self.assertEqual(record.id, "23")
self.assertEqual(record.name, "domain23b")
self.assertEqual(record.type, "A")
self.assertEqual(record.data, "0.0.0.41")
def test_update_record_success(self):
zone = self.driver.list_zones()[0]
record = self.driver.get_record(zone.id, "1")
WorldWideDNSMockHttp.type = "UPDATE_RECORD"
record = self.driver.update_record(
record=record,
name="domain1",
type=RecordType.A,
data="0.0.0.1",
extra={"entry": 1},
)
self.assertEqual(record.id, "1")
self.assertEqual(record.name, "domain1")
self.assertNotEqual(record.zone.extra.get("S1"), zone.extra.get("S1"))
self.assertNotEqual(record.zone.extra.get("D1"), zone.extra.get("D1"))
self.assertEqual(record.type, RecordType.A)
self.assertEqual(record.data, "0.0.0.1")
def test_delete_zone_success(self):
zone = self.driver.list_zones()[0]
status = self.driver.delete_zone(zone=zone)
self.assertTrue(status)
def test_delete_zone_does_not_exist(self):
zone = self.driver.list_zones()[0]
WorldWideDNSMockHttp.type = "ZONE_DOES_NOT_EXIST"
try:
self.driver.delete_zone(zone=zone)
except NonExistentDomain as e:
self.assertEqual(e.code, 405)
else:
self.fail("Exception was not thrown")
def test_delete_record_success(self):
zone = self.driver.list_zones()[0]
records = self.driver.list_records(zone=zone)
self.assertEqual(len(records), 3)
record = records[1]
WorldWideDNSMockHttp.type = "DELETE_RECORD"
status = self.driver.delete_record(record=record)
self.assertTrue(status)
zone = self.driver.list_zones()[0]
records = self.driver.list_records(zone=zone)
self.assertEqual(len(records), 2)
class WorldWideDNSMockHttp(MockHttp):
fixtures = DNSFileFixtures("worldwidedns")
def _api_dns_list_asp(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list_domain_asp")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_ZONE_DOES_NOT_EXIST(self, method, url, body, headers):
return (httplib.OK, "405", {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_GET_ZONE_DOES_NOT_EXIST(self, method, url, body, headers):
return (httplib.OK, "", {}, httplib.responses[httplib.OK])
def _api_dns_new_domain_asp(self, method, url, body, headers):
return (httplib.OK, "200", {}, httplib.responses[httplib.OK])
def _api_dns_new_domain_asp_VALIDATION_ERROR(self, method, url, body, headers):
return (httplib.OK, "410", {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp(self, method, url, body, headers):
return (httplib.OK, "211\r\n212\r\n213", {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_UPDATE_ZONE(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp_UPDATE_ZONE(self, method, url, body, headers):
return (httplib.OK, "211\r\n212\r\n213", {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp_UPDATE_ZONE(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list_domain_asp_UPDATE_ZONE")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_CREATE_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_CREATE_SECOND_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp_CREATE_RECORD(self, method, url, body, headers):
return (httplib.OK, "211\r\n212\r\n213", {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp_CREATE_SECOND_RECORD(self, method, url, body, headers):
return (httplib.OK, "211\r\n212\r\n213", {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp_CREATE_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list_domain_asp_CREATE_RECORD")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp_CREATE_SECOND_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list_domain_asp_CREATE_SECOND_RECORD")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp_CREATE_RECORD_MAX_ENTRIES(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list_domain_asp_CREATE_RECORD_MAX_ENTRIES")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp_CREATE_RECORD_MAX_ENTRIES(self, method, url, body, headers):
return (httplib.OK, "211\r\n212\r\n213", {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_CREATE_RECORD_MAX_ENTRIES(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp_CREATE_RECORD_MAX_ENTRIES_WITH_ENTRY(
self, method, url, body, headers
):
body = self.fixtures.load("_api_dns_modify_asp_CREATE_RECORD_MAX_ENTRIES_WITH_ENTRY")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp_CREATE_RECORD_MAX_ENTRIES_WITH_ENTRY(self, method, url, body, headers):
return (httplib.OK, "211\r\n212\r\n213", {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_CREATE_RECORD_MAX_ENTRIES_WITH_ENTRY(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_UPDATE_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp_UPDATE_RECORD(self, method, url, body, headers):
return (httplib.OK, "211\r\n212\r\n213", {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp_UPDATE_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list_domain_asp_UPDATE_RECORD")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_delete_domain_asp(self, method, url, body, headers):
return (httplib.OK, "200", {}, httplib.responses[httplib.OK])
def _api_dns_delete_domain_asp_ZONE_DOES_NOT_EXIST(self, method, url, body, headers):
return (httplib.OK, "405", {}, httplib.responses[httplib.OK])
def _api_dns_list_asp_DELETE_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_dns_modify_asp_DELETE_RECORD(self, method, url, body, headers):
return (httplib.OK, "200", {}, httplib.responses[httplib.OK])
def _api_dns_list_domain_asp_DELETE_RECORD(self, method, url, body, headers):
body = self.fixtures.load("api_dns_list_domain_asp_DELETE_RECORD")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if __name__ == "__main__":
sys.exit(unittest.main())