blob: 8762fb241a61e4f4185aa5c632bd698ec18d95aa [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from libcloud.test import MockHttp, unittest
from libcloud.utils.py3 import httplib
from libcloud.test.secrets import CONTAINER_PARAMS_KUBERNETES
from libcloud.container.base import ContainerImage
from libcloud.test.file_fixtures import ContainerFileFixtures
from libcloud.test.common.test_kubernetes import KubernetesAuthTestCaseMixin
from libcloud.container.drivers.kubernetes import (
KubernetesContainerDriver,
to_n_cpus,
to_cpu_str,
to_n_bytes,
sum_resources,
to_memory_str,
)
class KubernetesContainerDriverTestCase(unittest.TestCase, KubernetesAuthTestCaseMixin):
driver_cls = KubernetesContainerDriver
def setUp(self):
KubernetesContainerDriver.connectionCls.conn_class = KubernetesMockHttp
KubernetesMockHttp.type = None
KubernetesMockHttp.use_param = "a"
self.driver = KubernetesContainerDriver(*CONTAINER_PARAMS_KUBERNETES)
def test_list_containers(self):
containers = self.driver.list_containers()
self.assertEqual(len(containers), 1)
self.assertEqual(
containers[0].id,
"docker://3c48b5cda79bce4c8866f02a3b96a024edb8f660d10e7d1755e9ced49ef47b36",
)
self.assertEqual(containers[0].name, "hello-world")
def test_deploy_container(self):
image = ContainerImage(
id=None, name="hello-world", path=None, driver=self.driver, version=None
)
container = self.driver.deploy_container("hello-world", image=image)
self.assertEqual(container.name, "hello-world")
def test_get_container(self):
container = self.driver.get_container(
"docker://3c48b5cda79bce4c8866f02a3b96a024edb8f660d10e7d1755e9ced49ef47b36"
)
assert (
container.id
== "docker://3c48b5cda79bce4c8866f02a3b96a024edb8f660d10e7d1755e9ced49ef47b36"
)
def test_list_namespaces(self):
namespaces = self.driver.list_namespaces()
self.assertEqual(len(namespaces), 2)
self.assertEqual(namespaces[0].id, "default")
self.assertEqual(namespaces[0].name, "default")
def test_get_namespace(self):
namespace = self.driver.get_namespace("default")
self.assertEqual(namespace.id, "default")
self.assertEqual(namespace.name, "default")
def test_create_namespace(self):
namespace = self.driver.create_namespace("test")
self.assertEqual(namespace.id, "test")
self.assertEqual(namespace.name, "test")
def test_delete_namespace(self):
namespace = self.driver.get_namespace("default")
result = self.driver.delete_namespace(namespace)
self.assertTrue(result)
def test_list_pods(self):
pods = self.driver.ex_list_pods()
self.assertEqual(len(pods), 1)
self.assertEqual(pods[0].id, "1fad5411-b9af-11e5-8701-0050568157ec")
self.assertEqual(pods[0].name, "hello-world")
def test_destroy_pod(self):
result = self.driver.ex_destroy_pod("default", "default")
self.assertTrue(result)
def test_list_nodes(self):
nodes = self.driver.ex_list_nodes()
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].id, "45949cbb-b99d-11e5-8d53-0050568157ec")
self.assertEqual(nodes[0].name, "127.0.0.1")
def test_destroy_node(self):
result = self.driver.ex_destroy_node("127.0.0.1")
self.assertTrue(result)
def test_get_version(self):
version = self.driver.ex_get_version()
self.assertEqual(version, "v1.20.8-gke.900")
def test_list_nodes_metrics(self):
nodes_metrics = self.driver.ex_list_nodes_metrics()
self.assertEqual(len(nodes_metrics), 1)
self.assertEqual(
nodes_metrics[0]["metadata"]["name"],
"gke-cluster-3-default-pool-76fd57f7-l83v",
)
def test_list_pods_metrics(self):
pods_metrics = self.driver.ex_list_pods_metrics()
self.assertEqual(len(pods_metrics), 10)
self.assertEqual(pods_metrics[0]["metadata"]["name"], "gke-metrics-agent-sfjzj")
self.assertEqual(
pods_metrics[1]["metadata"]["name"],
"stackdriver-metadata-agent-cluster-level-849ff68b6d-fphxl",
)
self.assertEqual(pods_metrics[2]["metadata"]["name"], "event-exporter-gke-67986489c8-g47rz")
def test_list_services(self):
services = self.driver.ex_list_services()
self.assertEqual(len(services), 4)
self.assertEqual(services[0]["metadata"]["name"], "kubernetes")
self.assertEqual(services[1]["metadata"]["name"], "default-http-backend")
self.assertEqual(services[2]["metadata"]["name"], "kube-dns")
def test_list_deployments(self):
deployments = self.driver.ex_list_deployments()
self.assertEqual(len(deployments), 7)
deployment = deployments[0]
self.assertEqual(deployment.id, "aea45586-9a4a-4a01-805c-719f431316c0")
self.assertEqual(deployment.name, "event-exporter-gke")
self.assertEqual(deployment.namespace, "kube-system")
for deployment in deployments:
self.assertIsInstance(deployment.replicas, int)
self.assertIsInstance(deployment.selector, dict)
def test_to_n_bytes(self):
memory = "0"
self.assertEqual(to_n_bytes(memory), 0)
memory = "1000Ki"
self.assertEqual(to_n_bytes(memory), 1_024_000)
memory = "100K"
self.assertEqual(to_n_bytes(memory), 100_000)
memory = "512Mi"
self.assertEqual(to_n_bytes(memory), 536_870_912)
memory = "900M"
self.assertEqual(to_n_bytes(memory), 900_000_000)
memory = "10Gi"
self.assertEqual(to_n_bytes(memory), 10_737_418_240)
memory = "10G"
self.assertEqual(to_n_bytes(memory), 10_000_000_000)
def test_to_memory_str(self):
memory = 0
self.assertEqual(to_memory_str(memory), "0K")
memory = 1_024_000
self.assertEqual(to_memory_str(memory), "1000Ki")
memory = 100_000
self.assertEqual(to_memory_str(memory), "100K")
memory = 536_870_912
self.assertEqual(to_memory_str(memory), "512Mi")
memory = 900_000_000
self.assertEqual(to_memory_str(memory), "900M")
memory = 10_737_418_240
self.assertEqual(to_memory_str(memory), "10Gi")
memory = 10_000_000_000
self.assertEqual(to_memory_str(memory), "10G")
def test_to_cpu_str(self):
cpu = 0
self.assertEqual(to_cpu_str(cpu), "0")
cpu = 0.5
self.assertEqual(to_cpu_str(cpu), "500m")
cpu = 2
self.assertEqual(to_cpu_str(cpu), "2000m")
cpu = 0.000001
self.assertEqual(to_cpu_str(cpu), "1u")
cpu = 0.0005
self.assertEqual(to_cpu_str(cpu), "500u")
cpu = 0.000000001
self.assertEqual(to_cpu_str(cpu), "1n")
cpu = 0.0000005
self.assertEqual(to_cpu_str(cpu), "500n")
def test_to_n_cpus(self):
cpu = "0m"
self.assertEqual(to_n_cpus(cpu), 0)
cpu = "2"
self.assertEqual(to_n_cpus(cpu), 2)
cpu = "500m"
self.assertEqual(to_n_cpus(cpu), 0.5)
cpu = "500m"
self.assertEqual(to_n_cpus(cpu), 0.5)
cpu = "2000m"
self.assertEqual(to_n_cpus(cpu), 2)
cpu = "1u"
self.assertEqual(to_n_cpus(cpu), 0.000001)
cpu = "500u"
self.assertEqual(to_n_cpus(cpu), 0.0005)
cpu = "1n"
self.assertEqual(to_n_cpus(cpu), 0.000000001)
cpu = "500n"
self.assertEqual(to_n_cpus(cpu), 0.0000005)
def test_sum_resources(self):
resource_1 = {"cpu": "1", "memory": "1000Mi"}
resource_2 = {"cpu": "2", "memory": "2000Mi"}
self.assertDictEqual(
sum_resources(resource_1, resource_2),
{"cpu": "3000m", "memory": "3000Mi"},
)
resource_3 = {"cpu": "1500m", "memory": "1Gi"}
self.assertDictEqual(
sum_resources(resource_1, resource_2, resource_3),
{"cpu": "4500m", "memory": "4024Mi"},
)
class KubernetesMockHttp(MockHttp):
fixtures = ContainerFileFixtures("kubernetes")
def _api_v1_pods(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_api_v1_pods.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_v1_namespaces(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_api_v1_namespaces.json")
elif method == "POST":
body = self.fixtures.load("_api_v1_namespaces_test.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_v1_namespaces_default(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_api_v1_namespaces_default.json")
elif method == "DELETE":
body = self.fixtures.load("_api_v1_namespaces_default_DELETE.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_v1_namespaces_default_pods(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_api_v1_namespaces_default_pods.json")
elif method == "POST":
body = self.fixtures.load("_api_v1_namespaces_default_pods_POST.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_v1_namespaces_default_pods_default(self, method, url, body, headers):
if method == "DELETE":
body = None
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_v1_nodes(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_api_v1_nodes.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_v1_nodes_127_0_0_1(self, method, url, body, headers):
if method == "DELETE":
body = None
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_v1_services(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_api_v1_services.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _version(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_version.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _apis_metrics_k8s_io_v1beta1_nodes(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_apis_metrics_k8s_io_v1beta1_nodes.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _apis_metrics_k8s_io_v1beta1_pods(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("_apis_metrics_k8s_io_v1beta1_pods.json")
else:
raise AssertionError("Unsupported method")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _apis_apps_v1_deployments(self, method, url, body, headers):
body = self.fixtures.load("_apis_apps_v1_deployments.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if __name__ == "__main__":
sys.exit(unittest.main())