blob: 84df59e2f659cfe4c14c0362f4598b42c74ee376 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.container.location.kubernetes;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import com.google.common.net.HostAndPort;
import io.fabric8.kubernetes.api.model.*;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.LocationSpec;
import org.apache.brooklyn.api.location.MachineLocation;
import org.apache.brooklyn.api.location.MachineProvisioningLocation;
import org.apache.brooklyn.api.location.PortRange;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.EnricherSpec;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.container.entity.docker.DockerContainer;
import org.apache.brooklyn.container.entity.kubernetes.KubernetesPod;
import org.apache.brooklyn.container.entity.kubernetes.KubernetesResource;
import org.apache.brooklyn.container.location.docker.DockerJcloudsLocation;
import org.apache.brooklyn.container.location.kubernetes.machine.KubernetesEmptyMachineLocation;
import org.apache.brooklyn.container.location.kubernetes.machine.KubernetesMachineLocation;
import org.apache.brooklyn.container.location.kubernetes.machine.KubernetesSshMachineLocation;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.location.AbstractLocation;
import org.apache.brooklyn.core.location.LocationConfigKeys;
import org.apache.brooklyn.core.location.PortRanges;
import org.apache.brooklyn.core.location.access.PortForwardManager;
import org.apache.brooklyn.core.location.access.PortForwardManagerLocationResolver;
import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
import org.apache.brooklyn.core.network.AbstractOnNetworkEnricher;
import org.apache.brooklyn.core.network.OnPublicNetworkEnricher;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.ResourceUtils;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
import org.apache.brooklyn.util.core.internal.ssh.SshTool;
import org.apache.brooklyn.util.core.text.TemplateProcessor;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.ReferenceWithError;
import org.apache.brooklyn.util.net.Networking;
import org.apache.brooklyn.util.repeat.Repeater;
import org.apache.brooklyn.util.stream.Streams;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
public class KubernetesLocation extends AbstractLocation implements MachineProvisioningLocation<KubernetesMachineLocation>, KubernetesLocationConfig {
/*
* TODO
*
* - Ignores config such as 'user' and 'password', just uses 'loginUser'
* and 'loginUser.password' for connecting to the container.
* - Does not create a user, so behaves differently from things that use
* JcloudsLocation.
* - Does not use ssh keys only passwords.
* - The 'brooklyncentral/*' images use root which is discouraged.
* - Error handling needs revisited. For example, if provisioning fails then
* it waits for five minutes and then fails without a reason why.
* e.g. try launching a container with an incorrect image name.
*/
public static final String NODE_PORT = "NodePort";
public static final String IMMUTABLE_CONTAINER_KEY = "immutable-container";
public static final String SSHABLE_CONTAINER = "sshable-container";
public static final String BROOKLYN_ENTITY_ID = "brooklyn.apache.org/entity-id";
public static final String BROOKLYN_APPLICATION_ID = "brooklyn.apache.org/application-id";
public static final String KUBERNETES_DOCKERCFG = "kubernetes.io/dockercfg";
public static final String PHASE_AVAILABLE = "Available";
public static final String PHASE_TERMINATING = "Terminating";
public static final String PHASE_ACTIVE = "Active";
/**
* The regex for the image descriptions that support us injecting login credentials.
*/
public static final List<String> IMAGE_DESCRIPTION_REGEXES_REQUIRING_INJECTED_LOGIN_CREDS = ImmutableList.of(
"brooklyncentral/centos.*",
"brooklyncentral/ubuntu.*");
/**
* The environment variable for injecting login credentials.
*/
public static final String BROOKLYN_ROOT_PASSWORD = "BROOKLYN_ROOT_PASSWORD";
private static final Logger LOG = LoggerFactory.getLogger(KubernetesLocation.class);
public static final String ADDRESS_KEY = "address";
private ConfigBag currentConfig;
public KubernetesLocation() {
super();
}
public KubernetesLocation(Map<?, ?> properties) {
super(properties);
}
public KubernetesClient getClient() {
if(currentConfig != null) {
return getClient(currentConfig);
}
return getClient(MutableMap.of());
}
public KubernetesClient getClient(Map<?, ?> flags) {
ConfigBag conf = (flags == null || flags.isEmpty())
? config().getBag()
: ConfigBag.newInstanceExtending(config().getBag(), flags);
return getClient(conf);
}
public KubernetesClient getClient(ConfigBag config) {
currentConfig = config;
KubernetesClientRegistry registry = getConfig(KUBERNETES_CLIENT_REGISTRY);
KubernetesClient client = registry.getKubernetesClient(ResolvingConfigBag.newInstanceExtending(getManagementContext(), config));
return client;
}
@Override
public KubernetesMachineLocation obtain(Map<?, ?> flags) {
ConfigBag setupRaw = ConfigBag.newInstanceExtending(config().getBag(), flags);
ConfigBag setup = ResolvingConfigBag.newInstanceExtending(getManagementContext(), setupRaw);
Entity entity = validateCallerContext(setup);
if (isKubernetesResource(entity)) {
return createKubernetesResourceLocation(entity, setup);
} else {
return createKubernetesContainerLocation(entity, setup);
}
}
@Override
public void release(KubernetesMachineLocation machine) {
Entity entity = validateCallerContext(machine);
if (isKubernetesResource(entity)) {
deleteKubernetesResourceLocation(entity);
} else {
deleteKubernetesContainerLocation(entity, machine);
}
getPortForwardManager().forgetPortMappings(machine);
removeChild(machine);
}
protected void deleteKubernetesContainerLocation(Entity entity, MachineLocation machine) {
final String namespace = entity.sensors().get(KubernetesPod.KUBERNETES_NAMESPACE);
final String deployment = entity.sensors().get(KubernetesPod.KUBERNETES_DEPLOYMENT);
final String service = entity.sensors().get(KubernetesPod.KUBERNETES_SERVICE);
undeploy(namespace, deployment);
try(KubernetesClient client = getClient()) {
client.services().inNamespace(namespace).withName(service).delete();
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
return client.services().inNamespace(namespace).withName(service).get() == null;
}
@Override
public String getFailureMessage() {
return "No service with namespace=" + namespace + ", serviceName=" + service;
}
};
waitForExitCondition(exitCondition);
Boolean delete = machine.config().get(DELETE_EMPTY_NAMESPACE);
if (Boolean.TRUE.equals(delete)) {
deleteEmptyNamespace(namespace);
}
}
}
protected void deleteKubernetesResourceLocation(Entity entity) {
final String namespace = entity.sensors().get(KubernetesPod.KUBERNETES_NAMESPACE);
final String resourceType = entity.sensors().get(KubernetesResource.RESOURCE_TYPE);
final String resourceName = entity.sensors().get(KubernetesResource.RESOURCE_NAME);
if (handleResourceDelete(resourceType, resourceName, namespace).isEmpty()) {
LOG.warn("Resource {}: {} not deleted", resourceName, resourceType);
}
}
protected List<StatusDetails> handleResourceDelete(String resourceType, String resourceName, String namespace) {
try (KubernetesClient client = getClient()){
switch (resourceType) {
case KubernetesResource.DEPLOYMENT:
return client.apps().deployments().inNamespace(namespace).withName(resourceName).delete();
case KubernetesResource.REPLICA_SET:
return client.apps().replicaSets().inNamespace(namespace).withName(resourceName).delete();
case KubernetesResource.CONFIG_MAP:
return client.configMaps().inNamespace(namespace).withName(resourceName).delete();
case KubernetesResource.PERSISTENT_VOLUME:
return client.persistentVolumes().withName(resourceName).delete();
case KubernetesResource.SECRET:
return client.secrets().inNamespace(namespace).withName(resourceName).delete();
case KubernetesResource.SERVICE:
return client.services().inNamespace(namespace).withName(resourceName).delete();
case KubernetesResource.REPLICATION_CONTROLLER:
return client.replicationControllers().inNamespace(namespace).withName(resourceName).delete();
case KubernetesResource.NAMESPACE:
return client.namespaces().withName(resourceName).delete();
}
} catch (KubernetesClientException kce) {
LOG.warn("Error deleting resource {}: {}", resourceName, kce);
}
return Collections.emptyList();
}
protected void undeploy(final String namespace, final String deployment) {
try (KubernetesClient client = getClient()) {
client.apps().deployments().inNamespace(namespace).withName(deployment).delete();
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
return client.apps().deployments().inNamespace(namespace).withName(deployment).get() == null;
}
@Override
public String getFailureMessage() {
return "No deployment with namespace=" + namespace + ", deployment=" + deployment;
}
};
waitForExitCondition(exitCondition);
}
}
protected synchronized void deleteEmptyNamespace(final String name) {
if (!name.equals("default") && isNamespaceEmpty(name)) {
try (KubernetesClient client = getClient()) {
if (client.namespaces().withName(name).get() != null &&
!client.namespaces().withName(name).get().getStatus().getPhase().equals(PHASE_TERMINATING)) {
client.namespaces().withName(name).delete();
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
return client.namespaces().withName(name).get() == null;
}
@Override
public String getFailureMessage() {
return "Namespace " + name + " still present";
}
};
waitForExitCondition(exitCondition);
}
}
}
}
protected boolean isNamespaceEmpty(String name) {
try (KubernetesClient client = getClient()) {
return client.apps().deployments().inNamespace(name).list().getItems().isEmpty() &&
client.services().inNamespace(name).list().getItems().isEmpty() &&
client.secrets().inNamespace(name).list().getItems().isEmpty();
}
}
@Override
public Map<String, Object> getProvisioningFlags(Collection<String> tags) {
return null;
}
protected KubernetesMachineLocation createKubernetesResourceLocation(Entity entity, ConfigBag setup) {
String resourceUri = entity.config().get(KubernetesResource.RESOURCE_FILE);
InputStream resource = ResourceUtils.create(entity).getResourceFromUrl(resourceUri);
String templateContents = Streams.readFullyString(resource);
String processedContents = TemplateProcessor.processTemplateContents("k8s location template", templateContents, (EntityInternal) entity, setup.getAllConfig());
InputStream processedResource = Streams.newInputStreamWithContents(processedContents);
try (KubernetesClient clientUnnamespaced = getClient()) {
Namespace namespaceFromConfig = null;
Boolean shouldCreate = setup.get(CREATE_NAMESPACE);
try {
namespaceFromConfig = createOrGetNamespace(lookup(NAMESPACE, entity, setup), shouldCreate);
} catch (Exception e) {
// unable to create
if (Boolean.TRUE.equals(shouldCreate)) {
throw Exceptions.propagate(e);
}
LOG.debug("Unable to create/get namespace; ignoring, but may fail subsequently: " + e, e);
}
final KubernetesClient client =
namespaceFromConfig!=null ?
((DefaultKubernetesClient)clientUnnamespaced).inNamespace(namespaceFromConfig.getMetadata().getName())
: clientUnnamespaced;
final List<HasMetadata> result = client.load(processedResource).createOrReplace();
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
if (result.isEmpty()) {
return false;
}
HasMetadata check = client.resource(result.get(0)).inNamespace(result.get(0).getMetadata().getNamespace()).get();
return check != null;
}
@Override
public String getFailureMessage() {
return "Cannot find created resources";
}
};
waitForExitCondition(exitCondition);
// TODO only returns info on the first item :|
HasMetadata metadata = result.get(0);
String resourceType = metadata.getKind();
String resourceName = metadata.getMetadata().getName();
String namespace = metadata.getMetadata().getNamespace();
LOG.debug("Resource {} (type {}) deployed to {}", resourceName, resourceType, namespace);
entity.sensors().set(KubernetesPod.KUBERNETES_NAMESPACE, namespace);
entity.sensors().set(KubernetesResource.RESOURCE_NAME, resourceName);
entity.sensors().set(KubernetesResource.RESOURCE_TYPE, resourceType);
LocationSpec<? extends KubernetesMachineLocation> locationSpec = LocationSpec.create(KubernetesSshMachineLocation.class);
if (!findResourceAddress(locationSpec, entity, metadata, resourceType, resourceName, namespace)) {
LOG.info("Resource {} with type {} has no associated address", resourceName, resourceType);
locationSpec = LocationSpec.create(KubernetesEmptyMachineLocation.class);
}
locationSpec.configure(CALLER_CONTEXT, setup.get(CALLER_CONTEXT))
.configure(KubernetesMachineLocation.KUBERNETES_NAMESPACE, namespace)
.configure(KubernetesMachineLocation.KUBERNETES_RESOURCE_NAME, resourceName)
.configure(KubernetesMachineLocation.KUBERNETES_RESOURCE_TYPE, resourceType);
KubernetesMachineLocation machine = getManagementContext().getLocationManager().createLocation(locationSpec);
addChild(machine);
if (resourceType.equals(KubernetesResource.SERVICE) && machine instanceof KubernetesSshMachineLocation) {
Service service = getService(namespace, resourceName, client);
registerPortMappings((KubernetesSshMachineLocation) machine, entity, service);
}
return machine;
}
}
protected boolean findResourceAddress(LocationSpec<? extends KubernetesMachineLocation> locationSpec, Entity entity,
HasMetadata metadata, String resourceType, String resourceName, String namespace) {
if (resourceType.equals(KubernetesResource.DEPLOYMENT) || resourceType.equals(KubernetesResource.REPLICATION_CONTROLLER)
|| resourceType.equals(KubernetesResource.POD)) {
Map<String, String> labels = MutableMap.of();
if (resourceType.equals(KubernetesResource.DEPLOYMENT)) {
Deployment deployment = (Deployment) metadata;
labels = deployment.getSpec().getTemplate().getMetadata().getLabels();
} else if (resourceType.equals(KubernetesResource.REPLICATION_CONTROLLER)) {
ReplicationController replicationController = (ReplicationController) metadata;
labels = replicationController.getSpec().getTemplate().getMetadata().getLabels();
}
Pod pod = resourceType.equals(KubernetesResource.POD) ? getPod(namespace, resourceName) : getPod(namespace, labels);
entity.sensors().set(KubernetesPod.KUBERNETES_POD, pod.getMetadata().getName());
InetAddress node = Networking.getInetAddressWithFixedName(pod.getSpec().getNodeName());
String podAddress = pod.getStatus().getPodIP();
locationSpec.configure(ADDRESS_KEY, node);
locationSpec.configure(SshMachineLocation.PRIVATE_ADDRESSES, ImmutableSet.of(podAddress));
return true;
} else if (resourceType.equals(KubernetesResource.SERVICE)) {
Endpoints endpoints;
try (KubernetesClient client = getClient()) {
getService(namespace, resourceName, client);
endpoints = client.endpoints().inNamespace(namespace).withName(resourceName).get();
}
Set<String> privateIps = Sets.newLinkedHashSet();
Set<String> podNames = Sets.newLinkedHashSet();
for (EndpointSubset subset : endpoints.getSubsets()) {
for (EndpointAddress address : subset.getAddresses()) {
String podName = address.getTargetRef().getName();
podNames.add(podName);
String privateIp = address.getIp();
privateIps.add(privateIp);
}
}
locationSpec.configure(SshMachineLocation.PRIVATE_ADDRESSES, ImmutableSet.copyOf(privateIps));
if (!podNames.isEmpty()) {
// Use the first pod name from the list; warn when multiple pods are referenced
String podName = Iterables.get(podNames, 0);
if (podNames.size() > 1) {
LOG.warn("Multiple pods referenced by service {} in namespace {}, using {}: {}",
resourceName, namespace, podName, Iterables.toString(podNames));
}
try {
Pod pod = getPod(namespace, podName);
entity.sensors().set(KubernetesPod.KUBERNETES_POD, podName);
InetAddress node = Networking.getInetAddressWithFixedName(pod.getSpec().getNodeName());
locationSpec.configure(ADDRESS_KEY, node);
} catch (KubernetesClientException kce) {
LOG.warn("Cannot find pod {} in namespace {} for service {}", podName, namespace, resourceName);
}
}
return true;
} else {
return false;
}
}
protected KubernetesMachineLocation createKubernetesContainerLocation(Entity entity, ConfigBag setup) {
String deploymentName = lookup(KubernetesPod.DEPLOYMENT, entity, setup, entity.getId());
Integer replicas = lookup(KubernetesPod.REPLICAS, entity, setup);
List<String> volumes = lookup(KubernetesPod.PERSISTENT_VOLUMES, entity, setup);
Map<String, String> secrets = lookup(KubernetesPod.SECRETS, entity, setup);
Map<String, String> limits = lookup(KubernetesPod.LIMITS, entity, setup);
Boolean privileged = lookup(KubernetesPod.PRIVILEGED, entity, setup);
String imageName = findImageName(entity, setup);
Iterable<Integer> inboundPorts = findInboundPorts(entity, setup);
Map<String, String> env = findEnvironmentVariables(entity, setup, imageName);
Map<String, String> metadata = findMetadata(entity, setup, deploymentName);
if (volumes != null) {
createPersistentVolumes(volumes);
}
Namespace namespace = createOrGetNamespace(lookup(NAMESPACE, entity, setup), setup.get(CREATE_NAMESPACE));
if (secrets != null) {
createSecrets(namespace.getMetadata().getName(), secrets);
}
Container container = buildContainer(namespace.getMetadata().getName(), metadata, deploymentName, imageName, inboundPorts, env, limits, privileged);
deploy(namespace.getMetadata().getName(), entity, metadata, deploymentName, container, replicas, secrets);
Service service = exposeService(namespace.getMetadata().getName(), metadata, deploymentName, inboundPorts);
Pod pod = getPod(namespace.getMetadata().getName(), metadata);
entity.sensors().set(KubernetesPod.KUBERNETES_NAMESPACE, namespace.getMetadata().getName());
entity.sensors().set(KubernetesPod.KUBERNETES_DEPLOYMENT, deploymentName);
entity.sensors().set(KubernetesPod.KUBERNETES_POD, pod.getMetadata().getName());
entity.sensors().set(KubernetesPod.KUBERNETES_SERVICE, service.getMetadata().getName());
LocationSpec<KubernetesSshMachineLocation> locationSpec = prepareSshableLocationSpec(entity, setup, service, pod)
.configure(KubernetesMachineLocation.KUBERNETES_NAMESPACE, namespace.getMetadata().getName())
.configure(KubernetesMachineLocation.KUBERNETES_RESOURCE_NAME, deploymentName)
.configure(KubernetesMachineLocation.KUBERNETES_RESOURCE_TYPE, getContainerResourceType());
KubernetesSshMachineLocation machine = getManagementContext().getLocationManager().createLocation(locationSpec);
addChild(machine);
registerPortMappings(machine, entity, service);
if (!isDockerContainer(entity)) {
waitForSshable(machine, Duration.FIVE_MINUTES);
}
return machine;
}
protected String getContainerResourceType() {
return KubernetesResource.DEPLOYMENT;
}
protected void waitForSshable(final SshMachineLocation machine, Duration timeout) {
Callable<Boolean> checker = () -> {
int exitstatus = machine.execScript(
ImmutableMap.of( // TODO investigate why SSH connection does not time out with this config
SshTool.PROP_CONNECT_TIMEOUT.getName(), Duration.TEN_SECONDS.toMilliseconds(),
SshTool.PROP_SESSION_TIMEOUT.getName(), Duration.TEN_SECONDS.toMilliseconds(),
SshTool.PROP_SSH_TRIES_TIMEOUT.getName(), Duration.TEN_SECONDS.toMilliseconds(),
SshTool.PROP_SSH_TRIES.getName(), 1),
"check-sshable",
ImmutableList.of("true"));
return (exitstatus == 0);
};
Stopwatch stopwatch = Stopwatch.createStarted();
ReferenceWithError<Boolean> reachable = Repeater.create("reachable")
.threaded()
.backoff(Duration.FIVE_SECONDS, 2, Duration.TEN_SECONDS) // Exponential backoff, to 10 seconds
.until(checker)
.limitTimeTo(timeout)
.runKeepingError();
if (!reachable.getWithoutError()) {
throw new IllegalStateException("Connection failed for " + machine.getSshHostAndPort() + " after waiting " + stopwatch.elapsed(TimeUnit.SECONDS), reachable.getError());
} else {
LOG.debug("Connection succeeded for {} after {}", machine.getSshHostAndPort(), stopwatch.elapsed(TimeUnit.SECONDS));
}
}
protected void registerPortMappings(KubernetesSshMachineLocation machine, Entity entity, Service service) {
PortForwardManager portForwardManager = getPortForwardManager();
List<ServicePort> ports = service.getSpec().getPorts();
String publicHostText = machine.getSshHostAndPort().getHost();
LOG.debug("Recording port-mappings for container {} of {}: {}", machine, this, ports);
for (ServicePort port : ports) {
String protocol = port.getProtocol();
Integer targetPort = port.getTargetPort().getIntVal();
if (!"TCP".equalsIgnoreCase(protocol)) {
LOG.debug("Ignoring port mapping {} for {} because only TCP is currently supported", port, machine);
} else if (targetPort == null) {
LOG.debug("Ignoring port mapping {} for {} because targetPort.intValue is null", port, machine);
} else if (port.getNodePort() == null) {
LOG.debug("Ignoring port mapping {} to {} because port.getNodePort() is null", targetPort, machine);
} else {
portForwardManager.associate(publicHostText, HostAndPort.fromParts(publicHostText, port.getNodePort()), machine, targetPort);
AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("kubernetes." + Strings.maybeNonBlank(port.getName()).or(targetPort.toString()) + ".port");
entity.sensors().set(sensor, targetPort);
}
}
entity.enrichers().add(EnricherSpec.create(OnPublicNetworkEnricher.class)
.configure(AbstractOnNetworkEnricher.MAP_MATCHING, "kubernetes.[a-zA-Z0-9][a-zA-Z0-9-_]*.port"));
}
private PortForwardManager getPortForwardManager() {
return (PortForwardManager) getManagementContext().getLocationRegistry()
.getLocationManaged(PortForwardManagerLocationResolver.PFM_GLOBAL_SPEC);
}
protected synchronized Namespace createOrGetNamespace(final String name, Boolean create) {
try (KubernetesClient client = getClient()) {
Namespace namespace = client.namespaces().withName(name).get();
ExitCondition namespaceReady = new ExitCondition() {
@Override
public Boolean call() {
Namespace actualNamespace = client.namespaces().withName(name).get();
return actualNamespace != null && actualNamespace.getStatus().getPhase().equals(PHASE_ACTIVE);
}
@Override
public String getFailureMessage() {
Namespace actualNamespace = client.namespaces().withName(name).get();
return "Namespace for " + name + " " + (actualNamespace == null ? "absent" : " status " + actualNamespace.getStatus());
}
};
if (namespace != null) {
LOG.debug("Found namespace {}, returning it.", namespace);
} else if (create) {
namespace = client.namespaces().create(new NamespaceBuilder().withNewMetadata().withName(name).addToLabels("name", name).endMetadata().build());
LOG.debug("Created namespace {}.", namespace);
} else {
throw new IllegalStateException("Namespace " + name + " does not exist and namespace.create is not set");
}
waitForExitCondition(namespaceReady);
return client.namespaces().withName(name).get();
}
}
protected Pod getPod(final String namespace, final String name) {
try (KubernetesClient client = getClient()) {
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
Pod result = client.pods().inNamespace(namespace).withName(name).get();
return result != null && result.getStatus().getPodIP() != null;
}
@Override
public String getFailureMessage() {
return "Cannot find pod with name: " + name;
}
};
waitForExitCondition(exitCondition);
return client.pods().inNamespace(namespace).withName(name).get();
}
}
protected Pod getPod(final String namespace, final Map<String, String> metadata) {
try (KubernetesClient client = getClient()) {
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
PodList result = client.pods().inNamespace(namespace).withLabels(metadata).list();
return !result.getItems().isEmpty() && result.getItems().get(0).getStatus().getPodIP() != null;
}
@Override
public String getFailureMessage() {
return "Cannot find pod with metadata: " + Joiner.on(" ").withKeyValueSeparator("=").join(metadata);
}
};
waitForExitCondition(exitCondition);
PodList result = client.pods().inNamespace(namespace).withLabels(metadata).list();
return result.getItems().get(0);
}
}
protected void createSecrets(String namespace, Map<String, String> secrets) {
for (Map.Entry<String, String> nameAuthEntry : secrets.entrySet()) {
createSecret(namespace, nameAuthEntry.getKey(), nameAuthEntry.getValue());
}
}
protected Secret createSecret(final String namespace, final String secretName, String auth) {
try (KubernetesClient client = getClient()) {
Secret secret = client.secrets().inNamespace(namespace).withName(secretName).get();
if (secret != null) return secret;
String json = String.format("{\"https://index.docker.io/v1/\":{\"auth\":\"%s\"}}", auth);
String base64encoded = BaseEncoding.base64().encode(json.getBytes(Charset.defaultCharset()));
secret = new SecretBuilder()
.withNewMetadata()
.withName(secretName)
.endMetadata()
.withType(KUBERNETES_DOCKERCFG)
.withData(ImmutableMap.of(".dockercfg", base64encoded))
.build();
try {
client.secrets().inNamespace(namespace).create(secret);
} catch (KubernetesClientException e) {
if (e.getCode() == 500 && e.getMessage().contains("Message: resourceVersion may not be set on objects to be created")) {
// ignore exception as per https://github.com/fabric8io/kubernetes-client/issues/451
} else {
throw Throwables.propagate(e);
}
}
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
return client.secrets().inNamespace(namespace).withName(secretName).get() != null;
}
@Override
public String getFailureMessage() {
return "Absent namespace=" + namespace + ", secretName=" + secretName;
}
};
waitForExitCondition(exitCondition);
return client.secrets().inNamespace(namespace).withName(secretName).get();
}
}
protected Container buildContainer(String namespace, Map<String, String> metadata, String deploymentName, String imageName, Iterable<Integer> inboundPorts, Map<String, ?> env, Map<String, String> limits, boolean privileged) {
List<ContainerPort> containerPorts = Lists.newArrayList();
inboundPorts.forEach(inboundPort -> containerPorts.add(new ContainerPortBuilder().withContainerPort(inboundPort).build()));
List<EnvVar> envVars = Lists.newArrayList();
env.forEach((key,value) -> envVars.add(new EnvVarBuilder().withName(key).withValue(value.toString()).build()));
ContainerBuilder containerBuilder = new ContainerBuilder()
.withName(deploymentName)
.withImage(imageName)
.addToPorts(Iterables.toArray(containerPorts, ContainerPort.class))
.addToEnv(Iterables.toArray(envVars, EnvVar.class))
.withNewSecurityContext()
.withPrivileged(privileged)
.endSecurityContext();
if (limits != null) {
for (Map.Entry<String, String> nameValueEntry : limits.entrySet()) {
ResourceRequirements resourceRequirements = new ResourceRequirementsBuilder().addToLimits(nameValueEntry.getKey(),
new QuantityBuilder().withAmount(nameValueEntry.getValue()).build()).build();
containerBuilder.withResources(resourceRequirements);
}
}
LOG.debug("Built container {} to be deployed in namespace {} with metadata {}.", containerBuilder.build(), namespace, metadata);
return containerBuilder.build();
}
protected void deploy(final String namespace, Entity entity, Map<String, String> metadata, final String deploymentName,
Container container, final Integer replicas, Map<String, String> secrets) {
PodTemplateSpecBuilder podTemplateSpecBuilder = new PodTemplateSpecBuilder()
.withNewMetadata()
.addToLabels("name", deploymentName)
.addToLabels(metadata)
.endMetadata()
.withNewSpec()
.addToContainers(container)
.endSpec();
if (secrets != null) {
for (String secretName : secrets.keySet()) {
podTemplateSpecBuilder.withNewSpec()
.addToContainers(container)
.addNewImagePullSecret(secretName)
.endSpec();
}
}
PodTemplateSpec template = podTemplateSpecBuilder.build();
LabelSelectorBuilder labelSelectorBuilder = new LabelSelectorBuilder();
labelSelectorBuilder.addToMatchLabels(metadata);
LabelSelector labelSelector = labelSelectorBuilder.build();
Deployment deployment = new DeploymentBuilder()
.withNewMetadata()
.withName(deploymentName)
.addToAnnotations(BROOKLYN_ENTITY_ID, entity.getId())
.addToAnnotations(BROOKLYN_APPLICATION_ID, entity.getApplicationId())
.endMetadata()
.withNewSpec()
.withSelector(labelSelector)
.withReplicas(replicas)
.withTemplate(template)
.endSpec()
.build();
try (KubernetesClient client = getClient()) {
client.apps().deployments().inNamespace(namespace).create(deployment);
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
Deployment dep = client.apps().deployments().inNamespace(namespace).withName(deploymentName).get();
DeploymentStatus status = (dep == null) ? null : dep.getStatus();
Integer replicas = (status == null) ? null : status.getAvailableReplicas();
return replicas != null;
}
@Override
public String getFailureMessage() {
Deployment dep = client.apps().deployments().inNamespace(namespace).withName(deploymentName).get();
DeploymentStatus status = (dep == null) ? null : dep.getStatus();
return "Namespace=" + namespace + "; deploymentName= " + deploymentName + "; Deployment=" + dep
+ "; status=" + status
+ "; availableReplicas=" + (status == null ? "null" : status.getAvailableReplicas());
}
};
waitForExitCondition(exitCondition);
}
LOG.debug("Deployed deployment {} in namespace {}.", deployment, namespace);
}
protected Service exposeService(String namespace, Map<String, String> metadata, String serviceName, Iterable<Integer> inboundPorts) {
List<ServicePort> servicePorts = Lists.newArrayList();
for (Integer inboundPort : inboundPorts) {
servicePorts.add(new ServicePortBuilder().withName(Integer.toString(inboundPort)).withPort(inboundPort).build());
}
Service service = new ServiceBuilder().withNewMetadata().withName(serviceName).endMetadata()
.withNewSpec()
.addToSelector(metadata)
.addToPorts(Iterables.toArray(servicePorts, ServicePort.class))
.withType(NODE_PORT)
.endSpec()
.build();
try (KubernetesClient client = getClient()) {
client.services().inNamespace(namespace).create(service);
service = getService(namespace, serviceName, client);
LOG.debug("Exposed service {} in namespace {}.", service, namespace);
return service;
}
}
protected Service getService(final String namespace, final String serviceName, KubernetesClient client) {
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
Service svc = client.services().inNamespace(namespace).withName(serviceName).get();
if (svc == null || svc.getStatus() == null) {
return false;
}
Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(serviceName).get();
if (endpoints == null || endpoints.getSubsets().isEmpty()) {
return false;
}
for (EndpointSubset subset : endpoints.getSubsets()) {
if (!subset.getNotReadyAddresses().isEmpty()) {
return false;
}
}
return true;
}
@Override
public String getFailureMessage() {
Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(serviceName).get();
return "Service endpoints in " + namespace + " for serviceName= " + serviceName + " not ready: " + endpoints;
}
};
waitForExitCondition(exitCondition);
return client.services().inNamespace(namespace).withName(serviceName).get();
}
protected LocationSpec<KubernetesSshMachineLocation> prepareSshableLocationSpec(Entity entity, ConfigBag setup, Service service, Pod pod) {
InetAddress node = Networking.getInetAddressWithFixedName(pod.getSpec().getNodeName());
String podAddress = pod.getStatus().getPodIP();
LocationSpec<KubernetesSshMachineLocation> locationSpec = LocationSpec.create(KubernetesSshMachineLocation.class)
.configure(ADDRESS_KEY, node)
.configure(SshMachineLocation.PRIVATE_ADDRESSES, ImmutableSet.of(podAddress))
.configure(CALLER_CONTEXT, setup.get(CALLER_CONTEXT));
if (!isDockerContainer(entity)) {
Optional<ServicePort> sshPort = Iterables.tryFind(service.getSpec().getPorts(), input -> "TCP".equalsIgnoreCase(input.getProtocol()) && input.getPort() == 22);
Optional<Integer> sshPortNumber;
if (sshPort.isPresent()) {
sshPortNumber = Optional.of(sshPort.get().getNodePort());
} else {
LOG.warn("No port-mapping found to ssh port 22, for container {}", service);
sshPortNumber = Optional.absent();
}
locationSpec.configure(CloudLocationConfig.USER, setup.get(KubernetesLocationConfig.LOGIN_USER))
.configure(SshMachineLocation.PASSWORD, setup.get(KubernetesLocationConfig.LOGIN_USER_PASSWORD))
.configureIfNotNull(SshMachineLocation.SSH_PORT, sshPortNumber.orNull())
.configure(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION, true)
.configure(BrooklynConfigKeys.ONBOX_BASE_DIR, "/tmp");
}
return locationSpec;
}
protected void createPersistentVolumes(List<String> volumes) {
for (final String persistentVolume : volumes) {
PersistentVolume volume = new PersistentVolumeBuilder()
.withNewMetadata()
.withName(persistentVolume)
.withLabels(ImmutableMap.of("type", "local")) // TODO make it configurable
.endMetadata()
.withNewSpec()
.addToCapacity("storage", new QuantityBuilder().withAmount("20").build()) // TODO make it configurable
.addToAccessModes("ReadWriteOnce") // TODO make it configurable
.withNewHostPath().withPath("/tmp/pv-1").endHostPath() // TODO make it configurable
.endSpec()
.build();
try (KubernetesClient client = getClient()) {
client.persistentVolumes().create(volume);
ExitCondition exitCondition = new ExitCondition() {
@Override
public Boolean call() {
PersistentVolume pv = client.persistentVolumes().withName(persistentVolume).get();
return pv != null && pv.getStatus() != null
&& pv.getStatus().getPhase().equals(PHASE_AVAILABLE);
}
@Override
public String getFailureMessage() {
PersistentVolume pv = client.persistentVolumes().withName(persistentVolume).get();
return "PersistentVolume for " + persistentVolume + " " + (pv == null ? "absent" : "pv=" + pv);
}
};
waitForExitCondition(exitCondition);
}
}
}
protected Entity validateCallerContext(ConfigBag setup) {
// Lookup entity flags
Object callerContext = setup.get(LocationConfigKeys.CALLER_CONTEXT);
if (callerContext instanceof Entity) {
return (Entity) callerContext;
} else {
throw new IllegalStateException("Invalid caller context: " + callerContext);
}
}
protected Entity validateCallerContext(MachineLocation machine) {
// Lookup entity flags
Object callerContext = machine.config().get(LocationConfigKeys.CALLER_CONTEXT);
if (callerContext instanceof Entity) {
return (Entity) callerContext;
} else {
throw new IllegalStateException("Invalid caller context: " + callerContext);
}
}
protected Map<String, String> findMetadata(Entity entity, ConfigBag setup, String value) {
Map<String, String> podMetadata = Maps.newLinkedHashMap();
podMetadata.put(isDockerContainer(entity) ? IMMUTABLE_CONTAINER_KEY : SSHABLE_CONTAINER, value);
Map<String, Object> metadata = MutableMap.<String, Object>builder()
.putAll(MutableMap.copyOf(setup.get(KubernetesPod.METADATA)))
.putAll(MutableMap.copyOf(entity.config().get(KubernetesPod.METADATA)))
.putAll(podMetadata)
.build();
return Maps.transformValues(metadata, Functions.toStringFunction());
}
/**
* Sets the {@code BROOKLYN_ROOT_PASSWORD} variable in the container environment if appropriate.
* This is (approximately) the same behaviour as the {@link DockerJcloudsLocation} used for
* Swarm.
* <p>
* Side-effects the location {@code config} to set the {@link KubernetesLocationConfig#LOGIN_USER_PASSWORD loginUser.password}
* if one is auto-generated. Note that this injected value overrides any other settings configured for the
* container environment.
*/
protected Map<String, String> findEnvironmentVariables(Entity entity, ConfigBag setup, String imageName) {
String loginUser = setup.get(LOGIN_USER);
String loginPassword = setup.get(LOGIN_USER_PASSWORD);
Map<String, String> injections = Maps.newLinkedHashMap();
// Check if login credentials should be injected
Boolean injectLoginCredentials = setup.get(INJECT_LOGIN_CREDENTIAL);
if (injectLoginCredentials == null) {
for (String regex : IMAGE_DESCRIPTION_REGEXES_REQUIRING_INJECTED_LOGIN_CREDS) {
if (imageName != null && imageName.matches(regex)) {
injectLoginCredentials = true;
break;
}
}
}
if (Boolean.TRUE.equals(injectLoginCredentials)) {
if ((Strings.isBlank(loginUser) || "root".equals(loginUser))) {
loginUser = "root";
setup.configure(LOGIN_USER, loginUser);
if (Strings.isBlank(loginPassword)) {
loginPassword = Identifiers.makeRandomPassword(12);
setup.configure(LOGIN_USER_PASSWORD, loginPassword);
}
injections.put(BROOKLYN_ROOT_PASSWORD, loginPassword);
}
}
Map<String, Object> rawEnv = MutableMap.<String, Object>builder()
.putAll(MutableMap.copyOf(setup.get(ENV)))
.putAll(MutableMap.copyOf(entity.config().get(DockerContainer.CONTAINER_ENVIRONMENT)))
.putAll(injections)
.build();
return Maps.transformValues(rawEnv, Functions.toStringFunction());
}
protected Iterable<Integer> findInboundPorts(Entity entity, ConfigBag setup) {
Iterable<String> inboundTcpPorts = entity.config().get(DockerContainer.INBOUND_TCP_PORTS);
if(inboundTcpPorts == null)
return setup.containsKey(INBOUND_PORTS) ? toIntPortList(setup.get(INBOUND_PORTS)) : ImmutableList.of(22);
List<Integer> inboundPorts = Lists.newArrayList();
List<String> portRanges = MutableList.copyOf(entity.config().get(DockerContainer.INBOUND_TCP_PORTS));
for (String portRange : portRanges) {
for (Integer port : PortRanges.fromString(portRange)) {
inboundPorts.add(port);
}
}
return inboundPorts;
}
protected List<Integer> toIntPortList(Object v) {
if (v == null) return ImmutableList.of();
PortRange portRange = PortRanges.fromIterable(ImmutableList.of(v));
return ImmutableList.copyOf(portRange);
}
protected String findImageName(Entity entity, ConfigBag setup) {
String result = entity.config().get(DockerContainer.IMAGE_NAME);
if (Strings.isNonBlank(result)) return result;
result = setup.get(IMAGE);
if (Strings.isNonBlank(result)) return result;
String osFamily = setup.get(OS_FAMILY);
String osVersion = setup.get(OS_VERSION_REGEX);
Optional<String> imageName = new ImageChooser().chooseImage(osFamily, osVersion);
if (imageName.isPresent()) return imageName.get();
throw new IllegalStateException("No matching image found for " + entity
+ " (no explicit image name, osFamily=" + osFamily + "; osVersion=" + osVersion + ")");
}
protected boolean isDockerContainer(Entity entity) {
return implementsInterface(entity, DockerContainer.class);
}
protected boolean isKubernetesPod(Entity entity) {
return implementsInterface(entity, KubernetesPod.class);
}
protected boolean isKubernetesResource(Entity entity) {
return implementsInterface(entity, KubernetesResource.class);
}
public boolean implementsInterface(Entity entity, Class<?> type) {
return Iterables.tryFind(Arrays.asList(entity.getClass().getInterfaces()), iface -> iface.isAssignableFrom(type)).isPresent();
}
@Override
public MachineProvisioningLocation<KubernetesMachineLocation> newSubLocation(Map<?, ?> newFlags) {
throw new UnsupportedOperationException();
}
/** @see {@link #lookup(ConfigKey, Entity, ConfigBag, Object)} */
public <T> T lookup(ConfigKey<T> config, Entity entity, ConfigBag setup) {
return lookup(config, entity, setup, config.getDefaultValue());
}
/**
* Looks up {@link ConfigKey configuration} with the entity value taking precedence over the
* location, and returning a default value (normally {@literal null}) if neither is present.
*/
public <T> T lookup(final ConfigKey<T> config, Entity entity, ConfigBag setup, T defaultValue) {
boolean entityConfigPresent = !entity.config().findKeysPresent(config::equals).isEmpty();
boolean setupBagConfigPresent = setup.containsKey(config);
if (entityConfigPresent) {
return entity.config().get(config);
} else if (setupBagConfigPresent) {
return setup.get(config);
}
return defaultValue;
}
public void waitForExitCondition(ExitCondition exitCondition) {
waitForExitCondition(exitCondition, Duration.ONE_SECOND, Duration.FIVE_MINUTES);
}
public void waitForExitCondition(ExitCondition exitCondition, Duration initial, Duration duration) {
ReferenceWithError<Boolean> result = Repeater.create()
.backoff(initial, 1.2, duration)
.limitTimeTo(duration)
.until(exitCondition)
.runKeepingError();
if (!Boolean.TRUE.equals(result.get())) {
String err = String.format("Exit condition unsatisfied after %s: %s", duration, exitCondition.getFailureMessage());
LOG.info("{} (rethrowing)", err);
throw new IllegalStateException(err);
}
}
public interface ExitCondition extends Callable<Boolean> {
String getFailureMessage();
}
}