blob: ff6b74ac6797aa0b947d5d99b04a428ddf47ca5d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Dict, List
from xml.etree import ElementTree as ET # noqa
from libcloud.common.base import XmlResponse, ConnectionUserAndKey
# API HOST to connect
API_HOST = "durabledns.com"
def _schema_builder(urn_nid, method, attributes):
"""
Return a xml schema used to do an API request.
:param urn_nid: API urn namespace id.
:type urn_nid: type: ``str``
:param method: API method.
:type method: type: ``str``
:param attributes: List of attributes to include.
:type attributes: ``list`` of ``str``
rtype: :class:`Element`
"""
soap = ET.Element("soap:Body", {"xmlns:m": "https://durabledns.com/services/dns/%s" % method})
urn = ET.SubElement(soap, "urn:{}:{}".format(urn_nid, method))
# Attributes specification
for attribute in attributes:
ET.SubElement(urn, "urn:{}:{}".format(urn_nid, attribute))
return soap
SCHEMA_BUILDER_MAP = {
"list_zones": {
"urn_nid": "listZoneswsdl",
"method": "listZones",
"attributes": ["apiuser", "apikey"],
},
"list_records": {
"urn_nid": "listRecordswsdl",
"method": "listRecords",
"attributes": ["apiuser", "apikey", "zonename"],
},
"get_zone": {
"urn_nid": "getZonewsdl",
"method": "getZone",
"attributes": ["apiuser", "apikey", "zonename"],
},
"get_record": {
"urn_nid": "getRecordwsdl",
"method": "getRecord",
"attributes": ["apiuser", "apikey", "zonename", "recordid"],
},
"create_zone": {
"urn_nid": "createZonewsdl",
"method": "createZone",
"attributes": [
"apiuser",
"apikey",
"zonename",
"ns",
"mbox",
"refresh",
"retry",
"expire",
"minimum",
"ttl",
"xfer",
"update_acl",
],
},
"create_record": {
"urn_nid": "createRecordwsdl",
"method": "createRecord",
"attributes": [
"apiuser",
"apikey",
"zonename",
"name",
"type",
"data",
"aux",
"ttl",
"ddns_enabled",
],
},
"update_zone": {
"urn_nid": "updateZonewsdl",
"method": "updateZone",
"attributes": [
"apiuser",
"apikey",
"zonename",
"ns",
"mbox",
"refresh",
"retry",
"expire",
"minimum",
"ttl",
"xfer",
"update_acl",
],
},
"update_record": {
"urn_nid": "updateRecordwsdl",
"method": "updateRecord",
"attributes": [
"apiuser",
"apikey",
"zonename",
"id",
"name",
"aux",
"data",
"ttl",
"ddns_enabled",
],
},
"delete_zone": {
"urn_nid": "deleteZonewsdl",
"method": "deleteZone",
"attributes": ["apiuser", "apikey", "zonename"],
},
"delete_record": {
"urn_nid": "deleteRecordwsdl",
"method": "deleteRecord",
"attributes": ["apiuser", "apikey", "zonename", "id"],
},
}
class DurableDNSException(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
self.args = (code, message)
def __str__(self):
return "{} {}".format(self.code, self.message)
def __repr__(self):
return "DurableDNSException {} {}".format(self.code, self.message)
class DurableResponse(XmlResponse):
errors = [] # type: List[Dict]
objects = [] # type: List[Dict]
def __init__(self, response, connection):
super().__init__(response=response, connection=connection)
self.objects, self.errors = self.parse_body_and_error()
if self.errors:
raise self._make_excp(self.errors[0])
def parse_body_and_error(self):
"""
Used to parse body from httplib.HttpResponse object.
"""
objects = []
errors = []
error_dict = {}
extra = {}
zone_dict = {}
record_dict = {}
xml_obj = self.parse_body()
# pylint: disable=no-member
envelop_body = list(xml_obj)[0]
method_resp = list(envelop_body)[0]
# parse the xml_obj
# handle errors
if "Fault" in method_resp.tag:
fault = [fault for fault in list(method_resp) if fault.tag == "faultstring"][0]
error_dict["ERRORMESSAGE"] = fault.text.strip()
error_dict["ERRORCODE"] = self.status
errors.append(error_dict)
# parsing response from listZonesResponse
if "listZonesResponse" in method_resp.tag:
answer = list(method_resp)[0]
for element in answer:
zone_dict["id"] = list(element)[0].text
objects.append(zone_dict)
# reset the zone_dict
zone_dict = {}
# parse response from listRecordsResponse
if "listRecordsResponse" in method_resp.tag:
answer = list(method_resp)[0]
for element in answer:
for child in list(element):
if child.tag == "id":
record_dict["id"] = child.text.strip()
objects.append(record_dict)
# reset the record_dict for later usage
record_dict = {}
# parse response from getZoneResponse
if "getZoneResponse" in method_resp.tag:
for child in list(method_resp):
if child.tag == "origin":
zone_dict["id"] = child.text.strip()
zone_dict["domain"] = child.text.strip()
elif child.tag == "ttl":
zone_dict["ttl"] = int(child.text.strip())
elif child.tag == "retry":
extra["retry"] = int(child.text.strip())
elif child.tag == "expire":
extra["expire"] = int(child.text.strip())
elif child.tag == "minimum":
extra["minimum"] = int(child.text.strip())
else:
if child.text:
extra[child.tag] = child.text.strip()
else:
extra[child.tag] = ""
zone_dict["extra"] = extra
objects.append(zone_dict)
# parse response from getRecordResponse
if "getRecordResponse" in method_resp.tag:
answer = list(method_resp)[0]
for child in list(method_resp):
if child.tag == "id" and child.text:
record_dict["id"] = child.text.strip()
elif child.tag == "name" and child.text:
record_dict["name"] = child.text.strip()
elif child.tag == "type" and child.text:
record_dict["type"] = child.text.strip()
elif child.tag == "data" and child.text:
record_dict["data"] = child.text.strip()
elif child.tag == "aux" and child.text:
record_dict["aux"] = child.text.strip()
elif child.tag == "ttl" and child.text:
record_dict["ttl"] = child.text.strip()
if not record_dict:
error_dict["ERRORMESSAGE"] = "Record does not exist"
error_dict["ERRORCODE"] = 404
errors.append(error_dict)
objects.append(record_dict)
record_dict = {}
if "createZoneResponse" in method_resp.tag:
answer = list(method_resp)[0]
if answer.tag == "return" and answer.text:
record_dict["id"] = answer.text.strip()
objects.append(record_dict)
# catch Record does not exists error when deleting record
if "deleteRecordResponse" in method_resp.tag:
answer = list(method_resp)[0]
if "Record does not exists" in answer.text.strip():
errors.append({"ERRORMESSAGE": answer.text.strip(), "ERRORCODE": self.status})
# parse response in createRecordResponse
if "createRecordResponse" in method_resp.tag:
answer = list(method_resp)[0]
record_dict["id"] = answer.text.strip()
objects.append(record_dict)
record_dict = {}
return (objects, errors)
def parse_body(self):
# A problem arise in the api response because there are undeclared
# xml namespaces. In order to fix that at the moment, we use the
# _fix_response method to clean up since we won't always have lxml
# library.
self._fix_response()
body = super().parse_body()
return body
def success(self):
"""
Used to determine if the request was successful.
"""
return len(self.errors) == 0
def _make_excp(self, error):
return DurableDNSException(error["ERRORCODE"], error["ERRORMESSAGE"])
def _fix_response(self):
items = re.findall('<ns1:.+ xmlns:ns1="">', self.body, flags=0)
for item in items:
parts = item.split(" ")
prefix = parts[0].replace("<", "").split(":")[1]
new_item = "<" + prefix + ">"
close_tag = "</" + parts[0].replace("<", "") + ">"
new_close_tag = "</" + prefix + ">"
self.body = self.body.replace(item, new_item)
self.body = self.body.replace(close_tag, new_close_tag)
class DurableConnection(ConnectionUserAndKey):
host = API_HOST
responseCls = DurableResponse
def add_default_params(self, params):
params["user_id"] = self.user_id
params["key"] = self.key
return params
def add_default_headers(self, headers):
headers["Content-Type"] = "text/xml"
headers["Content-Encoding"] = "gzip; charset=ISO-8859-1"
return headers