| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| import re |
| import base64 |
| import collections |
| |
| from libcloud.utils.py3 import b, httplib |
| from libcloud.common.base import JsonResponse, ConnectionUserAndKey, KeyCertificateConnection |
| from libcloud.common.types import InvalidCredsError |
| from libcloud.compute.base import StorageVolume |
| from libcloud.container.base import Container, ContainerImage, ContainerDriver |
| from libcloud.container.types import ContainerState |
| from libcloud.common.exceptions import BaseHTTPError |
| from libcloud.container.providers import Provider |
| |
| try: |
| import simplejson as json |
| except Exception: |
| import json |
| |
| |
| # Acceptable success strings comping from LXD API |
| LXD_API_SUCCESS_STATUS = ["Success"] |
| LXD_API_STATE_ACTIONS = ["stop", "start", "restart", "freeze", "unfreeze"] |
| LXD_API_IMAGE_SOURCE_TYPE = ["image", "migration", "copy", "none"] |
| |
| # the wording used by LXD to indicate that an error |
| # occurred for a request |
| LXD_ERROR_STATUS_RESP = "error" |
| |
| |
| # helpers |
| def strip_http_prefix(host): |
| # strip the prefix |
| prefixes = ["http://", "https://"] |
| for prefix in prefixes: |
| if host.startswith(prefix): |
| host = host.strip(prefix) |
| return host |
| |
| |
| def check_certificates(key_file, cert_file, **kwargs): |
| """ |
| Basic checks for the provided certificates in LXDtlsConnection |
| """ |
| |
| # there is no point attempting to connect if either is missing |
| if key_file is None or cert_file is None: |
| raise InvalidCredsError( |
| "TLS Connection requires specification " "of a key file and a certificate file" |
| ) |
| |
| # if they are not none they may be empty strings |
| # or certificates that are not appropriate |
| if key_file == "" or cert_file == "": |
| raise InvalidCredsError( |
| "TLS Connection requires specification " "of a key file and a certificate file" |
| ) |
| |
| # if none of the above check the types |
| if "key_files_allowed" in kwargs.keys(): |
| key_file_suffix = key_file.split(".") |
| |
| if key_file_suffix[-1] not in kwargs["key_files_allowed"]: |
| raise InvalidCredsError( |
| "Valid key files are: " |
| + str(kwargs["key_files_allowed"]) |
| + "you provided: " |
| + key_file_suffix[-1] |
| ) |
| |
| # if none of the above check the types |
| if "cert_files_allowed" in kwargs.keys(): |
| cert_file_suffix = cert_file.split(".") |
| |
| if cert_file_suffix[-1] not in kwargs["cert_files_allowed"]: |
| raise InvalidCredsError( |
| "Valid certification files are: " |
| + str(kwargs["cert_files_allowed"]) |
| + "you provided: " |
| + cert_file_suffix[-1] |
| ) |
| |
| # if all these are good check the paths |
| keypath = os.path.expanduser(key_file) |
| is_file_path = os.path.exists(keypath) and os.path.isfile(keypath) |
| if not is_file_path: |
| raise InvalidCredsError( |
| "You need a key file to authenticate with " "LXD tls. This can be found in the server." |
| ) |
| |
| certpath = os.path.expanduser(cert_file) |
| is_file_path = os.path.exists(certpath) and os.path.isfile(certpath) |
| if not is_file_path: |
| raise InvalidCredsError( |
| "You need a certificate file to " |
| "authenticate with LXD tls. " |
| "This can be found in the server." |
| ) |
| |
| |
| def assert_response(response_dict, status_code): |
| """ |
| Basic checks that the response is of the type |
| the client is expecting |
| """ |
| |
| # if the type of the response is an error |
| if response_dict["type"] == LXD_ERROR_STATUS_RESP: |
| # an error returned |
| raise LXDAPIException(message="response type is error", response_dict=response_dict) |
| |
| # anything else apart from the status_code given should be treated as error |
| if response_dict["status_code"] != status_code: |
| # we have an unknown error |
| msg = "Status code should be {}\ |
| but is {}".format( |
| status_code, response_dict["status_code"] |
| ) |
| raise LXDAPIException(message=msg, response_dict=response_dict) |
| |
| |
| class LXDAPIException(Exception): |
| """ |
| Basic exception to be thrown when LXD API |
| returns with some kind of error |
| """ |
| |
| def __init__(self, message="Unknown Error Occurred", response_dict=None, error_type=""): |
| self.message = message |
| self.response_dict = response_dict |
| self.type = error_type |
| |
| super().__init__(message) |
| |
| def __str__(self): |
| return self.message |
| |
| |
| class LXDStoragePool: |
| """ |
| Utility class representing an LXD storage pool |
| https://lxd.readthedocs.io/en/latest/storage/ |
| """ |
| |
| def __init__(self, name, driver, used_by, config, managed): |
| |
| # the name of the storage pool |
| self.name = name |
| |
| # the driver (or type of storage pool). e.g. ‘zfs’ or ‘btrfs’, etc. |
| self.driver = driver |
| |
| # which containers (by API endpoint /1.0/containers/<name>) |
| # are using this storage-pool. |
| self.used_by = used_by |
| |
| # a dictionary with some information about the storage-pool. |
| # e.g. size, source (path), volume.size, etc. |
| self.config = config |
| |
| # Boolean that indicates whether LXD manages the pool or not. |
| self.managed = managed |
| |
| |
| class LXDNetwork: |
| """ |
| Utility class representing an LXD network |
| """ |
| |
| @classmethod |
| def build_from_response(cls, metadata): |
| lxd_network = LXDNetwork() |
| lxd_network.name = metadata.get("name", None) |
| lxd_network.id = metadata.get("name", None) |
| lxd_network.description = metadata.get("description", None) |
| lxd_network.type = metadata.get("type", None) |
| lxd_network.config = metadata.get("config", None) |
| lxd_network.status = metadata.get("status", None) |
| lxd_network.locations = metadata.get("locations", None) |
| lxd_network.managed = metadata.get("managed", None) |
| lxd_network.used_by = metadata.get("used_by", None) |
| return lxd_network |
| |
| def __init__(self): |
| self.name = None |
| self.id = None |
| self.description = None |
| self.type = None |
| self.config = None |
| self.status = None |
| self.locations = None |
| self.managed = None |
| self.used_by = None |
| self.extra = {} |
| |
| |
| class LXDServerInfo: |
| """ |
| Wraps the response form /1.0 |
| """ |
| |
| @classmethod |
| def build_from_response(cls, metadata): |
| |
| server_info = LXDServerInfo() |
| server_info.api_extensions = metadata.get("api_extensions", None) |
| server_info.api_status = metadata.get("api_status", None) |
| server_info.api_version = metadata.get("api_version", None) |
| server_info.auth = metadata.get("auth", None) |
| server_info.config = metadata.get("config", None) |
| server_info.environment = metadata.get("environment", None) |
| server_info.public = metadata.get("public", None) |
| return server_info |
| |
| def __init__(self): |
| |
| # List of API extensions added after |
| # the API was marked stable |
| self.api_extensions = None |
| |
| # API implementation status |
| # (one of, development, stable or deprecated) |
| self.api_status = None |
| |
| # The API version as a string |
| self.api_version = None |
| |
| # Authentication state, |
| # one of "guest", "untrusted" or "trusted" |
| self.auth = None |
| |
| self.config = None |
| |
| # Various information about the host (OS, kernel, ...) |
| self.environment = None |
| |
| self.public = None |
| |
| def __str__(self): |
| return ( |
| str(self.api_extensions) |
| + str(self.api_status) |
| + str(self.api_version) |
| + str(self.auth) |
| + str(self.config) |
| + str(self.environment) |
| + str(self.public) |
| ) |
| |
| |
| LXDContainerExecuteResult = collections.namedtuple( |
| "LXDContainerExecuteResult", |
| ["uuid", "secret_0", "secret_1", "secret_2", "control", "output", "result"], |
| ) |
| |
| |
| class LXDResponse(JsonResponse): |
| valid_response_codes = [ |
| httplib.OK, |
| httplib.ACCEPTED, |
| httplib.CREATED, |
| httplib.NO_CONTENT, |
| ] |
| |
| def parse_body(self): |
| |
| if len(self.body) == 0 and not self.parse_zero_length_body: |
| return self.body |
| |
| try: |
| content_type = self.headers.get("content-type", "application/json") |
| if content_type == "application/json" or content_type == "": |
| if ( |
| self.headers.get("transfer-encoding") == "chunked" |
| and "fromImage" in self.request.url |
| ): |
| body = [ |
| json.loads(chunk) |
| for chunk in self.body.strip().replace("\r", "").split("\n") |
| ] |
| else: |
| body = json.loads(self.body) |
| else: |
| body = self.body |
| except ValueError: |
| m = re.search('Error: (.+?)"', self.body) |
| if m: |
| error_msg = m.group(1) |
| raise Exception(error_msg) |
| else: |
| msg = "ConnectionError: Failed to parse JSON response " "(body=%s)" % (self.body) |
| raise Exception(msg) |
| return body |
| |
| def parse_error(self): |
| if self.status == 401: |
| raise InvalidCredsError("Invalid credentials") |
| return self.body |
| |
| def success(self): |
| return self.status in self.valid_response_codes |
| |
| |
| class LXDConnection(ConnectionUserAndKey): |
| responseCls = LXDResponse |
| timeout = 60 |
| |
| def add_default_headers(self, headers): |
| """ |
| Add parameters that are necessary for every request |
| If user and password are specified, include a base http auth |
| header |
| """ |
| headers["Content-Type"] = "application/json" |
| if self.user_id and self.key: |
| user_b64 = base64.b64encode(b("{}:{}".format(self.user_id, self.key))) |
| headers["Authorization"] = "Basic %s" % (user_b64.decode("utf-8")) |
| return headers |
| |
| |
| class LXDtlsConnection(KeyCertificateConnection): |
| |
| responseCls = LXDResponse |
| |
| def __init__( |
| self, |
| key, |
| secret, |
| secure=True, |
| host="localhost", |
| port=8443, |
| ca_cert="", |
| key_file=None, |
| cert_file=None, |
| certificate_validator=None, |
| **kwargs, |
| ): |
| |
| if certificate_validator is not None: |
| certificate_validator(key_file=key_file, cert_file=cert_file) |
| else: |
| check_certificates(key_file=key_file, cert_file=cert_file, **kwargs) |
| |
| super().__init__( |
| key_file=key_file, |
| cert_file=cert_file, |
| secure=secure, |
| host=host, |
| port=port, |
| url=None, |
| proxy_url=None, |
| timeout=None, |
| backoff=None, |
| retry_delay=None, |
| ) |
| |
| self.key_file = key_file |
| self.cert_file = cert_file |
| |
| def add_default_headers(self, headers): |
| headers["Content-Type"] = "application/json" |
| return headers |
| |
| |
| class LXDContainerDriver(ContainerDriver): |
| """ |
| Driver for LXD REST API of LXC containers |
| https://lxd.readthedocs.io/en/stable-2.0/rest-api/ |
| https://github.com/lxc/lxd/blob/master/doc/rest-api.md |
| """ |
| |
| type = Provider.LXD |
| name = "LXD" |
| website = "https://linuxcontainers.org/" |
| connectionCls = LXDConnection |
| |
| # LXD supports clustering but still the functionality |
| # is not implemented yet on our side |
| supports_clusters = False |
| version = "1.0" |
| default_time_out = 30 |
| |
| # default configuration when creating a container |
| # if the architecture is not specified |
| # by the client code then the underlying |
| # host architecture should be picked up by |
| # LXC. |
| default_architecture = "" |
| default_profiles = "default" |
| |
| # An ephemeral container means that it |
| # will be restroyed once it is stopped |
| default_ephemeral = False |
| |
| def __init__( |
| self, |
| key="", |
| secret="", |
| secure=False, |
| host="localhost", |
| port=8443, |
| key_file=None, |
| cert_file=None, |
| ca_cert=None, |
| certificate_validator=check_certificates, |
| ): |
| |
| if key_file: |
| |
| if not cert_file: |
| # LXD tls authentication- |
| # We pass two files, a key_file with the |
| # private key and cert_file with the certificate |
| # libcloud will handle them through LibcloudHTTPSConnection |
| |
| raise LXDAPIException( |
| message="Need both private key and" |
| " certificate files for " |
| "tls authentication" |
| ) |
| |
| self.connectionCls = LXDtlsConnection |
| self.key_file = key_file |
| self.cert_file = cert_file |
| self.certificate_validator = certificate_validator |
| secure = True |
| |
| if host.startswith("https://"): |
| secure = True |
| |
| host = strip_http_prefix(host=host) |
| |
| super().__init__( |
| key=key, |
| secret=secret, |
| secure=secure, |
| host=host, |
| port=port, |
| key_file=key_file, |
| cert_file=cert_file, |
| ) |
| |
| if ca_cert: |
| self.connection.connection.ca_cert = ca_cert |
| else: |
| # do not verify SSL certificate |
| self.connection.connection.ca_cert = False |
| |
| self.connection.secure = secure |
| self.connection.host = host |
| self.connection.port = port |
| self.version = self._get_api_version() |
| |
| def build_operation_websocket_url(self, uuid, w_secret): |
| |
| uri = "wss://%s:%s/%s/operations/%s/" "websocket?secret=%s" % ( |
| self.connection.host, |
| self.connection.port, |
| self.version, |
| uuid, |
| w_secret, |
| ) |
| return uri |
| |
| def ex_get_api_endpoints(self): |
| """ |
| Description: List of supported APIs |
| Authentication: guest |
| Operation: sync |
| Return: list of supported API endpoint URLs |
| |
| """ |
| response = self.connection.request("/") |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| return response_dict["metadata"] |
| |
| def ex_get_server_configuration(self): |
| """ |
| |
| Description: Server configuration and environment information |
| Authentication: guest, untrusted or trusted |
| Operation: sync |
| Return: Dict representing server state |
| |
| The returned configuration depends on whether the connection |
| is trusted or not |
| :rtype: :class: .LXDServerInfo |
| |
| """ |
| response = self.connection.request("/%s" % (self.version)) |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| meta = response_dict["metadata"] |
| return LXDServerInfo.build_from_response(metadata=meta) |
| |
| def deploy_container( |
| self, |
| name, |
| image, |
| cluster=None, |
| parameters=None, |
| start=True, |
| ex_architecture=default_architecture, |
| ex_profiles=None, |
| ex_ephemeral=default_ephemeral, |
| ex_config=None, |
| ex_devices=None, |
| ex_instance_type=None, |
| ex_timeout=default_time_out, |
| ): |
| |
| """ |
| Create a new container |
| Authentication: trusted |
| Operation: async |
| |
| :param name: The name of the new container. |
| 64 chars max, ASCII, no slash, no colon and no comma |
| :type name: ``str`` |
| |
| :param image: The container image to deploy. Currently not used |
| :type image: :class:`.ContainerImage` |
| |
| :param cluster: The cluster to deploy to, None is default |
| :type cluster: :class:`.ContainerCluster` |
| |
| :param parameters: Container Image parameters. |
| This parameter should represent the |
| the ``source`` dictioanry expected by the LXD API call. For more |
| information how this parameter should be structured see |
| https://github.com/lxc/lxd/blob/master/doc/rest-api.md |
| :type parameters: ``str`` |
| |
| :param start: Start the container on deployment. True is the default |
| :type start: ``bool`` |
| |
| :param ex_architecture: string e.g. x86_64 |
| :type ex_architecture: ``str`` |
| |
| :param ex_profiles: List of profiles |
| :type ex_profiles: ``list`` |
| |
| :param ex_ephemeral: Whether to destroy the container on shutdown |
| :type ex_ephemeral: ``bool`` |
| |
| :param ex_config: Config override e.g. {"limits.cpu": "2"} |
| :type ex_config: ``dict`` |
| |
| :param ex_devices: optional list of devices the container should have |
| :type ex_devices: ``dict`` |
| |
| :param ex_instance_type: An optional instance type |
| to use as basis for limits e.g. "c2.micro" |
| :type ex_instance_type: ``str`` |
| |
| :param ex_timeout: Timeout |
| :type ex_timeout: ``int`` |
| |
| :rtype: :class:`libcloud.container.base.Container` |
| """ |
| |
| if parameters: |
| parameters = json.loads(parameters) |
| |
| if parameters["source"].get("mode", None) == "pull": |
| |
| try: |
| # this means the image must be downloaded |
| image = self.install_image(path=None, ex_timeout=ex_timeout, **parameters) |
| except Exception as e: |
| raise LXDAPIException( |
| message="Deploying " |
| "container failed: " |
| "Image could not " |
| "be installed. %r" % e |
| ) |
| |
| # if the image was installed then we need to change |
| # how parameters are structured |
| parameters = { |
| "source": { |
| "type": "image", |
| "fingerprint": image.extra["fingerprint"], |
| } |
| } |
| |
| cont_params = LXDContainerDriver._fix_cont_params( |
| architecture=ex_architecture, |
| profiles=ex_profiles, |
| ephemeral=ex_ephemeral, |
| config=ex_config, |
| devices=ex_devices, |
| instance_type=ex_instance_type, |
| ) |
| |
| container = self._deploy_container_from_image( |
| name=name, |
| image=image, |
| parameters=parameters, |
| cont_params=cont_params, |
| timeout=ex_timeout, |
| ) |
| |
| if start: |
| container.start() |
| |
| return container |
| |
| def get_container(self, id, ex_get_ip_addr=True): |
| |
| """ |
| Get a container by ID |
| |
| :param id: The ID of the container to get |
| :type id: ``str`` |
| |
| :param ex_get_ip_addr: Indicates whether ip addresses |
| should also be included. This requires an extra GET request |
| :type ex_get_ip_addr: ``boolean``` |
| |
| :rtype: :class:`libcloud.container.base.Container` |
| """ |
| req = "/{}/containers/{}".format(self.version, id) |
| response = self.connection.request(req) |
| result_dict = response.parse_body() |
| assert_response(response_dict=result_dict, status_code=200) |
| |
| metadata = result_dict["metadata"] |
| |
| ips = [] |
| if ex_get_ip_addr: |
| req = "/{}/containers/{}/state".format(self.version, id) |
| ip_response = self.connection.request(req) |
| |
| ip_result_dict = ip_response.parse_body() |
| assert_response(response_dict=ip_result_dict, status_code=200) |
| |
| if ip_result_dict["metadata"]["network"] is not None: |
| networks = ip_result_dict["metadata"]["network"]["eth0"] |
| |
| # the list of addresses |
| addresses = networks["addresses"] |
| |
| for item in addresses: |
| ips.append(item["address"]) |
| |
| metadata.update({"ips": ips}) |
| return self._to_container(metadata=metadata) |
| |
| def start_container( |
| self, container, ex_timeout=default_time_out, ex_force=True, ex_stateful=True |
| ): |
| """ |
| Start a container |
| |
| :param container: The container to start |
| :type container: :class:`libcloud.container.base.Container` |
| |
| :param ex_timeout: Time to wait for the operation to complete |
| :type ex_timeout: ``int`` |
| |
| :param ex_force: |
| :type ex_force: ``boolean`` |
| |
| :param ex_stateful: |
| :type ex_stateful: ``boolean`` |
| |
| :rtype: :class:`libcloud.container.base.Container` |
| """ |
| return self._do_container_action( |
| container=container, |
| action="start", |
| timeout=ex_timeout, |
| force=ex_force, |
| stateful=ex_stateful, |
| ) |
| |
| def stop_container( |
| self, container, ex_timeout=default_time_out, ex_force=True, ex_stateful=True |
| ): |
| """ |
| Stop the given container |
| |
| :param container: The container to be stopped |
| :type container: :class:`libcloud.container.base.Container` |
| |
| :param ex_timeout: Time to wait for the operation to complete |
| :type ex_timeout: ``int`` |
| |
| :param ex_force: |
| :type ex_force: ``boolean`` |
| |
| :param ex_stateful: |
| :type ex_stateful: ``boolean`` |
| |
| :rtype: :class:`libcloud.container.base.Container |
| """ |
| return self._do_container_action( |
| container=container, |
| action="stop", |
| timeout=ex_timeout, |
| force=ex_force, |
| stateful=ex_stateful, |
| ) |
| |
| def restart_container( |
| self, container, ex_timeout=default_time_out, ex_force=True, ex_stateful=True |
| ): |
| """ |
| Restart a deployed container |
| |
| :param container: The container to restart |
| :type container: :class:`.Container` |
| |
| :param ex_timeout: Time to wait for the operation to complete |
| :type ex_timeout: ``int`` |
| |
| :param ex_force: |
| :type ex_force: ``boolean`` |
| |
| :param ex_stateful: |
| :type ex_stateful: ``boolean`` |
| |
| :rtype: :class:`libcloud.container.base.Container |
| """ |
| return self._do_container_action( |
| container=container, |
| action="restart", |
| timeout=ex_timeout, |
| force=ex_force, |
| stateful=ex_stateful, |
| ) |
| |
| def ex_freeze_container(self, container, ex_timeout=default_time_out): |
| |
| """ |
| Set the given container into a freeze state |
| |
| :param container: The container to restart |
| :type container: :class:`.Container` |
| |
| :param ex_timeout: Time to wait for the operation to complete |
| :type ex_timeout: ``int`` |
| |
| :rtype :class: `libcloud.container.base.Container |
| """ |
| |
| return self._do_container_action( |
| container=container, |
| action="freeze", |
| timeout=ex_timeout, |
| force=True, |
| stateful=True, |
| ) |
| |
| def ex_unfreeze_container(self, container, ex_timeout=default_time_out): |
| |
| """ |
| Set the given container into unfreeze state |
| |
| :param container: The container to restart |
| :type container: :class:`.Container` |
| |
| :param ex_timeout: Time to wait for the operation to complete |
| :type ex_timeout: ``int`` |
| |
| :rtype :class: `libcloud.container.base.Container |
| """ |
| |
| return self._do_container_action( |
| container=container, |
| action="unfreeze", |
| timeout=ex_timeout, |
| force=True, |
| stateful=True, |
| ) |
| |
| def destroy_container(self, container, ex_timeout=default_time_out): |
| """ |
| Destroy a deployed container. Raises and exception |
| |
| if the container is running |
| |
| :param container: The container to destroy |
| :type container: :class:`.Container` |
| |
| :param ex_timeout: Time to wait for the operation to complete |
| :type ex_timeout ``int`` |
| |
| :rtype: :class:`libcloud.container.base.Container |
| """ |
| |
| # Return: background operation or standard error |
| req = "/{}/containers/{}".format(self.version, container.name) |
| |
| try: |
| response = self.connection.request(req, method="DELETE") |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=100) |
| except BaseHTTPError as err: |
| # handle the case where the container is running |
| raise self._get_lxd_api_exception_for_error(err) |
| |
| try: |
| |
| # wait until the timeout...but util getting here the operation |
| # may have finished already |
| id = response_dict["metadata"]["id"] |
| req = "/{}/operations/{}/wait?timeout={}".format(self.version, id, ex_timeout) |
| response = self.connection.request(req) |
| except BaseHTTPError as err: |
| |
| lxd_exception = self._get_lxd_api_exception_for_error(err) |
| # if not found assume the operation completed |
| if lxd_exception.message != "not found": |
| raise lxd_exception |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| # return a dummy container |
| container = Container( |
| driver=self, |
| name=container.name, |
| id=container.name, |
| state=ContainerState.TERMINATED, |
| image=None, |
| ip_addresses=[], |
| extra=None, |
| ) |
| return container |
| |
| def ex_execute_cmd_on_container(self, cont_id, command, **config): |
| """ |
| Description: run a remote command |
| Operation: async |
| |
| Return: Depends on the the configuration |
| |
| if wait-for-websocket=true and interactive=false |
| returns a LXDContainerExecuteResult with: |
| uuid=uuid, |
| secret_0=fds["0"], |
| secret_1=fds["1"], |
| secret_2=fds["2"], |
| control=fds["control"], |
| output={}, result=None |
| |
| if wait-for-websocket=true and interactive=true |
| returns a LXDContainerExecuteResult with: |
| uuid=uuid, |
| secret_0=fds["0"], |
| secret_1=None, |
| secret_2=None, |
| control=fds["control"], |
| output={}, result=None |
| |
| if interactive=false and record-output=true |
| returns a LXDContainerExecuteResult with: |
| uuid=uuid, |
| secret_0=None, |
| secret_1=None, |
| secret_2=None, |
| control=None, |
| output=output, result=result |
| |
| if none of the above it assumes that the command has |
| been executed and returns LXDContainerExecuteResult with: |
| uuid=uuid, |
| secret_0=None, |
| secret_1=None, |
| secret_2=None, |
| control=None, |
| output=None, result=result |
| |
| |
| in all the above uuid is the operation id |
| |
| :param cont_id: The container name to run the commands |
| ":type cont_id: ``str`` |
| |
| :param command: a list of strings indicating the commands |
| and their arguments e.g: ["/bin/bash ls -l"] |
| :type command ``list`` |
| |
| :param config: Dict with extra arguments. |
| |
| For example: |
| |
| width: Initial width of the terminal default 80 |
| height: Initial height of the terminal default 25 |
| user: User to run the command as default 1000 |
| group: Group to run the command as default 1000 |
| cwd: Current working directory default /tmp |
| |
| wait-for-websocket: Whether to wait for a connection |
| before starting the process. Default False |
| |
| record-output: Whether to store stdout and stderr |
| (only valid with wait-for-websocket=false) |
| (requires API extension container_exec_recording). Default False |
| |
| interactive: Whether to allocate a pts device |
| instead of PIPEs. Default true |
| |
| :type config ``dict`` |
| |
| :rtype LXDContainerExecuteResult |
| """ |
| |
| input = {"command": command} |
| input = LXDContainerDriver._create_exec_configuration(input, **config) |
| data = json.dumps(input) |
| req = "/{}/containers/{}/exec".format(self.version, cont_id) |
| |
| # Return: background operation + |
| # optional websocket information or standard error |
| response = self.connection.request(req, method="POST", data=data) |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=100) |
| |
| fds = response_dict["metadata"]["metadata"]["fds"] |
| uuid = response_dict["metadata"]["id"] |
| |
| if input["wait-for-websocket"] is True and input["interactive"] is False: |
| return LXDContainerExecuteResult( |
| uuid=uuid, |
| secret_0=fds["0"], |
| secret_1=fds["1"], |
| secret_2=fds["2"], |
| control=fds["control"], |
| output={}, |
| result=None, |
| ) |
| |
| elif input["wait-for-websocket"] is True and input["interactive"] is True: |
| |
| return LXDContainerExecuteResult( |
| uuid=uuid, |
| secret_0=fds["0"], |
| secret_1=None, |
| secret_2=None, |
| control=fds["control"], |
| output={}, |
| result=None, |
| ) |
| |
| elif input["interactive"] is False and input["record-output"] is True: |
| |
| output = response_dict["metadata"]["metadata"]["output"] |
| result = response_dict["metadata"]["metadata"]["result"] |
| return LXDContainerExecuteResult( |
| uuid=uuid, |
| secret_0=None, |
| secret_1=None, |
| secret_2=None, |
| control=None, |
| output=output, |
| result=result, |
| ) |
| |
| else: |
| |
| result = response_dict["metadata"]["metadata"]["result"] |
| return LXDContainerExecuteResult( |
| uuid=uuid, |
| secret_0=None, |
| secret_1=None, |
| secret_2=None, |
| control=None, |
| output={}, |
| result=result, |
| ) |
| |
| def list_containers(self, image=None, cluster=None, ex_detailed=True): |
| """ |
| List the deployed container images |
| |
| :param image: Filter to containers with a certain image |
| :type image: :class:`.ContainerImage` |
| |
| :param cluster: Filter to containers in a cluster |
| :type cluster: :class:`.ContainerCluster` |
| |
| :param ex_detailed: Flag indicating whether detail info |
| of the containers is required. This will cause a |
| GET request for every container present in the |
| host. Default is True |
| :type ex_detailed: ``bool`` |
| |
| :rtype: ``list`` of :class:`libcloud.container.base.Container |
| """ |
| |
| result = self.connection.request("/%s/containers" % self.version) |
| result = result.parse_body() |
| |
| # how to treat the errors???? |
| assert_response(response_dict=result, status_code=200) |
| |
| meta = result["metadata"] |
| containers = [] |
| for item in meta: |
| container_id = item.split("/")[-1] |
| if not ex_detailed: |
| container = Container( |
| driver=self, |
| name=container_id, |
| state=ContainerState.UNKNOWN, |
| id=container_id, |
| image=image, |
| ip_addresses=[], |
| extra={}, |
| ) |
| else: |
| container = self.get_container(id=container_id) |
| containers.append(container) |
| |
| return containers |
| |
| def ex_get_image(self, fingerprint): |
| """ |
| Returns a container image from the given image fingerprint |
| |
| :param fingerprint: image fingerprint |
| :type fingerprint: ``str`` |
| |
| :rtype: :class:`.ContainerImage` |
| """ |
| req = "/{}/images/{}".format(self.version, fingerprint) |
| response = self.connection.request(req) |
| |
| # parse the LXDResponse into a dictionary |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| return self._to_image(metadata=response_dict["metadata"]) |
| |
| def install_image(self, path, ex_timeout=default_time_out, **ex_img_data): |
| |
| """ |
| Install a container image from a remote path. Not that the |
| path currently is not used. Image data should be provided |
| under the key 'ex_img_data'. Creating an image in LXD is an |
| asynchronous operation |
| |
| :param path: Path to the container image |
| :type path: ``str`` |
| |
| :param ex_timeout: Time to wait before signaling timeout |
| :type ex_timeout: ``int`` |
| |
| :param ex_img_data: Dictionary describing the image data |
| :type ex_img_data: ``dict`` |
| |
| :rtype: :class:`.ContainerImage` |
| """ |
| |
| if not ex_img_data: |
| msg = "Install an image for LXD requires " "specification of image_data" |
| raise LXDAPIException(message=msg) |
| |
| # Return: background operation or standard error |
| data = ex_img_data["source"] |
| |
| config = { |
| "public": data.get("public", True), |
| "auto_update": data.get("auto_update", False), |
| "aliases": [data.get("aliases", {})], |
| "source": {"type": "url", "mode": "pull", "url": data["url"]}, |
| } |
| |
| config = json.dumps(config) |
| |
| # background operation or standard error |
| response = self.connection.request( |
| "/%s/images" % (self.version), method="POST", data=config |
| ) |
| |
| response_dict = response.parse_body() |
| |
| # a background operation is expected |
| # to be returned status_code = 100 --> Operation created |
| assert_response(response_dict=response_dict, status_code=100) |
| |
| try: |
| |
| # wait until the timeout...but until getting here the operation |
| # may have finished already |
| id = response_dict["metadata"]["id"] |
| req = "/{}/operations/{}/wait?timeout={}".format(self.version, id, ex_timeout) |
| response = self.connection.request(req) |
| except BaseHTTPError as err: |
| |
| lxd_exception = self._get_lxd_api_exception_for_error(err) |
| # if not found assume the operation completed |
| if lxd_exception.message != "not found": |
| raise lxd_exception |
| |
| config = json.loads(config) |
| |
| # let's see if the image exists |
| if len(config["aliases"]) != 0 and "name" in config["aliases"][0]: |
| image_alias = config["aliases"][0]["name"] |
| else: |
| image_alias = config["source"]["url"].split("/")[-1] |
| |
| has, fingerprint = self.ex_has_image(alias=image_alias) |
| if not has: |
| raise LXDAPIException(message="Image %s was " "not installed " % image_alias) |
| |
| return self.ex_get_image(fingerprint=fingerprint) |
| |
| def list_images(self): |
| """ |
| List of URLs for images the server is publishing |
| |
| :rtype: ``list`` of :class:`.ContainerImage` |
| """ |
| response = self.connection.request("/%s/images" % (self.version)) |
| |
| # parse the LXDResponse into a dictionary |
| response_dict = response.parse_body() |
| |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| metadata = response_dict["metadata"] |
| images = [] |
| |
| for image in metadata: |
| fingerprint = image.split("/")[-1] |
| images.append(self.ex_get_image(fingerprint=fingerprint)) |
| return images |
| |
| def ex_has_image(self, alias): |
| """ |
| Helper function. Returns true and the image fingerprint |
| if the image with the given alias exists on the host. |
| |
| :param alias: the image alias |
| :type alias: ``str`` |
| |
| :rtype: ``tupple`` :: (``boolean``, ``str``) |
| """ |
| |
| # get all the images existing on the host |
| try: |
| response = self.connection.request("/{}/images/aliases/{}".format(self.version, alias)) |
| metadata = response.object["metadata"] |
| return True, metadata.get("target") |
| except BaseHTTPError as err: |
| lxd_exception = self._get_lxd_api_exception_for_error(err) |
| if lxd_exception.message == "not found": |
| return False, -1 |
| else: |
| raise lxd_exception |
| except Exception as err: |
| raise self._get_lxd_api_exception_for_error(err) |
| |
| def ex_list_storage_pools(self, detailed=True): |
| """ |
| Returns a list of storage pools defined currently defined on the host |
| |
| Description: list of storage pools |
| Authentication: trusted |
| Operation: sync |
| |
| ":rtype: list of StoragePool items |
| """ |
| |
| # Return: list of storage pools that are currently defined on the host |
| response = self.connection.request("/%s/storage-pools" % self.version) |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| pools = [] |
| for pool_item in response_dict["metadata"]: |
| pool_name = pool_item.split("/")[-1] |
| |
| if not detailed: |
| # attempt to create a minimal StoragePool |
| pools.append( |
| self._to_storage_pool( |
| { |
| "name": pool_name, |
| "driver": None, |
| "used_by": None, |
| "config": None, |
| "managed": None, |
| } |
| ) |
| ) |
| else: |
| pools.append(self.ex_get_storage_pool(id=pool_name)) |
| |
| return pools |
| |
| def ex_get_storage_pool(self, id): |
| """ |
| Returns information about a storage pool |
| :param id: the name of the storage pool |
| :rtype: :class: StoragePool |
| """ |
| |
| # Return: dict representing a storage pool |
| req = "/{}/storage-pools/{}".format(self.version, id) |
| response = self.connection.request(req) |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| if not response_dict["metadata"]: |
| msg = "Storage pool with name {} has no data".format(id) |
| raise LXDAPIException(message=msg) |
| |
| return self._to_storage_pool(data=response_dict["metadata"]) |
| |
| def ex_create_storage_pool(self, definition): |
| |
| """ |
| Create a storage_pool from definition. |
| |
| Implements POST /1.0/storage-pools |
| |
| The `definition` parameter defines |
| what the storage pool will be. An |
| example config for the zfs driver is: |
| |
| { |
| "config": { |
| "size": "10GB" |
| }, |
| "driver": "zfs", |
| "name": "pool1" |
| } |
| |
| Note that **all** fields in the `definition` parameter are strings. |
| Note that size has to be at least 64MB in order to create the pool |
| |
| For further details on the storage pool types see: |
| https://lxd.readthedocs.io/en/latest/storage/ |
| |
| The function returns the a `StoragePool` instance, if it is |
| successfully created, otherwise an LXDAPIException is raised. |
| |
| :param definition: the fields to pass to the LXD API endpoint |
| :type definition: dict |
| |
| :returns: a storage pool if successful, |
| raises NotFound if not found |
| :rtype: :class:`StoragePool` |
| |
| :raises: :class:`LXDAPIExtensionNotAvailable` |
| if the 'storage' api extension is missing. |
| :raises: :class:`LXDAPIException` |
| if the storage pool couldn't be created. |
| """ |
| |
| if not definition: |
| raise LXDAPIException("Cannot create a storage pool " " without a definition") |
| |
| data = json.dumps(definition) |
| |
| # Return: standard return value or standard error |
| response = self.connection.request( |
| "/%s/storage-pools" % self.version, method="POST", data=data |
| ) |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| return self.ex_get_storage_pool(id=definition["name"]) |
| |
| def ex_delete_storage_pool(self, id): |
| """Delete the storage pool. |
| |
| Implements DELETE /1.0/storage-pools/<self.name> |
| |
| Deleting a storage pool may fail if it is being used. See the LXD |
| documentation for further details. |
| |
| :raises: :class:`LXDAPIException` if the storage pool can't be deleted. |
| """ |
| |
| # Return: standard return value or standard error |
| req = "/{}/storage-pools/{}".format(self.version, id) |
| response = self.connection.request(req, method="DELETE") |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| def ex_list_storage_pool_volumes(self, pool_id, detailed=True): |
| """ |
| Description: list of storage volumes |
| associated with the given storage pool |
| |
| :param pool_id: the id of the storage pool to query |
| :param detailed: boolean flag. |
| If True extra API calls are made to fill in the missing details |
| of the storage volumes |
| |
| Authentication: trusted |
| Operation: sync |
| Return: list of storage volumes that |
| currently exist on a given storage pool |
| |
| :rtype: A list of :class: StorageVolume |
| """ |
| |
| req = "/{}/storage-pools/{}/volumes".format(self.version, pool_id) |
| response = self.connection.request(req) |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| volumes = [] |
| |
| for volume in response_dict["metadata"]: |
| volume = volume.split("/") |
| name = volume[-1] |
| type = volume[-2] |
| |
| if not detailed: |
| |
| metadata = { |
| "config": {"size": None}, |
| "name": name, |
| "type": type, |
| "used_by": None, |
| } |
| volumes.append(self._to_storage_volume(pool_id=pool_id, metadata=metadata)) |
| else: |
| volume = self.ex_get_storage_pool_volume(pool_id=pool_id, type=type, name=name) |
| volumes.append(volume) |
| |
| return volumes |
| |
| def ex_get_storage_pool_volume(self, pool_id, type, name): |
| """ |
| Description: information about a storage volume |
| of a given type on a storage pool |
| Introduced: with API extension storage |
| Authentication: trusted |
| Operation: sync |
| Return: A StorageVolume representing a storage volume |
| """ |
| req = "/{}/storage-pools/{}/volumes/{}/{}".format(self.version, pool_id, type, name) |
| response = self.connection.request(req) |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| return self._to_storage_volume(pool_id=pool_id, metadata=response_dict["metadata"]) |
| |
| def ex_get_volume_by_name(self, name, vol_type="custom"): |
| """ |
| Returns a storage volume that has the given name. |
| The function will loop over all storage-polls available |
| and will pick the first volume from the first storage poll |
| that matches the given name. Thus this function can be |
| quite expensive |
| |
| :param name: The name of the volume to look for |
| :type name: str |
| |
| :param vol_type: The type of the volume default is custom |
| :type vol_type: str |
| |
| :return: A StorageVolume representing a storage volume |
| """ |
| |
| req = "/%s/storage-pools" % self.version |
| response = self.connection.request(req) |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| pools = response_dict["metadata"] |
| |
| for pool in pools: |
| |
| pool_id = pool.split("/")[-1] |
| |
| volumes = self.ex_list_storage_pool_volumes(pool_id=pool_id) |
| |
| for vol in volumes: |
| if vol.name == name: |
| return vol |
| |
| return None |
| |
| def create_volume(self, pool_id, definition, **kwargs): |
| |
| """ |
| Create a new storage volume on a given storage pool |
| Operation: sync or async (when copying an existing volume) |
| :return: A StorageVolume representing a storage volume |
| """ |
| |
| if not definition: |
| raise LXDAPIException("Cannot create a storage volume " "without a definition") |
| |
| size_type = definition.pop("size_type") |
| definition["config"]["size"] = str( |
| LXDContainerDriver._to_bytes(definition["config"]["size"], size_type=size_type) |
| ) |
| |
| data = json.dumps(definition) |
| |
| # Return: standard return value or standard error |
| req = "/{}/storage-pools/{}/volumes".format(self.version, pool_id) |
| response = self.connection.request(req, method="POST", data=data) |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| return self.ex_get_storage_pool_volume( |
| pool_id=pool_id, type=definition["type"], name=definition["name"] |
| ) |
| |
| def attach_volume( |
| self, container_id, volume_id, pool_id, name, path, ex_timeout=default_time_out |
| ): |
| """ |
| Attach the volume with id volume_id |
| to the container with id container_id |
| """ |
| container = self.get_container(id=container_id) |
| config = container.extra |
| |
| # expand the devices for the container |
| config["devices"] = { |
| name: {"path": path, "type": "disk", "source": volume_id, "pool": pool_id} |
| } |
| |
| data = json.dumps(config) |
| |
| req = "/{}/containers/{}".format(self.version, container_id) |
| response = self.connection.request(req, method="PUT", data=data) |
| |
| response_dict = response.parse_body() |
| |
| # a background operation is expected |
| # to be returned status_code = 100 --> Operation created |
| assert_response(response_dict=response_dict, status_code=100) |
| |
| try: |
| |
| # wait until the timeout...but util getting here the operation |
| # may have finished already |
| oid = response_dict["metadata"]["id"] |
| req = "/{}/operations/{}/wait?timeout={}".format(self.version, oid, ex_timeout) |
| response = self.connection.request(req) |
| except BaseHTTPError as err: |
| |
| lxd_exception = self._get_lxd_api_exception_for_error(err) |
| # if not found assume the operation completed |
| if lxd_exception.message != "not found": |
| raise lxd_exception |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| return self.get_container(id=container_id, ex_get_ip_addr=True) |
| |
| def ex_replace_storage_volume_config(self, pool_id, type, name, definition): |
| """ |
| Replace the storage volume information |
| :param pool_id: |
| :param type: |
| :param name: |
| :param definition |
| """ |
| |
| if not definition: |
| raise LXDAPIException("Cannot create a storage " "volume without a definition") |
| |
| data = json.dumps(definition) |
| response = self.connection.request( |
| "/{}/storage-pools/{}/volumes/{}/{}".format(self.version, pool_id, type, name), |
| method="PUT", |
| data=data, |
| ) |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| return self.ex_get_storage_pool_volume(pool_id=pool_id, type=type, name=name) |
| |
| def ex_delete_storage_pool_volume(self, pool_id, type, name): |
| """ |
| Delete a storage volume of a given type on a given storage pool |
| |
| :param pool_id: |
| :type ``str`` |
| |
| :param type: |
| :type ``str`` |
| |
| :param name: |
| :type ``str`` |
| |
| :return: |
| """ |
| |
| try: |
| |
| req = "/{}/storage-pools/{}/volumes/{}/{}".format( |
| self.version, |
| pool_id, |
| type, |
| name, |
| ) |
| response = self.connection.request(req, method="DELETE") |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| except BaseHTTPError as err: |
| raise self._get_lxd_api_exception_for_error(err) |
| |
| return True |
| |
| def ex_list_networks(self): |
| """ |
| Returns a list of networks. |
| Implements GET /1.0/networks |
| Authentication: trusted |
| Operation: sync |
| |
| :rtype: list of LXDNetwork objects |
| """ |
| |
| req = "/%s/networks" % (self.version) |
| response = self.connection.request(req) |
| |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| nets = response_dict["metadata"] |
| networks = [] |
| for net in nets: |
| name = net.split("/")[-1] |
| networks.append(self.ex_get_network(name=name)) |
| return networks |
| |
| def ex_get_network(self, name): |
| """ |
| Returns the LXD network with the given name. |
| Implements GET /1.0/networks/<name> |
| |
| Authentication: trusted |
| Operation: sync |
| |
| :param name: The name of the network to return |
| :type name: str |
| |
| :rtype: LXDNetwork |
| """ |
| req = "/{}/networks/{}".format(self.version, name) |
| response = self.connection.request(req) |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| return LXDNetwork.build_from_response(response_dict["metadata"]) |
| |
| def ex_create_network(self, name, **kwargs): |
| """ |
| Create a new network with the given name and |
| and the specified configuration |
| |
| Authentication: trusted |
| Operation: sync |
| |
| :param name: The name of the new network |
| :type name: str |
| """ |
| |
| kwargs["name"] = name |
| data = json.dumps(kwargs) |
| req = "/%s/networks" % self.version |
| # Return: standard return value or standard error |
| response = self.connection.request(req, method="POST", data=data) |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| return self.ex_get_network(name=name) |
| |
| def ex_delete_network(self, name): |
| """ |
| Delete the network with the given name |
| Authentication: trusted |
| Operation: sync |
| |
| :param name: The network name to delete |
| :type name: str |
| |
| :return: True is successfully deleted the network |
| """ |
| |
| req = "/{}/networks/{}".format(self.version, name) |
| response = self.connection.request(req, method="DELETE") |
| response_dict = response.parse_body() |
| assert_response(response_dict=response_dict, status_code=200) |
| |
| return True |
| |
| def _to_container(self, metadata): |
| """ |
| Returns Container instance built from the given metadata |
| |
| :param metadata: dictionary with the container metadata |
| :type metadata: ``dict`` |
| |
| :rtype :class:`libcloud.container.base.Container |
| """ |
| |
| name = metadata["name"] |
| state = metadata["status"] |
| |
| if state == "Running": |
| state = ContainerState.RUNNING |
| elif state == "Frozen": |
| state = ContainerState.PAUSED |
| else: |
| state = ContainerState.STOPPED |
| |
| extra = metadata |
| img_id = metadata["config"].get("volatile.base_image", None) |
| img_version = metadata["config"].get("image.version", None) |
| ips = metadata["ips"] |
| |
| image = ContainerImage( |
| id=img_id, |
| name=img_id, |
| path=None, |
| version=img_version, |
| driver=self, |
| extra=None, |
| ) |
| |
| container = Container( |
| driver=self, |
| name=name, |
| id=name, |
| state=state, |
| image=image, |
| ip_addresses=ips, |
| extra=extra, |
| ) |
| |
| return container |
| |
| def _do_container_action(self, container, action, timeout, force, stateful): |
| """ |
| change the container state by performing the given action |
| action may be either stop, start, restart, freeze or unfreeze |
| """ |
| |
| if action not in LXD_API_STATE_ACTIONS: |
| raise ValueError("Invalid action specified") |
| |
| # cache the state of the container |
| state = container.state |
| |
| data = {"action": action, "timeout": timeout} |
| data = json.dumps(data) |
| |
| # checkout this for stateful: |
| # https://discuss.linuxcontainers.org/t/error-in-live-migration/1928 |
| # looks like we are getting "err":"Unable to perform |
| # container live migration. CRIU isn't installed" |
| # in the response when stateful is True so remove it for now |
| req = "/{}/containers/{}/state".format(self.version, container.name) |
| response = self.connection.request(req, method="PUT", data=data) |
| |
| response_dict = response.parse_body() |
| |
| # a background operation is expected to |
| # be returned status_code = 100 --> Operation created |
| assert_response(response_dict=response_dict, status_code=100) |
| |
| if not timeout: |
| timeout = LXDContainerDriver.default_time_out |
| |
| try: |
| id = response_dict["metadata"]["id"] |
| req = "/{}/operations/{}/wait?timeout={}".format(self.version, id, timeout) |
| response = self.connection.request(req) |
| |
| except BaseHTTPError as err: |
| lxd_exception = self._get_lxd_api_exception_for_error(err) |
| # if not found assume the operation completed |
| if lxd_exception.message != "not found": |
| raise lxd_exception |
| |
| # if the container is ephemeral and the action is to stop |
| # then the container is removed so return sth dummy |
| if state == ContainerState.RUNNING and container.extra["ephemeral"] and action == "stop": |
| # return a dummy container otherwise we get 404 error |
| container = Container( |
| driver=self, |
| name=container.name, |
| id=container.name, |
| state=ContainerState.TERMINATED, |
| image=None, |
| ip_addresses=[], |
| extra=None, |
| ) |
| return container |
| |
| return self.get_container(id=container.name) |
| |
| def _to_image(self, metadata): |
| """ |
| Returns a container image from the given metadata |
| |
| :param metadata: |
| :type metadata: ``dict`` |
| |
| :rtype: :class:`.ContainerImage` |
| """ |
| fingerprint = metadata.get("fingerprint") |
| aliases = metadata.get("aliases", []) |
| |
| if aliases: |
| name = metadata.get("aliases")[0].get("name") |
| else: |
| name = metadata.get("properties", {}).get("description") or fingerprint |
| |
| version = metadata.get("update_source", {}).get("alias") |
| extra = metadata |
| |
| return ContainerImage( |
| id=fingerprint, |
| name=name, |
| path=None, |
| version=version, |
| driver=self, |
| extra=extra, |
| ) |
| |
| def _to_storage_pool(self, data): |
| """ |
| Given a dictionary with the storage pool configuration |
| it returns a StoragePool object |
| :param data: the storage pool configuration |
| :return: :class: .StoragePool |
| """ |
| |
| return LXDStoragePool( |
| name=data["name"], |
| driver=data["driver"], |
| used_by=data["used_by"], |
| config=["config"], |
| managed=False, |
| ) |
| |
| def _deploy_container_from_image( |
| self, name, image, parameters, cont_params, timeout=default_time_out |
| ): |
| """ |
| Deploy a new container from the given image |
| |
| :param name: the name of the container |
| :param image: .ContainerImage |
| |
| :param parameters: dictionary describing the source attribute |
| :type parameters ``dict`` |
| |
| :param cont_params: dictionary describing the container configuration |
| :type cont_params: dict |
| |
| :param timeout: Time to wait for the operation before timeout |
| :type timeout: int |
| |
| :rtype: :class: .Container |
| """ |
| |
| if cont_params is None: |
| raise LXDAPIException(message="cont_params " "must be a valid dict") |
| |
| # container without a pre-populated rootfs |
| # see https://github.com/lxc/lxd/blob/master/doc/rest-api.md |
| # can be "image", "migration", "copy" or "none" |
| data = {"name": name, "source": {"type": "none"}} |
| |
| if parameters: |
| data["source"].update(parameters["source"]) |
| |
| if data["source"]["type"] not in LXD_API_IMAGE_SOURCE_TYPE: |
| msg = "source type must in " + str(LXD_API_IMAGE_SOURCE_TYPE) |
| raise LXDAPIException(message=msg) |
| |
| # add also the other container parameters |
| data.update(cont_params) |
| |
| data = json.dumps(data) |
| |
| # Return: background operation or standard error |
| response = self.connection.request( |
| "/%s/containers" % self.version, method="POST", data=data |
| ) |
| response_dict = response.parse_body() |
| |
| # a background operation is expected to |
| # be returned status_code = 100 --> Operation created |
| assert_response(response_dict=response_dict, status_code=100) |
| |
| # make sure we don't wait indefinitely |
| # until the operation is done |
| if not timeout: |
| timeout = LXDContainerDriver.default_time_out |
| |
| try: |
| # wait untitl the timeout...but util getting here the operation |
| # may have finished already |
| id = response_dict["metadata"]["id"] |
| req_str = "/{}/operations/{}/wait?timeout={}".format(self.version, id, timeout) |
| response = self.connection.request(req_str) |
| except BaseHTTPError as err: |
| lxd_exception = self._get_lxd_api_exception_for_error(err) |
| # if not found assume the operation completed |
| if lxd_exception.message != "not found": |
| raise lxd_exception |
| return self.get_container(id=name) |
| |
| def _to_storage_volume(self, pool_id, metadata): |
| """ |
| Returns StorageVolume object from metadata |
| :param metadata: dict representing the volume |
| :rtype: StorageVolume |
| """ |
| |
| size = 0 |
| if "size" in metadata["config"].keys(): |
| size = LXDContainerDriver._to_gb(metadata["config"].pop("size")) |
| |
| extra = { |
| "pool_id": pool_id, |
| "type": metadata["type"], |
| "used_by": metadata["used_by"], |
| "config": metadata["config"], |
| } |
| |
| return StorageVolume( |
| id=metadata["name"], |
| name=metadata["name"], |
| driver=self, |
| size=size, |
| extra=extra, |
| ) |
| |
| def _get_api_version(self): |
| """ |
| Get the LXD API version |
| """ |
| return LXDContainerDriver.version |
| |
| def _ex_connection_class_kwargs(self): |
| """ |
| Return extra connection keyword arguments which are passed to the |
| Connection class constructor. |
| """ |
| |
| if hasattr(self, "key_file") and hasattr(self, "cert_file"): |
| return { |
| "key_file": self.key_file, |
| "cert_file": self.cert_file, |
| "certificate_validator": self.certificate_validator, |
| } |
| return super()._ex_connection_class_kwargs() |
| |
| @staticmethod |
| def _create_exec_configuration(input, **config): |
| """ |
| Prepares the input parameters for executyion API call |
| """ |
| if "environment" in config.keys(): |
| input["environment"] = config["environment"] |
| |
| if "width" in config.keys(): |
| input["width"] = int(config["width"]) |
| else: |
| input["width"] = 80 |
| |
| if "height" in config.keys(): |
| input["height"] = int(config["height"]) |
| else: |
| input["height"] = 25 |
| |
| if "user" in config.keys(): |
| input["user"] = config["user"] |
| |
| if "group" in config.keys(): |
| input["group"] = config["group"] |
| |
| if "cwd" in config.keys(): |
| input["cwd"] = config["cwd"] |
| |
| if "wait-for-websocket" in config.keys(): |
| input["wait-for-websocket"] = config["wait-for-websocket"] |
| else: |
| input["wait-for-websocket"] = False |
| |
| if "record-output" in config.keys(): |
| input["record-output"] = config["record-output"] |
| |
| if "interactive" in config.keys(): |
| input["interactive"] = config["interactive"] |
| |
| return input |
| |
| @staticmethod |
| def _fix_cont_params(architecture, profiles, ephemeral, config, devices, instance_type): |
| """ |
| Returns a dict with the container parameters |
| """ |
| |
| cont_params = {} |
| |
| # add also the other container parameters |
| if architecture is not None: |
| cont_params["architecture"] = architecture |
| |
| if profiles is not None: |
| cont_params["profiles"] = profiles |
| else: |
| cont_params["profiles"] = [LXDContainerDriver.default_profiles] |
| |
| if ephemeral is not None: |
| cont_params["ephemeral"] = ephemeral |
| else: |
| cont_params["ephemeral"] = LXDContainerDriver.default_ephemeral |
| |
| if config is not None: |
| cont_params["config"] = config |
| |
| if devices is not None: |
| cont_params["devices"] = devices |
| |
| if instance_type is not None: |
| cont_params["instance_type"] = instance_type |
| |
| return cont_params |
| |
| def _get_lxd_api_exception_for_error(self, error): |
| error_dict = json.loads(error.message) |
| message = error_dict.get("error") |
| return LXDAPIException(message=message, error_type=error_dict.get("type", "")) |
| |
| @staticmethod |
| def _to_gb(size): |
| """ |
| Convert the given size in bytes to gigabyte |
| :param size: in bytes |
| :return: int representing the gigabytes |
| """ |
| size = int(size) |
| return size // 10**9 |
| |
| @staticmethod |
| def _to_bytes(size, size_type="GB"): |
| """ |
| convert the given size in GB to bytes |
| :param size: in GBs |
| :return: int representing bytes |
| """ |
| size = int(size) |
| if size_type == "GB": |
| return size * 10**9 |
| elif size_type == "MB": |
| return size * 10**6 |