blob: d330ddeb772f168b9f284f8bbfa00a5942481949 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.stratos.cloud.controller.iaases.kubernetes;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.*;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.context.CloudControllerContext;
import org.apache.stratos.cloud.controller.domain.*;
import org.apache.stratos.cloud.controller.domain.kubernetes.KubernetesCluster;
import org.apache.stratos.cloud.controller.domain.kubernetes.KubernetesClusterContext;
import org.apache.stratos.cloud.controller.domain.kubernetes.KubernetesHost;
import org.apache.stratos.cloud.controller.domain.kubernetes.PortRange;
import org.apache.stratos.cloud.controller.exception.*;
import org.apache.stratos.cloud.controller.iaases.Iaas;
import org.apache.stratos.cloud.controller.iaases.PartitionValidator;
import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.common.Property;
import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.common.domain.NameValuePair;
import org.apache.stratos.kubernetes.client.KubernetesApiClient;
import org.apache.stratos.kubernetes.client.KubernetesConstants;
import org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException;
import org.apache.stratos.messaging.domain.topology.KubernetesService;
import java.util.*;
import java.util.concurrent.locks.Lock;
/**
* Kubernetes IaaS implementation.
*/
public class KubernetesIaas extends Iaas {
private static final Log log = LogFactory.getLog(KubernetesIaas.class);
private static final long DEFAULT_POD_ACTIVATION_TIMEOUT = 60000; // 1 min
private static final String PAYLOAD_PARAMETER_SEPARATOR = ",";
private static final String PAYLOAD_PARAMETER_NAME_VALUE_SEPARATOR = "=";
private static final String PAYLOAD_PARAMETER_PREFIX = "payload_parameter.";
private static final String PORT_MAPPINGS = "PORT_MAPPINGS";
private static final String KUBERNETES_CONTAINER_CPU = "KUBERNETES_CONTAINER_CPU";
private static final String KUBERNETES_CONTAINER_MEMORY = "KUBERNETES_CONTAINER_MEMORY";
private static final String KUBERNETES_SERVICE_SESSION_AFFINITY = "KUBERNETES_SERVICE_SESSION_AFFINITY";
private static final String KUBERNETES_CONTAINER_CPU_DEFAULT = "kubernetes.container.cpu.default";
private static final String KUBERNETES_CONTAINER_MEMORY_DEFAULT = "kubernetes.container.memory.default";
public static final String POD_ID_PREFIX = "pod";
public static final String SERVICE_NAME_PREFIX = "service";
private PartitionValidator partitionValidator;
private List<NameValuePair> payload;
private Long podActivationTimeout;
public KubernetesIaas(IaasProvider iaasProvider) {
super(iaasProvider);
partitionValidator = new KubernetesPartitionValidator();
payload = new ArrayList<NameValuePair>();
podActivationTimeout = Long.getLong("stratos.pod.activation.timeout");
if (podActivationTimeout == null) {
podActivationTimeout = DEFAULT_POD_ACTIVATION_TIMEOUT;
if (log.isInfoEnabled()) {
log.info("Pod activation timeout was set: " + podActivationTimeout);
}
}
}
@Override
public void initialize() {
}
/**
* Set dynamic payload which needs to be passed to the containers as environment variables.
*
* @param payloadByteArray
*/
@Override
public void setDynamicPayload(byte[] payloadByteArray) {
// Clear existing payload parameters
payload.clear();
if (payloadByteArray != null) {
String payloadString = new String(payloadByteArray);
String[] parameterArray = payloadString.split(PAYLOAD_PARAMETER_SEPARATOR);
if (parameterArray != null) {
for (String parameter : parameterArray) {
if (parameter != null) {
String[] nameValueArray = parameter.split(PAYLOAD_PARAMETER_NAME_VALUE_SEPARATOR, 2);
if ((nameValueArray != null) && (nameValueArray.length == 2)) {
NameValuePair nameValuePair = new NameValuePair(nameValueArray[0], nameValueArray[1]);
payload.add(nameValuePair);
}
}
}
if (log.isDebugEnabled()) {
log.debug("Dynamic payload is set: " + payload.toString());
}
}
}
}
@Override
public MemberContext startInstance(MemberContext memberContext, byte[] payload) throws CartridgeNotFoundException {
setDynamicPayload(payload);
return startContainer(memberContext);
}
@Override
public PartitionValidator getPartitionValidator() {
return partitionValidator;
}
@Override
public void terminateInstance(MemberContext memberContext) throws InvalidCartridgeTypeException,
InvalidMemberException, MemberTerminationFailedException {
terminateContainer(memberContext);
}
/**
* Starts a container via kubernetes for the given member context.
*
* @param memberContext
* @return
* @throws CartridgeNotFoundException
*/
public MemberContext startContainer(MemberContext memberContext)
throws CartridgeNotFoundException {
Lock lock = null;
try {
lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
handleNullObject(memberContext, "member context is null");
log.info(String.format("Starting container: [application] %s [cartridge] %s [member] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType(),
memberContext.getMemberId()));
// Validate cluster id
String clusterId = memberContext.getClusterId();
String memberId = memberContext.getMemberId();
handleNullObject(clusterId, "cluster id is null in member context");
// Validate cluster context
ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
handleNullObject(clusterContext,
String.format("Cluster context not found: [application] %s [cartridge] %s " +
"[cluster] %s", memberContext.getApplicationId(), memberContext.getCartridgeType(),
clusterId));
// Validate partition
Partition partition = memberContext.getPartition();
handleNullObject(partition, String.format("partition not found in member context: [application] %s " +
"[cartridge] %s [member] %s", memberContext.getApplicationId(),
memberContext.getCartridgeType(),
memberContext.getMemberId()));
// Validate cartridge
String cartridgeType = clusterContext.getCartridgeType();
Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
if (cartridge == null) {
String msg = String.format("Cartridge not found: [application] %s [cartridge] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType());
log.error(msg);
throw new CartridgeNotFoundException(msg);
}
String kubernetesClusterId = partition.getKubernetesClusterId();
KubernetesCluster kubernetesCluster = CloudControllerContext.getInstance().
getKubernetesCluster(kubernetesClusterId);
handleNullObject(kubernetesCluster, "kubernetes cluster not found: " +
"[kubernetes-cluster] " + kubernetesClusterId + " [cluster] " + clusterId +
" [member] " + memberId);
// Prepare kubernetes context
String kubernetesMasterIp = kubernetesCluster.getKubernetesMaster().getPrivateIPAddress();
PortRange kubernetesPortRange = kubernetesCluster.getPortRange();
String kubernetesMasterPort = CloudControllerUtil.getProperty(
kubernetesCluster.getKubernetesMaster().getProperties(), StratosConstants.KUBERNETES_MASTER_PORT,
StratosConstants.KUBERNETES_MASTER_DEFAULT_PORT);
// Add kubernetes cluster payload parameters to payload
if ((kubernetesCluster.getProperties() != null) &&
(kubernetesCluster.getProperties().getProperties() != null)) {
for (Property property : kubernetesCluster.getProperties().getProperties()) {
if (property != null) {
if (property.getName().startsWith(PAYLOAD_PARAMETER_PREFIX)) {
String name = property.getName().replace(PAYLOAD_PARAMETER_PREFIX, "");
payload.add(new NameValuePair(name, property.getValue()));
}
}
}
}
KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterContext(kubernetesClusterId,
kubernetesMasterIp, kubernetesMasterPort, kubernetesPortRange.getUpper(),
kubernetesPortRange.getLower());
// Generate kubernetes service ports and update port mappings in cartridge
generateKubernetesServicePorts(clusterContext.getApplicationId(), clusterContext.getClusterId(),
kubernetesClusterContext, cartridge);
// Create kubernetes services for port mappings
KubernetesApiClient kubernetesApi = kubernetesClusterContext.getKubApi();
createKubernetesServices(kubernetesApi, clusterContext, kubernetesCluster, kubernetesClusterContext,
memberContext);
// Create pod
createPod(clusterContext, memberContext, kubernetesApi, kubernetesClusterContext);
// Wait for pod status to be changed to running
Pod pod = waitForPodToBeActivated(memberContext, kubernetesApi);
// Update member context
updateMemberContext(memberContext, pod, kubernetesCluster);
log.info(String.format("Container started successfully: [application] %s [cartridge] %s [member] %s " +
"[pod] %s [cpu] %s [memory] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType(),
memberContext.getMemberId(), memberContext.getKubernetesPodId(),
memberContext.getInstanceMetadata().getCpu(), memberContext.getInstanceMetadata().getRam()));
return memberContext;
}
catch (Exception e) {
String msg = String.format("Could not start container: [application] %s [cartridge] %s [member] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType(),
memberContext.getMemberId());
log.error(msg, e);
throw new RuntimeException(msg, e);
}
finally {
if (lock != null) {
CloudControllerContext.getInstance().releaseWriteLock(lock);
}
}
}
private void updateMemberContext(MemberContext memberContext, Pod pod, KubernetesCluster kubernetesCluster) {
String memberPrivateIPAddress = pod.getStatus().getPodIP();
String podHostIPAddress = pod.getStatus().getHostIP();
String memberPublicIPAddress = podHostIPAddress;
String kubernetesHostPublicIP = findKubernetesHostPublicIPAddress(kubernetesCluster, podHostIPAddress);
if (StringUtils.isNotBlank(kubernetesHostPublicIP)) {
memberPublicIPAddress = kubernetesHostPublicIP;
if (log.isInfoEnabled()) {
log.info(String.format("Member public IP address set to kubernetes host public IP address:" +
"[pod-host-ip] %s [kubernetes-host-public-ip] %s", podHostIPAddress, kubernetesHostPublicIP));
}
}
memberContext.setInstanceId(pod.getMetadata().getName());
memberContext.setDefaultPrivateIP(memberPrivateIPAddress);
memberContext.setPrivateIPs(new String[]{memberPrivateIPAddress});
memberContext.setDefaultPublicIP(memberPublicIPAddress);
memberContext.setPublicIPs(new String[]{memberPublicIPAddress});
memberContext.setInitTime(memberContext.getInitTime());
memberContext.setProperties(memberContext.getProperties());
}
private String findKubernetesHostPublicIPAddress(KubernetesCluster kubernetesCluster, String podHostIP) {
if ((kubernetesCluster != null) && (StringUtils.isNotBlank(podHostIP))) {
for (KubernetesHost kubernetesHost : kubernetesCluster.getKubernetesHosts()) {
if (kubernetesHost != null) {
if (podHostIP.equals(kubernetesHost.getPrivateIPAddress())) {
return kubernetesHost.getPublicIPAddress();
}
}
}
}
return null;
}
private Pod waitForPodToBeActivated(MemberContext memberContext, KubernetesApiClient kubernetesApi)
throws KubernetesClientException, InterruptedException {
Pod pod;
boolean podCreated = false;
boolean podRunning = false;
long startTime = System.currentTimeMillis();
while (!podRunning) {
pod = kubernetesApi.getPod(memberContext.getKubernetesPodId());
if (pod != null) {
podCreated = true;
if (pod.getStatus().getPhase().equals(KubernetesConstants.POD_STATUS_RUNNING)) {
log.info(String.format(
"Pod status changed to running: [application] %s [cartridge] %s [member] %s " +
"[pod] %s", memberContext.getApplicationId(), memberContext.getCartridgeType(),
memberContext.getMemberId(), pod.getMetadata().getName()));
return pod;
} else {
log.info(String.format("Waiting pod status to be changed to running: [application] %s " +
"[cartridge] %s [member] %s [pod] %s", memberContext.getApplicationId(),
memberContext.getCartridgeType(), memberContext.getMemberId(),
pod.getMetadata().getName()));
}
} else {
log.info(String.format("Waiting for pod to be created: [application] %s " +
"[cartridge] %s [member] %s [pod] %s", memberContext.getApplicationId(),
memberContext.getCartridgeType(), memberContext.getMemberId(),
memberContext.getKubernetesPodId()));
}
if ((System.currentTimeMillis() - startTime) > podActivationTimeout) {
break;
}
Thread.sleep(5000);
}
String message;
if (podCreated) {
// Pod created but status did not change to running
message = String.format("Pod status did not change to running within %d sec: " +
"[application] %s [cartridge] %s [member] %s [pod] %s",
(podActivationTimeout.intValue() / 1000),
memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(),
memberContext.getKubernetesPodId());
log.error(message);
} else {
// Pod did not create
message = String.format("Pod did not create within %d sec: " +
"[application] %s [cartridge] %s [member] %s [pod] %s",
(podActivationTimeout.intValue() / 1000),
memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(),
memberContext.getKubernetesPodId());
log.error(message);
}
throw new RuntimeException(message);
}
/**
* Create new pod and pass environment variables.
*
* @param memberContext
* @param kubernetesApi
* @param kubernetesClusterContext
* @throws KubernetesClientException
*/
private void createPod(ClusterContext clusterContext, MemberContext memberContext,
KubernetesApiClient kubernetesApi, KubernetesClusterContext kubernetesClusterContext)
throws KubernetesClientException {
String applicationId = memberContext.getApplicationId();
String cartridgeType = memberContext.getCartridgeType();
String clusterId = memberContext.getClusterId();
String memberId = memberContext.getMemberId();
if (log.isInfoEnabled()) {
log.info(String.format("Creating kubernetes pod: [application] %s [cartridge] %s [member] %s",
applicationId, cartridgeType, memberId));
}
Partition partition = memberContext.getPartition();
if (partition == null) {
String message = String.format("Partition not found in member context: [application] %s [cartridge] %s " +
"[member] %s ", applicationId, cartridgeType,
memberId);
log.error(message);
throw new RuntimeException(message);
}
Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
if (cartridge == null) {
String message = "Could not find cartridge: [cartridge] " + cartridgeType;
log.error(message);
throw new RuntimeException(message);
}
// Set default values to zero to avoid cpu and memory restrictions
String cpu = System.getProperty(KUBERNETES_CONTAINER_CPU_DEFAULT, "0");
String memory = System.getProperty(KUBERNETES_CONTAINER_MEMORY_DEFAULT, "0");
Property cpuProperty = cartridge.getProperties().getProperty(KUBERNETES_CONTAINER_CPU);
if (cpuProperty != null) {
cpu = cpuProperty.getValue();
}
Property memoryProperty = cartridge.getProperties().getProperty(KUBERNETES_CONTAINER_MEMORY);
if (memoryProperty != null) {
memory = memoryProperty.getValue();
}
IaasProvider iaasProvider =
CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridge.getType(), partition.getId());
if (iaasProvider == null) {
String message = "Could not find iaas provider: [partition] " + partition.getId();
log.error(message);
throw new RuntimeException(message);
}
// Add dynamic payload to the member context
memberContext.setDynamicPayload(payload.toArray(new NameValuePair[payload.size()]));
// Find next available sequence number
long podSeqNo = kubernetesClusterContext.getNextPodSeqNo();
String podId = preparePodId(podSeqNo);
while (kubernetesApi.getPod(podId) != null) {
podSeqNo = kubernetesClusterContext.getNextPodSeqNo();
podId = preparePodId(podSeqNo);
}
// Create pod
String podName = DigestUtils.md5Hex(clusterId);
String dockerImage = iaasProvider.getImage();
List<EnvVar> environmentVariables = KubernetesIaasUtil.prepareEnvironmentVariables(
clusterContext, memberContext);
List<ContainerPort> ports = KubernetesIaasUtil.convertPortMappings(Arrays.asList(cartridge.getPortMappings()));
log.info(String.format("Starting pod: [application] %s [cartridge] %s [member] %s " +
"[cpu] %s [memory] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType(),
memberContext.getMemberId(), cpu, memory));
Map<String, String> podLabels = new HashMap<>();
podLabels.put(KubernetesConstants.SERVICE_SELECTOR_LABEL, podName);
podLabels.put(CloudControllerConstants.APPLICATION_ID_LABEL,
trimLabel(CloudControllerConstants.APPLICATION_ID_LABEL, memberContext.getApplicationId()));
podLabels.put(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL,
trimLabel(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL, memberContext.getClusterInstanceId()));
podLabels.put(CloudControllerConstants.MEMBER_ID_LABEL,
trimLabel(CloudControllerConstants.MEMBER_ID_LABEL, memberContext.getMemberId()));
Map<String, String> podAnnotations = new HashMap<>();
podAnnotations.put(CloudControllerConstants.APPLICATION_ID_LABEL, memberContext.getApplicationId());
podAnnotations.put(CloudControllerConstants.CARTRIDGE_TYPE_LABEL, memberContext.getCartridgeType());
podAnnotations.put(CloudControllerConstants.CLUSTER_ID_LABEL, memberContext.getClusterId());
podAnnotations.put(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL, memberContext.getClusterInstanceId());
podAnnotations.put(CloudControllerConstants.MEMBER_ID_LABEL, memberContext.getMemberId());
kubernetesApi.createPod(podId, podName, podLabels, podAnnotations, dockerImage, cpu, memory, ports,
environmentVariables);
log.info(String.format("Pod started successfully: [application] %s [cartridge] %s [member] %s " +
"[pod] %s [pod-label] %s [cpu] %s [memory] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType(),
memberContext.getMemberId(), podId, podName, cpu, memory));
// Add pod id to member context
memberContext.setKubernetesPodId(podId);
memberContext.setKubernetesPodName(podName);
// Create instance metadata
InstanceMetadata instanceMetadata = new InstanceMetadata();
instanceMetadata.setImageId(dockerImage);
instanceMetadata.setCpu(cpu);
instanceMetadata.setRam(memory);
memberContext.setInstanceMetadata(instanceMetadata);
// Persist cloud controller context
CloudControllerContext.getInstance().persist();
}
private String preparePodId(long podSeqNo) {
return POD_ID_PREFIX + "-" + podSeqNo;
}
/**
* Creates and returns proxy services for the cluster.
*
* @param kubernetesApi
* @param clusterContext
* @param kubernetesCluster
* @param kubernetesClusterContext
* @throws KubernetesClientException
*/
private void createKubernetesServices(KubernetesApiClient kubernetesApi, ClusterContext clusterContext,
KubernetesCluster kubernetesCluster, KubernetesClusterContext
kubernetesClusterContext, MemberContext memberContext)
throws KubernetesClientException {
String clusterId = clusterContext.getClusterId();
String cartridgeType = clusterContext.getCartridgeType();
Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
if (cartridge == null) {
String message = "Could not create kubernetes services, cartridge not found: [cartridge] " +
cartridgeType;
log.error(message);
throw new RuntimeException(message);
}
String sessionAffinity = null;
Property sessionAffinityProperty = cartridge.getProperties().getProperty(KUBERNETES_SERVICE_SESSION_AFFINITY);
if (sessionAffinityProperty != null) {
sessionAffinity = sessionAffinityProperty.getValue();
}
// Prepare minion public IP addresses
List<String> minionPublicIPList = prepareMinionIPAddresses(kubernetesCluster);
if (log.isDebugEnabled()) {
log.debug(String.format("Minion public IPs: %s", minionPublicIPList));
}
Collection<ClusterPortMapping> clusterPortMappings = CloudControllerContext.getInstance()
.getClusterPortMappings(clusterContext.getApplicationId(), clusterId);
if (clusterPortMappings == null) {
log.info("No cluster port mappings found. Stratos will not attempt to create Kubernetes services");
return;
}
String serviceName = DigestUtils.md5Hex(clusterId);
Collection<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices(memberContext.getClusterInstanceId());
for (ClusterPortMapping clusterPortMapping : clusterPortMappings) {
// Skip if already created
int containerPort = clusterPortMapping.getPort();
KubernetesService existingService = findKubernetesService(kubernetesServices, containerPort);
if ((existingService != null) && serviceExistsInCluster(
existingService.getId(), kubernetesClusterContext,
memberContext, clusterPortMapping.getName())) {
log.info(String.format("Kubernetes service already exists: [kubernetes-cluster] %s " +
"[cluster] %s [service-name] %s [container-port] %d ",
kubernetesCluster.getClusterId(), clusterId, serviceName, containerPort));
continue;
}
// Find next available service sequence number
long serviceSeqNo = kubernetesClusterContext.getNextServiceSeqNo();
String serviceId =
KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo));
while (kubernetesApi.getService(serviceId) != null) {
serviceSeqNo = kubernetesClusterContext.getNextServiceSeqNo();
serviceId = KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo));
}
if (log.isInfoEnabled()) {
log.info(String.format("Creating kubernetes service: [cluster] %s [service-id] %s [service-name] " +
"%s " + "[protocol] %s [service-port] %d [container-port] %s", clusterId,
serviceId, serviceName, clusterPortMapping.getProtocol(),
clusterPortMapping.getKubernetesServicePort(), containerPort));
}
// Create kubernetes service for port mapping
int servicePort = clusterPortMapping.getKubernetesServicePort();
String serviceType = clusterPortMapping.getKubernetesPortType();
String containerPortName = KubernetesIaasUtil.preparePortNameFromPortMapping(clusterPortMapping);
Map<String, String> serviceLabels = new HashMap<>();
serviceLabels.put(CloudControllerConstants.APPLICATION_ID_LABEL,
trimLabel(CloudControllerConstants.APPLICATION_ID_LABEL, clusterContext.getApplicationId()));
serviceLabels.put(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL,
trimLabel(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL,
memberContext.getClusterInstanceId()));
serviceLabels.put(CloudControllerConstants.PORT_NAME_LABEL,
trimLabel(CloudControllerConstants.PORT_NAME_LABEL, clusterPortMapping.getName()));
Map<String, String> serviceAnnotations = new HashMap<>();
serviceAnnotations
.put(CloudControllerConstants.APPLICATION_ID_LABEL, clusterContext.getApplicationId());
serviceAnnotations.put(CloudControllerConstants.CLUSTER_ID_LABEL, clusterContext.getClusterId());
serviceAnnotations.put(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL,
memberContext.getClusterInstanceId());
serviceAnnotations.put(CloudControllerConstants.PORT_NAME_LABEL, clusterPortMapping.getName());
serviceAnnotations.put(CloudControllerConstants.PROTOCOL_LABEL, clusterPortMapping.getProtocol());
serviceAnnotations.put(CloudControllerConstants.PORT_TYPE_LABEL,
clusterPortMapping.getKubernetesPortType());
serviceAnnotations.put(CloudControllerConstants.SERVICE_PORT_LABEL, String.valueOf(clusterPortMapping
.getKubernetesServicePort()));
serviceAnnotations
.put(CloudControllerConstants.PORT_LABEL, String.valueOf(clusterPortMapping.getPort()));
serviceAnnotations.put(CloudControllerConstants.PROXY_PORT_LABEL,
String.valueOf(clusterPortMapping.getProxyPort()));
kubernetesApi.createService(serviceId, serviceName, serviceLabels, serviceAnnotations, servicePort,
serviceType, containerPortName, containerPort, sessionAffinity);
try {
Thread.sleep(1000);
}
catch (InterruptedException ignore) {
}
Service service = kubernetesApi.getService(serviceId);
if (service == null) {
throw new KubernetesClientException("Kubernetes service was not created: [service] " + serviceId);
}
KubernetesService kubernetesService = new KubernetesService();
kubernetesService.setId(service.getMetadata().getName());
kubernetesService.setPortalIP(service.getSpec().getClusterIP());
// Expose minions public IP addresses as they need to be accessed by external networks
String[] minionPublicIPArray = minionPublicIPList.toArray(new String[minionPublicIPList.size()]);
kubernetesService.setPublicIPs(minionPublicIPArray);
kubernetesService.setProtocol(clusterPortMapping.getProtocol());
kubernetesService.setPortName(clusterPortMapping.getName());
String kubernetesPortType = service.getSpec().getType();
kubernetesService.setServiceType(kubernetesPortType);
kubernetesService.setKubernetesClusterId(memberContext.getPartition().getKubernetesClusterId());
if (kubernetesPortType.equals(KubernetesConstants.NODE_PORT)) {
kubernetesService.setPort(service.getSpec().getPorts().get(0).getNodePort());
} else {
kubernetesService.setPort(service.getSpec().getPorts().get(0).getPort());
}
kubernetesService.setContainerPort(containerPort);
clusterContext.addKubernetesService(memberContext.getClusterInstanceId(), kubernetesService);
CloudControllerContext.getInstance().persist();
if (log.isInfoEnabled()) {
log.info(String.format(
"Kubernetes service successfully created: [cluster] %s [service-id] %s [protocol] %s " +
"[node-port] %d [container-port] %s", clusterId, serviceId,
clusterPortMapping.getProtocol(), servicePort, containerPort));
}
}
}
/**
* Check a given kubernetes service exists in kubernetes cluster
*
* @param serviceId
* @param kubernetesClusterContext
* @param memberContext
* @param portName
* @return
* @throws KubernetesClientException
*/
private boolean serviceExistsInCluster(String serviceId, KubernetesClusterContext kubernetesClusterContext,
MemberContext memberContext, String portName)
throws KubernetesClientException {
KubernetesApiClient kubernetesApi = kubernetesClusterContext.getKubApi();
Service service = kubernetesApi.getService(serviceId);
if (service != null) {
Map<String, String> annotations = service.getMetadata().getAnnotations();
String applicationIdLabel = annotations.get(CloudControllerConstants.APPLICATION_ID_LABEL);
String clusterInstanceIdLabel = annotations.get(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL);
String portNameLabel = annotations.get(CloudControllerConstants.PORT_NAME_LABEL);
return (StringUtils.isNotEmpty(applicationIdLabel) &&
StringUtils.isNotEmpty(clusterInstanceIdLabel) &&
StringUtils.isNotEmpty(portNameLabel) &&
applicationIdLabel.equals(memberContext.getApplicationId()) &&
clusterInstanceIdLabel.equals(memberContext.getClusterInstanceId()) &&
portNameLabel.equals(portName)
);
}
return false;
}
private String trimLabel(String key, String value) {
if (StringUtils.isNotEmpty(value) && (value.length() > KubernetesConstants.MAX_LABEL_LENGTH)) {
String trimmed = value.substring(0, KubernetesConstants.MAX_LABEL_LENGTH - 2).concat("X");
log.warn(String.format("Kubernetes label trimmed: [key] %s [original] %s [trimmed] %s",
key, value, trimmed));
return trimmed;
}
return value;
}
private String prepareServiceName(long serviceSeqNo) {
return SERVICE_NAME_PREFIX + "-" + (serviceSeqNo);
}
private List<String> prepareMinionIPAddresses(KubernetesCluster kubernetesCluster) {
List<String> minionPublicIPList = new ArrayList<String>();
KubernetesHost[] kubernetesHosts = kubernetesCluster.getKubernetesHosts();
if ((kubernetesHosts == null) || (kubernetesHosts.length == 0) || (kubernetesHosts[0] == null)) {
throw new RuntimeException("Hosts not found in kubernetes cluster: [cluster] "
+ kubernetesCluster.getClusterId());
}
for (KubernetesHost host : kubernetesHosts) {
if (host != null) {
minionPublicIPList.add(host.getPublicIPAddress());
}
}
return minionPublicIPList;
}
/**
* Find a kubernetes service by container port
*
* @param kubernetesServices
* @param containerPort
* @return
*/
private KubernetesService findKubernetesService(Collection<KubernetesService> kubernetesServices,
int containerPort) {
if (kubernetesServices != null) {
for (KubernetesService kubernetesService : kubernetesServices) {
if (kubernetesService.getContainerPort() == containerPort) {
return kubernetesService;
}
}
}
return null;
}
/**
* Generate kubernetes service ports for cluster.
*
* @param kubernetesClusterContext
* @param clusterId
* @param cartridge
*/
private void generateKubernetesServicePorts(String applicationId, String clusterId,
KubernetesClusterContext kubernetesClusterContext,
Cartridge cartridge) throws KubernetesClientException {
synchronized (KubernetesIaas.class) {
if (cartridge != null) {
StringBuilder portMappingStrBuilder = new StringBuilder();
for (PortMapping portMapping : Arrays.asList(cartridge.getPortMappings())) {
Collection<ClusterPortMapping> clusterPortMappings =
CloudControllerContext.getInstance().getClusterPortMappings(applicationId, clusterId);
if (clusterPortMappings == null) {
throw new CloudControllerException(String.format("Cluster port mappings not found: " +
"[application-id] %s [cluster-id] %s", applicationId, clusterId));
}
ClusterPortMapping clusterPortMapping = findClusterPortMapping(clusterPortMappings, portMapping);
if (clusterPortMapping == null) {
throw new CloudControllerException(String.format("Cluster port mapping not found: " +
"[application-id] %s [cluster-id] %s [transport] %s", applicationId, clusterId,
portMapping.getName()));
}
if (clusterPortMapping.getKubernetesPortType() == null) {
throw new CloudControllerException(String.format("Kubernetes service type not " +
"found [application-id] %s [cluster-id] %s [cartridge] %s", applicationId,
clusterId, cartridge));
}
String serviceType = portMapping.getKubernetesPortType();
clusterPortMapping.setKubernetesPortType(serviceType);
// If kubernetes service port is already set, skip setting a new one
if (clusterPortMapping.getKubernetesServicePort() == 0) {
if (serviceType.equals(KubernetesConstants.NODE_PORT)) {
int nextServicePort = kubernetesClusterContext.getNextServicePort();
if (nextServicePort == -1) {
throw new RuntimeException(
String.format("Could not generate service port: [cluster-id] %s " +
"[port] %d", clusterId, portMapping.getPort()));
}
// Find next available service port
KubernetesApiClient kubernetesApi = kubernetesClusterContext.getKubApi();
List<Service> services = kubernetesApi.getServices();
while (!nodePortAvailable(services, nextServicePort)) {
nextServicePort = kubernetesClusterContext.getNextServicePort();
}
clusterPortMapping.setKubernetesServicePort(nextServicePort);
} else {
clusterPortMapping.setKubernetesServicePort(portMapping.getPort());
}
} else {
if (log.isDebugEnabled()) {
log.debug(String.format("Kubernetes service port is already set: [application-id] %s " +
"[cluster-id] %s [port] %d [service-port] %d",
applicationId, clusterId, clusterPortMapping.getPort(),
clusterPortMapping.getKubernetesServicePort()));
}
}
// Add port mappings to payload
if (portMappingStrBuilder.toString().length() > 0) {
portMappingStrBuilder.append(";");
}
portMappingStrBuilder.append(String.format("NAME:%s|PROTOCOL:%s|PORT:%d|PROXY_PORT:%d|TYPE:%s",
clusterPortMapping.getName(), clusterPortMapping.getProtocol(),
clusterPortMapping.getKubernetesServicePort(), clusterPortMapping.getProxyPort(),
clusterPortMapping.getKubernetesPortType()));
if (log.isInfoEnabled()) {
log.info(String.format("Kubernetes service port generated: [application-id] %s " +
"[cluster-id] %s [port] %d [service-port] %d",
applicationId, clusterId, clusterPortMapping.getPort(),
clusterPortMapping.getKubernetesServicePort()));
}
}
NameValuePair nameValuePair = new NameValuePair(PORT_MAPPINGS, portMappingStrBuilder.toString());
payload.add(nameValuePair);
// Persist service ports added to cluster port mappings
CloudControllerContext.getInstance().persist();
}
}
}
private boolean nodePortAvailable(List<Service> services, int nodePort)
throws KubernetesClientException {
for (Service service : services) {
for (ServicePort servicePort : service.getSpec().getPorts()) {
if (servicePort.getNodePort() == nodePort) {
return false;
}
}
}
return true;
}
/**
* Find cluster port mapping that corresponds to cartridge port mapping.
*
* @param clusterPortMappings
* @param portMapping
* @return
*/
private ClusterPortMapping findClusterPortMapping(Collection<ClusterPortMapping> clusterPortMappings,
PortMapping portMapping) {
for (ClusterPortMapping clusterPortMapping : clusterPortMappings) {
if (clusterPortMapping.getName().equals(portMapping.getName())) {
return clusterPortMapping;
}
}
return null;
}
/**
* Terminate a container by member id
*
* @param memberContext
* @return
* @throws MemberTerminationFailedException
*/
public MemberContext terminateContainer(MemberContext memberContext) throws MemberTerminationFailedException {
Lock lock = null;
try {
lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
handleNullObject(memberContext, "Could not terminate container, member context not found");
Partition partition = memberContext.getPartition();
if (partition == null) {
String message = String.format("Partition not found in member context: [member] %s ",
memberContext.getMemberId());
log.error(message);
throw new RuntimeException(message);
}
String kubernetesClusterId = memberContext.getPartition().getKubernetesClusterId();
handleNullObject(kubernetesClusterId, String.format("Could not terminate container, kubernetes cluster " +
"context id is null: [partition-id] %s [member-id] %s", partition.getId(),
memberContext.getMemberId()));
KubernetesClusterContext kubernetesClusterContext =
CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
handleNullObject(kubernetesClusterContext,
String.format("Could not terminate container, kubernetes cluster " +
"context not found: [partition-id] %s [member-id] %s", partition.getId(),
memberContext.getMemberId()));
KubernetesApiClient kubApi = kubernetesClusterContext.getKubApi();
try {
log.info(String.format("Removing kubernetes pod: [application] %s [cartridge] %s [member] %s [pod] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(),
memberContext.getKubernetesPodId()));
// Remove pod
kubApi.deletePod(memberContext.getKubernetesPodId());
// Persist changes
CloudControllerContext.getInstance().persist();
log.info(String.format("Kubernetes pod removed successfully: [application] %s [cartridge] %s " +
"[member] %s [pod] %s",
memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(),
memberContext.getKubernetesPodId()));
}
catch (KubernetesClientException ignore) {
// we can't do nothing here
log.warn(String.format("Could not delete pod: [pod-id] %s", memberContext.getKubernetesPodId()));
}
return memberContext;
}
finally {
if (lock != null) {
CloudControllerContext.getInstance().releaseWriteLock(lock);
}
}
}
/**
* Get kubernetes cluster context
*
* @param kubernetesClusterId
* @param kubernetesMasterIp
* @param kubernetesMasterPort
* @param upperPort
* @param lowerPort
* @return
*/
private KubernetesClusterContext getKubernetesClusterContext(String kubernetesClusterId, String kubernetesMasterIp,
String kubernetesMasterPort, int upperPort,
int lowerPort) {
KubernetesClusterContext kubernetesClusterContext = CloudControllerContext.getInstance().
getKubernetesClusterContext(kubernetesClusterId);
if (kubernetesClusterContext != null) {
return kubernetesClusterContext;
}
kubernetesClusterContext = new KubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp,
kubernetesMasterPort, lowerPort, upperPort);
CloudControllerContext.getInstance().addKubernetesClusterContext(kubernetesClusterContext);
return kubernetesClusterContext;
}
private String readProperty(String property, org.apache.stratos.common.Properties properties, String object) {
String propVal = CloudControllerUtil.getProperty(properties, property);
handleNullObject(propVal,
"Property validation failed. Could not find property: '" + property + " in " + object);
return propVal;
}
private void handleNullObject(Object obj, String errorMsg) {
if (obj == null) {
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
}
@Override
public void releaseAddress(String ip) {
}
@Override
public boolean isValidRegion(String region) throws InvalidRegionException {
// No regions in kubernetes cluster
return true;
}
@Override
public boolean isValidZone(String region, String zone) throws InvalidZoneException, InvalidRegionException {
// No zones in kubernetes cluster
return true;
}
@Override
public boolean isValidHost(String zone, String host) throws InvalidHostException {
// No zones in kubernetes cluster
return true;
}
@Override
public String createVolume(int sizeGB, String snapshotId) {
throw new NotImplementedException();
}
@Override
public String attachVolume(String instanceId, String volumeId, String deviceName) {
throw new NotImplementedException();
}
@Override
public void detachVolume(String instanceId, String volumeId) {
throw new NotImplementedException();
}
@Override
public void deleteVolume(String volumeId) {
throw new NotImplementedException();
}
@Override
public String getIaasDevice(String device) {
throw new NotImplementedException();
}
@Override
public void allocateIpAddresses(String clusterId, MemberContext memberContext, Partition partition) {
}
/**
* Remove kubernetes services if available for application cluster.
*
* @param clusterContext
* @param clusterInstanceId
*/
public static void removeKubernetesServices(ClusterContext clusterContext, String clusterInstanceId) {
if (clusterContext != null) {
ArrayList<KubernetesService> kubernetesServices =
Lists.newArrayList(clusterContext.getKubernetesServices(clusterInstanceId));
for (KubernetesService kubernetesService : kubernetesServices) {
KubernetesClusterContext kubernetesClusterContext =
CloudControllerContext.getInstance()
.getKubernetesClusterContext(kubernetesService.getKubernetesClusterId());
KubernetesApiClient kubernetesApiClient = kubernetesClusterContext.getKubApi();
String serviceId = kubernetesService.getId();
log.info(String.format("Deleting kubernetes service: [application-id] %s " +
"[service-id] %s", clusterContext.getApplicationId(), serviceId));
try {
kubernetesApiClient.deleteService(serviceId);
kubernetesClusterContext.deallocatePort(kubernetesService.getPort());
clusterContext.removeKubernetesService(clusterInstanceId, serviceId);
}
catch (KubernetesClientException e) {
log.error(String.format("Could not delete kubernetes service: [application-id] %s " +
"[service-id] %s", clusterContext.getApplicationId(), serviceId), e);
}
}
}
}
}