blob: 9b814460a737779ddede1bd4ea1789b748e8adc6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common settings and connection objects for DigitalOcean Cloud
"""
from libcloud.utils.py3 import httplib, parse_qs, urlparse
from libcloud.common.base import BaseDriver, JsonResponse, ConnectionKey
from libcloud.common.types import LibcloudError, InvalidCredsError
__all__ = [
"DigitalOcean_v2_Response",
"DigitalOcean_v2_Connection",
"DigitalOceanBaseDriver",
]
class DigitalOcean_v1_Error(LibcloudError):
"""
Exception for when attempting to use version 1
of the DigitalOcean API which is no longer
supported.
"""
def __init__(
self,
value=(
"Driver no longer supported: Version 1 of the "
"DigitalOcean API reached end of life on November 9, "
"2015. Use the v2 driver. Please visit: "
"https://developers.digitalocean.com/documentation/changelog/api-v1/sunsetting-api-v1/"
), # noqa: E501
driver=None,
):
super().__init__(value, driver=driver)
class DigitalOcean_v2_Response(JsonResponse):
valid_response_codes = [
httplib.OK,
httplib.ACCEPTED,
httplib.CREATED,
httplib.NO_CONTENT,
]
def parse_error(self):
if self.status == httplib.UNAUTHORIZED:
body = self.parse_body()
raise InvalidCredsError(body["message"])
else:
body = self.parse_body()
if "message" in body:
error = "{} (code: {})".format(body["message"], self.status)
else:
error = body
return error
def success(self):
return self.status in self.valid_response_codes
class DigitalOcean_v2_Connection(ConnectionKey):
"""
Connection class for the DigitalOcean (v2) driver.
"""
host = "api.digitalocean.com"
responseCls = DigitalOcean_v2_Response
def add_default_headers(self, headers):
"""
Add headers that are necessary for every request
This method adds ``token`` to the request.
"""
headers["Authorization"] = "Bearer %s" % (self.key)
headers["Content-Type"] = "application/json"
return headers
def add_default_params(self, params):
"""
Add parameters that are necessary for every request
This method adds ``per_page`` to the request to reduce the total
number of paginated requests to the API.
"""
# pylint: disable=maybe-no-member
params["per_page"] = self.driver.ex_per_page
return params
class DigitalOceanConnection(DigitalOcean_v2_Connection):
"""
Connection class for the DigitalOcean driver.
"""
pass
class DigitalOceanResponse(DigitalOcean_v2_Response):
pass
class DigitalOceanBaseDriver(BaseDriver):
"""
DigitalOcean BaseDriver
"""
name = "DigitalOcean"
website = "https://www.digitalocean.com"
def __new__(cls, key, secret=None, api_version="v2", **kwargs):
if cls is DigitalOceanBaseDriver:
if api_version == "v1" or secret is not None:
raise DigitalOcean_v1_Error()
elif api_version == "v2":
cls = DigitalOcean_v2_BaseDriver
else:
raise NotImplementedError("Unsupported API version: %s" % (api_version))
return super().__new__(cls, **kwargs)
def ex_account_info(self):
raise NotImplementedError("ex_account_info not implemented for this driver")
def ex_list_events(self):
raise NotImplementedError("ex_list_events not implemented for this driver")
def ex_get_event(self, event_id):
raise NotImplementedError("ex_get_event not implemented for this driver")
def _paginated_request(self, url, obj):
raise NotImplementedError("_paginated_requests not implemented for this driver")
class DigitalOcean_v2_BaseDriver(DigitalOceanBaseDriver):
"""
DigitalOcean BaseDriver using v2 of the API.
Supports `ex_per_page` ``int`` value keyword parameter to adjust per page
requests against the API.
"""
connectionCls = DigitalOcean_v2_Connection
def __init__(
self,
key,
secret=None,
secure=True,
host=None,
port=None,
api_version=None,
region=None,
ex_per_page=200,
**kwargs,
):
self.ex_per_page = ex_per_page
super().__init__(key, **kwargs)
def ex_account_info(self):
return self.connection.request("/v2/account").object["account"]
def ex_list_events(self):
return self._paginated_request("/v2/actions", "actions")
def ex_get_event(self, event_id):
"""
Get an event object
:param event_id: Event id (required)
:type event_id: ``str``
"""
params = {}
return self.connection.request("/v2/actions/%s" % event_id, params=params).object["action"]
def _paginated_request(self, url, obj):
"""
Perform multiple calls in order to have a full list of elements when
the API responses are paginated.
:param url: API endpoint
:type url: ``str``
:param obj: Result object key
:type obj: ``str``
:return: ``list`` of API response objects
:rtype: ``list``
"""
params = {}
data = self.connection.request(url)
try:
query = urlparse.urlparse(data.object["links"]["pages"]["last"])
# The query[4] references the query parameters from the url
pages = parse_qs(query[4])["page"][0]
values = data.object[obj]
for page in range(2, int(pages) + 1):
params.update({"page": page})
new_data = self.connection.request(url, params=params)
more_values = new_data.object[obj]
for value in more_values:
values.append(value)
data = values
except KeyError: # No pages.
data = data.object[obj]
return data