blob: ba517e0f6e38e3aaf1b63964e71313f82c3a0369 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module which contains common Kubernetes related code.
"""
from typing import Optional
import os
import base64
import warnings
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import b
from libcloud.common.base import JsonResponse, ConnectionUserAndKey
from libcloud.common.base import KeyCertificateConnection, ConnectionKey
from libcloud.common.types import InvalidCredsError
__all__ = [
"KubernetesException",
"KubernetesBasicAuthConnection",
"KubernetesTLSAuthConnection",
"KubernetesTokenAuthConnection",
"KubernetesDriverMixin",
"VALID_RESPONSE_CODES",
]
VALID_RESPONSE_CODES = [
httplib.OK,
httplib.ACCEPTED,
httplib.CREATED,
httplib.NO_CONTENT,
]
class KubernetesException(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
self.args = (code, message)
def __str__(self):
return "%s %s" % (self.code, self.message)
def __repr__(self):
return "KubernetesException %s %s" % (self.code, self.message)
class KubernetesResponse(JsonResponse):
valid_response_codes = [
httplib.OK,
httplib.ACCEPTED,
httplib.CREATED,
httplib.NO_CONTENT,
]
def parse_error(self):
if self.status == 401:
raise InvalidCredsError("Invalid credentials")
return self.body
def success(self):
return self.status in self.valid_response_codes
class KubernetesTLSAuthConnection(KeyCertificateConnection):
responseCls = KubernetesResponse
timeout = 60
def __init__(
self,
key,
secure=True,
host="localhost",
port="6443",
key_file=None,
cert_file=None,
**kwargs,
):
super(KubernetesTLSAuthConnection, self).__init__(
key_file=key_file,
cert_file=cert_file,
secure=secure,
host=host,
port=port,
url=None,
proxy_url=None,
timeout=None,
backoff=None,
retry_delay=None,
)
if key_file:
keypath = os.path.expanduser(key_file)
is_file_path = os.path.exists(keypath) and os.path.isfile(keypath)
if not is_file_path:
raise InvalidCredsError(
"You need an key PEM file to authenticate "
"via tls. For more info please visit:"
"https://kubernetes.io/docs/concepts/"
"cluster-administration/certificates/"
)
self.key_file = key_file
certpath = os.path.expanduser(cert_file)
is_file_path = os.path.exists(certpath) and os.path.isfile(certpath)
if not is_file_path:
raise InvalidCredsError(
"You need an certificate PEM file to authenticate "
"via tls. For more info please visit:"
"https://kubernetes.io/docs/concepts/"
"cluster-administration/certificates/"
)
self.cert_file = cert_file
def add_default_headers(self, headers):
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
return headers
class KubernetesTokenAuthConnection(ConnectionKey):
responseCls = KubernetesResponse
timeout = 60
def add_default_headers(self, headers):
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
if self.key:
headers["Authorization"] = "Bearer " + self.key
else:
raise ValueError("Please provide a valid token in the key param")
return headers
class KubernetesBasicAuthConnection(ConnectionUserAndKey):
responseCls = KubernetesResponse
timeout = 60
def add_default_headers(self, headers):
"""
Add parameters that are necessary for every request
If user and password are specified, include a base http auth
header
"""
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
if self.user_id and self.key:
auth_string = b("%s:%s" % (self.user_id, self.key))
user_b64 = base64.b64encode(auth_string)
headers["Authorization"] = "Basic %s" % (user_b64.decode("utf-8"))
return headers
class KubernetesDriverMixin(object):
"""
Base driver class to be used with various Kubernetes drivers.
NOTE: This base class can be used in different APIs such as container and
compute one.
"""
def __init__(
self,
key=None,
secret=None,
secure=False,
host="localhost",
port=4243,
key_file=None,
cert_file=None,
ca_cert=None,
ex_token_bearer_auth=False,
):
"""
:param key: API key or username to be used (required)
:type key: ``str``
:param secret: Secret password to be used (required)
:type secret: ``str``
:param secure: Whether to use HTTPS or HTTP. Note: Some providers
only support HTTPS, and it is on by default.
:type secure: ``bool``
:param host: Override hostname used for connections.
:type host: ``str``
:param port: Override port used for connections.
:type port: ``int``
:param key_file: Path to the key file used to authenticate (when
using key file auth).
:type key_file: ``str``
:param cert_file: Path to the cert file used to authenticate (when
using key file auth).
:type cert_file: ``str``
:param ex_token_bearer_auth: True to use token bearer auth.
:type ex_token_bearer_auth: ``bool``
:return: ``None``
"""
if ex_token_bearer_auth:
self.connectionCls = KubernetesTokenAuthConnection
if not key:
msg = 'The token must be a string provided via "key" argument'
raise ValueError(msg)
secure = True
if key_file or cert_file:
# Certificate based auth is used
if not (key_file and cert_file):
raise ValueError("Both key and certificate files are needed")
if key_file:
self.connectionCls = KubernetesTLSAuthConnection
self.key_file = key_file
self.cert_file = cert_file
secure = True
if host and host.startswith("https://"):
secure = True
host = self._santize_host(host=host)
super(KubernetesDriverMixin, self).__init__(
key=key,
secret=secret,
secure=secure,
host=host,
port=port,
key_file=key_file,
cert_file=cert_file,
)
if ca_cert:
self.connection.connection.ca_cert = ca_cert
else:
# do not verify SSL certificate
warnings.warn(
"Kubernetes has its own CA, since you didn't supply "
"a CA certificate be aware that SSL verification "
"will be disabled for this session."
)
self.connection.connection.ca_cert = False
self.connection.secure = secure
self.connection.host = host
if port is not None:
self.connection.port = port
def _ex_connection_class_kwargs(self):
kwargs = {}
if hasattr(self, "key_file"):
kwargs["key_file"] = self.key_file
if hasattr(self, "cert_file"):
kwargs["cert_file"] = self.cert_file
return kwargs
def _santize_host(self, host=None):
# type: (Optional[str]) -> Optional[str]
"""
Sanitize "host" argument any remove any protocol prefix (if specified).
"""
if not host:
return None
prefixes = ["http://", "https://"]
for prefix in prefixes:
if host.startswith(prefix):
host = host.lstrip(prefix)
return host