blob: e8bc8e2851ed91b93d77448dacaaa5f9e65192b8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.kubernetes.cluster.actionworkers;
import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.inject.Inject;
import com.cloud.offering.NetworkOffering;
import com.cloud.offerings.dao.NetworkOfferingDao;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.BaseCmd;
import org.apache.cloudstack.api.command.user.firewall.CreateFirewallRuleCmd;
import org.apache.cloudstack.api.command.user.network.CreateNetworkACLCmd;
import org.apache.cloudstack.api.command.user.vm.StartVMCmd;
import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import com.cloud.capacity.CapacityManager;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.ClusterDetailsVO;
import com.cloud.dc.ClusterVO;
import com.cloud.dc.DataCenter;
import com.cloud.dc.dao.ClusterDao;
import com.cloud.deploy.DeployDestination;
import com.cloud.exception.InsufficientAddressCapacityException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InsufficientServerCapacityException;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.ManagementServerException;
import com.cloud.exception.NetworkRuleConflictException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.kubernetes.cluster.KubernetesCluster;
import com.cloud.kubernetes.cluster.KubernetesClusterDetailsVO;
import com.cloud.kubernetes.cluster.KubernetesClusterManagerImpl;
import com.cloud.kubernetes.cluster.KubernetesClusterVO;
import com.cloud.kubernetes.cluster.utils.KubernetesClusterUtil;
import com.cloud.network.IpAddress;
import com.cloud.network.Network;
import com.cloud.network.dao.FirewallRulesDao;
import com.cloud.network.dao.LoadBalancerDao;
import com.cloud.network.dao.LoadBalancerVO;
import com.cloud.network.firewall.FirewallService;
import com.cloud.network.lb.LoadBalancingRulesService;
import com.cloud.network.rules.FirewallRule;
import com.cloud.network.rules.FirewallRuleVO;
import com.cloud.network.rules.LoadBalancer;
import com.cloud.network.rules.PortForwardingRuleVO;
import com.cloud.network.rules.RulesService;
import com.cloud.network.rules.dao.PortForwardingRulesDao;
import com.cloud.network.vpc.NetworkACL;
import com.cloud.network.vpc.NetworkACLItem;
import com.cloud.network.vpc.NetworkACLItemDao;
import com.cloud.network.vpc.NetworkACLItemVO;
import com.cloud.network.vpc.NetworkACLService;
import com.cloud.offering.ServiceOffering;
import com.cloud.resource.ResourceManager;
import com.cloud.storage.Volume;
import com.cloud.storage.VolumeApiService;
import com.cloud.storage.VolumeVO;
import com.cloud.storage.dao.LaunchPermissionDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.user.Account;
import com.cloud.user.SSHKeyPairVO;
import com.cloud.uservm.UserVm;
import com.cloud.utils.Pair;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallback;
import com.cloud.utils.db.TransactionCallbackWithException;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.net.Ip;
import com.cloud.utils.net.NetUtils;
import com.cloud.utils.ssh.SshHelper;
import com.cloud.vm.Nic;
import com.cloud.vm.UserVmManager;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VmDetailConstants;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.logging.log4j.Level;
public class KubernetesClusterResourceModifierActionWorker extends KubernetesClusterActionWorker {
@Inject
protected CapacityManager capacityManager;
@Inject
protected ClusterDao clusterDao;
@Inject
protected ClusterDetailsDao clusterDetailsDao;
@Inject
protected HostDao hostDao;
@Inject
protected FirewallRulesDao firewallRulesDao;
@Inject
protected FirewallService firewallService;
@Inject
protected NetworkACLService networkACLService;
@Inject
protected NetworkACLItemDao networkACLItemDao;
@Inject
protected LoadBalancingRulesService lbService;
@Inject
protected RulesService rulesService;
@Inject
protected PortForwardingRulesDao portForwardingRulesDao;
@Inject
protected ResourceManager resourceManager;
@Inject
protected LoadBalancerDao loadBalancerDao;
@Inject
protected VMInstanceDao vmInstanceDao;
@Inject
protected UserVmManager userVmManager;
@Inject
protected LaunchPermissionDao launchPermissionDao;
@Inject
protected VolumeApiService volumeService;
@Inject
protected VolumeDao volumeDao;
@Inject
protected NetworkOfferingDao networkOfferingDao;
protected String kubernetesClusterNodeNamePrefix;
protected KubernetesClusterResourceModifierActionWorker(final KubernetesCluster kubernetesCluster, final KubernetesClusterManagerImpl clusterManager) {
super(kubernetesCluster, clusterManager);
}
protected void init() {
super.init();
kubernetesClusterNodeNamePrefix = getKubernetesClusterNodeNamePrefix();
}
private String getKubernetesNodeConfig(final String joinIp, final boolean ejectIso) throws IOException {
String k8sNodeConfig = readResourceFile("/conf/k8s-node.yml");
final String sshPubKey = "{{ k8s.ssh.pub.key }}";
final String joinIpKey = "{{ k8s_control_node.join_ip }}";
final String clusterTokenKey = "{{ k8s_control_node.cluster.token }}";
final String ejectIsoKey = "{{ k8s.eject.iso }}";
String pubKey = "- \"" + configurationDao.getValue("ssh.publickey") + "\"";
String sshKeyPair = kubernetesCluster.getKeyPair();
if (StringUtils.isNotEmpty(sshKeyPair)) {
SSHKeyPairVO sshkp = sshKeyPairDao.findByName(owner.getAccountId(), owner.getDomainId(), sshKeyPair);
if (sshkp != null) {
pubKey += "\n - \"" + sshkp.getPublicKey() + "\"";
}
}
k8sNodeConfig = k8sNodeConfig.replace(sshPubKey, pubKey);
k8sNodeConfig = k8sNodeConfig.replace(joinIpKey, joinIp);
k8sNodeConfig = k8sNodeConfig.replace(clusterTokenKey, KubernetesClusterUtil.generateClusterToken(kubernetesCluster));
k8sNodeConfig = k8sNodeConfig.replace(ejectIsoKey, String.valueOf(ejectIso));
k8sNodeConfig = updateKubeConfigWithRegistryDetails(k8sNodeConfig);
return k8sNodeConfig;
}
protected String updateKubeConfigWithRegistryDetails(String k8sConfig) {
/* genarate /etc/containerd/config.toml file on the nodes only if Kubernetes cluster is created to
* use docker private registry */
String registryUsername = null;
String registryPassword = null;
String registryUrl = null;
List<KubernetesClusterDetailsVO> details = kubernetesClusterDetailsDao.listDetails(kubernetesCluster.getId());
for (KubernetesClusterDetailsVO detail : details) {
if (detail.getName().equals(ApiConstants.DOCKER_REGISTRY_USER_NAME)) {
registryUsername = detail.getValue();
}
if (detail.getName().equals(ApiConstants.DOCKER_REGISTRY_PASSWORD)) {
registryPassword = detail.getValue();
}
if (detail.getName().equals(ApiConstants.DOCKER_REGISTRY_URL)) {
registryUrl = detail.getValue();
}
}
if (StringUtils.isNoneEmpty(registryUsername, registryPassword, registryUrl)) {
// Update runcmd in the cloud-init configuration to run a script that updates the containerd config with provided registry details
String runCmd = "- bash -x /opt/bin/setup-containerd";
String registryEp = registryUrl.split("://")[1];
k8sConfig = k8sConfig.replace("- containerd config default > /etc/containerd/config.toml", runCmd);
final String registryUrlKey = "{{registry.url}}";
final String registryUrlEpKey = "{{registry.url.endpoint}}";
final String registryAuthKey = "{{registry.token}}";
final String registryUname = "{{registry.username}}";
final String registryPsswd = "{{registry.password}}";
final String usernamePasswordKey = registryUsername + ":" + registryPassword;
String base64Auth = Base64.encodeBase64String(usernamePasswordKey.getBytes(com.cloud.utils.StringUtils.getPreferredCharset()));
k8sConfig = k8sConfig.replace(registryUrlKey, registryUrl);
k8sConfig = k8sConfig.replace(registryUrlEpKey, registryEp);
k8sConfig = k8sConfig.replace(registryUname, registryUsername);
k8sConfig = k8sConfig.replace(registryPsswd, registryPassword);
k8sConfig = k8sConfig.replace(registryAuthKey, base64Auth);
}
return k8sConfig;
}
protected DeployDestination plan(final long nodesCount, final DataCenter zone, final ServiceOffering offering) throws InsufficientServerCapacityException {
final int cpu_requested = offering.getCpu() * offering.getSpeed();
final long ram_requested = offering.getRamSize() * 1024L * 1024L;
List<HostVO> hosts = resourceManager.listAllHostsInOneZoneByType(Host.Type.Routing, zone.getId());
final Map<String, Pair<HostVO, Integer>> hosts_with_resevered_capacity = new ConcurrentHashMap<String, Pair<HostVO, Integer>>();
for (HostVO h : hosts) {
hosts_with_resevered_capacity.put(h.getUuid(), new Pair<HostVO, Integer>(h, 0));
}
boolean suitable_host_found = false;
for (int i = 1; i <= nodesCount; i++) {
suitable_host_found = false;
for (Map.Entry<String, Pair<HostVO, Integer>> hostEntry : hosts_with_resevered_capacity.entrySet()) {
Pair<HostVO, Integer> hp = hostEntry.getValue();
HostVO h = hp.first();
if (!h.getHypervisorType().equals(clusterTemplate.getHypervisorType())) {
continue;
}
hostDao.loadHostTags(h);
if (StringUtils.isNotEmpty(offering.getHostTag()) && !(h.getHostTags() != null && h.getHostTags().contains(offering.getHostTag()))) {
continue;
}
int reserved = hp.second();
reserved++;
ClusterVO cluster = clusterDao.findById(h.getClusterId());
ClusterDetailsVO cluster_detail_cpu = clusterDetailsDao.findDetail(cluster.getId(), "cpuOvercommitRatio");
ClusterDetailsVO cluster_detail_ram = clusterDetailsDao.findDetail(cluster.getId(), "memoryOvercommitRatio");
Float cpuOvercommitRatio = Float.parseFloat(cluster_detail_cpu.getValue());
Float memoryOvercommitRatio = Float.parseFloat(cluster_detail_ram.getValue());
if (logger.isDebugEnabled()) {
logger.debug(String.format("Checking host : %s for capacity already reserved %d", h.getName(), reserved));
}
if (capacityManager.checkIfHostHasCapacity(h.getId(), cpu_requested * reserved, ram_requested * reserved, false, cpuOvercommitRatio, memoryOvercommitRatio, true)) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Found host : %s for with enough capacity, CPU=%d RAM=%s", h.getName(), cpu_requested * reserved, toHumanReadableSize(ram_requested * reserved)));
}
hostEntry.setValue(new Pair<HostVO, Integer>(h, reserved));
suitable_host_found = true;
break;
}
}
if (!suitable_host_found) {
if (logger.isInfoEnabled()) {
logger.info(String.format("Suitable hosts not found in datacenter : %s for node %d, with offering : %s and hypervisor: %s",
zone.getName(), i, offering.getName(), clusterTemplate.getHypervisorType().toString()));
}
break;
}
}
if (suitable_host_found) {
if (logger.isInfoEnabled()) {
logger.info(String.format("Suitable hosts found in datacenter : %s, creating deployment destination", zone.getName()));
}
return new DeployDestination(zone, null, null, null);
}
String msg = String.format("Cannot find enough capacity for Kubernetes cluster(requested cpu=%d memory=%s) with offering : %s and hypervisor: %s",
cpu_requested * nodesCount, toHumanReadableSize(ram_requested * nodesCount), offering.getName(), clusterTemplate.getHypervisorType().toString());
logger.warn(msg);
throw new InsufficientServerCapacityException(msg, DataCenter.class, zone.getId());
}
protected DeployDestination plan() throws InsufficientServerCapacityException {
ServiceOffering offering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
DataCenter zone = dataCenterDao.findById(kubernetesCluster.getZoneId());
if (logger.isDebugEnabled()) {
logger.debug(String.format("Checking deployment destination for Kubernetes cluster : %s in zone : %s", kubernetesCluster.getName(), zone.getName()));
}
return plan(kubernetesCluster.getTotalNodeCount(), zone, offering);
}
protected void resizeNodeVolume(final UserVm vm) throws ManagementServerException {
try {
if (vm.getHypervisorType() == Hypervisor.HypervisorType.VMware && templateDao.findById(vm.getTemplateId()).isDeployAsIs()) {
List<VolumeVO> vmVols = volumeDao.findByInstance(vm.getId());
for (VolumeVO volumeVO : vmVols) {
if (volumeVO.getVolumeType() == Volume.Type.ROOT) {
ResizeVolumeCmd resizeVolumeCmd = new ResizeVolumeCmd();
resizeVolumeCmd = ComponentContext.inject(resizeVolumeCmd);
Field f = resizeVolumeCmd.getClass().getDeclaredField("size");
Field f1 = resizeVolumeCmd.getClass().getDeclaredField("id");
f.setAccessible(true);
f1.setAccessible(true);
f1.set(resizeVolumeCmd, volumeVO.getId());
f.set(resizeVolumeCmd, kubernetesCluster.getNodeRootDiskSize());
volumeService.resizeVolume(resizeVolumeCmd);
}
}
}
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new ManagementServerException(String.format("Failed to resize volume of VM in the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
protected void startKubernetesVM(final UserVm vm) throws ManagementServerException {
try {
StartVMCmd startVm = new StartVMCmd();
startVm = ComponentContext.inject(startVm);
Field f = startVm.getClass().getDeclaredField("id");
f.setAccessible(true);
f.set(startVm, vm.getId());
itMgr.advanceStart(vm.getUuid(), null, null);
if (logger.isInfoEnabled()) {
logger.info(String.format("Started VM : %s in the Kubernetes cluster : %s", vm.getDisplayName(), kubernetesCluster.getName()));
}
} catch (IllegalAccessException | NoSuchFieldException | OperationTimedoutException | ResourceUnavailableException | InsufficientCapacityException ex) {
throw new ManagementServerException(String.format("Failed to start VM in the Kubernetes cluster : %s", kubernetesCluster.getName()), ex);
}
UserVm startVm = userVmDao.findById(vm.getId());
if (!startVm.getState().equals(VirtualMachine.State.Running)) {
throw new ManagementServerException(String.format("Failed to start VM in the Kubernetes cluster : %s", kubernetesCluster.getName()));
}
}
protected List<UserVm> provisionKubernetesClusterNodeVms(final long nodeCount, final int offset, final String publicIpAddress) throws ManagementServerException,
ResourceUnavailableException, InsufficientCapacityException {
List<UserVm> nodes = new ArrayList<>();
for (int i = offset + 1; i <= nodeCount; i++) {
UserVm vm = createKubernetesNode(publicIpAddress);
addKubernetesClusterVm(kubernetesCluster.getId(), vm.getId(), false);
if (kubernetesCluster.getNodeRootDiskSize() > 0) {
resizeNodeVolume(vm);
}
startKubernetesVM(vm);
vm = userVmDao.findById(vm.getId());
if (vm == null) {
throw new ManagementServerException(String.format("Failed to provision worker VM for Kubernetes cluster : %s" , kubernetesCluster.getName()));
}
nodes.add(vm);
if (logger.isInfoEnabled()) {
logger.info(String.format("Provisioned node VM : %s in to the Kubernetes cluster : %s", vm.getDisplayName(), kubernetesCluster.getName()));
}
}
return nodes;
}
protected List<UserVm> provisionKubernetesClusterNodeVms(final long nodeCount, final String publicIpAddress) throws ManagementServerException,
ResourceUnavailableException, InsufficientCapacityException {
return provisionKubernetesClusterNodeVms(nodeCount, 0, publicIpAddress);
}
protected UserVm createKubernetesNode(String joinIp) throws ManagementServerException,
ResourceUnavailableException, InsufficientCapacityException {
UserVm nodeVm = null;
DataCenter zone = dataCenterDao.findById(kubernetesCluster.getZoneId());
ServiceOffering serviceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
List<Long> networkIds = new ArrayList<Long>();
networkIds.add(kubernetesCluster.getNetworkId());
Account owner = accountDao.findById(kubernetesCluster.getAccountId());
Network.IpAddresses addrs = new Network.IpAddresses(null, null);
long rootDiskSize = kubernetesCluster.getNodeRootDiskSize();
Map<String, String> customParameterMap = new HashMap<String, String>();
if (rootDiskSize > 0) {
customParameterMap.put("rootdisksize", String.valueOf(rootDiskSize));
}
if (Hypervisor.HypervisorType.VMware.equals(clusterTemplate.getHypervisorType())) {
customParameterMap.put(VmDetailConstants.ROOT_DISK_CONTROLLER, "scsi");
}
String suffix = Long.toHexString(System.currentTimeMillis());
String hostName = String.format("%s-node-%s", kubernetesClusterNodeNamePrefix, suffix);
String k8sNodeConfig = null;
try {
k8sNodeConfig = getKubernetesNodeConfig(joinIp, Hypervisor.HypervisorType.VMware.equals(clusterTemplate.getHypervisorType()));
} catch (IOException e) {
logAndThrow(Level.ERROR, "Failed to read Kubernetes node configuration file", e);
}
String base64UserData = Base64.encodeBase64String(k8sNodeConfig.getBytes(com.cloud.utils.StringUtils.getPreferredCharset()));
List<String> keypairs = new ArrayList<String>();
if (StringUtils.isNotBlank(kubernetesCluster.getKeyPair())) {
keypairs.add(kubernetesCluster.getKeyPair());
}
if (zone.isSecurityGroupEnabled()) {
List<Long> securityGroupIds = new ArrayList<>();
securityGroupIds.add(kubernetesCluster.getSecurityGroupId());
nodeVm = userVmService.createAdvancedSecurityGroupVirtualMachine(zone, serviceOffering, clusterTemplate, networkIds, securityGroupIds, owner,
hostName, hostName, null, null, null, Hypervisor.HypervisorType.None, BaseCmd.HTTPMethod.POST,base64UserData, null, null, keypairs,
null, addrs, null, null, null, customParameterMap, null, null, null,
null, true, null, UserVmManager.CKS_NODE);
} else {
nodeVm = userVmService.createAdvancedVirtualMachine(zone, serviceOffering, clusterTemplate, networkIds, owner,
hostName, hostName, null, null, null,
Hypervisor.HypervisorType.None, BaseCmd.HTTPMethod.POST, base64UserData, null, null, keypairs,
null, addrs, null, null, null, customParameterMap, null, null, null, null, true, UserVmManager.CKS_NODE, null);
}
if (logger.isInfoEnabled()) {
logger.info(String.format("Created node VM : %s, %s in the Kubernetes cluster : %s", hostName, nodeVm.getUuid(), kubernetesCluster.getName()));
}
return nodeVm;
}
protected void provisionFirewallRules(final IpAddress publicIp, final Account account, int startPort, int endPort) throws NoSuchFieldException,
IllegalAccessException, ResourceUnavailableException, NetworkRuleConflictException {
List<String> sourceCidrList = new ArrayList<String>();
sourceCidrList.add("0.0.0.0/0");
CreateFirewallRuleCmd rule = new CreateFirewallRuleCmd();
rule = ComponentContext.inject(rule);
Field addressField = rule.getClass().getDeclaredField("ipAddressId");
addressField.setAccessible(true);
addressField.set(rule, publicIp.getId());
Field protocolField = rule.getClass().getDeclaredField("protocol");
protocolField.setAccessible(true);
protocolField.set(rule, "TCP");
Field startPortField = rule.getClass().getDeclaredField("publicStartPort");
startPortField.setAccessible(true);
startPortField.set(rule, startPort);
Field endPortField = rule.getClass().getDeclaredField("publicEndPort");
endPortField.setAccessible(true);
endPortField.set(rule, endPort);
Field cidrField = rule.getClass().getDeclaredField("cidrlist");
cidrField.setAccessible(true);
cidrField.set(rule, sourceCidrList);
firewallService.createIngressFirewallRule(rule);
firewallService.applyIngressFwRules(publicIp.getId(), account);
}
protected void provisionPublicIpPortForwardingRule(IpAddress publicIp, Network network, Account account,
final long vmId, final int sourcePort, final int destPort) throws NetworkRuleConflictException, ResourceUnavailableException {
final long publicIpId = publicIp.getId();
final long networkId = network.getId();
final long accountId = account.getId();
final long domainId = account.getDomainId();
Nic vmNic = networkModel.getNicInNetwork(vmId, networkId);
final Ip vmIp = new Ip(vmNic.getIPv4Address());
PortForwardingRuleVO pfRule = Transaction.execute((TransactionCallbackWithException<PortForwardingRuleVO, NetworkRuleConflictException>) status -> {
PortForwardingRuleVO newRule =
new PortForwardingRuleVO(null, publicIpId,
sourcePort, sourcePort,
vmIp,
destPort, destPort,
"tcp", networkId, accountId, domainId, vmId);
newRule.setDisplay(true);
newRule.setState(FirewallRule.State.Add);
newRule = portForwardingRulesDao.persist(newRule);
return newRule;
});
rulesService.applyPortForwardingRules(publicIp.getId(), account);
if (logger.isInfoEnabled()) {
logger.info(String.format("Provisioned SSH port forwarding rule: %s from port %d to %d on %s to the VM IP : %s in Kubernetes cluster : %s", pfRule.getUuid(), sourcePort, destPort, publicIp.getAddress().addr(), vmIp.toString(), kubernetesCluster.getName()));
}
}
/**
* To provision SSH port forwarding rules for the given Kubernetes cluster
* for its given virtual machines
*
* @param publicIp
* @param network
* @param account
* @param clusterVMIds (when empty then method must be called while
* down-scaling of the KubernetesCluster therefore no new rules
* to be added)
* @throws ResourceUnavailableException
* @throws NetworkRuleConflictException
*/
protected void provisionSshPortForwardingRules(IpAddress publicIp, Network network, Account account,
List<Long> clusterVMIds) throws ResourceUnavailableException,
NetworkRuleConflictException {
if (!CollectionUtils.isEmpty(clusterVMIds)) {
for (int i = 0; i < clusterVMIds.size(); ++i) {
provisionPublicIpPortForwardingRule(publicIp, network, account, clusterVMIds.get(i), CLUSTER_NODES_DEFAULT_START_SSH_PORT + i, DEFAULT_SSH_PORT);
}
}
}
protected FirewallRule removeApiFirewallRule(final IpAddress publicIp) {
FirewallRule rule = null;
List<FirewallRuleVO> firewallRules = firewallRulesDao.listByIpAndPurposeAndNotRevoked(publicIp.getId(), FirewallRule.Purpose.Firewall);
for (FirewallRuleVO firewallRule : firewallRules) {
if (firewallRule.getSourcePortStart() == CLUSTER_API_PORT &&
firewallRule.getSourcePortEnd() == CLUSTER_API_PORT) {
rule = firewallRule;
firewallService.revokeIngressFwRule(firewallRule.getId(), true);
break;
}
}
return rule;
}
protected FirewallRule removeSshFirewallRule(final IpAddress publicIp) {
FirewallRule rule = null;
List<FirewallRuleVO> firewallRules = firewallRulesDao.listByIpAndPurposeAndNotRevoked(publicIp.getId(), FirewallRule.Purpose.Firewall);
for (FirewallRuleVO firewallRule : firewallRules) {
if (firewallRule.getSourcePortStart() == CLUSTER_NODES_DEFAULT_START_SSH_PORT) {
rule = firewallRule;
firewallService.revokeIngressFwRule(firewallRule.getId(), true);
break;
}
}
return rule;
}
protected void removePortForwardingRules(final IpAddress publicIp, final Network network, final Account account, final List<Long> removedVMIds) throws ResourceUnavailableException {
if (!CollectionUtils.isEmpty(removedVMIds)) {
for (Long vmId : removedVMIds) {
List<PortForwardingRuleVO> pfRules = portForwardingRulesDao.listByNetwork(network.getId());
for (PortForwardingRuleVO pfRule : pfRules) {
if (pfRule.getVirtualMachineId() == vmId) {
portForwardingRulesDao.remove(pfRule.getId());
break;
}
}
}
rulesService.applyPortForwardingRules(publicIp.getId(), account);
}
}
protected void removePortForwardingRules(final IpAddress publicIp, final Network network, final Account account, int startPort, int endPort)
throws ResourceUnavailableException {
List<PortForwardingRuleVO> pfRules = portForwardingRulesDao.listByNetwork(network.getId());
for (PortForwardingRuleVO pfRule : pfRules) {
if (startPort <= pfRule.getSourcePortStart() && pfRule.getSourcePortStart() <= endPort) {
portForwardingRulesDao.remove(pfRule.getId());
}
}
rulesService.applyPortForwardingRules(publicIp.getId(), account);
}
protected void removeLoadBalancingRule(final IpAddress publicIp, final Network network,
final Account account) throws ResourceUnavailableException {
List<LoadBalancerVO> rules = loadBalancerDao.listByIpAddress(publicIp.getId());
for (LoadBalancerVO rule : rules) {
if (rule.getNetworkId() == network.getId() &&
rule.getAccountId() == account.getId() &&
rule.getSourcePortStart() == CLUSTER_API_PORT &&
rule.getSourcePortEnd() == CLUSTER_API_PORT) {
lbService.deleteLoadBalancerRule(rule.getId(), true);
break;
}
}
}
protected void provisionVpcTierAllowPortACLRule(final Network network, int startPort, int endPorts) throws NoSuchFieldException,
IllegalAccessException, ResourceUnavailableException {
List<NetworkACLItemVO> aclItems = networkACLItemDao.listByACL(network.getNetworkACLId());
aclItems = aclItems.stream().filter(x -> !NetworkACLItem.State.Revoke.equals(x.getState())).collect(Collectors.toList());
CreateNetworkACLCmd rule = new CreateNetworkACLCmd();
rule = ComponentContext.inject(rule);
Map<String, Object> fieldValues = Map.of(
"protocol", "TCP",
"publicStartPort", startPort,
"publicEndPort", endPorts,
"trafficType", NetworkACLItem.TrafficType.Ingress.toString(),
"networkId", network.getId(),
"aclId", network.getNetworkACLId(),
"action", NetworkACLItem.Action.Allow.toString()
);
for (Map.Entry<String, Object> entry : fieldValues.entrySet()) {
Field field = rule.getClass().getDeclaredField(entry.getKey());
field.setAccessible(true);
field.set(rule, entry.getValue());
}
NetworkACLItem aclRule = networkACLService.createNetworkACLItem(rule);
networkACLService.moveRuleToTheTopInACLList(aclRule);
networkACLService.applyNetworkACL(aclRule.getAclId());
}
protected void removeVpcTierAllowPortACLRule(final Network network, int startPort, int endPort) throws NoSuchFieldException,
IllegalAccessException, ResourceUnavailableException {
List<NetworkACLItemVO> aclItems = networkACLItemDao.listByACL(network.getNetworkACLId());
aclItems = aclItems.stream().filter(x -> (x.getProtocol() != null &&
x.getProtocol().equals("TCP") &&
x.getSourcePortStart() != null &&
x.getSourcePortStart().equals(startPort) &&
x.getSourcePortEnd() != null &&
x.getSourcePortEnd().equals(endPort) &&
x.getAction().equals(NetworkACLItem.Action.Allow)))
.collect(Collectors.toList());
for (NetworkACLItemVO aclItem : aclItems) {
networkACLService.revokeNetworkACLItem(aclItem.getId());
}
}
protected void provisionLoadBalancerRule(final IpAddress publicIp, final Network network,
final Account account, final List<Long> clusterVMIds, final int port) throws NetworkRuleConflictException,
InsufficientAddressCapacityException {
LoadBalancer lb = lbService.createPublicLoadBalancerRule(null, "api-lb", "LB rule for API access",
port, port, port, port,
publicIp.getId(), NetUtils.TCP_PROTO, "roundrobin", network.getId(),
account.getId(), false, NetUtils.TCP_PROTO, true);
Map<Long, List<String>> vmIdIpMap = new HashMap<>();
for (int i = 0; i < kubernetesCluster.getControlNodeCount(); ++i) {
List<String> ips = new ArrayList<>();
Nic controlVmNic = networkModel.getNicInNetwork(clusterVMIds.get(i), kubernetesCluster.getNetworkId());
ips.add(controlVmNic.getIPv4Address());
vmIdIpMap.put(clusterVMIds.get(i), ips);
}
lbService.assignToLoadBalancer(lb.getId(), null, vmIdIpMap, false);
}
protected void createFirewallRules(IpAddress publicIp, List<Long> clusterVMIds, boolean apiRule) throws ManagementServerException {
// Firewall rule for SSH access on each node VM
try {
int endPort = CLUSTER_NODES_DEFAULT_START_SSH_PORT + clusterVMIds.size() - 1;
provisionFirewallRules(publicIp, owner, CLUSTER_NODES_DEFAULT_START_SSH_PORT, endPort);
if (logger.isInfoEnabled()) {
logger.info(String.format("Provisioned firewall rule to open up port %d to %d on %s for Kubernetes cluster : %s", CLUSTER_NODES_DEFAULT_START_SSH_PORT, endPort, publicIp.getAddress().addr(), kubernetesCluster.getName()));
}
} catch (NoSuchFieldException | IllegalAccessException | ResourceUnavailableException | NetworkRuleConflictException e) {
throw new ManagementServerException(String.format("Failed to provision firewall rules for SSH access for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
if (!apiRule) {
return;
}
// Firewall rule for API access for control node VMs
try {
provisionFirewallRules(publicIp, owner, CLUSTER_API_PORT, CLUSTER_API_PORT);
if (logger.isInfoEnabled()) {
logger.info(String.format("Provisioned firewall rule to open up port %d on %s for Kubernetes cluster %s",
CLUSTER_API_PORT, publicIp.getAddress().addr(), kubernetesCluster.getName()));
}
} catch (NoSuchFieldException | IllegalAccessException | ResourceUnavailableException | NetworkRuleConflictException e) {
throw new ManagementServerException(String.format("Failed to provision firewall rules for API access for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
/**
* Setup network rules for Kubernetes cluster
* Open up firewall port CLUSTER_API_PORT, secure port on which Kubernetes
* API server is running. Also create load balancing rule to forward public
* IP traffic to control VMs' private IP.
* Open up firewall ports NODES_DEFAULT_START_SSH_PORT to NODES_DEFAULT_START_SSH_PORT+n
* for SSH access. Also create port-forwarding rule to forward public IP traffic to all
* @param network
* @param clusterVMIds
* @throws ManagementServerException
*/
protected void setupKubernetesClusterIsolatedNetworkRules(IpAddress publicIp, Network network, List<Long> clusterVMIds, boolean apiRule) throws ManagementServerException {
createFirewallRules(publicIp, clusterVMIds, apiRule);
// Port forwarding rule for SSH access on each node VM
try {
provisionSshPortForwardingRules(publicIp, network, owner, clusterVMIds);
} catch (ResourceUnavailableException | NetworkRuleConflictException e) {
throw new ManagementServerException(String.format("Failed to activate SSH port forwarding rules for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
if (!apiRule) {
return;
}
// Load balancer rule for API access for control node VMs
try {
provisionLoadBalancerRule(publicIp, network, owner, clusterVMIds, CLUSTER_API_PORT);
} catch (NetworkRuleConflictException | InsufficientAddressCapacityException e) {
throw new ManagementServerException(String.format("Failed to provision load balancer rule for API access for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
protected void createVpcTierAclRules(Network network) throws ManagementServerException {
if (network.getNetworkACLId() == NetworkACL.DEFAULT_ALLOW) {
return;
}
// ACL rule for API access for control node VMs
try {
provisionVpcTierAllowPortACLRule(network, CLUSTER_API_PORT, CLUSTER_API_PORT);
if (logger.isInfoEnabled()) {
logger.info(String.format("Provisioned ACL rule to open up port %d on %s for Kubernetes cluster %s",
CLUSTER_API_PORT, publicIpAddress, kubernetesCluster.getName()));
}
} catch (NoSuchFieldException | IllegalAccessException | ResourceUnavailableException | InvalidParameterValueException | PermissionDeniedException e) {
throw new ManagementServerException(String.format("Failed to provision firewall rules for API access for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
try {
provisionVpcTierAllowPortACLRule(network, DEFAULT_SSH_PORT, DEFAULT_SSH_PORT);
if (logger.isInfoEnabled()) {
logger.info(String.format("Provisioned ACL rule to open up port %d on %s for Kubernetes cluster %s",
DEFAULT_SSH_PORT, publicIpAddress, kubernetesCluster.getName()));
}
} catch (NoSuchFieldException | IllegalAccessException | ResourceUnavailableException | InvalidParameterValueException | PermissionDeniedException e) {
throw new ManagementServerException(String.format("Failed to provision firewall rules for API access for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
protected void removeVpcTierAclRules(Network network) throws ManagementServerException {
if (network.getNetworkACLId() == NetworkACL.DEFAULT_ALLOW) {
return;
}
// ACL rule for API access for control node VMs
try {
removeVpcTierAllowPortACLRule(network, CLUSTER_API_PORT, CLUSTER_API_PORT);
if (logger.isInfoEnabled()) {
logger.info(String.format("Removed network ACL rule to open up port %d on %s for Kubernetes cluster %s",
CLUSTER_API_PORT, publicIpAddress, kubernetesCluster.getName()));
}
} catch (NoSuchFieldException | IllegalAccessException | ResourceUnavailableException e) {
throw new ManagementServerException(String.format("Failed to remove network ACL rule for API access for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
// ACL rule for SSH access for all node VMs
try {
removeVpcTierAllowPortACLRule(network, DEFAULT_SSH_PORT, DEFAULT_SSH_PORT);
if (logger.isInfoEnabled()) {
logger.info(String.format("Removed network ACL rule to open up port %d on %s for Kubernetes cluster %s",
DEFAULT_SSH_PORT, publicIpAddress, kubernetesCluster.getName()));
}
} catch (NoSuchFieldException | IllegalAccessException | ResourceUnavailableException e) {
throw new ManagementServerException(String.format("Failed to remove network ACL rules for SSH access for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
protected void setupKubernetesClusterVpcTierRules(IpAddress publicIp, Network network, List<Long> clusterVMIds) throws ManagementServerException {
// Create ACL rules
createVpcTierAclRules(network);
NetworkOffering offering = networkOfferingDao.findById(network.getNetworkOfferingId());
if (offering.isConserveMode()) {
// Add load balancing for API access
try {
provisionLoadBalancerRule(publicIp, network, owner, clusterVMIds, CLUSTER_API_PORT);
} catch (InsufficientAddressCapacityException e) {
throw new ManagementServerException(String.format("Failed to activate API load balancing rules for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
} else {
// Add port forwarding for API access
try {
provisionPublicIpPortForwardingRule(publicIp, network, owner, clusterVMIds.get(0), CLUSTER_API_PORT, CLUSTER_API_PORT);
} catch (ResourceUnavailableException | NetworkRuleConflictException e) {
throw new ManagementServerException(String.format("Failed to activate API port forwarding rules for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
// Add port forwarding rule for SSH access on each node VM
try {
provisionSshPortForwardingRules(publicIp, network, owner, clusterVMIds);
} catch (ResourceUnavailableException | NetworkRuleConflictException e) {
throw new ManagementServerException(String.format("Failed to activate SSH port forwarding rules for the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
protected String getKubernetesClusterNodeNamePrefix() {
String prefix = kubernetesCluster.getName();
if (!NetUtils.verifyDomainNameLabel(prefix, true)) {
prefix = prefix.replaceAll("[^a-zA-Z0-9-]", "");
if (prefix.length() == 0) {
prefix = kubernetesCluster.getUuid();
}
prefix = "k8s-" + prefix;
}
if (prefix.length() > 40) {
prefix = prefix.substring(0, 40);
}
return prefix;
}
protected KubernetesClusterVO updateKubernetesClusterEntry(final Long cores, final Long memory, final Long size,
final Long serviceOfferingId, final Boolean autoscaleEnabled, final Long minSize, final Long maxSize) {
return Transaction.execute(new TransactionCallback<KubernetesClusterVO>() {
@Override
public KubernetesClusterVO doInTransaction(TransactionStatus status) {
KubernetesClusterVO updatedCluster = kubernetesClusterDao.findById(kubernetesCluster.getId());
if (cores != null) {
updatedCluster.setCores(cores);
}
if (memory != null) {
updatedCluster.setMemory(memory);
}
if (size != null) {
updatedCluster.setNodeCount(size);
}
if (serviceOfferingId != null) {
updatedCluster.setServiceOfferingId(serviceOfferingId);
}
if (autoscaleEnabled != null) {
updatedCluster.setAutoscalingEnabled(autoscaleEnabled.booleanValue());
}
updatedCluster.setMinSize(minSize);
updatedCluster.setMaxSize(maxSize);
return kubernetesClusterDao.persist(updatedCluster);
}
});
}
private KubernetesClusterVO updateKubernetesClusterEntry(final Boolean autoscaleEnabled, final Long minSize, final Long maxSize) throws CloudRuntimeException {
KubernetesClusterVO kubernetesClusterVO = updateKubernetesClusterEntry(null, null, null, null, autoscaleEnabled, minSize, maxSize);
if (kubernetesClusterVO == null) {
logTransitStateAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster %s failed, unable to update Kubernetes cluster",
kubernetesCluster.getName()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed);
}
return kubernetesClusterVO;
}
protected boolean autoscaleCluster(boolean enable, Long minSize, Long maxSize) {
if (!kubernetesCluster.getState().equals(KubernetesCluster.State.Scaling)) {
stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.AutoscaleRequested);
}
File pkFile = getManagementServerSshPublicKeyFile();
Pair<String, Integer> publicIpSshPort = getKubernetesClusterServerIpSshPort(null);
publicIpAddress = publicIpSshPort.first();
sshPort = publicIpSshPort.second();
try {
if (enable) {
String command = String.format("sudo /opt/bin/autoscale-kube-cluster -i %s -e -M %d -m %d",
kubernetesCluster.getUuid(), maxSize, minSize);
Pair<Boolean, String> result = SshHelper.sshExecute(publicIpAddress, sshPort, getControlNodeLoginUser(),
pkFile, null, command, 10000, 10000, 60000);
// Maybe the file isn't present. Try and copy it
if (!result.first()) {
logMessage(Level.INFO, "Autoscaling files missing. Adding them now", null);
retrieveScriptFiles();
copyScripts(publicIpAddress, sshPort);
if (!createCloudStackSecret(keys)) {
logTransitStateAndThrow(Level.ERROR, String.format("Failed to setup keys for Kubernetes cluster %s",
kubernetesCluster.getName()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed);
}
// If at first you don't succeed ...
result = SshHelper.sshExecute(publicIpAddress, sshPort, getControlNodeLoginUser(),
pkFile, null, command, 10000, 10000, 60000);
if (!result.first()) {
throw new CloudRuntimeException(result.second());
}
}
updateKubernetesClusterEntry(true, minSize, maxSize);
} else {
Pair<Boolean, String> result = SshHelper.sshExecute(publicIpAddress, sshPort, getControlNodeLoginUser(),
pkFile, null, String.format("sudo /opt/bin/autoscale-kube-cluster -d"),
10000, 10000, 60000);
if (!result.first()) {
throw new CloudRuntimeException(result.second());
}
updateKubernetesClusterEntry(false, null, null);
}
return true;
} catch (Exception e) {
String msg = String.format("Failed to autoscale Kubernetes cluster: %s : %s", kubernetesCluster.getName(), e.getMessage());
logAndThrow(Level.ERROR, msg);
return false;
} finally {
// Deploying the autoscaler might fail but it can be deployed manually too, so no need to go to an alert state
updateLoginUserDetails(null);
}
}
}